/ orchestrator.py
orchestrator.py
1 #!/usr/bin/env python3 2 """ 3 Alpha/Delta Protocol — CSPEC-Native Orchestrator 4 5 All worker communication uses .cspec files exclusively. 6 Human-readable documentation is a derived artifact. 7 8 Windows Path: C:\\Users\\MarcoAniballi\\Radicle\\alpha-delta-orchestrator\\ 9 10 Usage: 11 python orchestrator.py start 12 python orchestrator.py status 13 python orchestrator.py add-task "title" --role executor-alpha 14 python orchestrator.py respond 15 """ 16 17 import os 18 import sys 19 import re 20 import json 21 import time 22 import logging 23 import argparse 24 import threading 25 import subprocess 26 from pathlib import Path 27 from datetime import datetime 28 from typing import Optional, Dict, List, Any 29 30 try: 31 from rich.console import Console 32 from rich.table import Table 33 from rich.panel import Panel 34 RICH_AVAILABLE = True 35 except ImportError: 36 RICH_AVAILABLE = False 37 38 39 class CspecParser: 40 """Parse and write .cspec files.""" 41 42 @staticmethod 43 def parse(content: str) -> Dict[str, Any]: 44 result = {} 45 lines = content.split('\n') 46 i = 0 47 48 while i < len(lines): 49 line = lines[i].rstrip() 50 stripped = line.lstrip() 51 52 if not stripped or stripped.startswith('#'): 53 i += 1 54 continue 55 56 if stripped.endswith('{'): 57 name = stripped[:-1].strip() 58 block, end_i = CspecParser._parse_block(lines, i + 1) 59 result[name] = block 60 i = end_i + 1 61 continue 62 63 if ':' in stripped: 64 key, value = stripped.split(':', 1) 65 result[key.strip()] = CspecParser._parse_value(value.strip()) 66 67 i += 1 68 69 return result 70 71 @staticmethod 72 def _parse_block(lines: List[str], start: int) -> tuple: 73 result = {} 74 i = start 75 depth = 1 76 77 while i < len(lines) and depth > 0: 78 line = lines[i].rstrip() 79 stripped = line.lstrip() 80 81 if stripped == '}': 82 depth -= 1 83 if depth == 0: 84 return result, i 85 elif stripped.endswith('{'): 86 depth += 1 87 name = stripped[:-1].strip() 88 block, end_i = CspecParser._parse_block(lines, i + 1) 89 result[name] = block 90 i = end_i 91 elif ':' in stripped and not stripped.startswith('#'): 92 key, value = stripped.split(':', 1) 93 result[key.strip()] = CspecParser._parse_value(value.strip()) 94 i += 1 95 96 return result, i 97 98 @staticmethod 99 def _parse_value(value: str) -> Any: 100 if value.startswith('[') and value.endswith(']'): 101 items = value[1:-1].split(',') 102 return [i.strip().strip('"\'') for i in items if i.strip()] 103 if value.lower() in ('true', 'false'): 104 return value.lower() == 'true' 105 if value.isdigit(): 106 return int(value) 107 if value in ('null', '?', ''): 108 return None 109 return value.strip('"\'') 110 111 @staticmethod 112 def write(data: Dict[str, Any], indent: int = 0) -> str: 113 lines = [] 114 ind = " " * indent 115 116 for key, value in data.items(): 117 if isinstance(value, dict): 118 lines.append(f"{ind}{key} {{") 119 lines.append(CspecParser.write(value, indent + 1)) 120 lines.append(f"{ind}}}") 121 elif isinstance(value, list): 122 items = ', '.join(f'"{v}"' for v in value) 123 lines.append(f'{ind}{key}: [{items}]') 124 elif isinstance(value, bool): 125 lines.append(f"{ind}{key}: {str(value).lower()}") 126 elif value is None: 127 lines.append(f"{ind}{key}: null") 128 else: 129 lines.append(f'{ind}{key}: "{value}"' if ' ' in str(value) else f"{ind}{key}: {value}") 130 131 return '\n'.join(lines) 132 133 134 class OrchestratorConfig: 135 """Configuration with Windows paths.""" 136 137 def __init__(self): 138 if sys.platform == "win32": 139 self.RADICLE_ROOT = Path(r"C:\Users\MarcoAniballi\Radicle") 140 else: 141 self.RADICLE_ROOT = Path.home() / "Radicle" 142 143 self.WORKER_ROLES = ["planner", "executor-alpha", "executor-delta", "executor-infra", "docs"] 144 self.CLAUDE_CMD = "claude" 145 self.AUTO_ACCEPT = True 146 self.POLL_INTERVAL = 2.0 147 148 self._ensure_dirs() 149 150 @property 151 def CONTEXT_REPO(self): return self.RADICLE_ROOT / "alpha-delta-context" 152 @property 153 def PROTOCOL_REPO(self): return self.RADICLE_ROOT / "alpha-delta-protocol" 154 @property 155 def ORCH_ROOT(self): return self.RADICLE_ROOT / "alpha-delta-orchestrator" 156 @property 157 def WORKERS_DIR(self): return self.ORCH_ROOT / "workers" 158 @property 159 def QUEUES_DIR(self): return self.ORCH_ROOT / "queues" 160 @property 161 def LOGS_DIR(self): return self.ORCH_ROOT / "logs" 162 163 def _ensure_dirs(self): 164 self.ORCH_ROOT.mkdir(parents=True, exist_ok=True) 165 for d in [self.WORKERS_DIR, self.QUEUES_DIR, self.LOGS_DIR]: 166 d.mkdir(exist_ok=True) 167 for sub in ["pending", "active", "completed", "questions"]: 168 (self.QUEUES_DIR / sub).mkdir(exist_ok=True) 169 for role in self.WORKER_ROLES: 170 wd = self.WORKERS_DIR / role 171 wd.mkdir(exist_ok=True) 172 (wd / "inbox").mkdir(exist_ok=True) 173 (wd / "outbox").mkdir(exist_ok=True) 174 (wd / "archive").mkdir(exist_ok=True) 175 176 177 def gen_id(prefix: str, suffix: str = "") -> str: 178 ts = datetime.now().strftime("%Y%m%d_%H%M%S") 179 if suffix: 180 suffix = re.sub(r'[^a-z0-9]', '_', suffix.lower())[:20] 181 return f"{prefix}_{ts}_{suffix}" 182 return f"{prefix}_{ts}" 183 184 185 def create_task(title: str, objective: str, role: str, priority: str = "normal", 186 context_load: List[str] = None) -> str: 187 task_id = gen_id("task", title) 188 data = { 189 "meta": {"id": task_id, "created": datetime.now().isoformat(), "priority": priority, "role": role}, 190 "context": {"load": context_load or ["project/architecture/machine/index.cspec"]}, 191 "task": {"title": title, "objective": objective}, 192 "refs": {} 193 } 194 return f"# Task: {title}\n# Generated: {datetime.now().isoformat()}\n\n{CspecParser.write(data)}" 195 196 197 class WorkerProcess: 198 """Manages a Claude Code worker.""" 199 200 def __init__(self, role: str, config: OrchestratorConfig): 201 self.role = role 202 self.config = config 203 self.status = "stopped" 204 self.current_task = None 205 self.completed = 0 206 self._stop = threading.Event() 207 self._thread = None 208 209 @property 210 def inbox(self): return self.config.WORKERS_DIR / self.role / "inbox" 211 @property 212 def outbox(self): return self.config.WORKERS_DIR / self.role / "outbox" 213 @property 214 def worker_dir(self): return self.config.WORKERS_DIR / self.role 215 216 def _prompt(self) -> str: 217 return f"""# CSPEC-Native Worker: {self.role} 218 219 ## Protocol 220 - Tasks arrive in: inbox/ 221 - Results go to: outbox/ 222 - Load types.cspec first, then task.context.load[] 223 224 ## Paths 225 Context: {self.config.CONTEXT_REPO} 226 Protocol: {self.config.PROTOCOL_REPO} 227 Inbox: {self.inbox} 228 229 ## Now 230 Check inbox/ for .cspec tasks. Execute and write result to outbox/.""" 231 232 def start(self): 233 prompt_file = self.worker_dir / ".worker_prompt.md" 234 prompt_file.write_text(self._prompt()) 235 236 init = "Read .worker_prompt.md, check inbox/ for tasks." 237 238 if sys.platform == "win32": 239 cmd = f'start "Claude-{self.role}" cmd /k "cd /d {self.worker_dir} && {self.config.CLAUDE_CMD}' 240 if self.config.AUTO_ACCEPT: 241 cmd += ' --dangerously-skip-permissions' 242 cmd += f' "{init}""' 243 subprocess.Popen(cmd, shell=True) 244 else: 245 session = f"claude-{self.role}" 246 subprocess.run(["tmux", "kill-session", "-t", session], capture_output=True) 247 subprocess.run(["tmux", "new-session", "-d", "-s", session, "-c", str(self.worker_dir)]) 248 249 claude = f"export ANTHROPIC_API_KEY='{os.environ.get('ANTHROPIC_API_KEY','')}' && {self.config.CLAUDE_CMD}" 250 if self.config.AUTO_ACCEPT: 251 claude += " --dangerously-skip-permissions" 252 claude += f' "{init}"' 253 subprocess.run(["tmux", "send-keys", "-t", session, claude, "Enter"]) 254 255 self.status = "idle" 256 self._stop.clear() 257 self._thread = threading.Thread(target=self._monitor, daemon=True) 258 self._thread.start() 259 260 def stop(self): 261 self._stop.set() 262 if sys.platform != "win32": 263 session = f"claude-{self.role}" 264 subprocess.run(["tmux", "send-keys", "-t", session, "C-c"], capture_output=True) 265 subprocess.run(["tmux", "kill-session", "-t", session], capture_output=True) 266 self.status = "stopped" 267 268 def assign(self, task_path: Path): 269 dest = self.inbox / task_path.name 270 dest.write_text(task_path.read_text()) 271 data = CspecParser.parse(task_path.read_text()) 272 self.current_task = data.get("meta", {}).get("id") 273 self.status = "working" 274 275 def _monitor(self): 276 while not self._stop.is_set(): 277 try: 278 for f in self.outbox.glob("*.cspec"): 279 data = CspecParser.parse(f.read_text()) 280 281 if "question" in data: 282 dest = self.config.QUEUES_DIR / "questions" / f.name 283 dest.write_text(f.read_text()) 284 self.status = "blocked" 285 elif "output" in data or "meta" in data: 286 dest = self.config.QUEUES_DIR / "completed" / f.name 287 dest.write_text(f.read_text()) 288 self.completed += 1 289 self.current_task = None 290 self.status = "idle" 291 elif "task" in data: 292 dest = self.config.QUEUES_DIR / "pending" / f.name 293 dest.write_text(f.read_text()) 294 295 archive = self.worker_dir / "archive" 296 archive.mkdir(exist_ok=True) 297 f.rename(archive / f.name) 298 except Exception as e: 299 logging.error(f"Monitor [{self.role}]: {e}") 300 301 time.sleep(self.config.POLL_INTERVAL) 302 303 304 class Orchestrator: 305 """CSPEC-native orchestrator.""" 306 307 def __init__(self, config: OrchestratorConfig = None): 308 self.config = config or OrchestratorConfig() 309 self.workers: Dict[str, WorkerProcess] = {} 310 self._running = False 311 312 if RICH_AVAILABLE: 313 self.console = Console() 314 315 logging.basicConfig( 316 level=logging.INFO, 317 format='%(asctime)s [%(levelname)s] %(message)s', 318 handlers=[logging.FileHandler(self.config.LOGS_DIR / "orch.log"), logging.StreamHandler()] 319 ) 320 self.log = logging.getLogger("orch") 321 322 def start(self): 323 self.log.info(f"Starting orchestrator at {self.config.ORCH_ROOT}") 324 325 for role in self.config.WORKER_ROLES: 326 self.workers[role] = WorkerProcess(role, self.config) 327 328 self._running = True 329 330 for role, w in self.workers.items(): 331 self.log.info(f"Starting {role}") 332 w.start() 333 334 if not list((self.config.QUEUES_DIR / "pending").glob("*.cspec")): 335 self._init_task() 336 337 self._loop() 338 339 def _init_task(self): 340 content = create_task( 341 "Initial Architecture Analysis", 342 "Analyze project and create implementation tasks", 343 "planner", "high", 344 ["project/architecture/machine/types.cspec", "project/architecture/machine/index.cspec"] 345 ) 346 (self.config.QUEUES_DIR / "pending" / "task_initial.cspec").write_text(content) 347 self.log.info("Created initial task") 348 349 def _loop(self): 350 try: 351 while self._running: 352 self._assign() 353 self._display() 354 time.sleep(self.config.POLL_INTERVAL) 355 except KeyboardInterrupt: 356 self.stop() 357 358 def _assign(self): 359 for f in sorted((self.config.QUEUES_DIR / "pending").glob("*.cspec")): 360 data = CspecParser.parse(f.read_text()) 361 role = data.get("meta", {}).get("role", "executor-alpha") 362 w = self.workers.get(role) 363 if w and w.status == "idle": 364 self.log.info(f"{f.name} → {role}") 365 w.assign(f) 366 f.rename(self.config.QUEUES_DIR / "active" / f.name) 367 368 def _display(self): 369 if not RICH_AVAILABLE: 370 return 371 372 self.console.clear() 373 self.console.print(Panel.fit("[bold blue]Alpha/Delta CSPEC Orchestrator[/bold blue]")) 374 375 t = Table(title="Workers") 376 t.add_column("Role", style="cyan") 377 t.add_column("Status") 378 t.add_column("Task") 379 t.add_column("Done", style="blue") 380 381 for role, w in self.workers.items(): 382 st = {"idle": "green", "working": "yellow", "blocked": "red"}.get(w.status, "dim") 383 t.add_row(role, f"[{st}]{w.status}[/{st}]", w.current_task or "—", str(w.completed)) 384 385 self.console.print(t) 386 387 p = len(list((self.config.QUEUES_DIR / "pending").glob("*.cspec"))) 388 a = len(list((self.config.QUEUES_DIR / "active").glob("*.cspec"))) 389 c = len(list((self.config.QUEUES_DIR / "completed").glob("*.cspec"))) 390 q = len(list((self.config.QUEUES_DIR / "questions").glob("*.cspec"))) 391 392 self.console.print(f"\n[yellow]{p}[/yellow] pending | [cyan]{a}[/cyan] active | [green]{c}[/green] done") 393 if q: 394 self.console.print(f"[bold red]⚠ {q} questions pending[/bold red]") 395 396 def add_task(self, title: str, objective: str = None, role: str = "executor-alpha", priority: str = "normal"): 397 content = create_task(title, objective or title, role, priority) 398 tid = gen_id("task", title) 399 (self.config.QUEUES_DIR / "pending" / f"{tid}.cspec").write_text(content) 400 return tid 401 402 def respond(self, qfile: str, answer: str): 403 qdir = self.config.QUEUES_DIR / "questions" 404 qpath = qdir / qfile 405 if not qpath.exists(): 406 matches = list(qdir.glob(f"*{qfile}*")) 407 qpath = matches[0] if matches else None 408 409 if not qpath: 410 self.log.error(f"Not found: {qfile}") 411 return 412 413 data = CspecParser.parse(qpath.read_text()) 414 data["response"] = {"answered": datetime.now().isoformat(), "answer": answer} 415 416 worker = data.get("meta", {}).get("worker") 417 if worker in self.workers: 418 w = self.workers[worker] 419 (w.inbox / f"response_{qpath.stem}.cspec").write_text(CspecParser.write(data)) 420 w.status = "working" 421 422 archive = qdir.parent / "questions_archive" 423 archive.mkdir(exist_ok=True) 424 qpath.rename(archive / qpath.name) 425 426 def stop(self): 427 self._running = False 428 for w in self.workers.values(): 429 w.stop() 430 self.log.info("Stopped") 431 432 def status(self): 433 return { 434 "workers": {r: {"status": w.status, "task": w.current_task, "done": w.completed} 435 for r, w in self.workers.items()}, 436 "queues": { 437 "pending": len(list((self.config.QUEUES_DIR / "pending").glob("*.cspec"))), 438 "active": len(list((self.config.QUEUES_DIR / "active").glob("*.cspec"))), 439 "completed": len(list((self.config.QUEUES_DIR / "completed").glob("*.cspec"))), 440 "questions": len(list((self.config.QUEUES_DIR / "questions").glob("*.cspec"))) 441 } 442 } 443 444 445 def main(): 446 p = argparse.ArgumentParser(description="Alpha/Delta CSPEC Orchestrator") 447 sub = p.add_subparsers(dest="cmd") 448 449 sub.add_parser("start") 450 sub.add_parser("status") 451 452 add = sub.add_parser("add-task") 453 add.add_argument("title") 454 add.add_argument("-o", "--objective") 455 add.add_argument("-r", "--role", default="executor-alpha", 456 choices=["planner", "executor-alpha", "executor-delta", "executor-infra", "docs"]) 457 add.add_argument("-p", "--priority", default="normal", choices=["critical", "high", "normal", "low"]) 458 459 resp = sub.add_parser("respond") 460 resp.add_argument("question", nargs="?") 461 resp.add_argument("answer", nargs="?") 462 463 sub.add_parser("stop") 464 465 args = p.parse_args() 466 467 if not args.cmd: 468 p.print_help() 469 return 470 471 cfg = OrchestratorConfig() 472 orch = Orchestrator(cfg) 473 474 if args.cmd == "start": 475 if not os.environ.get("ANTHROPIC_API_KEY"): 476 print("Error: ANTHROPIC_API_KEY not set") 477 sys.exit(1) 478 orch.start() 479 elif args.cmd == "status": 480 print(json.dumps(orch.status(), indent=2)) 481 elif args.cmd == "add-task": 482 print(f"Created: {orch.add_task(args.title, args.objective, args.role, args.priority)}") 483 elif args.cmd == "respond": 484 if not args.question: 485 for q in (cfg.QUEUES_DIR / "questions").glob("*.cspec"): 486 d = CspecParser.parse(q.read_text()) 487 print(f"- {q.name}: {d.get('question', {}).get('summary', '')}") 488 elif not args.answer: 489 print("Usage: respond <question> <answer>") 490 else: 491 orch.respond(args.question, args.answer) 492 elif args.cmd == "stop": 493 (cfg.ORCH_ROOT / ".stop").touch() 494 print("Stop signal sent") 495 496 497 if __name__ == "__main__": 498 main()