mailbox.go
1 package actor 2 3 import ( 4 "context" 5 "iter" 6 "sync" 7 "sync/atomic" 8 ) 9 10 // Mailbox represents the message queue for an actor. It provides methods for 11 // sending messages and receiving them via an iterator pattern. 12 type Mailbox[M Message, R any] interface { 13 // Send attempts to send an envelope to the mailbox with context-based 14 // cancellation. Returns true if sent successfully, false if the 15 // context was cancelled or the mailbox is closed. 16 Send(ctx context.Context, env envelope[M, R]) bool 17 18 // TrySend attempts to send without blocking. Returns true if the 19 // envelope was sent, false if the mailbox is full or closed. 20 TrySend(env envelope[M, R]) bool 21 22 // Receive returns an iterator for consuming messages from the mailbox. 23 // The iterator will yield messages until the mailbox is closed or the 24 // context is cancelled. 25 Receive(ctx context.Context) iter.Seq[envelope[M, R]] 26 27 // Close closes the mailbox, preventing new messages from being sent. 28 // Any remaining messages can still be consumed via Receive. 29 Close() 30 31 // IsClosed returns true if the mailbox has been closed. 32 IsClosed() bool 33 34 // Drain returns an iterator that yields all remaining messages in the 35 // mailbox after it has been closed. This is useful for cleanup. 36 Drain() iter.Seq[envelope[M, R]] 37 } 38 39 // ChannelMailbox is a channel-based implementation of the Mailbox interface. 40 type ChannelMailbox[M Message, R any] struct { 41 ch chan envelope[M, R] 42 closed atomic.Bool 43 44 // mu protects Send/TrySend operations to prevent send-on-closed-channel 45 // panics. Close() acquires write lock, Send/TrySend acquire read lock. 46 mu sync.RWMutex 47 48 // closeOnce ensures Close() executes exactly once. 49 closeOnce sync.Once 50 51 // actorCtx is the actor's context for lifecycle management. 52 actorCtx context.Context 53 } 54 55 // NewChannelMailbox creates a new channel-based mailbox with the specified 56 // buffer capacity and actor context. 57 func NewChannelMailbox[M Message, R any](actorCtx context.Context, 58 capacity int) *ChannelMailbox[M, R] { 59 60 if capacity <= 0 { 61 capacity = 1 62 } 63 return &ChannelMailbox[M, R]{ 64 ch: make(chan envelope[M, R], capacity), 65 actorCtx: actorCtx, 66 } 67 } 68 69 // Send implements Mailbox.Send with context-aware blocking send. 70 func (m *ChannelMailbox[M, R]) Send(ctx context.Context, 71 env envelope[M, R]) bool { 72 73 m.mu.RLock() 74 defer m.mu.RUnlock() 75 76 if m.IsClosed() { 77 return false 78 } 79 80 select { 81 case m.ch <- env: 82 return true 83 case <-ctx.Done(): 84 return false 85 case <-m.actorCtx.Done(): 86 // Actor is shutting down. 87 return false 88 } 89 } 90 91 // TrySend implements Mailbox.TrySend with non-blocking send. 92 func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool { 93 m.mu.RLock() 94 defer m.mu.RUnlock() 95 96 if m.IsClosed() { 97 return false 98 } 99 100 select { 101 case m.ch <- env: 102 return true 103 default: 104 return false 105 } 106 } 107 108 // Receive implements Mailbox.Receive using iter.Seq pattern. 109 func (m *ChannelMailbox[M, R]) Receive( 110 ctx context.Context) iter.Seq[envelope[M, R]] { 111 return func(yield func(envelope[M, R]) bool) { 112 for { 113 select { 114 case env, ok := <-m.ch: 115 if !ok { 116 return 117 } 118 119 if !yield(env) { 120 return 121 } 122 123 case <-ctx.Done(): 124 return 125 126 case <-m.actorCtx.Done(): 127 return 128 } 129 } 130 } 131 } 132 133 // Close implements Mailbox.Close. 134 func (m *ChannelMailbox[M, R]) Close() { 135 m.closeOnce.Do(func() { 136 m.mu.Lock() 137 defer m.mu.Unlock() 138 139 m.closed.Store(true) 140 141 close(m.ch) 142 }) 143 } 144 145 // IsClosed implements Mailbox.IsClosed. 146 func (m *ChannelMailbox[M, R]) IsClosed() bool { 147 return m.closed.Load() 148 } 149 150 // Drain implements Mailbox.Drain for cleanup after close. 151 func (m *ChannelMailbox[M, R]) Drain() iter.Seq[envelope[M, R]] { 152 return func(yield func(envelope[M, R]) bool) { 153 // Only drain if closed. 154 if !m.IsClosed() { 155 return 156 } 157 158 // Drain all remaining messages from the channel. 159 for { 160 select { 161 case env, ok := <-m.ch: 162 // Channel closed, nothing left to drain. 163 if !ok { 164 return 165 } 166 167 if !yield(env) { 168 return 169 } 170 default: 171 // Channel empty, done draining. 172 return 173 } 174 } 175 } 176 }