/ scripts / claude-workflow-runner.py
claude-workflow-runner.py
  1  #!/usr/bin/env python3
  2  """
  3  Claude Workflow Runner (Token-Optimized)
  4  Executes predefined Claude Code agent workflows from cron or CLI.
  5  Uses compressed cspec-style prompts with grammar expansion.
  6  
  7  Usage:
  8      ./claude-workflow-runner.py ci-repair           # Run workflow
  9      ./claude-workflow-runner.py ci-repair --dry-run # Show command
 10      ./claude-workflow-runner.py --list              # List workflows
 11  """
 12  
 13  import argparse
 14  import json
 15  import os
 16  import re
 17  import subprocess
 18  import sys
 19  from datetime import datetime
 20  from pathlib import Path
 21  
 22  import yaml
 23  
 24  
 25  # Default paths
 26  SCRIPT_DIR = Path(__file__).parent
 27  WORKFLOWS_FILE = SCRIPT_DIR / "workflows.yaml"
 28  GRAMMAR_FILE = SCRIPT_DIR / "workflow-grammar.yaml"
 29  LOG_DIR = Path("/var/log/claude-workflows")
 30  CONTEXT_DIR = Path("/home/devops/working-repos/alpha-delta-context")
 31  
 32  
 33  def load_yaml(file_path: Path) -> dict:
 34      """Load YAML file."""
 35      if not file_path.exists():
 36          return {}
 37      with open(file_path, 'r') as f:
 38          return yaml.safe_load(f) or {}
 39  
 40  
 41  def load_workflows(workflows_file: Path) -> dict:
 42      """Load workflow definitions from YAML file."""
 43      if not workflows_file.exists():
 44          print(f"Error: Workflows file not found: {workflows_file}")
 45          sys.exit(1)
 46      return load_yaml(workflows_file)
 47  
 48  
 49  def load_grammar() -> dict:
 50      """Load prompt grammar for expansion."""
 51      return load_yaml(GRAMMAR_FILE)
 52  
 53  
 54  def build_system_prompt(grammar: dict) -> str:
 55      """Build compressed system prompt from grammar."""
 56      if not grammar:
 57          return ""
 58  
 59      lines = [
 60          "# Workflow Grammar (expand compressed prompts)",
 61          "",
 62          "## Commands"
 63      ]
 64  
 65      for cmd, spec in grammar.get("commands", {}).items():
 66          lines.append(f"{cmd}: {spec.get('expand', '').strip()}")
 67  
 68      lines.append("")
 69      lines.append("## Repos")
 70      for list_name, repos in grammar.get("repo_lists", {}).items():
 71          if isinstance(repos, dict):
 72              flat = []
 73              for category, items in repos.items():
 74                  flat.extend(items)
 75              lines.append(f"{list_name}: {','.join(flat)}")
 76          else:
 77              lines.append(f"{list_name}: {','.join(repos)}")
 78  
 79      lines.append("")
 80      lines.append("## Tools")
 81      for tool, cmd in grammar.get("tool_mappings", {}).items():
 82          lines.append(f"{tool}: `{cmd}`")
 83  
 84      return "\n".join(lines)
 85  
 86  
 87  def list_workflows(workflows: dict):
 88      """Print available workflows."""
 89      print("Available workflows:")
 90      print()
 91  
 92      # Calculate token savings
 93      total_original = 0
 94      total_compressed = 0
 95  
 96      for name, config in workflows.get("workflows", {}).items():
 97          schedule = config.get("schedule", "manual")
 98          desc = config.get("description", "No description")
 99          prompt = config.get("prompt", "")
