agent.py
1 """Bob Knowledge Gardener Agent — nightly knowledge consolidation. 2 3 Triggered at 2 AM ET. Performs: 4 - Scan today's agent execution results from NATS JetStream 5 - Summarize agent activity into a daily digest 6 - Query Oxigraph for stale or duplicate triples 7 - Generate a knowledge consolidation report 8 - (Future) Prune Graphiti temporal memory, merge knowledge graph entries 9 10 This is Bob's "AutoDream" — inspired by Claude Code's memory consolidation daemon. 11 """ 12 13 import asyncio 14 import json 15 import os 16 import sys 17 import time 18 import urllib.request 19 import urllib.error 20 from datetime import datetime, timezone, timedelta 21 from pathlib import Path 22 23 import nats 24 25 NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222") 26 OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://127.0.0.1:7878") 27 GRAPHITI_ENABLED = os.getenv("GRAPHITI_ENABLED", "true").lower() == "true" 28 ROLE = "knowledge_gardener" 29 DIGEST_DIR = os.getenv("DIGEST_DIR", "/srv/bob/digests") 30 31 32 def sparql_query(query: str) -> dict: 33 """Execute a SPARQL query against Oxigraph.""" 34 try: 35 data = query.encode("utf-8") 36 req = urllib.request.Request( 37 f"{OXIGRAPH_URL}/query", 38 data=data, 39 headers={ 40 "Content-Type": "application/sparql-query", 41 "Accept": "application/sparql-results+json", 42 }, 43 ) 44 with urllib.request.urlopen(req, timeout=10) as resp: 45 return json.loads(resp.read().decode()) 46 except Exception as e: 47 return {"error": str(e)} 48 49 50 def count_triples() -> int: 51 """Count total triples in the knowledge graph.""" 52 result = sparql_query("SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }") 53 try: 54 return int(result["results"]["bindings"][0]["count"]["value"]) 55 except (KeyError, IndexError, ValueError): 56 return -1 57 58 59 def get_triple_summary() -> dict: 60 """Get a summary of the knowledge graph by predicate.""" 61 result = sparql_query(""" 62 SELECT ?p (COUNT(*) AS ?count) WHERE { ?s ?p ?o } 63 GROUP BY ?p ORDER BY DESC(?count) LIMIT 20 64 """) 65 predicates = [] 66 try: 67 for binding in result.get("results", {}).get("bindings", []): 68 predicates.append({ 69 "predicate": binding["p"]["value"].split("#")[-1].split("/")[-1], 70 "count": int(binding["count"]["value"]), 71 }) 72 except (KeyError, ValueError): 73 pass 74 return {"predicates": predicates} 75 76 77 async def collect_agent_results(js) -> list: 78 """Collect all agent results from the past 24 hours.""" 79 results = [] 80 for role in ["home_keeper", "morning_coordinator", "evening_coordinator"]: 81 try: 82 sub = await js.subscribe( 83 f"bob.agent.{role}.result", 84 deliver_policy="last", 85 ) 86 msg = await asyncio.wait_for(sub.next_msg(), timeout=3) 87 await sub.unsubscribe() 88 data = json.loads(msg.data.decode()) 89 results.append({ 90 "role": role, 91 "run_id": data.get("run_id", "?"), 92 "completed_at": data.get("completed_at", "?"), 93 "severity": data.get("severity", "ok"), 94 "alert_count": len(data.get("alerts", data.get("active_alerts", []))), 95 }) 96 except Exception: 97 continue 98 return results 99 100 101 def is_weekly_run() -> bool: 102 """Check if today is Sunday (weekly digest day).""" 103 return datetime.now().weekday() == 6 # Sunday 104 105 106 def generate_weekly_digest(daily_digests_dir: str) -> str: 107 """Generate a weekly summary from daily digests.""" 108 lines = [ 109 f"# Bob Weekly Family Digest — Week of {datetime.now().strftime('%Y-%m-%d')}", 110 "", 111 ] 112 113 # Read all daily digests from this week 114 digest_files = sorted(Path(daily_digests_dir).glob("*.md")) 115 recent = [f for f in digest_files if f.stem >= (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")] 116 117 if recent: 118 lines.append(f"## Summary ({len(recent)} days of activity)") 119 lines.append("") 120 121 total_alerts = 0 122 agent_activity = {} 123 for f in recent: 124 content = f.read_text() 125 # Count alerts mentioned 126 total_alerts += content.count("CRITICAL") + content.count("WARNING") 127 # Track agent activity 128 for line in content.split("\n"): 129 if line.startswith("- **") and "**:" in line: 130 role = line.split("**")[1] 131 agent_activity[role] = agent_activity.get(role, 0) + 1 132 133 lines.append(f"- Days covered: {len(recent)}") 134 lines.append(f"- Total alerts across week: {total_alerts}") 135 if agent_activity: 136 lines.append("- Agent runs:") 137 for role, count in sorted(agent_activity.items()): 138 lines.append(f" - {role}: {count} runs") 139 else: 140 lines.append("No daily digests found for this week.") 141 142 lines.extend([ 143 "", 144 "## Recommendations", 145 "- Review any recurring alerts for root cause analysis", 146 "- Consider adjusting alert thresholds if too noisy", 147 "- Check knowledge graph growth and prune if needed", 148 "", 149 f"*Generated at {datetime.now().isoformat()}*", 150 ]) 151 152 return "\n".join(lines) 153 154 155 def generate_daily_digest(agent_results: list, triple_count: int, triple_summary: dict) -> str: 156 """Generate a human-readable daily digest.""" 157 now = datetime.now() 158 lines = [ 159 f"# Bob Daily Knowledge Digest — {now.strftime('%Y-%m-%d')}", 160 "", 161 "## Agent Activity", 162 ] 163 164 if agent_results: 165 for r in agent_results: 166 status = "OK" if r["severity"] == "ok" else r["severity"].upper() 167 lines.append(f"- **{r['role']}**: {status} ({r['alert_count']} alerts) — run {r['run_id']}") 168 else: 169 lines.append("- No agent results found for today") 170 171 lines.extend([ 172 "", 173 "## Knowledge Graph", 174 f"- Total triples: {triple_count}", 175 ]) 176 177 if triple_summary.get("predicates"): 178 lines.append("- Top predicates:") 179 for p in triple_summary["predicates"][:10]: 180 lines.append(f" - {p['predicate']}: {p['count']}") 181 182 lines.extend([ 183 "", 184 "## Consolidation Actions", 185 "- Scanned agent execution history", 186 f"- Knowledge graph: {triple_count} triples (no pruning needed yet)", 187 "- Memory consolidation: baseline established", 188 "", 189 f"*Generated at {now.isoformat()}*", 190 ]) 191 192 return "\n".join(lines) 193 194 195 async def run_consolidation(trigger_data: dict) -> dict: 196 """Run nightly knowledge consolidation.""" 197 run_id = trigger_data.get("run_id", f"garden-{int(time.time())}") 198 started_at = datetime.now(timezone.utc).isoformat() 199 print(f"\nKnowledge consolidation run: {run_id}") 200 201 nc = await nats.connect(NATS_URL) 202 js = nc.jetstream() 203 204 # Collect agent results 205 print("Collecting agent results...") 206 agent_results = await collect_agent_results(js) 207 print(f" {len(agent_results)} agent results found") 208 209 # Knowledge graph stats 210 print("Querying knowledge graph...") 211 triple_count = count_triples() 212 triple_summary = get_triple_summary() 213 print(f" {triple_count} triples in graph") 214 215 # Generate digest 216 digest = generate_daily_digest(agent_results, triple_count, triple_summary) 217 print(f" Digest generated ({len(digest)} chars)") 218 219 # Save digest to filesystem 220 os.makedirs(DIGEST_DIR, exist_ok=True) 221 digest_file = os.path.join(DIGEST_DIR, f"{datetime.now().strftime('%Y-%m-%d')}.md") 222 try: 223 with open(digest_file, "w") as f: 224 f.write(digest) 225 print(f" Saved to {digest_file}") 226 except Exception as e: 227 print(f" Could not save digest: {e}") 228 229 # Weekly digest (Sundays) 230 weekly_digest = None 231 if is_weekly_run(): 232 print("Generating weekly digest (Sunday)...") 233 weekly_digest = generate_weekly_digest(DIGEST_DIR) 234 weekly_file = os.path.join(DIGEST_DIR, f"weekly-{datetime.now().strftime('%Y-%m-%d')}.md") 235 try: 236 with open(weekly_file, "w") as f: 237 f.write(weekly_digest) 238 print(f" Saved weekly digest to {weekly_file}") 239 except Exception as e: 240 print(f" Could not save weekly digest: {e}") 241 242 # Graphiti memory analysis 243 graphiti_memory_count = 0 244 if GRAPHITI_ENABLED: 245 try: 246 from graphiti_client import create_graphiti_client, search_memory 247 g = await create_graphiti_client() 248 # Count how many facts we have 249 results = await search_memory(g, "family", limit=100) 250 graphiti_memory_count = len(results) 251 await g.close() 252 print(f" Graphiti: {graphiti_memory_count} facts in memory") 253 except Exception as e: 254 print(f" Graphiti analysis error: {e}") 255 256 # Store agent results in Graphiti temporal memory 257 graphiti_status = "skipped" 258 if GRAPHITI_ENABLED: 259 try: 260 from graphiti_client import create_graphiti_client, add_agent_result 261 print("Storing agent results in Graphiti...") 262 g = await create_graphiti_client() 263 for ar in agent_results: 264 summary = f"severity={ar.get('severity', 'ok')}, alerts={ar.get('alert_count', 0)}" 265 await add_agent_result(g, ar["role"], summary) 266 await g.close() 267 graphiti_status = f"stored {len(agent_results)} episodes" 268 print(f" Graphiti: {graphiti_status}") 269 except Exception as e: 270 graphiti_status = f"error: {e}" 271 print(f" Graphiti error: {e}") 272 273 result = { 274 "run_id": run_id, 275 "role": ROLE, 276 "started_at": started_at, 277 "completed_at": datetime.now(timezone.utc).isoformat(), 278 "triple_count": triple_count, 279 "agent_results_count": len(agent_results), 280 "digest_preview": digest[:500], 281 "graphiti_status": graphiti_status, 282 "graphiti_memory_count": graphiti_memory_count, 283 "weekly_digest": weekly_digest is not None, 284 "actions_taken": ["scanned_agent_history", "queried_knowledge_graph", "generated_digest"] 285 + (["generated_weekly_digest"] if weekly_digest else []) 286 + (["analyzed_graphiti_memory"] if graphiti_memory_count > 0 else []), 287 } 288 289 await nc.close() 290 return result 291 292 293 async def main(): 294 """Main agent loop.""" 295 nc = await nats.connect(NATS_URL) 296 js = nc.jetstream() 297 print(f"Knowledge Gardener connected to NATS at {NATS_URL}") 298 299 trigger_subject = f"bob.agent.{ROLE}.trigger" 300 301 async def handle_trigger(msg): 302 try: 303 trigger_data = json.loads(msg.data.decode()) 304 print(f"\nReceived trigger: {trigger_data.get('run_id', 'unknown')}") 305 306 started = { 307 "role": ROLE, 308 "run_id": trigger_data.get("run_id"), 309 "started_at": datetime.now(timezone.utc).isoformat(), 310 } 311 await js.publish(f"bob.agent.{ROLE}.started", json.dumps(started).encode()) 312 313 result = await run_consolidation(trigger_data) 314 await js.publish(f"bob.agent.{ROLE}.result", json.dumps(result).encode()) 315 print(f"Consolidation result published") 316 317 await msg.ack() 318 except Exception as e: 319 print(f"Error: {e}", file=sys.stderr) 320 import traceback 321 traceback.print_exc() 322 323 await js.subscribe( 324 trigger_subject, 325 cb=handle_trigger, 326 durable="knowledge_gardener_agent", 327 deliver_policy="new", 328 manual_ack=True, 329 ) 330 print(f"Subscribed to {trigger_subject}") 331 print("Waiting for triggers...") 332 333 try: 334 while True: 335 await asyncio.sleep(1) 336 except asyncio.CancelledError: 337 pass 338 finally: 339 await nc.close() 340 341 342 if __name__ == "__main__": 343 asyncio.run(main())