/ orchestrator.py
orchestrator.py
  1  #!/usr/bin/env python3
  2  """
  3  Alpha/Delta Protocol — CSPEC-Native Orchestrator
  4  
  5  All worker communication uses .cspec files exclusively.
  6  Human-readable documentation is a derived artifact.
  7  
  8  Windows Path: C:\\Users\\MarcoAniballi\\Radicle\\alpha-delta-orchestrator\\
  9  
 10  Usage:
 11      python orchestrator.py start
 12      python orchestrator.py status
 13      python orchestrator.py add-task "title" --role executor-alpha
 14      python orchestrator.py respond
 15  """
 16  
 17  import os
 18  import sys
 19  import re
 20  import json
 21  import time
 22  import logging
 23  import argparse
 24  import threading
 25  import subprocess
 26  from pathlib import Path
 27  from datetime import datetime
 28  from typing import Optional, Dict, List, Any
 29  
 30  try:
 31      from rich.console import Console
 32      from rich.table import Table
 33      from rich.panel import Panel
 34      RICH_AVAILABLE = True
 35  except ImportError:
 36      RICH_AVAILABLE = False
 37  
 38  
 39  class CspecParser:
 40      """Parse and write .cspec files."""
 41      
 42      @staticmethod
 43      def parse(content: str) -> Dict[str, Any]:
 44          result = {}
 45          lines = content.split('\n')
 46          i = 0
 47          
 48          while i < len(lines):
 49              line = lines[i].rstrip()
 50              stripped = line.lstrip()
 51              
 52              if not stripped or stripped.startswith('#'):
 53                  i += 1
 54                  continue
 55              
 56              if stripped.endswith('{'):
 57                  name = stripped[:-1].strip()
 58                  block, end_i = CspecParser._parse_block(lines, i + 1)
 59                  result[name] = block
 60                  i = end_i + 1
 61                  continue
 62              
 63              if ':' in stripped:
 64                  key, value = stripped.split(':', 1)
 65                  result[key.strip()] = CspecParser._parse_value(value.strip())
 66              
 67              i += 1
 68          
 69          return result
 70      
 71      @staticmethod
 72      def _parse_block(lines: List[str], start: int) -> tuple:
 73          result = {}
 74          i = start
 75          depth = 1
 76          
 77          while i < len(lines) and depth > 0:
 78              line = lines[i].rstrip()
 79              stripped = line.lstrip()
 80              
 81              if stripped == '}':
 82                  depth -= 1
 83                  if depth == 0:
 84                      return result, i
 85              elif stripped.endswith('{'):
 86                  depth += 1
 87                  name = stripped[:-1].strip()
 88                  block, end_i = CspecParser._parse_block(lines, i + 1)
 89                  result[name] = block
 90                  i = end_i
 91              elif ':' in stripped and not stripped.startswith('#'):
 92                  key, value = stripped.split(':', 1)
 93                  result[key.strip()] = CspecParser._parse_value(value.strip())
 94              i += 1
 95          
 96          return result, i
 97      
 98      @staticmethod
 99      def _parse_value(value: str) -> Any:
