agent.py
  1  """Bob Knowledge Gardener Agent — nightly knowledge consolidation.
  2  
  3  Triggered at 2 AM ET. Performs:
  4  - Scan today's agent execution results from NATS JetStream
  5  - Summarize agent activity into a daily digest
  6  - Query Oxigraph for stale or duplicate triples
  7  - Generate a knowledge consolidation report
  8  - (Future) Prune Graphiti temporal memory, merge knowledge graph entries
  9  
 10  This is Bob's "AutoDream" — inspired by Claude Code's memory consolidation daemon.
 11  """
 12  
 13  import asyncio
 14  import json
 15  import os
 16  import sys
 17  import time
 18  import urllib.request
 19  import urllib.error
 20  from datetime import datetime, timezone, timedelta
 21  from pathlib import Path
 22  
 23  import nats
 24  
 25  NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222")
 26  OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://127.0.0.1:7878")
 27  GRAPHITI_ENABLED = os.getenv("GRAPHITI_ENABLED", "true").lower() == "true"
 28  ROLE = "knowledge_gardener"
 29  DIGEST_DIR = os.getenv("DIGEST_DIR", "/srv/bob/digests")
 30  
 31  
 32  def sparql_query(query: str) -> dict:
 33      """Execute a SPARQL query against Oxigraph."""
 34      try:
 35          data = query.encode("utf-8")
 36          req = urllib.request.Request(
 37              f"{OXIGRAPH_URL}/query",
 38              data=data,
 39              headers={
 40                  "Content-Type": "application/sparql-query",
 41                  "Accept": "application/sparql-results+json",
 42              },
 43          )
 44          with urllib.request.urlopen(req, timeout=10) as resp:
 45              return json.loads(resp.read().decode())
 46      except Exception as e:
 47          return {"error": str(e)}
 48  
 49  
 50  def count_triples() -> int:
 51      """Count total triples in the knowledge graph."""
 52      result = sparql_query("SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }")
 53      try:
 54          return int(result["results"]["bindings"][0]["count"]["value"])
 55      except (KeyError, IndexError, ValueError):
 56          return -1
 57  
 58  
 59  def get_triple_summary() -> dict:
 60      """Get a summary of the knowledge graph by predicate."""
 61      result = sparql_query("""
 62          SELECT ?p (COUNT(*) AS ?count) WHERE { ?s ?p ?o }
 63          GROUP BY ?p ORDER BY DESC(?count) LIMIT 20
 64      """)
 65      predicates = []
 66      try:
 67          for binding in result.get("results", {}).get("bindings", []):
 68              predicates.append({
 69                  "predicate": binding["p"]["value"].split("#")[-1].split("/")[-1],
 70                  "count": int(binding["count"]["value"]),
 71              })
 72      except (KeyError, ValueError):
 73          pass
 74      return {"predicates": predicates}
 75  
 76  
 77  async def collect_agent_results(js) -> list:
 78      """Collect all agent results from the past 24 hours."""
 79      results = []
 80      for role in ["home_keeper", "morning_coordinator", "evening_coordinator"]:
 81          try:
 82              sub = await js.subscribe(
 83                  f"bob.agent.{role}.result",
 84                  deliver_policy="last",
 85              )
 86              msg = await asyncio.wait_for(sub.next_msg(), timeout=3)
 87              await sub.unsubscribe()
 88              data = json.loads(msg.data.decode())
 89              results.append({
 90                  "role": role,
 91                  "run_id": data.get("run_id", "?"),
 92                  "completed_at": data.get("completed_at", "?"),
 93                  "severity": data.get("severity", "ok"),
 94                  "alert_count": len(data.get("alerts", data.get("active_alerts", []))),
 95              })
 96          except Exception:
 97              continue
 98      return results
 99  
