actor.go
1 package actor 2 3 import ( 4 "context" 5 "sync" 6 7 "github.com/lightningnetwork/lnd/fn/v2" 8 ) 9 10 // ActorConfig holds the configuration parameters for creating a new Actor. 11 // It is generic over M (Message type) and R (Response type) to accommodate 12 // the actor's specific behavior. 13 type ActorConfig[M Message, R any] struct { 14 // ID is the unique identifier for the actor. 15 ID string 16 17 // Behavior defines how the actor responds to messages. 18 Behavior ActorBehavior[M, R] 19 20 // DLO is a reference to the dead letter office for this actor system. 21 // If nil, undeliverable messages during shutdown or due to a full 22 // mailbox (if such logic were added) might be dropped. 23 DLO ActorRef[Message, any] 24 25 // MailboxSize defines the buffer capacity of the actor's mailbox. 26 MailboxSize int 27 } 28 29 // envelope wraps a message with its associated promise. This allows the sender 30 // of an "ask" message to await a response. If the promise is nil, it 31 // signifies a "tell" operation (fire-and-forget). 32 type envelope[M Message, R any] struct { 33 message M 34 promise Promise[R] 35 } 36 37 // Actor represents a concrete actor implementation. It encapsulates a behavior, 38 // manages its internal state implicitly through that behavior, and processes 39 // messages from its mailbox sequentially in its own goroutine. 40 type Actor[M Message, R any] struct { 41 // id is the unique identifier for the actor. 42 id string 43 44 // behavior defines how the actor responds to messages. 45 behavior ActorBehavior[M, R] 46 47 // mailbox is the incoming message queue for the actor. 48 mailbox Mailbox[M, R] 49 50 // ctx is the context governing the actor's lifecycle. 51 ctx context.Context 52 53 // cancel is the function to cancel the actor's context. 54 cancel context.CancelFunc 55 56 // dlo is a reference to the dead letter office for this actor system. 57 dlo ActorRef[Message, any] 58 59 // startOnce ensures the actor's processing loop is started only once. 60 startOnce sync.Once 61 62 // stopOnce ensures the actor's processing loop is stopped only once. 63 stopOnce sync.Once 64 65 // ref is the cached ActorRef for this actor. 66 ref ActorRef[M, R] 67 } 68 69 // NewActor creates a new actor instance with the given ID and behavior. 70 // It initializes the actor's internal structures but does not start its 71 // message processing goroutine. The Start() method must be called to begin 72 // processing messages. 73 func NewActor[M Message, R any](cfg ActorConfig[M, R]) (*Actor[M, R], 74 error) { 75 76 if cfg.ID == "" { 77 return nil, ErrEmptyActorID 78 } 79 80 if cfg.Behavior == nil { 81 return nil, ErrNilBehavior 82 } 83 84 ctx, cancel := context.WithCancel(context.Background()) 85 86 // Ensure MailboxSize has a sane default if not specified or zero. A 87 // capacity of 0 would make the channel unbuffered, which is generally 88 // not desired for actor mailboxes. 89 mailboxCapacity := cfg.MailboxSize 90 if mailboxCapacity <= 0 { 91 // Default to a small capacity if an invalid one is given. This 92 // could also come from a global constant. 93 mailboxCapacity = 1 94 } 95 96 // Create mailbox - could be injected via config in the future. 97 mailbox := NewChannelMailbox[M, R](ctx, mailboxCapacity) 98 99 actor := &Actor[M, R]{ 100 id: cfg.ID, 101 behavior: cfg.Behavior, 102 mailbox: mailbox, 103 ctx: ctx, 104 cancel: cancel, 105 dlo: cfg.DLO, 106 } 107 108 // Create and cache the actor's own reference. 109 actor.ref = &actorRefImpl[M, R]{ 110 actor: actor, 111 } 112 113 return actor, nil 114 } 115 116 // Start initiates the actor's message processing loop in a new goroutine. This 117 // method should be called once after the actor is created. 118 func (a *Actor[M, R]) Start() { 119 a.startOnce.Do(func() { 120 log.Infof("Actor %s: starting", a.id) 121 122 go a.process() 123 }) 124 } 125 126 // process is the main event loop for the actor. It continuously monitors its 127 // mailbox for incoming messages and its context for cancellation signals. 128 func (a *Actor[M, R]) process() { 129 // Use the new iterator pattern for receiving messages. 130 for env := range a.mailbox.Receive(a.ctx) { 131 result := a.behavior.Receive(a.ctx, env.message) 132 133 // If a promise was provided (i.e., it was an "ask" 134 // operation), complete the promise with the result from 135 // the behavior. 136 if env.promise != nil { 137 env.promise.Complete(result) 138 } 139 } 140 141 // Context was cancelled or mailbox closed, drain remaining messages. 142 a.mailbox.Close() 143 144 for env := range a.mailbox.Drain() { 145 // If a DLO is configured, send the original message there 146 // for auditing or potential manual reprocessing. 147 if a.dlo != nil { 148 a.dlo.Tell(context.Background(), env.message) 149 } 150 151 // If it was an Ask, complete the promise with an error 152 // indicating the actor terminated. 153 if env.promise != nil { 154 env.promise.Complete(fn.Err[R](ErrActorTerminated)) 155 } 156 } 157 } 158 159 // Stop signals the actor to terminate its processing loop and shut down. 160 // This is achieved by cancelling the actor's internal context. The actor's 161 // goroutine will exit once it detects the context cancellation. 162 func (a *Actor[M, R]) Stop() { 163 a.stopOnce.Do(func() { 164 log.Infof("Actor %s: stopping", a.id) 165 166 a.cancel() 167 }) 168 } 169 170 // actorRefImpl provides a concrete implementation of the ActorRef interface. It 171 // holds a reference to the target Actor instance, enabling message sending. 172 type actorRefImpl[M Message, R any] struct { 173 actor *Actor[M, R] 174 } 175 176 // Tell sends a message without waiting for a response. If the context is 177 // cancelled before the message can be sent to the actor's mailbox, the message 178 // may be dropped. 179 // 180 //nolint:ll 181 func (ref *actorRefImpl[M, R]) Tell(ctx context.Context, msg M) { 182 // If the actor's own context is already done, don't try to send. 183 // Route to DLO if available. 184 if ref.actor.ctx.Err() != nil { 185 ref.trySendToDLO(msg) 186 return 187 } 188 189 env := envelope[M, R]{message: msg, promise: nil} 190 191 // Use mailbox Send method which internally checks both contexts. 192 if !ref.actor.mailbox.Send(ctx, env) { 193 // Failed to send - check if actor terminated. 194 if ref.actor.ctx.Err() != nil { 195 ref.trySendToDLO(msg) 196 } 197 // Otherwise it was the caller's context that cancelled. 198 } 199 } 200 201 // Ask sends a message and returns a Future for the response. The Future will be 202 // completed with the actor's reply or an error if the operation fails (e.g., 203 // context cancellation before send). 204 // 205 //nolint:ll 206 func (ref *actorRefImpl[M, R]) Ask(ctx context.Context, msg M) Future[R] { 207 // Create a new promise that will be fulfilled with the actor's response. 208 promise := NewPromise[R]() 209 210 // If the actor's own context is already done, complete the promise with 211 // ErrActorTerminated and return immediately. This is the primary guard 212 // against trying to send to a stopped actor. 213 if ref.actor.ctx.Err() != nil { 214 promise.Complete(fn.Err[R](ErrActorTerminated)) 215 return promise.Future() 216 } 217 218 // Check if the context is already done before attempting to send. This 219 // ensures deterministic behavior and prevents a race where the message 220 // could be enqueued even though the context was already cancelled. 221 if ctx.Err() != nil { 222 promise.Complete(fn.Err[R](ctx.Err())) 223 return promise.Future() 224 } 225 226 env := envelope[M, R]{message: msg, promise: promise} 227 228 // Use mailbox Send method which internally checks both contexts. 229 if !ref.actor.mailbox.Send(ctx, env) { 230 // Determine the error based on what failed. 231 if ref.actor.ctx.Err() != nil { 232 promise.Complete(fn.Err[R](ErrActorTerminated)) 233 } else { 234 promise.Complete(fn.Err[R](ctx.Err())) 235 } 236 } 237 238 // Return the future associated with the promise, allowing the caller to 239 // await the response. 240 return promise.Future() 241 } 242 243 // trySendToDLO attempts to send the message to the actor's DLO if configured. 244 func (ref *actorRefImpl[M, R]) trySendToDLO(msg M) { 245 if ref.actor.dlo != nil { 246 // Use context.Background() for sending to DLO as the 247 // original context might be done or the operation 248 // should not be bound by it. 249 // This Tell to DLO is fire-and-forget. 250 ref.actor.dlo.Tell(context.Background(), msg) 251 } 252 } 253 254 // ID returns the unique identifier for this actor. 255 func (ref *actorRefImpl[M, R]) ID() string { 256 return ref.actor.id 257 } 258 259 // Ref returns an ActorRef for this actor. This allows clients to interact with 260 // the actor (send messages) without having direct access to the Actor struct 261 // itself, promoting encapsulation and location transparency. 262 func (a *Actor[M, R]) Ref() ActorRef[M, R] { 263 return a.ref 264 } 265 266 // TellRef returns a TellOnlyRef for this actor. This allows clients to send 267 // messages to the actor using only the "tell" pattern (fire-and-forget), 268 // without having access to "ask" capabilities. 269 func (a *Actor[M, R]) TellRef() TellOnlyRef[M] { 270 return a.ref 271 }