input_processor.py
1 """ 2 Input Processor for Sovereign OS 3 4 Processes human input from FO-INPUT.md and MC-INPUT.md, 5 routing appropriately based on attention gradient. 6 7 Route Types: 8 - SIDE: Stays peripheral, FO/MC responds in sidebar 9 - BUBBLE: Enters focal attention, injected into main thread 10 - DECISION: Routes to specific thread, unblocks pending items 11 - BROADCAST: All threads receive via RESONANCE-ALERT 12 """ 13 14 import re 15 import logging 16 from pathlib import Path 17 from dataclasses import dataclass 18 from typing import List, Optional, Dict 19 from datetime import datetime 20 from enum import Enum 21 22 logger = logging.getLogger(__name__) 23 24 25 class RouteType(Enum): 26 """Types of input routing.""" 27 SIDE = "side" # Stays peripheral 28 BUBBLE = "bubble" # Enters focal attention 29 DECISION = "decision" # Routes to thread, unblocks 30 BROADCAST = "broadcast" # All threads receive 31 32 33 @dataclass 34 class HumanInput: 35 """A parsed human input block.""" 36 37 timestamp: datetime 38 title: str 39 route: RouteType 40 threads: List[str] 41 content: str 42 source_file: str # "fo" or "mc" 43 processed: bool = False 44 45 @property 46 def urgency(self) -> float: 47 """Calculate urgency based on route type.""" 48 urgencies = { 49 RouteType.SIDE: 0.2, 50 RouteType.BUBBLE: 0.8, 51 RouteType.DECISION: 0.9, 52 RouteType.BROADCAST: 1.0 53 } 54 return urgencies.get(self.route, 0.5) 55 56 def to_acknowledgment(self) -> str: 57 """Generate acknowledgment text.""" 58 return f""" 59 ### Processed: {self.timestamp.strftime('%H:%M')} 60 61 **Route:** {self.route.value.upper()} 62 **Threads:** {', '.join(self.threads)} 63 **Urgency:** {self.urgency} 64 65 ✓ Input received and routed. 66 67 --- 68 """ 69 70 71 class InputProcessor: 72 """ 73 Processes human input from dashboard files. 74 75 Watches FO-INPUT.md and MC-INPUT.md for new input blocks, 76 parses them, and routes appropriately. 77 """ 78 79 # Pattern to match input blocks 80 INPUT_PATTERN = re.compile( 81 r"## (\d{4}-\d{2}-\d{2} \d{2}:\d{2}) (.+?)\n\n" 82 r"\*\*Route:\*\* (SIDE|BUBBLE|DECISION|BROADCAST)\n" 83 r"\*\*(?:Thread|Threads):\*\* (.+?)\n\n" 84 r"(.*?)\n\n---", 85 re.DOTALL | re.IGNORECASE 86 ) 87 88 def __init__(self, sessions_dir: Path): 89 self.sessions_dir = Path(sessions_dir) 90 self._processed_inputs: Dict[str, List[str]] = { 91 "fo": [], 92 "mc": [] 93 } 94 95 def check_for_inputs(self) -> List[HumanInput]: 96 """Check both input files for new inputs.""" 97 inputs = [] 98 99 # Check FO-INPUT.md 100 fo_inputs = self._parse_input_file( 101 self.sessions_dir / "FO-INPUT.md", 102 "fo" 103 ) 104 inputs.extend(fo_inputs) 105 106 # Check MC-INPUT.md 107 mc_inputs = self._parse_input_file( 108 self.sessions_dir / "MC-INPUT.md", 109 "mc" 110 ) 111 inputs.extend(mc_inputs) 112 113 return inputs 114 115 def _parse_input_file( 116 self, 117 filepath: Path, 118 source: str 119 ) -> List[HumanInput]: 120 """Parse an input file for new input blocks.""" 121 if not filepath.exists(): 122 return [] 123 124 content = filepath.read_text(encoding="utf-8") 125 inputs = [] 126 127 for match in self.INPUT_PATTERN.finditer(content): 128 timestamp_str = match.group(1) 129 title = match.group(2).strip() 130 route_str = match.group(3).upper() 131 threads_str = match.group(4).strip() 132 message = match.group(5).strip() 133 134 # Create unique ID for this input 135 input_id = f"{timestamp_str}-{title[:20]}" 136 137 # Skip if already processed 138 if input_id in self._processed_inputs[source]: 139 continue 140 141 try: 142 timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M") 143 route = RouteType[route_str] 144 threads = [t.strip() for t in threads_str.split(",")] 145 if threads == ["current"]: 146 threads = ["main"] 147 148 human_input = HumanInput( 149 timestamp=timestamp, 150 title=title, 151 route=route, 152 threads=threads, 153 content=message, 154 source_file=source 155 ) 156 157 inputs.append(human_input) 158 self._processed_inputs[source].append(input_id) 159 160 logger.info( 161 f"New input detected: {title} " 162 f"(route: {route.value}, threads: {threads})" 163 ) 164 165 except (ValueError, KeyError) as e: 166 logger.warning(f"Failed to parse input block: {e}") 167 168 return inputs 169 170 def route_input(self, human_input: HumanInput) -> None: 171 """Route an input based on its type.""" 172 173 if human_input.route == RouteType.SIDE: 174 self._route_side(human_input) 175 elif human_input.route == RouteType.BUBBLE: 176 self._route_bubble(human_input) 177 elif human_input.route == RouteType.DECISION: 178 self._route_decision(human_input) 179 elif human_input.route == RouteType.BROADCAST: 180 self._route_broadcast(human_input) 181 182 human_input.processed = True 183 184 def _route_side(self, human_input: HumanInput) -> None: 185 """Route a SIDE input - stays peripheral.""" 186 logger.info(f"Routing SIDE input: {human_input.title}") 187 188 # Append to appropriate compression file's sidebar section 189 if human_input.source_file == "fo": 190 self._append_to_sidebar( 191 self.sessions_dir / "LIVE-COMPRESSION.md", 192 human_input 193 ) 194 else: 195 self._append_to_sidebar( 196 self.sessions_dir / "DAILY-SYNTHESIS.md", 197 human_input 198 ) 199 200 def _route_bubble(self, human_input: HumanInput) -> None: 201 """Route a BUBBLE input - enters focal attention.""" 202 logger.info(f"Routing BUBBLE input: {human_input.title}") 203 204 # Create a bubble file that FO will pick up 205 bubble_path = self.sessions_dir / "PENDING-BUBBLES.md" 206 207 bubble_content = f""" 208 ## Bubble: {human_input.title} 209 210 **From:** {human_input.source_file.upper()}-INPUT 211 **Time:** {human_input.timestamp.strftime('%H:%M')} 212 **Urgency:** {human_input.urgency} 213 214 {human_input.content} 215 216 **Status:** PENDING - Will enter main thread at next FO checkpoint 217 218 --- 219 """ 220 221 # Append to bubbles file 222 if bubble_path.exists(): 223 existing = bubble_path.read_text() 224 bubble_path.write_text(existing + bubble_content) 225 else: 226 bubble_path.write_text(f"# Pending Bubbles\n\n{bubble_content}") 227 228 def _route_decision(self, human_input: HumanInput) -> None: 229 """Route a DECISION input - unblocks threads.""" 230 logger.info(f"Routing DECISION input: {human_input.title}") 231 232 # Write to decisions log 233 decisions_path = self.sessions_dir / "DECISIONS-LOG.md" 234 235 decision_content = f""" 236 ## Decision: {human_input.title} 237 238 **Time:** {human_input.timestamp.strftime('%Y-%m-%d %H:%M')} 239 **Affects:** {', '.join(human_input.threads)} 240 241 {human_input.content} 242 243 **Status:** ROUTED - Threads notified 244 245 --- 246 """ 247 248 if decisions_path.exists(): 249 existing = decisions_path.read_text() 250 decisions_path.write_text(existing + decision_content) 251 else: 252 decisions_path.write_text(f"# Decisions Log\n\n{decision_content}") 253 254 # Create resonance alert for affected threads 255 self._create_resonance_alert(human_input, "DECISION") 256 257 def _route_broadcast(self, human_input: HumanInput) -> None: 258 """Route a BROADCAST input - all threads receive.""" 259 logger.info(f"Routing BROADCAST input: {human_input.title}") 260 261 # Create resonance alert for all threads 262 self._create_resonance_alert(human_input, "BROADCAST") 263 264 def _append_to_sidebar(self, filepath: Path, human_input: HumanInput) -> None: 265 """Append input acknowledgment to a file's sidebar section.""" 266 if not filepath.exists(): 267 return 268 269 content = filepath.read_text() 270 271 sidebar_entry = f""" 272 ### Human Input ({human_input.timestamp.strftime('%H:%M')}) 273 274 **{human_input.title}** 275 276 {human_input.content} 277 278 *Acknowledged. Awaiting FO response.* 279 280 --- 281 """ 282 283 # Try to find a sidebar section, or append at end 284 if "## Sidebar" in content: 285 content = content.replace( 286 "## Sidebar", 287 f"## Sidebar\n{sidebar_entry}" 288 ) 289 else: 290 content += f"\n## Sidebar\n{sidebar_entry}" 291 292 filepath.write_text(content) 293 294 def _create_resonance_alert( 295 self, 296 human_input: HumanInput, 297 alert_type: str 298 ) -> None: 299 """Create a resonance alert for the input.""" 300 alerts_dir = self.sessions_dir / "RESONANCE-ALERTS" 301 alerts_dir.mkdir(exist_ok=True) 302 303 timestamp = human_input.timestamp.strftime("%Y%m%d-%H%M%S") 304 filename = f"{timestamp}-human-{alert_type.lower()}.md" 305 filepath = alerts_dir / filename 306 307 threads_list = ', '.join(human_input.threads) 308 309 alert_content = f"""# Human Input Alert - {human_input.timestamp.strftime('%Y-%m-%d %H:%M')} 310 311 **Type:** {alert_type} 312 **Source:** {human_input.source_file.upper()}-INPUT 313 **Affected Threads:** {threads_list} 314 **Urgency:** {human_input.urgency} 315 316 --- 317 318 ## Human Input 319 320 **{human_input.title}** 321 322 {human_input.content} 323 324 --- 325 326 ## Required Action 327 328 {"All FOs should acknowledge this input at next checkpoint." if alert_type == "BROADCAST" else "Affected thread FOs should process this decision."} 329 330 --- 331 332 *Generated by Input Processor | {datetime.now().strftime('%Y-%m-%d %H:%M')}* 333 """ 334 335 filepath.write_text(alert_content) 336 logger.info(f"Created resonance alert: {filename}") 337 338 339 # CLI for testing 340 if __name__ == "__main__": 341 import sys 342 343 logging.basicConfig(level=logging.INFO) 344 345 sessions_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("./sessions") 346 347 processor = InputProcessor(sessions_dir) 348 inputs = processor.check_for_inputs() 349 350 print(f"Found {len(inputs)} new inputs:") 351 for inp in inputs: 352 print(f" - {inp.title} (route: {inp.route.value})") 353 processor.route_input(inp)