100  
101  def is_weekly_run() -> bool:
102      """Check if today is Sunday (weekly digest day)."""
103      return datetime.now().weekday() == 6  # Sunday
104  
105  
106  def generate_weekly_digest(daily_digests_dir: str) -> str:
107      """Generate a weekly summary from daily digests."""
108      lines = [
109          f"# Bob Weekly Family Digest — Week of {datetime.now().strftime('%Y-%m-%d')}",
110          "",
111      ]
112  
113      # Read all daily digests from this week
114      digest_files = sorted(Path(daily_digests_dir).glob("*.md"))
115      recent = [f for f in digest_files if f.stem >= (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")]
116  
117      if recent:
118          lines.append(f"## Summary ({len(recent)} days of activity)")
119          lines.append("")
120  
121          total_alerts = 0
122          agent_activity = {}
123          for f in recent:
124              content = f.read_text()
125              # Count alerts mentioned
126              total_alerts += content.count("CRITICAL") + content.count("WARNING")
127              # Track agent activity
128              for line in content.split("\n"):
129                  if line.startswith("- **") and "**:" in line:
130                      role = line.split("**")[1]
131                      agent_activity[role] = agent_activity.get(role, 0) + 1
132  
133          lines.append(f"- Days covered: {len(recent)}")
134          lines.append(f"- Total alerts across week: {total_alerts}")
135          if agent_activity:
136              lines.append("- Agent runs:")
137              for role, count in sorted(agent_activity.items()):
138                  lines.append(f"  - {role}: {count} runs")
139      else:
140          lines.append("No daily digests found for this week.")
141  
142      lines.extend([
143          "",
144          "## Recommendations",
145          "- Review any recurring alerts for root cause analysis",
146          "- Consider adjusting alert thresholds if too noisy",
147          "- Check knowledge graph growth and prune if needed",
148          "",
149          f"*Generated at {datetime.now().isoformat()}*",
150      ])
151  
152      return "\n".join(lines)
153  
154  
155  def generate_daily_digest(agent_results: list, triple_count: int, triple_summary: dict) -> str:
156      """Generate a human-readable daily digest."""
157      now = datetime.now()
158      lines = [
159          f"# Bob Daily Knowledge Digest — {now.strftime('%Y-%m-%d')}",
160          "",
161          "## Agent Activity",
162      ]
163  
164      if agent_results:
165          for r in agent_results:
166              status = "OK" if r["severity"] == "ok" else r["severity"].upper()
167              lines.append(f"- **{r['role']}**: {status} ({r['alert_count']} alerts) — run {r['run_id']}")
168      else:
169          lines.append("- No agent results found for today")
170  
171      lines.extend([
172          "",
173          "## Knowledge Graph",
174          f"- Total triples: {triple_count}",
175      ])
176  
177      if triple_summary.get("predicates"):
178          lines.append("- Top predicates:")
179          for p in triple_summary["predicates"][:10]:
180              lines.append(f"  - {p['predicate']}: {p['count']}")
181  
182      lines.extend([
183          "",
184          "## Consolidation Actions",
185          "- Scanned agent execution history",
186          f"- Knowledge graph: {triple_count} triples (no pruning needed yet)",
187          "- Memory consolidation: baseline established",
188          "",
189          f"*Generated at {now.isoformat()}*",
190      ])
191  
192      return "\n".join(lines)
193  
194  
195  async def run_consolidation(trigger_data: dict) -> dict:
196      """Run nightly knowledge consolidation."""
197      run_id = trigger_data.get("run_id", f"garden-{int(time.time())}")
198      started_at = datetime.now(timezone.utc).isoformat()
199      print(f"\nKnowledge consolidation run: {run_id}")
200  
201      nc = await nats.connect(NATS_URL)
202      js = nc.jetstream()
203  
204      # Collect agent results
205      print("Collecting agent results...")
206      agent_results = await collect_agent_results(js)
207      print(f"  {len(agent_results)} agent results found")
208  
209      # Knowledge graph stats
210      print("Querying knowledge graph...")
211      triple_count = count_triples()
212      triple_summary = get_triple_summary()
213      print(f"  {triple_count} triples in graph")
214  
215      # Generate digest
216      digest = generate_daily_digest(agent_results, triple_count, triple_summary)
217      print(f"  Digest generated ({len(digest)} chars)")
218  
219      # Save digest to filesystem
220      os.makedirs(DIGEST_DIR, exist_ok=True)
221      digest_file = os.path.join(DIGEST_DIR, f"{datetime.now().strftime('%Y-%m-%d')}.md")
222      try:
223          with open(digest_file, "w") as f:
224              f.write(digest)
225          print(f"  Saved to {digest_file}")
226      except Exception as e:
227          print(f"  Could not save digest: {e}")
228  
229      # Weekly digest (Sundays)
230      weekly_digest = None
231      if is_weekly_run():
232          print("Generating weekly digest (Sunday)...")
233          weekly_digest = generate_weekly_digest(DIGEST_DIR)
234          weekly_file = os.path.join(DIGEST_DIR, f"weekly-{datetime.now().strftime('%Y-%m-%d')}.md")
235          try:
236              with open(weekly_file, "w") as f:
237                  f.write(weekly_digest)
238              print(f"  Saved weekly digest to {weekly_file}")
239          except Exception as e:
240              print(f"  Could not save weekly digest: {e}")
241  
242      # Graphiti memory analysis
243      graphiti_memory_count = 0
244      if GRAPHITI_ENABLED:
245          try:
246              from graphiti_client import create_graphiti_client, search_memory
247              g = await create_graphiti_client()
248              # Count how many facts we have
249              results = await search_memory(g, "family", limit=100)
250              graphiti_memory_count = len(results)
251              await g.close()
252              print(f"  Graphiti: {graphiti_memory_count} facts in memory")
253          except Exception as e:
254              print(f"  Graphiti analysis error: {e}")
255  
256      # Store agent results in Graphiti temporal memory
257      graphiti_status = "skipped"
258      if GRAPHITI_ENABLED:
259          try:
260              from graphiti_client import create_graphiti_client, add_agent_result
261              print("Storing agent results in Graphiti...")
262              g = await create_graphiti_client()
263              for ar in agent_results:
264                  summary = f"severity={ar.get('severity', 'ok')}, alerts={ar.get('alert_count', 0)}"
265                  await add_agent_result(g, ar["role"], summary)
266              await g.close()
267              graphiti_status = f"stored {len(agent_results)} episodes"
268              print(f"  Graphiti: {graphiti_status}")
269          except Exception as e:
270              graphiti_status = f"error: {e}"
271              print(f"  Graphiti error: {e}")
272  
273      result = {
274          "run_id": run_id,
275          "role": ROLE,
276          "started_at": started_at,
277          "completed_at": datetime.now(timezone.utc).isoformat(),
278          "triple_count": triple_count,
279          "agent_results_count": len(agent_results),
280          "digest_preview": digest[:500],
281          "graphiti_status": graphiti_status,
282          "graphiti_memory_count": graphiti_memory_count,
283          "weekly_digest": weekly_digest is not None,
284          "actions_taken": ["scanned_agent_history", "queried_knowledge_graph", "generated_digest"]
285              + (["generated_weekly_digest"] if weekly_digest else [])
286              + (["analyzed_graphiti_memory"] if graphiti_memory_count > 0 else []),
287      }
288  
289      await nc.close()
290      return result
291  
292  
293  async def main():
294      """Main agent loop."""
295      nc = await nats.connect(NATS_URL)
296      js = nc.jetstream()
297      print(f"Knowledge Gardener connected to NATS at {NATS_URL}")
298  
299      trigger_subject = f"bob.agent.{ROLE}.trigger"
300  
301      async def handle_trigger(msg):
302          try:
303              trigger_data = json.loads(msg.data.decode())
304              print(f"\nReceived trigger: {trigger_data.get('run_id', 'unknown')}")
305  
306              started = {
307                  "role": ROLE,
308                  "run_id": trigger_data.get("run_id"),
309                  "started_at": datetime.now(timezone.utc).isoformat(),
310              }
311              await js.publish(f"bob.agent.{ROLE}.started", json.dumps(started).encode())
312  
313              result = await run_consolidation(trigger_data)
314              await js.publish(f"bob.agent.{ROLE}.result", json.dumps(result).encode())
315              print(f"Consolidation result published")
316  
317              await msg.ack()
318          except Exception as e:
319              print(f"Error: {e}", file=sys.stderr)
320              import traceback
321              traceback.print_exc()
322  
323      await js.subscribe(
324          trigger_subject,
325          cb=handle_trigger,
326          durable="knowledge_gardener_agent",
327          deliver_policy="new",
328          manual_ack=True,
329      )
330      print(f"Subscribed to {trigger_subject}")
331      print("Waiting for triggers...")
332  
333      try:
334          while True:
335              await asyncio.sleep(1)
336      except asyncio.CancelledError:
337          pass
338      finally:
339          await nc.close()
340  
341  
342  if __name__ == "__main__":
343      asyncio.run(main())