100          if value.startswith('[') and value.endswith(']'):
101              items = value[1:-1].split(',')
102              return [i.strip().strip('"\'') for i in items if i.strip()]
103          if value.lower() in ('true', 'false'):
104              return value.lower() == 'true'
105          if value.isdigit():
106              return int(value)
107          if value in ('null', '?', ''):
108              return None
109          return value.strip('"\'')
110      
111      @staticmethod
112      def write(data: Dict[str, Any], indent: int = 0) -> str:
113          lines = []
114          ind = "  " * indent
115          
116          for key, value in data.items():
117              if isinstance(value, dict):
118                  lines.append(f"{ind}{key} {{")
119                  lines.append(CspecParser.write(value, indent + 1))
120                  lines.append(f"{ind}}}")
121              elif isinstance(value, list):
122                  items = ', '.join(f'"{v}"' for v in value)
123                  lines.append(f'{ind}{key}: [{items}]')
124              elif isinstance(value, bool):
125                  lines.append(f"{ind}{key}: {str(value).lower()}")
126              elif value is None:
127                  lines.append(f"{ind}{key}: null")
128              else:
129                  lines.append(f'{ind}{key}: "{value}"' if ' ' in str(value) else f"{ind}{key}: {value}")
130          
131          return '\n'.join(lines)
132  
133  
134  class OrchestratorConfig:
135      """Configuration with Windows paths."""
136      
137      def __init__(self):
138          if sys.platform == "win32":
139              self.RADICLE_ROOT = Path(r"C:\Users\MarcoAniballi\Radicle")
140          else:
141              self.RADICLE_ROOT = Path.home() / "Radicle"
142          
143          self.WORKER_ROLES = ["planner", "executor-alpha", "executor-delta", "executor-infra", "docs"]
144          self.CLAUDE_CMD = "claude"
145          self.AUTO_ACCEPT = True
146          self.POLL_INTERVAL = 2.0
147          
148          self._ensure_dirs()
149      
150      @property
151      def CONTEXT_REPO(self): return self.RADICLE_ROOT / "alpha-delta-context"
152      @property
153      def PROTOCOL_REPO(self): return self.RADICLE_ROOT / "alpha-delta-protocol"
154      @property
155      def ORCH_ROOT(self): return self.RADICLE_ROOT / "alpha-delta-orchestrator"
156      @property
157      def WORKERS_DIR(self): return self.ORCH_ROOT / "workers"
158      @property
159      def QUEUES_DIR(self): return self.ORCH_ROOT / "queues"
160      @property
161      def LOGS_DIR(self): return self.ORCH_ROOT / "logs"
162      
163      def _ensure_dirs(self):
164          self.ORCH_ROOT.mkdir(parents=True, exist_ok=True)
165          for d in [self.WORKERS_DIR, self.QUEUES_DIR, self.LOGS_DIR]:
166              d.mkdir(exist_ok=True)
167          for sub in ["pending", "active", "completed", "questions"]:
168              (self.QUEUES_DIR / sub).mkdir(exist_ok=True)
169          for role in self.WORKER_ROLES:
170              wd = self.WORKERS_DIR / role
171              wd.mkdir(exist_ok=True)
172              (wd / "inbox").mkdir(exist_ok=True)
173              (wd / "outbox").mkdir(exist_ok=True)
174              (wd / "archive").mkdir(exist_ok=True)
175  
176  
177  def gen_id(prefix: str, suffix: str = "") -> str:
178      ts = datetime.now().strftime("%Y%m%d_%H%M%S")
179      if suffix:
180          suffix = re.sub(r'[^a-z0-9]', '_', suffix.lower())[:20]
181          return f"{prefix}_{ts}_{suffix}"
182      return f"{prefix}_{ts}"
183  
184  
185  def create_task(title: str, objective: str, role: str, priority: str = "normal",
186                  context_load: List[str] = None) -> str:
187      task_id = gen_id("task", title)
188      data = {
189          "meta": {"id": task_id, "created": datetime.now().isoformat(), "priority": priority, "role": role},
190          "context": {"load": context_load or ["project/architecture/machine/index.cspec"]},
191          "task": {"title": title, "objective": objective},
192          "refs": {}
193      }
194      return f"# Task: {title}\n# Generated: {datetime.now().isoformat()}\n\n{CspecParser.write(data)}"
195  
196  
197  class WorkerProcess:
198      """Manages a Claude Code worker."""
199      
200      def __init__(self, role: str, config: OrchestratorConfig):
201          self.role = role
202          self.config = config
203          self.status = "stopped"
204          self.current_task = None
205          self.completed = 0
206          self._stop = threading.Event()
207          self._thread = None
208      
209      @property
210      def inbox(self): return self.config.WORKERS_DIR / self.role / "inbox"
211      @property
212      def outbox(self): return self.config.WORKERS_DIR / self.role / "outbox"
213      @property
214      def worker_dir(self): return self.config.WORKERS_DIR / self.role
215      
216      def _prompt(self) -> str:
217          return f"""# CSPEC-Native Worker: {self.role}
218  
219  ## Protocol
220  - Tasks arrive in: inbox/
221  - Results go to: outbox/
222  - Load types.cspec first, then task.context.load[]
223  
224  ## Paths
225  Context: {self.config.CONTEXT_REPO}
226  Protocol: {self.config.PROTOCOL_REPO}
227  Inbox: {self.inbox}
228  
229  ## Now
230  Check inbox/ for .cspec tasks. Execute and write result to outbox/."""
231      
232      def start(self):
233          prompt_file = self.worker_dir / ".worker_prompt.md"
234          prompt_file.write_text(self._prompt())
235          
236          init = "Read .worker_prompt.md, check inbox/ for tasks."
237          
238          if sys.platform == "win32":
239              cmd = f'start "Claude-{self.role}" cmd /k "cd /d {self.worker_dir} && {self.config.CLAUDE_CMD}'
240              if self.config.AUTO_ACCEPT:
241                  cmd += ' --dangerously-skip-permissions'
242              cmd += f' "{init}""'
243              subprocess.Popen(cmd, shell=True)
244          else:
245              session = f"claude-{self.role}"
246              subprocess.run(["tmux", "kill-session", "-t", session], capture_output=True)
247              subprocess.run(["tmux", "new-session", "-d", "-s", session, "-c", str(self.worker_dir)])
248              
249              claude = f"export ANTHROPIC_API_KEY='{os.environ.get('ANTHROPIC_API_KEY','')}' && {self.config.CLAUDE_CMD}"
250              if self.config.AUTO_ACCEPT:
251                  claude += " --dangerously-skip-permissions"
252              claude += f' "{init}"'
253              subprocess.run(["tmux", "send-keys", "-t", session, claude, "Enter"])
254          
255          self.status = "idle"
256          self._stop.clear()
257          self._thread = threading.Thread(target=self._monitor, daemon=True)
258          self._thread.start()
259      
260      def stop(self):
261          self._stop.set()
262          if sys.platform != "win32":
263              session = f"claude-{self.role}"
264              subprocess.run(["tmux", "send-keys", "-t", session, "C-c"], capture_output=True)
265              subprocess.run(["tmux", "kill-session", "-t", session], capture_output=True)
266          self.status = "stopped"
267      
268      def assign(self, task_path: Path):
269          dest = self.inbox / task_path.name
270          dest.write_text(task_path.read_text())
271          data = CspecParser.parse(task_path.read_text())
272          self.current_task = data.get("meta", {}).get("id")
273          self.status = "working"
274      
275      def _monitor(self):
276          while not self._stop.is_set():
277              try:
278                  for f in self.outbox.glob("*.cspec"):
279                      data = CspecParser.parse(f.read_text())
280                      
281                      if "question" in data:
282                          dest = self.config.QUEUES_DIR / "questions" / f.name
283                          dest.write_text(f.read_text())
284                          self.status = "blocked"
285                      elif "output" in data or "meta" in data:
286                          dest = self.config.QUEUES_DIR / "completed" / f.name
287                          dest.write_text(f.read_text())
288                          self.completed += 1
289                          self.current_task = None
290                          self.status = "idle"
291                      elif "task" in data:
292                          dest = self.config.QUEUES_DIR / "pending" / f.name
293                          dest.write_text(f.read_text())
294                      
295                      archive = self.worker_dir / "archive"
296                      archive.mkdir(exist_ok=True)
297                      f.rename(archive / f.name)
298              except Exception as e:
299                  logging.error(f"Monitor [{self.role}]: {e}")
300              
301              time.sleep(self.config.POLL_INTERVAL)
302  
303  
304  class Orchestrator:
305      """CSPEC-native orchestrator."""
306      
307      def __init__(self, config: OrchestratorConfig = None):
308          self.config = config or OrchestratorConfig()
309          self.workers: Dict[str, WorkerProcess] = {}
310          self._running = False
311          
312          if RICH_AVAILABLE:
313              self.console = Console()
314          
315          logging.basicConfig(
316              level=logging.INFO,
317              format='%(asctime)s [%(levelname)s] %(message)s',
318              handlers=[logging.FileHandler(self.config.LOGS_DIR / "orch.log"), logging.StreamHandler()]
319          )
320          self.log = logging.getLogger("orch")
321      
322      def start(self):
323          self.log.info(f"Starting orchestrator at {self.config.ORCH_ROOT}")
324          
325          for role in self.config.WORKER_ROLES:
326              self.workers[role] = WorkerProcess(role, self.config)
327          
328          self._running = True
329          
330          for role, w in self.workers.items():
331              self.log.info(f"Starting {role}")
332              w.start()
333          
334          if not list((self.config.QUEUES_DIR / "pending").glob("*.cspec")):
335              self._init_task()
336          
337          self._loop()
338      
339      def _init_task(self):
340          content = create_task(
341              "Initial Architecture Analysis",
342              "Analyze project and create implementation tasks",
343              "planner", "high",
344              ["project/architecture/machine/types.cspec", "project/architecture/machine/index.cspec"]
345          )
346          (self.config.QUEUES_DIR / "pending" / "task_initial.cspec").write_text(content)
347          self.log.info("Created initial task")
348      
349      def _loop(self):
350          try:
351              while self._running:
352                  self._assign()
353                  self._display()
354                  time.sleep(self.config.POLL_INTERVAL)
355          except KeyboardInterrupt:
356              self.stop()
357      
358      def _assign(self):
359          for f in sorted((self.config.QUEUES_DIR / "pending").glob("*.cspec")):
360              data = CspecParser.parse(f.read_text())
361              role = data.get("meta", {}).get("role", "executor-alpha")
362              w = self.workers.get(role)
363              if w and w.status == "idle":
364                  self.log.info(f"{f.name} → {role}")
365                  w.assign(f)
366                  f.rename(self.config.QUEUES_DIR / "active" / f.name)
367      
368      def _display(self):
369          if not RICH_AVAILABLE:
370              return
371          
372          self.console.clear()
373          self.console.print(Panel.fit("[bold blue]Alpha/Delta CSPEC Orchestrator[/bold blue]"))
374          
375          t = Table(title="Workers")
376          t.add_column("Role", style="cyan")
377          t.add_column("Status")
378          t.add_column("Task")
379          t.add_column("Done", style="blue")
380          
381          for role, w in self.workers.items():
382              st = {"idle": "green", "working": "yellow", "blocked": "red"}.get(w.status, "dim")
383              t.add_row(role, f"[{st}]{w.status}[/{st}]", w.current_task or "—", str(w.completed))
384          
385          self.console.print(t)
386          
387          p = len(list((self.config.QUEUES_DIR / "pending").glob("*.cspec")))
388          a = len(list((self.config.QUEUES_DIR / "active").glob("*.cspec")))
389          c = len(list((self.config.QUEUES_DIR / "completed").glob("*.cspec")))
390          q = len(list((self.config.QUEUES_DIR / "questions").glob("*.cspec")))
391          
392          self.console.print(f"\n[yellow]{p}[/yellow] pending | [cyan]{a}[/cyan] active | [green]{c}[/green] done")
393          if q:
394              self.console.print(f"[bold red]⚠ {q} questions pending[/bold red]")
395      
396      def add_task(self, title: str, objective: str = None, role: str = "executor-alpha", priority: str = "normal"):
397          content = create_task(title, objective or title, role, priority)
398          tid = gen_id("task", title)
399          (self.config.QUEUES_DIR / "pending" / f"{tid}.cspec").write_text(content)
400          return tid
401      
402      def respond(self, qfile: str, answer: str):
403          qdir = self.config.QUEUES_DIR / "questions"
404          qpath = qdir / qfile
405          if not qpath.exists():
406              matches = list(qdir.glob(f"*{qfile}*"))
407              qpath = matches[0] if matches else None
408          
409          if not qpath:
410              self.log.error(f"Not found: {qfile}")
411              return
412          
413          data = CspecParser.parse(qpath.read_text())
414          data["response"] = {"answered": datetime.now().isoformat(), "answer": answer}
415          
416          worker = data.get("meta", {}).get("worker")
417          if worker in self.workers:
418              w = self.workers[worker]
419              (w.inbox / f"response_{qpath.stem}.cspec").write_text(CspecParser.write(data))
420              w.status = "working"
421          
422          archive = qdir.parent / "questions_archive"
423          archive.mkdir(exist_ok=True)
424          qpath.rename(archive / qpath.name)
425      
426      def stop(self):
427          self._running = False
428          for w in self.workers.values():
429              w.stop()
430          self.log.info("Stopped")
431      
432      def status(self):
433          return {
434              "workers": {r: {"status": w.status, "task": w.current_task, "done": w.completed} 
435                         for r, w in self.workers.items()},
436              "queues": {
437                  "pending": len(list((self.config.QUEUES_DIR / "pending").glob("*.cspec"))),
438                  "active": len(list((self.config.QUEUES_DIR / "active").glob("*.cspec"))),
439                  "completed": len(list((self.config.QUEUES_DIR / "completed").glob("*.cspec"))),
440                  "questions": len(list((self.config.QUEUES_DIR / "questions").glob("*.cspec")))
441              }
442          }
443  
444  
445  def main():
446      p = argparse.ArgumentParser(description="Alpha/Delta CSPEC Orchestrator")
447      sub = p.add_subparsers(dest="cmd")
448      
449      sub.add_parser("start")
450      sub.add_parser("status")
451      
452      add = sub.add_parser("add-task")
453      add.add_argument("title")
454      add.add_argument("-o", "--objective")
455      add.add_argument("-r", "--role", default="executor-alpha",
456                      choices=["planner", "executor-alpha", "executor-delta", "executor-infra", "docs"])
457      add.add_argument("-p", "--priority", default="normal", choices=["critical", "high", "normal", "low"])
458      
459      resp = sub.add_parser("respond")
460      resp.add_argument("question", nargs="?")
461      resp.add_argument("answer", nargs="?")
462      
463      sub.add_parser("stop")
464      
465      args = p.parse_args()
466      
467      if not args.cmd:
468          p.print_help()
469          return
470      
471      cfg = OrchestratorConfig()
472      orch = Orchestrator(cfg)
473      
474      if args.cmd == "start":
475          if not os.environ.get("ANTHROPIC_API_KEY"):
476              print("Error: ANTHROPIC_API_KEY not set")
477              sys.exit(1)
478          orch.start()
479      elif args.cmd == "status":
480          print(json.dumps(orch.status(), indent=2))
481      elif args.cmd == "add-task":
482          print(f"Created: {orch.add_task(args.title, args.objective, args.role, args.priority)}")
483      elif args.cmd == "respond":
484          if not args.question:
485              for q in (cfg.QUEUES_DIR / "questions").glob("*.cspec"):
486                  d = CspecParser.parse(q.read_text())
487                  print(f"- {q.name}: {d.get('question', {}).get('summary', '')}")
488          elif not args.answer:
489              print("Usage: respond <question> <answer>")
490          else:
491              orch.respond(args.question, args.answer)
492      elif args.cmd == "stop":
493          (cfg.ORCH_ROOT / ".stop").touch()
494          print("Stop signal sent")
495  
496  
497  if __name__ == "__main__":
498      main()