agent.py
  1  """Bob Evening Coordinator Agent — daily summary and next-day preparation.
  2  
  3  Triggered by the scheduler at 8:00 PM ET. Assembles:
  4  - Day's activity summary (agent execution results)
  5  - System health status
  6  - Tomorrow's weather forecast
  7  - Maintenance recommendations
  8  
  9  Publishes summary to NATS.
 10  """
 11  
 12  import asyncio
 13  import json
 14  import os
 15  import sys
 16  import time
 17  import urllib.request
 18  from datetime import datetime, timezone
 19  
 20  import nats
 21  
 22  NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222")
 23  ROLE = "evening_coordinator"
 24  LATITUDE = float(os.getenv("LATITUDE", "27.9506"))
 25  LONGITUDE = float(os.getenv("LONGITUDE", "-82.4572"))
 26  LOCATION_NAME = os.getenv("LOCATION_NAME", "Tampa, FL")
 27  
 28  
 29  def fetch_tomorrow_weather() -> dict:
 30      """Fetch tomorrow's weather from Open-Meteo."""
 31      try:
 32          url = (
 33              f"https://api.open-meteo.com/v1/forecast?"
 34              f"latitude={LATITUDE}&longitude={LONGITUDE}"
 35              f"&daily=temperature_2m_max,temperature_2m_min,precipitation_probability_max,weather_code"
 36              f"&temperature_unit=fahrenheit&wind_speed_unit=mph"
 37              f"&timezone=America/New_York&forecast_days=2"
 38          )
 39          with urllib.request.urlopen(url, timeout=10) as resp:
 40              data = json.loads(resp.read().decode())
 41  
 42          codes = {
 43              0: "clear", 1: "mainly clear", 2: "partly cloudy", 3: "overcast",
 44              45: "foggy", 51: "drizzle", 61: "rain", 63: "moderate rain",
 45              65: "heavy rain", 71: "snow", 80: "rain showers", 95: "thunderstorm",
 46          }
 47  
 48          daily = data.get("daily", {})
 49          if len(daily.get("temperature_2m_max", [])) > 1:
 50              return {
 51                  "location": LOCATION_NAME,
 52                  "high_f": daily["temperature_2m_max"][1],
 53                  "low_f": daily["temperature_2m_min"][1],
 54                  "rain_chance_pct": daily.get("precipitation_probability_max", [0, 0])[1],
 55                  "conditions": codes.get(daily.get("weather_code", [0, 0])[1], "unknown"),
 56              }
 57          return {"error": "No forecast data for tomorrow"}
 58      except Exception as e:
 59          return {"error": str(e)}
 60  
 61  
 62  async def get_todays_agent_results(js) -> list:
 63      """Fetch all agent results from today."""
 64      results = []
 65      for role in ["home_keeper", "morning_coordinator"]:
 66          try:
 67              sub = await js.subscribe(
 68                  f"bob.agent.{role}.result",
 69                  deliver_policy="last",
 70              )
 71              msg = await asyncio.wait_for(sub.next_msg(), timeout=3)
 72              await sub.unsubscribe()
 73              data = json.loads(msg.data.decode())
 74              results.append({"role": role, "summary": data.get("summary", data.get("briefing_text", "")[:200])})
 75          except Exception:
 76              continue
 77      return results
 78  
 79  
 80  async def get_latest_alerts(js) -> list:
 81      """Fetch any unresolved alerts."""
 82      alerts = []
 83      try:
 84          sub = await js.subscribe(
 85              "bob.agent.home_keeper.alert",
 86              deliver_policy="last",
 87          )
 88          msg = await asyncio.wait_for(sub.next_msg(), timeout=3)
 89          await sub.unsubscribe()
 90          data = json.loads(msg.data.decode())
 91          alerts = data.get("alerts", [])
 92      except Exception:
 93          pass
 94      return alerts
 95  
 96  
 97  def format_summary_text(tomorrow_weather: dict, agent_results: list, alerts: list) -> str:
 98      """Format evening summary as natural language."""
 99      lines = []
