/ actor / mailbox.go
mailbox.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"iter"
  6  	"sync"
  7  	"sync/atomic"
  8  )
  9  
 10  // Mailbox represents the message queue for an actor. It provides methods for
 11  // sending messages and receiving them via an iterator pattern.
 12  type Mailbox[M Message, R any] interface {
 13  	// Send attempts to send an envelope to the mailbox with context-based
 14  	// cancellation. Returns true if sent successfully, false if the
 15  	// context was cancelled or the mailbox is closed.
 16  	Send(ctx context.Context, env envelope[M, R]) bool
 17  
 18  	// TrySend attempts to send without blocking. Returns true if the
 19  	// envelope was sent, false if the mailbox is full or closed.
 20  	TrySend(env envelope[M, R]) bool
 21  
 22  	// Receive returns an iterator for consuming messages from the mailbox.
 23  	// The iterator will yield messages until the mailbox is closed or the
 24  	// context is cancelled.
 25  	Receive(ctx context.Context) iter.Seq[envelope[M, R]]
 26  
 27  	// Close closes the mailbox, preventing new messages from being sent.
 28  	// Any remaining messages can still be consumed via Receive.
 29  	Close()
 30  
 31  	// IsClosed returns true if the mailbox has been closed.
 32  	IsClosed() bool
 33  
 34  	// Drain returns an iterator that yields all remaining messages in the
 35  	// mailbox after it has been closed. This is useful for cleanup.
 36  	Drain() iter.Seq[envelope[M, R]]
 37  }
 38  
 39  // ChannelMailbox is a channel-based implementation of the Mailbox interface.
 40  type ChannelMailbox[M Message, R any] struct {
 41  	ch     chan envelope[M, R]
 42  	closed atomic.Bool
 43  
 44  	// mu protects Send/TrySend operations to prevent send-on-closed-channel
 45  	// panics. Close() acquires write lock, Send/TrySend acquire read lock.
 46  	mu sync.RWMutex
 47  
 48  	// closeOnce ensures Close() executes exactly once.
 49  	closeOnce sync.Once
 50  
 51  	// actorCtx is the actor's context for lifecycle management.
 52  	actorCtx context.Context
 53  }
 54  
 55  // NewChannelMailbox creates a new channel-based mailbox with the specified
 56  // buffer capacity and actor context.
 57  func NewChannelMailbox[M Message, R any](actorCtx context.Context,
 58  	capacity int) *ChannelMailbox[M, R] {
 59  
 60  	if capacity <= 0 {
 61  		capacity = 1
 62  	}
 63  	return &ChannelMailbox[M, R]{
 64  		ch:       make(chan envelope[M, R], capacity),
 65  		actorCtx: actorCtx,
 66  	}
 67  }
 68  
 69  // Send implements Mailbox.Send with context-aware blocking send.
 70  func (m *ChannelMailbox[M, R]) Send(ctx context.Context,
 71  	env envelope[M, R]) bool {
 72  
 73  	m.mu.RLock()
 74  	defer m.mu.RUnlock()
 75  
 76  	if m.IsClosed() {
 77  		return false
 78  	}
 79  
 80  	select {
 81  	case m.ch <- env:
 82  		return true
 83  	case <-ctx.Done():
 84  		return false
 85  	case <-m.actorCtx.Done():
 86  		// Actor is shutting down.
 87  		return false
 88  	}
 89  }
 90  
 91  // TrySend implements Mailbox.TrySend with non-blocking send.
 92  func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool {
 93  	m.mu.RLock()
 94  	defer m.mu.RUnlock()
 95  
 96  	if m.IsClosed() {
 97  		return false
 98  	}
 99  
100  	select {
101  	case m.ch <- env:
102  		return true
103  	default:
104  		return false
105  	}
106  }
107  
108  // Receive implements Mailbox.Receive using iter.Seq pattern.
109  func (m *ChannelMailbox[M, R]) Receive(
110  	ctx context.Context) iter.Seq[envelope[M, R]] {
111  	return func(yield func(envelope[M, R]) bool) {
112  		for {
113  			select {
114  			case env, ok := <-m.ch:
115  				if !ok {
116  					return
117  				}
118  
119  				if !yield(env) {
120  					return
121  				}
122  
123  			case <-ctx.Done():
124  				return
125  
126  			case <-m.actorCtx.Done():
127  				return
128  			}
129  		}
130  	}
131  }
132  
133  // Close implements Mailbox.Close.
134  func (m *ChannelMailbox[M, R]) Close() {
135  	m.closeOnce.Do(func() {
136  		m.mu.Lock()
137  		defer m.mu.Unlock()
138  
139  		m.closed.Store(true)
140  
141  		close(m.ch)
142  	})
143  }
144  
145  // IsClosed implements Mailbox.IsClosed.
146  func (m *ChannelMailbox[M, R]) IsClosed() bool {
147  	return m.closed.Load()
148  }
149  
150  // Drain implements Mailbox.Drain for cleanup after close.
151  func (m *ChannelMailbox[M, R]) Drain() iter.Seq[envelope[M, R]] {
152  	return func(yield func(envelope[M, R]) bool) {
153  		// Only drain if closed.
154  		if !m.IsClosed() {
155  			return
156  		}
157  
158  		// Drain all remaining messages from the channel.
159  		for {
160  			select {
161  			case env, ok := <-m.ch:
162  				// Channel closed, nothing left to drain.
163  				if !ok {
164  					return
165  				}
166  
167  				if !yield(env) {
168  					return
169  				}
170  			default:
171  				// Channel empty, done draining.
172  				return
173  			}
174  		}
175  	}
176  }