/ core / sync / input_processor.py
input_processor.py
  1  """
  2  Input Processor for Sovereign OS
  3  
  4  Processes human input from FO-INPUT.md and MC-INPUT.md,
  5  routing appropriately based on attention gradient.
  6  
  7  Route Types:
  8  - SIDE: Stays peripheral, FO/MC responds in sidebar
  9  - BUBBLE: Enters focal attention, injected into main thread
 10  - DECISION: Routes to specific thread, unblocks pending items
 11  - BROADCAST: All threads receive via RESONANCE-ALERT
 12  """
 13  
 14  import re
 15  import logging
 16  from pathlib import Path
 17  from dataclasses import dataclass
 18  from typing import List, Optional, Dict
 19  from datetime import datetime
 20  from enum import Enum
 21  
 22  logger = logging.getLogger(__name__)
 23  
 24  
 25  class RouteType(Enum):
 26      """Types of input routing."""
 27      SIDE = "side"           # Stays peripheral
 28      BUBBLE = "bubble"       # Enters focal attention
 29      DECISION = "decision"   # Routes to thread, unblocks
 30      BROADCAST = "broadcast" # All threads receive
 31  
 32  
 33  @dataclass
 34  class HumanInput:
 35      """A parsed human input block."""
 36      
 37      timestamp: datetime
 38      title: str
 39      route: RouteType
 40      threads: List[str]
 41      content: str
 42      source_file: str  # "fo" or "mc"
 43      processed: bool = False
 44      
 45      @property
 46      def urgency(self) -> float:
 47          """Calculate urgency based on route type."""
 48          urgencies = {
 49              RouteType.SIDE: 0.2,
 50              RouteType.BUBBLE: 0.8,
 51              RouteType.DECISION: 0.9,
 52              RouteType.BROADCAST: 1.0
 53          }
 54          return urgencies.get(self.route, 0.5)
 55      
 56      def to_acknowledgment(self) -> str:
 57          """Generate acknowledgment text."""
 58          return f"""
 59  ### Processed: {self.timestamp.strftime('%H:%M')}
 60  
 61  **Route:** {self.route.value.upper()}
 62  **Threads:** {', '.join(self.threads)}
 63  **Urgency:** {self.urgency}
 64  
 65  ✓ Input received and routed.
 66  
 67  ---
 68  """
 69  
 70  
 71  class InputProcessor:
 72      """
 73      Processes human input from dashboard files.
 74      
 75      Watches FO-INPUT.md and MC-INPUT.md for new input blocks,
 76      parses them, and routes appropriately.
 77      """
 78      
 79      # Pattern to match input blocks
 80      INPUT_PATTERN = re.compile(
 81          r"## (\d{4}-\d{2}-\d{2} \d{2}:\d{2}) (.+?)\n\n"
 82          r"\*\*Route:\*\* (SIDE|BUBBLE|DECISION|BROADCAST)\n"
 83          r"\*\*(?:Thread|Threads):\*\* (.+?)\n\n"
 84          r"(.*?)\n\n---",
 85          re.DOTALL | re.IGNORECASE
 86      )
 87      
 88      def __init__(self, sessions_dir: Path):
 89          self.sessions_dir = Path(sessions_dir)
 90          self._processed_inputs: Dict[str, List[str]] = {
 91              "fo": [],
 92              "mc": []
 93          }
 94      
 95      def check_for_inputs(self) -> List[HumanInput]:
 96          """Check both input files for new inputs."""
 97          inputs = []
 98          
 99          # Check FO-INPUT.md
