agent.py
1 """Bob Evening Coordinator Agent — daily summary and next-day preparation. 2 3 Triggered by the scheduler at 8:00 PM ET. Assembles: 4 - Day's activity summary (agent execution results) 5 - System health status 6 - Tomorrow's weather forecast 7 - Maintenance recommendations 8 9 Publishes summary to NATS. 10 """ 11 12 import asyncio 13 import json 14 import os 15 import sys 16 import time 17 import urllib.request 18 from datetime import datetime, timezone 19 20 import nats 21 22 NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222") 23 ROLE = "evening_coordinator" 24 LATITUDE = float(os.getenv("LATITUDE", "27.9506")) 25 LONGITUDE = float(os.getenv("LONGITUDE", "-82.4572")) 26 LOCATION_NAME = os.getenv("LOCATION_NAME", "Tampa, FL") 27 28 29 def fetch_tomorrow_weather() -> dict: 30 """Fetch tomorrow's weather from Open-Meteo.""" 31 try: 32 url = ( 33 f"https://api.open-meteo.com/v1/forecast?" 34 f"latitude={LATITUDE}&longitude={LONGITUDE}" 35 f"&daily=temperature_2m_max,temperature_2m_min,precipitation_probability_max,weather_code" 36 f"&temperature_unit=fahrenheit&wind_speed_unit=mph" 37 f"&timezone=America/New_York&forecast_days=2" 38 ) 39 with urllib.request.urlopen(url, timeout=10) as resp: 40 data = json.loads(resp.read().decode()) 41 42 codes = { 43 0: "clear", 1: "mainly clear", 2: "partly cloudy", 3: "overcast", 44 45: "foggy", 51: "drizzle", 61: "rain", 63: "moderate rain", 45 65: "heavy rain", 71: "snow", 80: "rain showers", 95: "thunderstorm", 46 } 47 48 daily = data.get("daily", {}) 49 if len(daily.get("temperature_2m_max", [])) > 1: 50 return { 51 "location": LOCATION_NAME, 52 "high_f": daily["temperature_2m_max"][1], 53 "low_f": daily["temperature_2m_min"][1], 54 "rain_chance_pct": daily.get("precipitation_probability_max", [0, 0])[1], 55 "conditions": codes.get(daily.get("weather_code", [0, 0])[1], "unknown"), 56 } 57 return {"error": "No forecast data for tomorrow"} 58 except Exception as e: 59 return {"error": str(e)} 60 61 62 async def get_todays_agent_results(js) -> list: 63 """Fetch all agent results from today.""" 64 results = [] 65 for role in ["home_keeper", "morning_coordinator"]: 66 try: 67 sub = await js.subscribe( 68 f"bob.agent.{role}.result", 69 deliver_policy="last", 70 ) 71 msg = await asyncio.wait_for(sub.next_msg(), timeout=3) 72 await sub.unsubscribe() 73 data = json.loads(msg.data.decode()) 74 results.append({"role": role, "summary": data.get("summary", data.get("briefing_text", "")[:200])}) 75 except Exception: 76 continue 77 return results 78 79 80 async def get_latest_alerts(js) -> list: 81 """Fetch any unresolved alerts.""" 82 alerts = [] 83 try: 84 sub = await js.subscribe( 85 "bob.agent.home_keeper.alert", 86 deliver_policy="last", 87 ) 88 msg = await asyncio.wait_for(sub.next_msg(), timeout=3) 89 await sub.unsubscribe() 90 data = json.loads(msg.data.decode()) 91 alerts = data.get("alerts", []) 92 except Exception: 93 pass 94 return alerts 95 96 97 def format_summary_text(tomorrow_weather: dict, agent_results: list, alerts: list) -> str: 98 """Format evening summary as natural language.""" 99 lines = [] 100 now = datetime.now() 101 lines.append(f"Good evening. Here's your daily summary for {now.strftime('%A, %B %d')}.") 102 103 # Agent activity 104 if agent_results: 105 lines.append(f"Bob ran {len(agent_results)} agent tasks today.") 106 107 # Current alerts 108 if alerts: 109 critical = [a for a in alerts if "CRITICAL" in a or "DOWN" in a] 110 if critical: 111 lines.append(f"There are {len(critical)} critical alerts still active.") 112 for a in critical[:3]: 113 lines.append(f" - {a}") 114 else: 115 lines.append(f"There are {len(alerts)} minor alerts, nothing critical.") 116 else: 117 lines.append("All systems are running smoothly.") 118 119 # Tomorrow's weather 120 tw = tomorrow_weather 121 if "error" not in tw: 122 lines.append( 123 f"Tomorrow's forecast for {tw.get('location', 'your area')}: " 124 f"high of {tw.get('high_f', '?')}, low of {tw.get('low_f', '?')}. " 125 f"{tw.get('conditions', '')}." 126 ) 127 rain = tw.get("rain_chance_pct", 0) 128 if rain and rain > 30: 129 lines.append(f"There's a {rain}% chance of rain — plan accordingly.") 130 131 lines.append("Have a good night!") 132 return " ".join(lines) 133 134 135 async def run_summary(trigger_data: dict) -> dict: 136 """Assemble and publish the evening summary.""" 137 run_id = trigger_data.get("run_id", f"evening-{int(time.time())}") 138 started_at = datetime.now(timezone.utc).isoformat() 139 print(f"\nEvening summary run: {run_id}") 140 141 nc = await nats.connect(NATS_URL) 142 js = nc.jetstream() 143 144 # Tomorrow's weather 145 print("Fetching tomorrow's weather...") 146 tomorrow = fetch_tomorrow_weather() 147 if "error" not in tomorrow: 148 print(f" Tomorrow: {tomorrow.get('high_f')}F high, {tomorrow.get('conditions')}") 149 150 # Today's agent activity 151 print("Fetching today's agent results...") 152 agent_results = await get_todays_agent_results(js) 153 print(f" {len(agent_results)} agent results found") 154 155 # Current alerts 156 print("Fetching alerts...") 157 alerts = await get_latest_alerts(js) 158 print(f" {len(alerts)} active alerts") 159 160 # Format 161 summary_text = format_summary_text(tomorrow, agent_results, alerts) 162 print(f"\nSummary: {summary_text[:200]}...") 163 164 result = { 165 "run_id": run_id, 166 "role": ROLE, 167 "started_at": started_at, 168 "completed_at": datetime.now(timezone.utc).isoformat(), 169 "summary_text": summary_text, 170 "tomorrow_weather": tomorrow, 171 "agent_activity": agent_results, 172 "active_alerts": alerts[:10], 173 } 174 175 await nc.close() 176 return result 177 178 179 async def main(): 180 """Main agent loop.""" 181 nc = await nats.connect(NATS_URL) 182 js = nc.jetstream() 183 print(f"Evening Coordinator connected to NATS at {NATS_URL}") 184 185 trigger_subject = f"bob.agent.{ROLE}.trigger" 186 187 async def handle_trigger(msg): 188 try: 189 trigger_data = json.loads(msg.data.decode()) 190 print(f"\nReceived trigger: {trigger_data.get('run_id', 'unknown')}") 191 192 started = { 193 "role": ROLE, 194 "run_id": trigger_data.get("run_id"), 195 "started_at": datetime.now(timezone.utc).isoformat(), 196 } 197 await js.publish(f"bob.agent.{ROLE}.started", json.dumps(started).encode()) 198 199 result = await run_summary(trigger_data) 200 await js.publish(f"bob.agent.{ROLE}.result", json.dumps(result).encode()) 201 print(f"Summary published to bob.agent.{ROLE}.result") 202 203 # Publish voice announcement 204 announcement = { 205 "text": result["summary_text"], 206 "voice": "ray_porter", 207 "target": "all", 208 "priority": "normal", 209 } 210 await js.publish("bob.announce.voice", json.dumps(announcement).encode()) 211 print(f"Voice announcement published") 212 213 await msg.ack() 214 except Exception as e: 215 print(f"Error: {e}", file=sys.stderr) 216 import traceback 217 traceback.print_exc() 218 219 await js.subscribe( 220 trigger_subject, 221 cb=handle_trigger, 222 durable="evening_coordinator_agent", 223 deliver_policy="new", 224 manual_ack=True, 225 ) 226 print(f"Subscribed to {trigger_subject}") 227 print("Waiting for triggers...") 228 229 try: 230 while True: 231 await asyncio.sleep(1) 232 except asyncio.CancelledError: 233 pass 234 finally: 235 await nc.close() 236 237 238 if __name__ == "__main__": 239 asyncio.run(main())