100          turns = config.get("max_turns", 30)
101  
102          # Estimate tokens (rough: 4 chars = 1 token)
103          compressed_tokens = len(prompt) // 4
104  
105          print(f"  {name}")
106          print(f"    Schedule: {schedule}")
107          print(f"    Turns: {turns}, Prompt: ~{compressed_tokens} tokens")
108          print()
109  
110          total_compressed += compressed_tokens
111  
112      print(f"Total prompt tokens: ~{total_compressed}")
113      print(f"Grammar overhead: ~500 tokens (one-time)")
114  
115  
116  def build_claude_command(workflow: dict, workflow_name: str, grammar: dict) -> tuple:
117      """Build the claude CLI command from workflow config.
118      Returns (cmd_list, working_directory).
119      """
120      # Build system prompt with grammar
121      system_prompt = build_system_prompt(grammar)
122  
123      # Combine system prompt and user prompt
124      full_prompt = workflow["prompt"]
125      if system_prompt:
126          full_prompt = f"{system_prompt}\n\n---\n\nTask: {workflow['prompt']}"
127  
128      cmd = ["claude", "-p", full_prompt]
129  
130      # Add allowed tools
131      if "allowed_tools" in workflow:
132          tools = ",".join(workflow["allowed_tools"])
133          cmd.extend(["--allowedTools", tools])
134  
135      # Add output format (default to JSON for machine parsing)
136      output_format = workflow.get("output_format", "json")
137      if output_format == "json":
138          cmd.extend(["--output-format", "json"])
139  
140      # Note: max_turns is tracked for documentation but not a CLI flag
141      # Could use --max-budget-usd for cost control instead
142  
143      # Skip permissions for automated runs
144      if workflow.get("skip_permissions", True):
145          cmd.append("--dangerously-skip-permissions")
146  
147      # Get working directory (will cd to this before running)
148      cwd = workflow.get("cwd", str(CONTEXT_DIR))
149  
150      return cmd, cwd
151  
152  
153  def run_workflow(workflow_name: str, workflows: dict, grammar: dict,
154                   dry_run: bool = False, verbose: bool = False):
155      """Execute a workflow."""
156      if workflow_name not in workflows.get("workflows", {}):
157          print(f"Error: Unknown workflow '{workflow_name}'")
158          print("Use --list to see available workflows")
159          sys.exit(1)
160  
161      workflow = workflows["workflows"][workflow_name]
162      timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
163  
164      print(f"[{timestamp}] {workflow_name}")
165      print(f"  {workflow.get('description', 'N/A')}")
166  
167      # Build command
168      cmd, cwd = build_claude_command(workflow, workflow_name, grammar)
169  
170      # Token estimate
171      prompt_tokens = len(workflow.get("prompt", "")) // 4
172      grammar_tokens = 500  # Approximate
173      print(f"  Tokens: ~{prompt_tokens} prompt + ~{grammar_tokens} grammar")
174      print(f"  Working dir: {cwd}")
175  
176      if verbose or dry_run:
177          print(f"\nCommand: {' '.join(cmd[:6])}...")  # Truncate for readability
178  
179      if dry_run:
180          print("\n[DRY RUN] Would execute above command")
181          return 0
182  
183      # Ensure log directory exists
184      LOG_DIR.mkdir(parents=True, exist_ok=True)
185      log_file = LOG_DIR / f"{workflow_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.log"
186  
187      # Check for authentication (Claude Code auto-uses ~/.claude/.credentials.json)
188      api_key = os.environ.get("ANTHROPIC_API_KEY")
189      oauth_file = Path.home() / ".claude" / ".credentials.json"
190  
191      if api_key:
192          print(f"  Auth: API key")
193      elif oauth_file.exists():
194          print(f"  Auth: OAuth ({oauth_file})")
195      else:
196          print("\n⚠ No auth found. Run 'claude login' or set ANTHROPIC_API_KEY")
197  
198      print(f"\nLog: {log_file}")
199      print("-" * 40)
200  
201      try:
202          with open(log_file, 'w') as log:
203              log.write(f"# {workflow_name} @ {timestamp}\n")
204              log.write(f"# Tokens: ~{prompt_tokens + grammar_tokens}\n")
205              log.write(f"# Working dir: {cwd}\n\n")
206  
207              process = subprocess.Popen(
208                  cmd,
209                  stdout=subprocess.PIPE,
210                  stderr=subprocess.STDOUT,
211                  text=True,
212                  bufsize=1,
213                  cwd=cwd  # Run from workflow's working directory
214              )
215  
216              output_lines = []
217              for line in process.stdout:
218                  print(line, end='')
219                  log.write(line)
220                  output_lines.append(line)
221                  log.flush()
222  
223              process.wait()
224              exit_code = process.returncode
225  
226              # Parse JSON output if available
227              end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
228              log.write(f"\n# Finished: {end_time}, Exit: {exit_code}\n")
229  
230      except KeyboardInterrupt:
231          print("\n⚠ Interrupted")
232          return 130
233      except Exception as e:
234          print(f"\n✗ Error: {e}")
235          return 1
236  
237      print("-" * 40)
238      status = "✓" if exit_code == 0 else "✗"
239      print(f"{status} {workflow_name}: exit {exit_code}")
240  
241      # Post-hooks
242      if "post_hooks" in workflow and exit_code == 0:
243          for hook in workflow["post_hooks"]:
244              subprocess.run(hook, shell=True, check=False)
245  
246      return exit_code
247  
248  
249  def main():
250      parser = argparse.ArgumentParser(
251          description="Claude Workflow Runner (Token-Optimized)",
252          formatter_class=argparse.RawDescriptionHelpFormatter,
253          epilog="""
254  Auth: ANTHROPIC_API_KEY or 'claude login' (Max subscription)
255  Env:  CLAUDE_WORKFLOWS to override workflows.yaml path
256          """
257      )
258  
259      parser.add_argument("workflow", nargs="?", help="Workflow to execute")
260      parser.add_argument("-l", "--list", action="store_true", help="List workflows")
261      parser.add_argument("-n", "--dry-run", action="store_true", help="Show command only")
262      parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
263      parser.add_argument("-f", "--workflows-file", type=Path, default=WORKFLOWS_FILE)
264  
265      args = parser.parse_args()
266  
267      # Load config
268      workflows_file = Path(os.environ.get("CLAUDE_WORKFLOWS", args.workflows_file))
269      workflows = load_workflows(workflows_file)
270      grammar = load_grammar()
271  
272      if args.list:
273          list_workflows(workflows)
274          return 0
275  
276      if not args.workflow:
277          parser.print_help()
278          return 1
279  
280      return run_workflow(args.workflow, workflows, grammar,
281                          dry_run=args.dry_run, verbose=args.verbose)
282  
283  
284  if __name__ == "__main__":
285      sys.exit(main())