100      now = datetime.now()
101      lines.append(f"Good evening. Here's your daily summary for {now.strftime('%A, %B %d')}.")
102  
103      # Agent activity
104      if agent_results:
105          lines.append(f"Bob ran {len(agent_results)} agent tasks today.")
106  
107      # Current alerts
108      if alerts:
109          critical = [a for a in alerts if "CRITICAL" in a or "DOWN" in a]
110          if critical:
111              lines.append(f"There are {len(critical)} critical alerts still active.")
112              for a in critical[:3]:
113                  lines.append(f"  - {a}")
114          else:
115              lines.append(f"There are {len(alerts)} minor alerts, nothing critical.")
116      else:
117          lines.append("All systems are running smoothly.")
118  
119      # Tomorrow's weather
120      tw = tomorrow_weather
121      if "error" not in tw:
122          lines.append(
123              f"Tomorrow's forecast for {tw.get('location', 'your area')}: "
124              f"high of {tw.get('high_f', '?')}, low of {tw.get('low_f', '?')}. "
125              f"{tw.get('conditions', '')}."
126          )
127          rain = tw.get("rain_chance_pct", 0)
128          if rain and rain > 30:
129              lines.append(f"There's a {rain}% chance of rain — plan accordingly.")
130  
131      lines.append("Have a good night!")
132      return " ".join(lines)
133  
134  
135  async def run_summary(trigger_data: dict) -> dict:
136      """Assemble and publish the evening summary."""
137      run_id = trigger_data.get("run_id", f"evening-{int(time.time())}")
138      started_at = datetime.now(timezone.utc).isoformat()
139      print(f"\nEvening summary run: {run_id}")
140  
141      nc = await nats.connect(NATS_URL)
142      js = nc.jetstream()
143  
144      # Tomorrow's weather
145      print("Fetching tomorrow's weather...")
146      tomorrow = fetch_tomorrow_weather()
147      if "error" not in tomorrow:
148          print(f"  Tomorrow: {tomorrow.get('high_f')}F high, {tomorrow.get('conditions')}")
149  
150      # Today's agent activity
151      print("Fetching today's agent results...")
152      agent_results = await get_todays_agent_results(js)
153      print(f"  {len(agent_results)} agent results found")
154  
155      # Current alerts
156      print("Fetching alerts...")
157      alerts = await get_latest_alerts(js)
158      print(f"  {len(alerts)} active alerts")
159  
160      # Format
161      summary_text = format_summary_text(tomorrow, agent_results, alerts)
162      print(f"\nSummary: {summary_text[:200]}...")
163  
164      result = {
165          "run_id": run_id,
166          "role": ROLE,
167          "started_at": started_at,
168          "completed_at": datetime.now(timezone.utc).isoformat(),
169          "summary_text": summary_text,
170          "tomorrow_weather": tomorrow,
171          "agent_activity": agent_results,
172          "active_alerts": alerts[:10],
173      }
174  
175      await nc.close()
176      return result
177  
178  
179  async def main():
180      """Main agent loop."""
181      nc = await nats.connect(NATS_URL)
182      js = nc.jetstream()
183      print(f"Evening Coordinator connected to NATS at {NATS_URL}")
184  
185      trigger_subject = f"bob.agent.{ROLE}.trigger"
186  
187      async def handle_trigger(msg):
188          try:
189              trigger_data = json.loads(msg.data.decode())
190              print(f"\nReceived trigger: {trigger_data.get('run_id', 'unknown')}")
191  
192              started = {
193                  "role": ROLE,
194                  "run_id": trigger_data.get("run_id"),
195                  "started_at": datetime.now(timezone.utc).isoformat(),
196              }
197              await js.publish(f"bob.agent.{ROLE}.started", json.dumps(started).encode())
198  
199              result = await run_summary(trigger_data)
200              await js.publish(f"bob.agent.{ROLE}.result", json.dumps(result).encode())
201              print(f"Summary published to bob.agent.{ROLE}.result")
202  
203              # Publish voice announcement
204              announcement = {
205                  "text": result["summary_text"],
206                  "voice": "ray_porter",
207                  "target": "all",
208                  "priority": "normal",
209              }
210              await js.publish("bob.announce.voice", json.dumps(announcement).encode())
211              print(f"Voice announcement published")
212  
213              await msg.ack()
214          except Exception as e:
215              print(f"Error: {e}", file=sys.stderr)
216              import traceback
217              traceback.print_exc()
218  
219      await js.subscribe(
220          trigger_subject,
221          cb=handle_trigger,
222          durable="evening_coordinator_agent",
223          deliver_policy="new",
224          manual_ack=True,
225      )
226      print(f"Subscribed to {trigger_subject}")
227      print("Waiting for triggers...")
228  
229      try:
230          while True:
231              await asyncio.sleep(1)
232      except asyncio.CancelledError:
233          pass
234      finally:
235          await nc.close()
236  
237  
238  if __name__ == "__main__":
239      asyncio.run(main())