/ orchestrator.py
orchestrator.py
1 import asyncio 2 import json 3 import logging 4 from typing import Any 5 6 import dependencies 7 from settings import config 8 9 definitions, enabled_rules_definition = dependencies.get_venvs_definitions() 10 dependencies.generate_venvs(definitions) 11 12 13 async def run_rule(rule: str, entry_payload: str) -> tuple[str, dict[str, Any]]: 14 venv_python = dependencies.get_venv_path(enabled_rules_definition[rule]) 15 16 proc = await asyncio.create_subprocess_exec( 17 venv_python, 18 "-u", 19 "rules/run.py", 20 rule, 21 entry_payload, 22 stdout=asyncio.subprocess.PIPE, 23 stderr=asyncio.subprocess.PIPE, 24 ) 25 26 try: 27 stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=config.rule_timeout_s) 28 except TimeoutError: 29 proc.kill() 30 stdout_b, stderr_b = await proc.communicate() 31 return rule, { 32 "ok": False, 33 "error": "timeout", 34 "timeout_s": config.rule_timeout_s, 35 "stdout": stdout_b.decode("utf-8", "replace"), 36 "stderr": stderr_b.decode("utf-8", "replace"), 37 } 38 39 stdout = stdout_b.decode("utf-8", "replace").strip() 40 stderr = stderr_b.decode("utf-8", "replace").strip() 41 42 if stderr: 43 logging.error("Rule %s stderr: %s", rule, stderr) 44 45 if proc.returncode != 0: 46 return rule, { 47 "ok": False, 48 "error": "nonzero_exit", 49 "returncode": proc.returncode, 50 "stdout": stdout, 51 "stderr": stderr, 52 } 53 54 try: 55 data = json.loads(stdout) if stdout else None 56 except json.JSONDecodeError: 57 return rule, { 58 "ok": False, 59 "error": "invalid_json_stdout", 60 "stdout": stdout, 61 "stderr": stderr, 62 } 63 64 return rule, { 65 "ok": True, 66 "data": data, 67 "stderr": stderr, 68 }