/ actor / actor.go
actor.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"sync"
  6  
  7  	"github.com/lightningnetwork/lnd/fn/v2"
  8  )
  9  
 10  // ActorConfig holds the configuration parameters for creating a new Actor.
 11  // It is generic over M (Message type) and R (Response type) to accommodate
 12  // the actor's specific behavior.
 13  type ActorConfig[M Message, R any] struct {
 14  	// ID is the unique identifier for the actor.
 15  	ID string
 16  
 17  	// Behavior defines how the actor responds to messages.
 18  	Behavior ActorBehavior[M, R]
 19  
 20  	// DLO is a reference to the dead letter office for this actor system.
 21  	// If nil, undeliverable messages during shutdown or due to a full
 22  	// mailbox (if such logic were added) might be dropped.
 23  	DLO ActorRef[Message, any]
 24  
 25  	// MailboxSize defines the buffer capacity of the actor's mailbox.
 26  	MailboxSize int
 27  }
 28  
 29  // envelope wraps a message with its associated promise. This allows the sender
 30  // of an "ask" message to await a response. If the promise is nil, it
 31  // signifies a "tell" operation (fire-and-forget).
 32  type envelope[M Message, R any] struct {
 33  	message M
 34  	promise Promise[R]
 35  }
 36  
 37  // Actor represents a concrete actor implementation. It encapsulates a behavior,
 38  // manages its internal state implicitly through that behavior, and processes
 39  // messages from its mailbox sequentially in its own goroutine.
 40  type Actor[M Message, R any] struct {
 41  	// id is the unique identifier for the actor.
 42  	id string
 43  
 44  	// behavior defines how the actor responds to messages.
 45  	behavior ActorBehavior[M, R]
 46  
 47  	// mailbox is the incoming message queue for the actor.
 48  	mailbox Mailbox[M, R]
 49  
 50  	// ctx is the context governing the actor's lifecycle.
 51  	ctx context.Context
 52  
 53  	// cancel is the function to cancel the actor's context.
 54  	cancel context.CancelFunc
 55  
 56  	// dlo is a reference to the dead letter office for this actor system.
 57  	dlo ActorRef[Message, any]
 58  
 59  	// startOnce ensures the actor's processing loop is started only once.
 60  	startOnce sync.Once
 61  
 62  	// stopOnce ensures the actor's processing loop is stopped only once.
 63  	stopOnce sync.Once
 64  
 65  	// ref is the cached ActorRef for this actor.
 66  	ref ActorRef[M, R]
 67  }
 68  
 69  // NewActor creates a new actor instance with the given ID and behavior.
 70  // It initializes the actor's internal structures but does not start its
 71  // message processing goroutine. The Start() method must be called to begin
 72  // processing messages.
 73  func NewActor[M Message, R any](cfg ActorConfig[M, R]) (*Actor[M, R],
 74  	error) {
 75  
 76  	if cfg.ID == "" {
 77  		return nil, ErrEmptyActorID
 78  	}
 79  
 80  	if cfg.Behavior == nil {
 81  		return nil, ErrNilBehavior
 82  	}
 83  
 84  	ctx, cancel := context.WithCancel(context.Background())
 85  
 86  	// Ensure MailboxSize has a sane default if not specified or zero. A
 87  	// capacity of 0 would make the channel unbuffered, which is generally
 88  	// not desired for actor mailboxes.
 89  	mailboxCapacity := cfg.MailboxSize
 90  	if mailboxCapacity <= 0 {
 91  		// Default to a small capacity if an invalid one is given. This
 92  		// could also come from a global constant.
 93  		mailboxCapacity = 1
 94  	}
 95  
 96  	// Create mailbox - could be injected via config in the future.
 97  	mailbox := NewChannelMailbox[M, R](ctx, mailboxCapacity)
 98  
 99  	actor := &Actor[M, R]{
100  		id:       cfg.ID,
101  		behavior: cfg.Behavior,
102  		mailbox:  mailbox,
103  		ctx:      ctx,
104  		cancel:   cancel,
105  		dlo:      cfg.DLO,
106  	}
107  
108  	// Create and cache the actor's own reference.
109  	actor.ref = &actorRefImpl[M, R]{
110  		actor: actor,
111  	}
112  
113  	return actor, nil
114  }
115  
116  // Start initiates the actor's message processing loop in a new goroutine. This
117  // method should be called once after the actor is created.
118  func (a *Actor[M, R]) Start() {
119  	a.startOnce.Do(func() {
120  		log.Infof("Actor %s: starting", a.id)
121  
122  		go a.process()
123  	})
124  }
125  
126  // process is the main event loop for the actor. It continuously monitors its
127  // mailbox for incoming messages and its context for cancellation signals.
128  func (a *Actor[M, R]) process() {
129  	// Use the new iterator pattern for receiving messages.
130  	for env := range a.mailbox.Receive(a.ctx) {
131  		result := a.behavior.Receive(a.ctx, env.message)
132  
133  		// If a promise was provided (i.e., it was an "ask"
134  		// operation), complete the promise with the result from
135  		// the behavior.
136  		if env.promise != nil {
137  			env.promise.Complete(result)
138  		}
139  	}
140  
141  	// Context was cancelled or mailbox closed, drain remaining messages.
142  	a.mailbox.Close()
143  
144  	for env := range a.mailbox.Drain() {
145  		// If a DLO is configured, send the original message there
146  		// for auditing or potential manual reprocessing.
147  		if a.dlo != nil {
148  			a.dlo.Tell(context.Background(), env.message)
149  		}
150  
151  		// If it was an Ask, complete the promise with an error
152  		// indicating the actor terminated.
153  		if env.promise != nil {
154  			env.promise.Complete(fn.Err[R](ErrActorTerminated))
155  		}
156  	}
157  }
158  
159  // Stop signals the actor to terminate its processing loop and shut down.
160  // This is achieved by cancelling the actor's internal context. The actor's
161  // goroutine will exit once it detects the context cancellation.
162  func (a *Actor[M, R]) Stop() {
163  	a.stopOnce.Do(func() {
164  		log.Infof("Actor %s: stopping", a.id)
165  
166  		a.cancel()
167  	})
168  }
169  
170  // actorRefImpl provides a concrete implementation of the ActorRef interface. It
171  // holds a reference to the target Actor instance, enabling message sending.
172  type actorRefImpl[M Message, R any] struct {
173  	actor *Actor[M, R]
174  }
175  
176  // Tell sends a message without waiting for a response. If the context is
177  // cancelled before the message can be sent to the actor's mailbox, the message
178  // may be dropped.
179  //
180  //nolint:ll
181  func (ref *actorRefImpl[M, R]) Tell(ctx context.Context, msg M) {
182  	// If the actor's own context is already done, don't try to send.
183  	// Route to DLO if available.
184  	if ref.actor.ctx.Err() != nil {
185  		ref.trySendToDLO(msg)
186  		return
187  	}
188  
189  	env := envelope[M, R]{message: msg, promise: nil}
190  
191  	// Use mailbox Send method which internally checks both contexts.
192  	if !ref.actor.mailbox.Send(ctx, env) {
193  		// Failed to send - check if actor terminated.
194  		if ref.actor.ctx.Err() != nil {
195  			ref.trySendToDLO(msg)
196  		}
197  		// Otherwise it was the caller's context that cancelled.
198  	}
199  }
200  
201  // Ask sends a message and returns a Future for the response. The Future will be
202  // completed with the actor's reply or an error if the operation fails (e.g.,
203  // context cancellation before send).
204  //
205  //nolint:ll
206  func (ref *actorRefImpl[M, R]) Ask(ctx context.Context, msg M) Future[R] {
207  	// Create a new promise that will be fulfilled with the actor's response.
208  	promise := NewPromise[R]()
209  
210  	// If the actor's own context is already done, complete the promise with
211  	// ErrActorTerminated and return immediately. This is the primary guard
212  	// against trying to send to a stopped actor.
213  	if ref.actor.ctx.Err() != nil {
214  		promise.Complete(fn.Err[R](ErrActorTerminated))
215  		return promise.Future()
216  	}
217  
218  	// Check if the context is already done before attempting to send. This
219  	// ensures deterministic behavior and prevents a race where the message
220  	// could be enqueued even though the context was already cancelled.
221  	if ctx.Err() != nil {
222  		promise.Complete(fn.Err[R](ctx.Err()))
223  		return promise.Future()
224  	}
225  
226  	env := envelope[M, R]{message: msg, promise: promise}
227  
228  	// Use mailbox Send method which internally checks both contexts.
229  	if !ref.actor.mailbox.Send(ctx, env) {
230  		// Determine the error based on what failed.
231  		if ref.actor.ctx.Err() != nil {
232  			promise.Complete(fn.Err[R](ErrActorTerminated))
233  		} else {
234  			promise.Complete(fn.Err[R](ctx.Err()))
235  		}
236  	}
237  
238  	// Return the future associated with the promise, allowing the caller to
239  	// await the response.
240  	return promise.Future()
241  }
242  
243  // trySendToDLO attempts to send the message to the actor's DLO if configured.
244  func (ref *actorRefImpl[M, R]) trySendToDLO(msg M) {
245  	if ref.actor.dlo != nil {
246  		// Use context.Background() for sending to DLO as the
247  		// original context might be done or the operation
248  		// should not be bound by it.
249  		// This Tell to DLO is fire-and-forget.
250  		ref.actor.dlo.Tell(context.Background(), msg)
251  	}
252  }
253  
254  // ID returns the unique identifier for this actor.
255  func (ref *actorRefImpl[M, R]) ID() string {
256  	return ref.actor.id
257  }
258  
259  // Ref returns an ActorRef for this actor. This allows clients to interact with
260  // the actor (send messages) without having direct access to the Actor struct
261  // itself, promoting encapsulation and location transparency.
262  func (a *Actor[M, R]) Ref() ActorRef[M, R] {
263  	return a.ref
264  }
265  
266  // TellRef returns a TellOnlyRef for this actor. This allows clients to send
267  // messages to the actor using only the "tell" pattern (fire-and-forget),
268  // without having access to "ask" capabilities.
269  func (a *Actor[M, R]) TellRef() TellOnlyRef[M] {
270  	return a.ref
271  }