100          fo_inputs = self._parse_input_file(
101              self.sessions_dir / "FO-INPUT.md",
102              "fo"
103          )
104          inputs.extend(fo_inputs)
105          
106          # Check MC-INPUT.md
107          mc_inputs = self._parse_input_file(
108              self.sessions_dir / "MC-INPUT.md",
109              "mc"
110          )
111          inputs.extend(mc_inputs)
112          
113          return inputs
114      
115      def _parse_input_file(
116          self, 
117          filepath: Path, 
118          source: str
119      ) -> List[HumanInput]:
120          """Parse an input file for new input blocks."""
121          if not filepath.exists():
122              return []
123          
124          content = filepath.read_text(encoding="utf-8")
125          inputs = []
126          
127          for match in self.INPUT_PATTERN.finditer(content):
128              timestamp_str = match.group(1)
129              title = match.group(2).strip()
130              route_str = match.group(3).upper()
131              threads_str = match.group(4).strip()
132              message = match.group(5).strip()
133              
134              # Create unique ID for this input
135              input_id = f"{timestamp_str}-{title[:20]}"
136              
137              # Skip if already processed
138              if input_id in self._processed_inputs[source]:
139                  continue
140              
141              try:
142                  timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M")
143                  route = RouteType[route_str]
144                  threads = [t.strip() for t in threads_str.split(",")]
145                  if threads == ["current"]:
146                      threads = ["main"]
147                  
148                  human_input = HumanInput(
149                      timestamp=timestamp,
150                      title=title,
151                      route=route,
152                      threads=threads,
153                      content=message,
154                      source_file=source
155                  )
156                  
157                  inputs.append(human_input)
158                  self._processed_inputs[source].append(input_id)
159                  
160                  logger.info(
161                      f"New input detected: {title} "
162                      f"(route: {route.value}, threads: {threads})"
163                  )
164                  
165              except (ValueError, KeyError) as e:
166                  logger.warning(f"Failed to parse input block: {e}")
167          
168          return inputs
169      
170      def route_input(self, human_input: HumanInput) -> None:
171          """Route an input based on its type."""
172          
173          if human_input.route == RouteType.SIDE:
174              self._route_side(human_input)
175          elif human_input.route == RouteType.BUBBLE:
176              self._route_bubble(human_input)
177          elif human_input.route == RouteType.DECISION:
178              self._route_decision(human_input)
179          elif human_input.route == RouteType.BROADCAST:
180              self._route_broadcast(human_input)
181          
182          human_input.processed = True
183      
184      def _route_side(self, human_input: HumanInput) -> None:
185          """Route a SIDE input - stays peripheral."""
186          logger.info(f"Routing SIDE input: {human_input.title}")
187          
188          # Append to appropriate compression file's sidebar section
189          if human_input.source_file == "fo":
190              self._append_to_sidebar(
191                  self.sessions_dir / "LIVE-COMPRESSION.md",
192                  human_input
193              )
194          else:
195              self._append_to_sidebar(
196                  self.sessions_dir / "DAILY-SYNTHESIS.md",
197                  human_input
198              )
199      
200      def _route_bubble(self, human_input: HumanInput) -> None:
201          """Route a BUBBLE input - enters focal attention."""
202          logger.info(f"Routing BUBBLE input: {human_input.title}")
203          
204          # Create a bubble file that FO will pick up
205          bubble_path = self.sessions_dir / "PENDING-BUBBLES.md"
206          
207          bubble_content = f"""
208  ## Bubble: {human_input.title}
209  
210  **From:** {human_input.source_file.upper()}-INPUT
211  **Time:** {human_input.timestamp.strftime('%H:%M')}
212  **Urgency:** {human_input.urgency}
213  
214  {human_input.content}
215  
216  **Status:** PENDING - Will enter main thread at next FO checkpoint
217  
218  ---
219  """
220          
221          # Append to bubbles file
222          if bubble_path.exists():
223              existing = bubble_path.read_text()
224              bubble_path.write_text(existing + bubble_content)
225          else:
226              bubble_path.write_text(f"# Pending Bubbles\n\n{bubble_content}")
227      
228      def _route_decision(self, human_input: HumanInput) -> None:
229          """Route a DECISION input - unblocks threads."""
230          logger.info(f"Routing DECISION input: {human_input.title}")
231          
232          # Write to decisions log
233          decisions_path = self.sessions_dir / "DECISIONS-LOG.md"
234          
235          decision_content = f"""
236  ## Decision: {human_input.title}
237  
238  **Time:** {human_input.timestamp.strftime('%Y-%m-%d %H:%M')}
239  **Affects:** {', '.join(human_input.threads)}
240  
241  {human_input.content}
242  
243  **Status:** ROUTED - Threads notified
244  
245  ---
246  """
247          
248          if decisions_path.exists():
249              existing = decisions_path.read_text()
250              decisions_path.write_text(existing + decision_content)
251          else:
252              decisions_path.write_text(f"# Decisions Log\n\n{decision_content}")
253          
254          # Create resonance alert for affected threads
255          self._create_resonance_alert(human_input, "DECISION")
256      
257      def _route_broadcast(self, human_input: HumanInput) -> None:
258          """Route a BROADCAST input - all threads receive."""
259          logger.info(f"Routing BROADCAST input: {human_input.title}")
260          
261          # Create resonance alert for all threads
262          self._create_resonance_alert(human_input, "BROADCAST")
263      
264      def _append_to_sidebar(self, filepath: Path, human_input: HumanInput) -> None:
265          """Append input acknowledgment to a file's sidebar section."""
266          if not filepath.exists():
267              return
268          
269          content = filepath.read_text()
270          
271          sidebar_entry = f"""
272  ### Human Input ({human_input.timestamp.strftime('%H:%M')})
273  
274  **{human_input.title}**
275  
276  {human_input.content}
277  
278  *Acknowledged. Awaiting FO response.*
279  
280  ---
281  """
282          
283          # Try to find a sidebar section, or append at end
284          if "## Sidebar" in content:
285              content = content.replace(
286                  "## Sidebar",
287                  f"## Sidebar\n{sidebar_entry}"
288              )
289          else:
290              content += f"\n## Sidebar\n{sidebar_entry}"
291          
292          filepath.write_text(content)
293      
294      def _create_resonance_alert(
295          self, 
296          human_input: HumanInput, 
297          alert_type: str
298      ) -> None:
299          """Create a resonance alert for the input."""
300          alerts_dir = self.sessions_dir / "RESONANCE-ALERTS"
301          alerts_dir.mkdir(exist_ok=True)
302          
303          timestamp = human_input.timestamp.strftime("%Y%m%d-%H%M%S")
304          filename = f"{timestamp}-human-{alert_type.lower()}.md"
305          filepath = alerts_dir / filename
306          
307          threads_list = ', '.join(human_input.threads)
308          
309          alert_content = f"""# Human Input Alert - {human_input.timestamp.strftime('%Y-%m-%d %H:%M')}
310  
311  **Type:** {alert_type}
312  **Source:** {human_input.source_file.upper()}-INPUT
313  **Affected Threads:** {threads_list}
314  **Urgency:** {human_input.urgency}
315  
316  ---
317  
318  ## Human Input
319  
320  **{human_input.title}**
321  
322  {human_input.content}
323  
324  ---
325  
326  ## Required Action
327  
328  {"All FOs should acknowledge this input at next checkpoint." if alert_type == "BROADCAST" else "Affected thread FOs should process this decision."}
329  
330  ---
331  
332  *Generated by Input Processor | {datetime.now().strftime('%Y-%m-%d %H:%M')}*
333  """
334          
335          filepath.write_text(alert_content)
336          logger.info(f"Created resonance alert: {filename}")
337  
338  
339  # CLI for testing
340  if __name__ == "__main__":
341      import sys
342      
343      logging.basicConfig(level=logging.INFO)
344      
345      sessions_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("./sessions")
346      
347      processor = InputProcessor(sessions_dir)
348      inputs = processor.check_for_inputs()
349      
350      print(f"Found {len(inputs)} new inputs:")
351      for inp in inputs:
352          print(f"  - {inp.title} (route: {inp.route.value})")
353          processor.route_input(inp)