system.go
1 package actor 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "sync" 8 9 "github.com/lightningnetwork/lnd/fn/v2" 10 ) 11 12 // stoppable defines an interface for components that can be stopped. 13 // This is unexported as it's an internal detail of ActorSystem for managing 14 // actors that need to be shut down. 15 type stoppable interface { 16 Stop() 17 } 18 19 // SystemConfig holds configuration parameters for the ActorSystem. 20 type SystemConfig struct { 21 // MailboxCapacity is the default capacity for actor mailboxes. 22 MailboxCapacity int 23 } 24 25 // DefaultConfig returns a default configuration for the ActorSystem. 26 // The default mailbox capacity of 100 means each actor can buffer up to 100 27 // pending messages (envelopes). Each envelope holds a message and an optional 28 // promise pointer, so the memory overhead per actor is roughly proportional to 29 // the size of the messages being sent multiplied by this capacity. 30 func DefaultConfig() SystemConfig { 31 return SystemConfig{ 32 MailboxCapacity: 100, 33 } 34 } 35 36 // ActorSystem manages the lifecycle of actors and provides coordination 37 // services such as a receptionist for actor discovery and a dead letter office 38 // for undeliverable messages. It also handles the graceful shutdown of all 39 // managed actors. 40 type ActorSystem struct { 41 // receptionist is used for actor discovery. 42 receptionist *Receptionist 43 44 // actors stores all actors managed by the system, keyed by their ID. 45 // This includes the deadLetterActor. 46 actors map[string]stoppable 47 48 // deadLetterActor handles undeliverable messages. 49 deadLetterActor ActorRef[Message, any] 50 51 // config holds the system-wide configuration. 52 config SystemConfig 53 54 // mu protects the 'actors' map. 55 mu sync.RWMutex 56 57 // ctx is the main context for the actor system. 58 ctx context.Context 59 60 // cancel cancels the main system context. 61 cancel context.CancelFunc 62 } 63 64 // NewActorSystem creates a new actor system using the default configuration. 65 func NewActorSystem() *ActorSystem { 66 return NewActorSystemWithConfig(DefaultConfig()) 67 } 68 69 // NewActorSystemWithConfig creates a new actor system with custom configuration 70 func NewActorSystemWithConfig(config SystemConfig) *ActorSystem { 71 ctx, cancel := context.WithCancel(context.Background()) 72 73 // Initialize the core ActorSystem components. 74 system := &ActorSystem{ 75 receptionist: newReceptionist(), 76 config: config, 77 actors: make(map[string]stoppable), 78 ctx: ctx, 79 cancel: cancel, 80 } 81 82 // Define the behavior for the dead letter actor. It logs undeliverable 83 // messages and returns an error. 84 deadLetterBehavior := NewFunctionBehavior( 85 func(ctx context.Context, msg Message) fn.Result[any] { 86 log.Warnf("Dead letter received: message type=%s", 87 msg.MessageType()) 88 89 return fn.Err[any](errors.New( 90 "message undeliverable: " + msg.MessageType(), 91 )) 92 }, 93 ) 94 95 // Create the raw dead letter actor (*Actor instance). The DLO's own DLO 96 // reference is nil to prevent loops if messages to the DLO itself fail. 97 deadLetterActorCfg := ActorConfig[Message, any]{ 98 ID: "dead-letters", 99 Behavior: deadLetterBehavior, 100 DLO: nil, 101 MailboxSize: config.MailboxCapacity, 102 } 103 deadLetterRawActor, err := NewActor[Message, any](deadLetterActorCfg) 104 if err != nil { 105 // This should never happen since we control the DLO config. 106 panic("failed to create dead letter actor: " + err.Error()) 107 } 108 deadLetterRawActor.Start() 109 system.deadLetterActor = deadLetterRawActor.Ref() 110 111 // Add the raw actor to the map of stoppable actors. No lock needed here 112 // as 'system' is not yet accessible concurrently. 113 system.actors[deadLetterRawActor.id] = deadLetterRawActor 114 115 // The system is now fully initialized and ready. 116 return system 117 } 118 119 // RegisterWithSystem creates an actor with the given ID, service key, and 120 // behavior within the specified ActorSystem. It starts the actor, adds it to 121 // the system's management, registers it with the receptionist using the 122 // provided key, and returns its ActorRef. 123 func RegisterWithSystem[M Message, R any](as *ActorSystem, id string, 124 key ServiceKey[M, R], 125 behavior ActorBehavior[M, R]) (ActorRef[M, R], error) { 126 127 actorCfg := ActorConfig[M, R]{ 128 ID: id, 129 Behavior: behavior, 130 DLO: as.deadLetterActor, 131 MailboxSize: as.config.MailboxCapacity, 132 } 133 // Check for duplicate actor ID before creating the actor. 134 as.mu.Lock() 135 if _, exists := as.actors[id]; exists { 136 as.mu.Unlock() 137 138 return nil, fmt.Errorf("%w: %s", ErrDuplicateActorID, id) 139 } 140 141 actorInstance, err := NewActor(actorCfg) 142 if err != nil { 143 as.mu.Unlock() 144 145 return nil, err 146 } 147 actorInstance.Start() 148 149 // Add the actor instance to the system's list of stoppable actors. 150 as.actors[actorInstance.id] = actorInstance 151 as.mu.Unlock() 152 153 log.Infof("ActorSystem: registered actor %s with service key %s", 154 id, key.name) 155 156 // Register the actor's reference with the receptionist under the given 157 // service key, making it discoverable by other parts of the system. 158 RegisterWithReceptionist(as.receptionist, key, actorInstance.Ref()) 159 160 return actorInstance.Ref(), nil 161 } 162 163 // Receptionist returns the system's receptionist, which can be used for 164 // actor service discovery (finding actors by ServiceKey). 165 func (as *ActorSystem) Receptionist() *Receptionist { 166 return as.receptionist 167 } 168 169 // DeadLetters returns a reference to the system's dead letter actor. Messages 170 // that cannot be delivered to their intended recipient (e.g., if an Ask 171 // context is cancelled before enqueuing) may be routed here if not otherwise 172 // handled. 173 func (as *ActorSystem) DeadLetters() ActorRef[Message, any] { 174 return as.deadLetterActor 175 } 176 177 // Shutdown gracefully stops the actor system. It iterates through all managed 178 // actors, including the dead letter actor, and calls their Stop method. 179 // After initiating the stop for all actors, it cancels the main system context. 180 // This method is safe for concurrent use. 181 func (as *ActorSystem) Shutdown() error { 182 log.Infof("ActorSystem: initiating shutdown") 183 184 // Create a slice of actors to stop. This avoids holding the lock while 185 // calling Stop() on each actor, and includes the dead letter actor. 186 var actorsToStop []stoppable 187 as.mu.RLock() 188 for _, actor := range as.actors { 189 actorsToStop = append(actorsToStop, actor) 190 } 191 as.mu.RUnlock() 192 193 // Notify all managed actors to stop. Actor.Stop() is non-blocking. 194 // Each actor's Stop method will cancel its internal context, leading 195 // to the termination of its processing goroutine. 196 for _, actor := range actorsToStop { 197 actor.Stop() 198 } 199 200 // Clear the actors map after initiating their shutdown. 201 as.mu.Lock() 202 as.actors = nil 203 as.mu.Unlock() 204 205 // Finally cancel the main context 206 // This signals to any other components observing the system's context 207 // that shutdown has been initiated. 208 as.cancel() 209 210 return nil 211 } 212 213 // StopAndRemoveActor stops a specific actor by its ID and removes it from the 214 // ActorSystem's management. It returns true if the actor was found and stopped, 215 // false otherwise. 216 func (as *ActorSystem) StopAndRemoveActor(id string) bool { 217 as.mu.Lock() 218 defer as.mu.Unlock() 219 220 actorToStop, exists := as.actors[id] 221 if !exists { 222 return false 223 } 224 225 // Stop the actor. This is non-blocking. 226 actorToStop.Stop() 227 228 // Remove from the system's management. 229 delete(as.actors, id) 230 231 return true 232 } 233 234 // UnregisterFromReceptionist removes an actor reference from a service key in 235 // the given receptionist. It returns true if the reference was found and 236 // removed, and false otherwise. This is a package-level generic function 237 // because methods cannot have their own type parameters in Go. 238 func UnregisterFromReceptionist[M Message, R any](r *Receptionist, 239 key ServiceKey[M, R], refToRemove ActorRef[M, R]) bool { 240 241 r.mu.Lock() 242 defer r.mu.Unlock() 243 244 refs, exists := r.registrations[key.name] 245 if !exists { 246 return false 247 } 248 249 found := false 250 251 // Build a new slice containing only the references that are not the one 252 // to be removed. 253 newRefs := make([]any, 0, max(0, len(refs)-1)) 254 for _, itemInSlice := range refs { 255 // Try to assert the item from the slice to the specific 256 // ActorRef[M,R] type we are trying to remove. 257 if specificActorRef, ok := itemInSlice.(ActorRef[M, R]); ok { 258 // If the type assertion is successful and it's the one 259 // we want to remove, mark as found and skip adding it 260 // to newRefs. 261 if specificActorRef == refToRemove { 262 found = true 263 continue 264 } 265 } 266 newRefs = append(newRefs, itemInSlice) 267 } 268 269 if !found { 270 return false 271 } 272 273 // If the new list of references is empty, remove the key from the map. 274 // Otherwise, update the map with the new slice. 275 if len(newRefs) == 0 { 276 delete(r.registrations, key.name) 277 } else { 278 r.registrations[key.name] = newRefs 279 } 280 281 return true 282 } 283 284 // ServiceKey is a type-safe identifier used for registering and discovering 285 // actors via the Receptionist. The generic type parameters M (Message) and R 286 // (Response) ensure that only actors handling compatible message/response types 287 // are associated with and retrieved for this key. 288 type ServiceKey[M Message, R any] struct { 289 name string 290 } 291 292 // NewServiceKey creates a new service key with the given name. The name is used 293 // as the lookup key within the Receptionist. 294 func NewServiceKey[M Message, R any](name string) ServiceKey[M, R] { 295 return ServiceKey[M, R]{name: name} 296 } 297 298 // Spawn registers an actor for this service key within the given ActorSystem. 299 // It's a convenience method that calls RegisterWithSystem, starting the actor 300 // and registering it with the receptionist. 301 func (sk ServiceKey[M, R]) Spawn(as *ActorSystem, id string, 302 behavior ActorBehavior[M, R]) (ActorRef[M, R], error) { 303 304 return RegisterWithSystem(as, id, sk, behavior) 305 } 306 307 // Unregister removes an actor reference associated with this service key from 308 // the ActorSystem's receptionist and also stops the actor. 309 // It returns true if the actor was successfully unregistered from the 310 // receptionist AND successfully stopped and removed from the system's 311 // management. Otherwise, it returns false. 312 func (sk ServiceKey[M, R]) Unregister(as *ActorSystem, 313 refToRemove ActorRef[M, R]) bool { 314 315 unregisteredFromReceptionist := UnregisterFromReceptionist( 316 as.Receptionist(), sk, refToRemove, 317 ) 318 319 // If not found in receptionist, no need to try stopping. 320 if !unregisteredFromReceptionist { 321 return false 322 } 323 324 // Attempt to stop and remove the actor from the system. 325 stoppedAndRemoved := as.StopAndRemoveActor(refToRemove.ID()) 326 327 return unregisteredFromReceptionist && stoppedAndRemoved 328 } 329 330 // UnregisterAll finds all actor references associated with this service key in 331 // the ActorSystem's receptionist. For each found actor, it attempts to stop it 332 // and remove it from system management, and also unregisters it from the 333 // receptionist. 334 func (sk ServiceKey[M, R]) UnregisterAll(as *ActorSystem) int { 335 // First find all the refs that match this service key. 336 refsFound := FindInReceptionist(as.Receptionist(), sk) 337 338 actorsStoppedCount := 0 339 for _, ref := range refsFound { 340 // Attempt to stop and remove the actor from the system's active 341 // management. This is the primary action to deactivate the 342 // actor. If StopAndRemoveActor returns true, it means an active 343 // actor was found in the system's `actors` map and was stopped. 344 if as.StopAndRemoveActor(ref.ID()) { 345 actorsStoppedCount++ 346 } 347 348 // Regardless of whether the actor was actively managed by the 349 // system (i.e., found in as.actors), attempt to unregister its 350 // reference from the receptionist. This helps clean up any 351 // potentially stale entries in the receptionist if an actor was 352 // removed from the system's management without also being 353 // unregistered from the receptionist. 354 UnregisterFromReceptionist(as.Receptionist(), sk, ref) 355 } 356 357 return actorsStoppedCount 358 } 359 360 // Receptionist provides service discovery for actors. Actors can be registered 361 // under a ServiceKey and later discovered by other actors or system components. 362 type Receptionist struct { 363 // registrations stores ActorRef instances, keyed by ServiceKey.name. 364 registrations map[string][]any 365 366 // mu protects access to registrations. 367 mu sync.RWMutex 368 } 369 370 // newReceptionist creates a new Receptionist instance. 371 func newReceptionist() *Receptionist { 372 return &Receptionist{ 373 registrations: make(map[string][]any), 374 } 375 } 376 377 // RegisterWithReceptionist registers an actor with a service key in the given 378 // receptionist. This is a package-level generic function because methods 379 // cannot have their own type parameters in Go (as of the current version). 380 // It appends the actor reference to the list associated with the key's name. 381 func RegisterWithReceptionist[M Message, R any](r *Receptionist, 382 key ServiceKey[M, R], ref ActorRef[M, R]) { 383 384 r.mu.Lock() 385 defer r.mu.Unlock() 386 387 // Initialize the slice for this key if it's the first registration. 388 if _, exists := r.registrations[key.name]; !exists { 389 r.registrations[key.name] = make([]any, 0) 390 } 391 392 r.registrations[key.name] = append(r.registrations[key.name], ref) 393 } 394 395 // FindInReceptionist returns all actors registered with a service key in the 396 // given receptionist. This is a package-level generic function because methods 397 // cannot have their own type parameters. It performs a type assertion to ensure 398 // that only ActorRefs matching the ServiceKey's generic types (M, R) are 399 // returned, providing type safety. 400 func FindInReceptionist[M Message, R any](r *Receptionist, 401 key ServiceKey[M, R]) []ActorRef[M, R] { 402 403 r.mu.RLock() 404 defer r.mu.RUnlock() 405 406 if refs, exists := r.registrations[key.name]; exists { 407 typedRefs := make([]ActorRef[M, R], 0, len(refs)) 408 for _, ref := range refs { 409 // Make sure that the reference is of the correct type. 410 // This type assertion is crucial for type safety, ensuring 411 // that the returned ActorRefs match the expected M and R. 412 if typedRef, ok := ref.(ActorRef[M, R]); ok { 413 typedRefs = append(typedRefs, typedRef) 414 } 415 } 416 417 return typedRefs 418 } 419 420 return nil 421 }