/ main.py
main.py
  1  import ast
  2  import asyncio
  3  import json
  4  import os
  5  import sys
  6  from typing import Any, TypedDict
  7  
  8  import dependencies
  9  from custom_types.NIP_35 import Nip35Kind2003Event
 10  from rules.simple_types import RuleResult
 11  from fastapi import FastAPI
 12  from orchestrator import run_rule
 13  from settings import config
 14  
 15  
 16  def validate_rules_at_startup(enabled_rules: dict):
 17      """Preflight validation: ensure all enabled rules parse and have main()."""
 18      failed = []
 19      for rule_id, _rule_def in enabled_rules.items():
 20          rule_path = f"rules/{rule_id}.py"
 21          if not os.path.exists(rule_path):
 22              failed.append((rule_id, f"file not found: {rule_path}"))
 23              continue
 24          try:
 25              with open(rule_path) as f:
 26                  content = f.read()
 27              ast.parse(content)
 28              # Check for main function
 29              if "def main(" not in content:
 30                  failed.append((rule_id, "missing main() function"))
 31          except SyntaxError as e:
 32              failed.append((rule_id, f"syntax error: {e}"))
 33  
 34      if failed:
 35          print("FATAL: Rule validation failed at startup:", file=sys.stderr)
 36          for rule_id, error in failed:
 37              print(f"  {rule_id}: {error}", file=sys.stderr)
 38          sys.exit(1)
 39      else:
 40          print(f"Preflight check passed: {len(enabled_rules)} rules validated")
 41  
 42  
 43  # Initialize venvs
 44  definitions, enabled_rules_definition = dependencies.get_venvs_definitions()
 45  dependencies.generate_venvs(definitions)
 46  
 47  # Validate all enabled rules at startup
 48  validate_rules_at_startup(enabled_rules_definition)
 49  
 50  app = FastAPI(
 51      title="Utopeer - Curator",
 52      version="0.1.0",
 53  )
 54  
 55  
 56  class HealthcheckResult(TypedDict):
 57      healthy: bool
 58  
 59  
 60  class EntryValidationResponse(TypedDict):
 61      passed: bool
 62      rules: dict[str, RuleResult]
 63  
 64  
 65  @app.get("/", summary="Healthcheck", tags=["healthcheck"])
 66  async def root() -> HealthcheckResult:
 67      return {"healthy": True}
 68  
 69  
 70  @app.get("/health", summary="Standard healthcheck", tags=["healthcheck"])
 71  async def health() -> HealthcheckResult:
 72      return {"healthy": True}
 73  
 74  
 75  @app.put("/validate/kind-2003", summary="Validate a kind-2003 formatted entry (NIP-35)", tags=["curation"])
 76  async def validate_entry(entry: Nip35Kind2003Event) -> EntryValidationResponse:
 77      enabled_rules = list(enabled_rules_definition.keys())
 78  
 79      try:
 80          entry_dict = entry.model_dump()
 81      except AttributeError:
 82          entry_dict = vars(entry)
 83  
 84      entry_payload = json.dumps(entry_dict, ensure_ascii=False)
 85  
 86      sem = asyncio.Semaphore(config.max_concurrency)
 87      outputs: dict[str, Any] = {}
 88  
 89      async def _bounded(rule: str):
 90          async with sem:
 91              return await run_rule(rule, entry_payload)
 92  
 93      tasks = [asyncio.create_task(_bounded(rule)) for rule in enabled_rules]
 94  
 95      for coro in asyncio.as_completed(tasks):
 96          rule, res = await coro
 97          if res.get("ok", True) and "data" in res:
 98              outputs[rule] = res["data"]
 99          else:
100              # Include error in output, don't drop silently
101              outputs[rule] = {
102                  "passed": False,
103                  "error": res.get("error", "rule execution failed"),
104                  "details": str(res)
105              }
106  
107      passed = all(v.get("passed", False) for v in outputs.values())
108  
109      return {
110          "passed": passed,
111          "rules": outputs,
112      }