events.go
1 package daemon 2 3 import ( 4 "encoding/json" 5 "sync" 6 ) 7 8 // Event types emitted by the daemon. 9 const ( 10 EventMessageReceived = "message_received" 11 EventAgentReply = "agent_reply" 12 EventApprovalRequest = "approval_request" 13 EventApprovalResolved = "approval_resolved" 14 EventAgentError = "agent_error" 15 EventHeartbeatAlert = "heartbeat_alert" 16 EventToolStatus = "tool_status" 17 EventUsage = "usage" // per-LLM-call usage snapshot for the run 18 EventCloudAgent = "cloud_agent" 19 EventCloudProgress = "cloud_progress" 20 EventCloudPlan = "cloud_plan" 21 EventNotification = "notification" 22 EventRunStatus = "run_status" // watchdog soft/hard events, LLM retries, etc. 23 ) 24 25 // Event is a daemon lifecycle event pushed to SSE subscribers. 26 type Event struct { 27 ID uint64 `json:"id,omitempty"` 28 Type string `json:"type"` 29 Payload json.RawMessage `json:"payload"` 30 } 31 32 const ringSize = 512 33 34 // EventBus is a simple pub/sub bus for daemon events. 35 // It maintains a ring buffer of the last ringSize events so that 36 // reconnecting clients can replay missed events via EventsSince. 37 type EventBus struct { 38 mu sync.RWMutex 39 subscribers map[<-chan Event]chan Event 40 ring [ringSize]Event 41 ringLen int // number of valid events in ring (≤ ringSize) 42 ringHead int // next write position 43 nextID uint64 // monotonically increasing event ID, starts at 1 44 } 45 46 // NewEventBus creates a new EventBus. 47 func NewEventBus() *EventBus { 48 return &EventBus{ 49 subscribers: make(map[<-chan Event]chan Event), 50 } 51 } 52 53 // Subscribe returns a channel that receives all emitted events. 54 // Caller must call Unsubscribe when done. 55 func (b *EventBus) Subscribe() <-chan Event { 56 ch := make(chan Event, 64) 57 b.mu.Lock() 58 b.subscribers[ch] = ch 59 b.mu.Unlock() 60 return ch 61 } 62 63 // Unsubscribe removes a subscriber. No further events will be sent to ch. 64 // The channel is not closed; callers should stop reading after Unsubscribe. 65 func (b *EventBus) Unsubscribe(ch <-chan Event) { 66 b.mu.Lock() 67 delete(b.subscribers, ch) 68 b.mu.Unlock() 69 } 70 71 // Emit sends an event to all subscribers. Non-blocking: if a subscriber's 72 // buffer is full, the event is dropped for that subscriber. 73 func (b *EventBus) Emit(evt Event) { 74 _ = b.EmitTo(evt) 75 } 76 77 // EmitTo sends an event to all subscribers and returns the number of 78 // subscribers that actually accepted the event (i.e. had buffer space). 79 // Subscribers whose buffer was full are counted as drops. Callers that need 80 // to make a real delivery decision — e.g. the notify tool choosing between 81 // the Desktop path and the osascript fallback — should use this method; a 82 // zero return value means "nobody got the event, fall back". 83 // 84 // Known limitation: EmitTo cannot distinguish a Desktop client from, say, a 85 // curl session debugging /events. It only reports best-effort delivery to 86 // any current subscriber. Capability negotiation on the /events endpoint is 87 // tracked as future work if this becomes a real problem. 88 func (b *EventBus) EmitTo(evt Event) int { 89 b.mu.Lock() 90 defer b.mu.Unlock() 91 92 // Assign monotonically increasing ID. 93 b.nextID++ 94 evt.ID = b.nextID 95 96 delivered := 0 97 for _, ch := range b.subscribers { 98 select { 99 case ch <- evt: 100 delivered++ 101 default: 102 // subscriber too slow, drop 103 } 104 } 105 106 // Write to ring buffer only after delivery attempt. Notification events 107 // that were not delivered (delivered == 0) are excluded: the caller 108 // (runner.go notify handler) falls back to osascript in that case, and 109 // replaying the notification on reconnect would produce a duplicate banner. 110 if evt.Type != EventNotification || delivered > 0 { 111 b.ring[b.ringHead] = evt 112 b.ringHead = (b.ringHead + 1) % ringSize 113 if b.ringLen < ringSize { 114 b.ringLen++ 115 } 116 } 117 118 return delivered 119 } 120 121 // SubscribeWithReplay atomically registers a subscriber and returns all 122 // events with ID > lastID from the ring buffer. Because both operations 123 // happen under a single write lock, no events can be emitted between the 124 // replay snapshot and the subscriber registration — closing the gap that 125 // would exist if EventsSince and Subscribe were called separately. 126 func (b *EventBus) SubscribeWithReplay(lastID uint64) ([]Event, <-chan Event) { 127 ch := make(chan Event, 64) 128 b.mu.Lock() 129 defer b.mu.Unlock() 130 b.subscribers[ch] = ch 131 var missed []Event 132 if b.ringLen > 0 && lastID < b.nextID { 133 start := (b.ringHead - b.ringLen + ringSize) % ringSize 134 for i := 0; i < b.ringLen; i++ { 135 idx := (start + i) % ringSize 136 if b.ring[idx].ID > lastID { 137 missed = append(missed, b.ring[idx]) 138 } 139 } 140 } 141 return missed, ch 142 } 143 144 // EventsSince returns events with ID > lastID from the ring buffer. 145 // Returns nil if the buffer is empty or the client is already up to date. 146 func (b *EventBus) EventsSince(lastID uint64) []Event { 147 b.mu.RLock() 148 defer b.mu.RUnlock() 149 if b.ringLen == 0 || lastID >= b.nextID { 150 return nil 151 } 152 var result []Event 153 start := (b.ringHead - b.ringLen + ringSize) % ringSize 154 for i := 0; i < b.ringLen; i++ { 155 idx := (start + i) % ringSize 156 if b.ring[idx].ID > lastID { 157 result = append(result, b.ring[idx]) 158 } 159 } 160 return result 161 } 162 163 // HasSubscribers reports whether at least one subscriber is currently attached. 164 // Retained for callers that only need a cheap liveness check. New delivery 165 // decisions should prefer EmitTo's return value instead. 166 func (b *EventBus) HasSubscribers() bool { 167 b.mu.RLock() 168 defer b.mu.RUnlock() 169 return len(b.subscribers) > 0 170 }