/ internal / daemon / events.go
events.go
  1  package daemon
  2  
  3  import (
  4  	"encoding/json"
  5  	"sync"
  6  )
  7  
  8  // Event types emitted by the daemon.
  9  const (
 10  	EventMessageReceived  = "message_received"
 11  	EventAgentReply       = "agent_reply"
 12  	EventApprovalRequest  = "approval_request"
 13  	EventApprovalResolved = "approval_resolved"
 14  	EventAgentError       = "agent_error"
 15  	EventHeartbeatAlert   = "heartbeat_alert"
 16  	EventToolStatus       = "tool_status"
 17  	EventUsage            = "usage" // per-LLM-call usage snapshot for the run
 18  	EventCloudAgent       = "cloud_agent"
 19  	EventCloudProgress    = "cloud_progress"
 20  	EventCloudPlan        = "cloud_plan"
 21  	EventNotification     = "notification"
 22  	EventRunStatus        = "run_status" // watchdog soft/hard events, LLM retries, etc.
 23  )
 24  
 25  // Event is a daemon lifecycle event pushed to SSE subscribers.
 26  type Event struct {
 27  	ID      uint64          `json:"id,omitempty"`
 28  	Type    string          `json:"type"`
 29  	Payload json.RawMessage `json:"payload"`
 30  }
 31  
 32  const ringSize = 512
 33  
 34  // EventBus is a simple pub/sub bus for daemon events.
 35  // It maintains a ring buffer of the last ringSize events so that
 36  // reconnecting clients can replay missed events via EventsSince.
 37  type EventBus struct {
 38  	mu          sync.RWMutex
 39  	subscribers map[<-chan Event]chan Event
 40  	ring        [ringSize]Event
 41  	ringLen     int    // number of valid events in ring (≤ ringSize)
 42  	ringHead    int    // next write position
 43  	nextID      uint64 // monotonically increasing event ID, starts at 1
 44  }
 45  
 46  // NewEventBus creates a new EventBus.
 47  func NewEventBus() *EventBus {
 48  	return &EventBus{
 49  		subscribers: make(map[<-chan Event]chan Event),
 50  	}
 51  }
 52  
 53  // Subscribe returns a channel that receives all emitted events.
 54  // Caller must call Unsubscribe when done.
 55  func (b *EventBus) Subscribe() <-chan Event {
 56  	ch := make(chan Event, 64)
 57  	b.mu.Lock()
 58  	b.subscribers[ch] = ch
 59  	b.mu.Unlock()
 60  	return ch
 61  }
 62  
 63  // Unsubscribe removes a subscriber. No further events will be sent to ch.
 64  // The channel is not closed; callers should stop reading after Unsubscribe.
 65  func (b *EventBus) Unsubscribe(ch <-chan Event) {
 66  	b.mu.Lock()
 67  	delete(b.subscribers, ch)
 68  	b.mu.Unlock()
 69  }
 70  
 71  // Emit sends an event to all subscribers. Non-blocking: if a subscriber's
 72  // buffer is full, the event is dropped for that subscriber.
 73  func (b *EventBus) Emit(evt Event) {
 74  	_ = b.EmitTo(evt)
 75  }
 76  
 77  // EmitTo sends an event to all subscribers and returns the number of
 78  // subscribers that actually accepted the event (i.e. had buffer space).
 79  // Subscribers whose buffer was full are counted as drops. Callers that need
 80  // to make a real delivery decision — e.g. the notify tool choosing between
 81  // the Desktop path and the osascript fallback — should use this method; a
 82  // zero return value means "nobody got the event, fall back".
 83  //
 84  // Known limitation: EmitTo cannot distinguish a Desktop client from, say, a
 85  // curl session debugging /events. It only reports best-effort delivery to
 86  // any current subscriber. Capability negotiation on the /events endpoint is
 87  // tracked as future work if this becomes a real problem.
 88  func (b *EventBus) EmitTo(evt Event) int {
 89  	b.mu.Lock()
 90  	defer b.mu.Unlock()
 91  
 92  	// Assign monotonically increasing ID.
 93  	b.nextID++
 94  	evt.ID = b.nextID
 95  
 96  	delivered := 0
 97  	for _, ch := range b.subscribers {
 98  		select {
 99  		case ch <- evt:
100  			delivered++
101  		default:
102  			// subscriber too slow, drop
103  		}
104  	}
105  
106  	// Write to ring buffer only after delivery attempt. Notification events
107  	// that were not delivered (delivered == 0) are excluded: the caller
108  	// (runner.go notify handler) falls back to osascript in that case, and
109  	// replaying the notification on reconnect would produce a duplicate banner.
110  	if evt.Type != EventNotification || delivered > 0 {
111  		b.ring[b.ringHead] = evt
112  		b.ringHead = (b.ringHead + 1) % ringSize
113  		if b.ringLen < ringSize {
114  			b.ringLen++
115  		}
116  	}
117  
118  	return delivered
119  }
120  
121  // SubscribeWithReplay atomically registers a subscriber and returns all
122  // events with ID > lastID from the ring buffer. Because both operations
123  // happen under a single write lock, no events can be emitted between the
124  // replay snapshot and the subscriber registration — closing the gap that
125  // would exist if EventsSince and Subscribe were called separately.
126  func (b *EventBus) SubscribeWithReplay(lastID uint64) ([]Event, <-chan Event) {
127  	ch := make(chan Event, 64)
128  	b.mu.Lock()
129  	defer b.mu.Unlock()
130  	b.subscribers[ch] = ch
131  	var missed []Event
132  	if b.ringLen > 0 && lastID < b.nextID {
133  		start := (b.ringHead - b.ringLen + ringSize) % ringSize
134  		for i := 0; i < b.ringLen; i++ {
135  			idx := (start + i) % ringSize
136  			if b.ring[idx].ID > lastID {
137  				missed = append(missed, b.ring[idx])
138  			}
139  		}
140  	}
141  	return missed, ch
142  }
143  
144  // EventsSince returns events with ID > lastID from the ring buffer.
145  // Returns nil if the buffer is empty or the client is already up to date.
146  func (b *EventBus) EventsSince(lastID uint64) []Event {
147  	b.mu.RLock()
148  	defer b.mu.RUnlock()
149  	if b.ringLen == 0 || lastID >= b.nextID {
150  		return nil
151  	}
152  	var result []Event
153  	start := (b.ringHead - b.ringLen + ringSize) % ringSize
154  	for i := 0; i < b.ringLen; i++ {
155  		idx := (start + i) % ringSize
156  		if b.ring[idx].ID > lastID {
157  			result = append(result, b.ring[idx])
158  		}
159  	}
160  	return result
161  }
162  
163  // HasSubscribers reports whether at least one subscriber is currently attached.
164  // Retained for callers that only need a cheap liveness check. New delivery
165  // decisions should prefer EmitTo's return value instead.
166  func (b *EventBus) HasSubscribers() bool {
167  	b.mu.RLock()
168  	defer b.mu.RUnlock()
169  	return len(b.subscribers) > 0
170  }