/ main.py
main.py
1 import ast 2 import asyncio 3 import json 4 import os 5 import sys 6 from typing import Any, TypedDict 7 8 import dependencies 9 from custom_types.NIP_35 import Nip35Kind2003Event 10 from rules.simple_types import RuleResult 11 from fastapi import FastAPI 12 from orchestrator import run_rule 13 from settings import config 14 15 16 def validate_rules_at_startup(enabled_rules: dict): 17 """Preflight validation: ensure all enabled rules parse and have main().""" 18 failed = [] 19 for rule_id, _rule_def in enabled_rules.items(): 20 rule_path = f"rules/{rule_id}.py" 21 if not os.path.exists(rule_path): 22 failed.append((rule_id, f"file not found: {rule_path}")) 23 continue 24 try: 25 with open(rule_path) as f: 26 content = f.read() 27 ast.parse(content) 28 # Check for main function 29 if "def main(" not in content: 30 failed.append((rule_id, "missing main() function")) 31 except SyntaxError as e: 32 failed.append((rule_id, f"syntax error: {e}")) 33 34 if failed: 35 print("FATAL: Rule validation failed at startup:", file=sys.stderr) 36 for rule_id, error in failed: 37 print(f" {rule_id}: {error}", file=sys.stderr) 38 sys.exit(1) 39 else: 40 print(f"Preflight check passed: {len(enabled_rules)} rules validated") 41 42 43 # Initialize venvs 44 definitions, enabled_rules_definition = dependencies.get_venvs_definitions() 45 dependencies.generate_venvs(definitions) 46 47 # Validate all enabled rules at startup 48 validate_rules_at_startup(enabled_rules_definition) 49 50 app = FastAPI( 51 title="Utopeer - Curator", 52 version="0.1.0", 53 ) 54 55 56 class HealthcheckResult(TypedDict): 57 healthy: bool 58 59 60 class EntryValidationResponse(TypedDict): 61 passed: bool 62 rules: dict[str, RuleResult] 63 64 65 @app.get("/", summary="Healthcheck", tags=["healthcheck"]) 66 async def root() -> HealthcheckResult: 67 return {"healthy": True} 68 69 70 @app.get("/health", summary="Standard healthcheck", tags=["healthcheck"]) 71 async def health() -> HealthcheckResult: 72 return {"healthy": True} 73 74 75 @app.put("/validate/kind-2003", summary="Validate a kind-2003 formatted entry (NIP-35)", tags=["curation"]) 76 async def validate_entry(entry: Nip35Kind2003Event) -> EntryValidationResponse: 77 enabled_rules = list(enabled_rules_definition.keys()) 78 79 try: 80 entry_dict = entry.model_dump() 81 except AttributeError: 82 entry_dict = vars(entry) 83 84 entry_payload = json.dumps(entry_dict, ensure_ascii=False) 85 86 sem = asyncio.Semaphore(config.max_concurrency) 87 outputs: dict[str, Any] = {} 88 89 async def _bounded(rule: str): 90 async with sem: 91 return await run_rule(rule, entry_payload) 92 93 tasks = [asyncio.create_task(_bounded(rule)) for rule in enabled_rules] 94 95 for coro in asyncio.as_completed(tasks): 96 rule, res = await coro 97 if res.get("ok", True) and "data" in res: 98 outputs[rule] = res["data"] 99 else: 100 # Include error in output, don't drop silently 101 outputs[rule] = { 102 "passed": False, 103 "error": res.get("error", "rule execution failed"), 104 "details": str(res) 105 } 106 107 passed = all(v.get("passed", False) for v in outputs.values()) 108 109 return { 110 "passed": passed, 111 "rules": outputs, 112 }