/ actor / system.go
system.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"fmt"
  7  	"sync"
  8  
  9  	"github.com/lightningnetwork/lnd/fn/v2"
 10  )
 11  
 12  // stoppable defines an interface for components that can be stopped.
 13  // This is unexported as it's an internal detail of ActorSystem for managing
 14  // actors that need to be shut down.
 15  type stoppable interface {
 16  	Stop()
 17  }
 18  
 19  // SystemConfig holds configuration parameters for the ActorSystem.
 20  type SystemConfig struct {
 21  	// MailboxCapacity is the default capacity for actor mailboxes.
 22  	MailboxCapacity int
 23  }
 24  
 25  // DefaultConfig returns a default configuration for the ActorSystem.
 26  // The default mailbox capacity of 100 means each actor can buffer up to 100
 27  // pending messages (envelopes). Each envelope holds a message and an optional
 28  // promise pointer, so the memory overhead per actor is roughly proportional to
 29  // the size of the messages being sent multiplied by this capacity.
 30  func DefaultConfig() SystemConfig {
 31  	return SystemConfig{
 32  		MailboxCapacity: 100,
 33  	}
 34  }
 35  
 36  // ActorSystem manages the lifecycle of actors and provides coordination
 37  // services such as a receptionist for actor discovery and a dead letter office
 38  // for undeliverable messages. It also handles the graceful shutdown of all
 39  // managed actors.
 40  type ActorSystem struct {
 41  	// receptionist is used for actor discovery.
 42  	receptionist *Receptionist
 43  
 44  	// actors stores all actors managed by the system, keyed by their ID.
 45  	// This includes the deadLetterActor.
 46  	actors map[string]stoppable
 47  
 48  	// deadLetterActor handles undeliverable messages.
 49  	deadLetterActor ActorRef[Message, any]
 50  
 51  	// config holds the system-wide configuration.
 52  	config SystemConfig
 53  
 54  	// mu protects the 'actors' map.
 55  	mu sync.RWMutex
 56  
 57  	// ctx is the main context for the actor system.
 58  	ctx context.Context
 59  
 60  	// cancel cancels the main system context.
 61  	cancel context.CancelFunc
 62  }
 63  
 64  // NewActorSystem creates a new actor system using the default configuration.
 65  func NewActorSystem() *ActorSystem {
 66  	return NewActorSystemWithConfig(DefaultConfig())
 67  }
 68  
 69  // NewActorSystemWithConfig creates a new actor system with custom configuration
 70  func NewActorSystemWithConfig(config SystemConfig) *ActorSystem {
 71  	ctx, cancel := context.WithCancel(context.Background())
 72  
 73  	// Initialize the core ActorSystem components.
 74  	system := &ActorSystem{
 75  		receptionist: newReceptionist(),
 76  		config:       config,
 77  		actors:       make(map[string]stoppable),
 78  		ctx:          ctx,
 79  		cancel:       cancel,
 80  	}
 81  
 82  	// Define the behavior for the dead letter actor. It logs undeliverable
 83  	// messages and returns an error.
 84  	deadLetterBehavior := NewFunctionBehavior(
 85  		func(ctx context.Context, msg Message) fn.Result[any] {
 86  			log.Warnf("Dead letter received: message type=%s",
 87  				msg.MessageType())
 88  
 89  			return fn.Err[any](errors.New(
 90  				"message undeliverable: " + msg.MessageType(),
 91  			))
 92  		},
 93  	)
 94  
 95  	// Create the raw dead letter actor (*Actor instance). The DLO's own DLO
 96  	// reference is nil to prevent loops if messages to the DLO itself fail.
 97  	deadLetterActorCfg := ActorConfig[Message, any]{
 98  		ID:          "dead-letters",
 99  		Behavior:    deadLetterBehavior,
100  		DLO:         nil,
101  		MailboxSize: config.MailboxCapacity,
102  	}
103  	deadLetterRawActor, err := NewActor[Message, any](deadLetterActorCfg)
104  	if err != nil {
105  		// This should never happen since we control the DLO config.
106  		panic("failed to create dead letter actor: " + err.Error())
107  	}
108  	deadLetterRawActor.Start()
109  	system.deadLetterActor = deadLetterRawActor.Ref()
110  
111  	// Add the raw actor to the map of stoppable actors. No lock needed here
112  	// as 'system' is not yet accessible concurrently.
113  	system.actors[deadLetterRawActor.id] = deadLetterRawActor
114  
115  	// The system is now fully initialized and ready.
116  	return system
117  }
118  
119  // RegisterWithSystem creates an actor with the given ID, service key, and
120  // behavior within the specified ActorSystem. It starts the actor, adds it to
121  // the system's management, registers it with the receptionist using the
122  // provided key, and returns its ActorRef.
123  func RegisterWithSystem[M Message, R any](as *ActorSystem, id string,
124  	key ServiceKey[M, R],
125  	behavior ActorBehavior[M, R]) (ActorRef[M, R], error) {
126  
127  	actorCfg := ActorConfig[M, R]{
128  		ID:          id,
129  		Behavior:    behavior,
130  		DLO:         as.deadLetterActor,
131  		MailboxSize: as.config.MailboxCapacity,
132  	}
133  	// Check for duplicate actor ID before creating the actor.
134  	as.mu.Lock()
135  	if _, exists := as.actors[id]; exists {
136  		as.mu.Unlock()
137  
138  		return nil, fmt.Errorf("%w: %s", ErrDuplicateActorID, id)
139  	}
140  
141  	actorInstance, err := NewActor(actorCfg)
142  	if err != nil {
143  		as.mu.Unlock()
144  
145  		return nil, err
146  	}
147  	actorInstance.Start()
148  
149  	// Add the actor instance to the system's list of stoppable actors.
150  	as.actors[actorInstance.id] = actorInstance
151  	as.mu.Unlock()
152  
153  	log.Infof("ActorSystem: registered actor %s with service key %s",
154  		id, key.name)
155  
156  	// Register the actor's reference with the receptionist under the given
157  	// service key, making it discoverable by other parts of the system.
158  	RegisterWithReceptionist(as.receptionist, key, actorInstance.Ref())
159  
160  	return actorInstance.Ref(), nil
161  }
162  
163  // Receptionist returns the system's receptionist, which can be used for
164  // actor service discovery (finding actors by ServiceKey).
165  func (as *ActorSystem) Receptionist() *Receptionist {
166  	return as.receptionist
167  }
168  
169  // DeadLetters returns a reference to the system's dead letter actor. Messages
170  // that cannot be delivered to their intended recipient (e.g., if an Ask
171  // context is cancelled before enqueuing) may be routed here if not otherwise
172  // handled.
173  func (as *ActorSystem) DeadLetters() ActorRef[Message, any] {
174  	return as.deadLetterActor
175  }
176  
177  // Shutdown gracefully stops the actor system. It iterates through all managed
178  // actors, including the dead letter actor, and calls their Stop method.
179  // After initiating the stop for all actors, it cancels the main system context.
180  // This method is safe for concurrent use.
181  func (as *ActorSystem) Shutdown() error {
182  	log.Infof("ActorSystem: initiating shutdown")
183  
184  	// Create a slice of actors to stop. This avoids holding the lock while
185  	// calling Stop() on each actor, and includes the dead letter actor.
186  	var actorsToStop []stoppable
187  	as.mu.RLock()
188  	for _, actor := range as.actors {
189  		actorsToStop = append(actorsToStop, actor)
190  	}
191  	as.mu.RUnlock()
192  
193  	// Notify all managed actors to stop. Actor.Stop() is non-blocking.
194  	// Each actor's Stop method will cancel its internal context, leading
195  	// to the termination of its processing goroutine.
196  	for _, actor := range actorsToStop {
197  		actor.Stop()
198  	}
199  
200  	// Clear the actors map after initiating their shutdown.
201  	as.mu.Lock()
202  	as.actors = nil
203  	as.mu.Unlock()
204  
205  	// Finally cancel the main context
206  	// This signals to any other components observing the system's context
207  	// that shutdown has been initiated.
208  	as.cancel()
209  
210  	return nil
211  }
212  
213  // StopAndRemoveActor stops a specific actor by its ID and removes it from the
214  // ActorSystem's management. It returns true if the actor was found and stopped,
215  // false otherwise.
216  func (as *ActorSystem) StopAndRemoveActor(id string) bool {
217  	as.mu.Lock()
218  	defer as.mu.Unlock()
219  
220  	actorToStop, exists := as.actors[id]
221  	if !exists {
222  		return false
223  	}
224  
225  	// Stop the actor. This is non-blocking.
226  	actorToStop.Stop()
227  
228  	// Remove from the system's management.
229  	delete(as.actors, id)
230  
231  	return true
232  }
233  
234  // UnregisterFromReceptionist removes an actor reference from a service key in
235  // the given receptionist. It returns true if the reference was found and
236  // removed, and false otherwise. This is a package-level generic function
237  // because methods cannot have their own type parameters in Go.
238  func UnregisterFromReceptionist[M Message, R any](r *Receptionist,
239  	key ServiceKey[M, R], refToRemove ActorRef[M, R]) bool {
240  
241  	r.mu.Lock()
242  	defer r.mu.Unlock()
243  
244  	refs, exists := r.registrations[key.name]
245  	if !exists {
246  		return false
247  	}
248  
249  	found := false
250  
251  	// Build a new slice containing only the references that are not the one
252  	// to be removed.
253  	newRefs := make([]any, 0, max(0, len(refs)-1))
254  	for _, itemInSlice := range refs {
255  		// Try to assert the item from the slice to the specific
256  		// ActorRef[M,R] type we are trying to remove.
257  		if specificActorRef, ok := itemInSlice.(ActorRef[M, R]); ok {
258  			// If the type assertion is successful and it's the one
259  			// we want to remove, mark as found and skip adding it
260  			// to newRefs.
261  			if specificActorRef == refToRemove {
262  				found = true
263  				continue
264  			}
265  		}
266  		newRefs = append(newRefs, itemInSlice)
267  	}
268  
269  	if !found {
270  		return false
271  	}
272  
273  	// If the new list of references is empty, remove the key from the map.
274  	// Otherwise, update the map with the new slice.
275  	if len(newRefs) == 0 {
276  		delete(r.registrations, key.name)
277  	} else {
278  		r.registrations[key.name] = newRefs
279  	}
280  
281  	return true
282  }
283  
284  // ServiceKey is a type-safe identifier used for registering and discovering
285  // actors via the Receptionist. The generic type parameters M (Message) and R
286  // (Response) ensure that only actors handling compatible message/response types
287  // are associated with and retrieved for this key.
288  type ServiceKey[M Message, R any] struct {
289  	name string
290  }
291  
292  // NewServiceKey creates a new service key with the given name. The name is used
293  // as the lookup key within the Receptionist.
294  func NewServiceKey[M Message, R any](name string) ServiceKey[M, R] {
295  	return ServiceKey[M, R]{name: name}
296  }
297  
298  // Spawn registers an actor for this service key within the given ActorSystem.
299  // It's a convenience method that calls RegisterWithSystem, starting the actor
300  // and registering it with the receptionist.
301  func (sk ServiceKey[M, R]) Spawn(as *ActorSystem, id string,
302  	behavior ActorBehavior[M, R]) (ActorRef[M, R], error) {
303  
304  	return RegisterWithSystem(as, id, sk, behavior)
305  }
306  
307  // Unregister removes an actor reference associated with this service key from
308  // the ActorSystem's receptionist and also stops the actor.
309  // It returns true if the actor was successfully unregistered from the
310  // receptionist AND successfully stopped and removed from the system's
311  // management. Otherwise, it returns false.
312  func (sk ServiceKey[M, R]) Unregister(as *ActorSystem,
313  	refToRemove ActorRef[M, R]) bool {
314  
315  	unregisteredFromReceptionist := UnregisterFromReceptionist(
316  		as.Receptionist(), sk, refToRemove,
317  	)
318  
319  	// If not found in receptionist, no need to try stopping.
320  	if !unregisteredFromReceptionist {
321  		return false
322  	}
323  
324  	// Attempt to stop and remove the actor from the system.
325  	stoppedAndRemoved := as.StopAndRemoveActor(refToRemove.ID())
326  
327  	return unregisteredFromReceptionist && stoppedAndRemoved
328  }
329  
330  // UnregisterAll finds all actor references associated with this service key in
331  // the ActorSystem's receptionist. For each found actor, it attempts to stop it
332  // and remove it from system management, and also unregisters it from the
333  // receptionist.
334  func (sk ServiceKey[M, R]) UnregisterAll(as *ActorSystem) int {
335  	// First find all the refs that match this service key.
336  	refsFound := FindInReceptionist(as.Receptionist(), sk)
337  
338  	actorsStoppedCount := 0
339  	for _, ref := range refsFound {
340  		// Attempt to stop and remove the actor from the system's active
341  		// management. This is the primary action to deactivate the
342  		// actor. If StopAndRemoveActor returns true, it means an active
343  		// actor was found in the system's `actors` map and was stopped.
344  		if as.StopAndRemoveActor(ref.ID()) {
345  			actorsStoppedCount++
346  		}
347  
348  		// Regardless of whether the actor was actively managed by the
349  		// system (i.e., found in as.actors), attempt to unregister its
350  		// reference from the receptionist. This helps clean up any
351  		// potentially stale entries in the receptionist if an actor was
352  		// removed from the system's management without also being
353  		// unregistered from the receptionist.
354  		UnregisterFromReceptionist(as.Receptionist(), sk, ref)
355  	}
356  
357  	return actorsStoppedCount
358  }
359  
360  // Receptionist provides service discovery for actors. Actors can be registered
361  // under a ServiceKey and later discovered by other actors or system components.
362  type Receptionist struct {
363  	// registrations stores ActorRef instances, keyed by ServiceKey.name.
364  	registrations map[string][]any
365  
366  	// mu protects access to registrations.
367  	mu sync.RWMutex
368  }
369  
370  // newReceptionist creates a new Receptionist instance.
371  func newReceptionist() *Receptionist {
372  	return &Receptionist{
373  		registrations: make(map[string][]any),
374  	}
375  }
376  
377  // RegisterWithReceptionist registers an actor with a service key in the given
378  // receptionist. This is a package-level generic function because methods
379  // cannot have their own type parameters in Go (as of the current version).
380  // It appends the actor reference to the list associated with the key's name.
381  func RegisterWithReceptionist[M Message, R any](r *Receptionist,
382  	key ServiceKey[M, R], ref ActorRef[M, R]) {
383  
384  	r.mu.Lock()
385  	defer r.mu.Unlock()
386  
387  	// Initialize the slice for this key if it's the first registration.
388  	if _, exists := r.registrations[key.name]; !exists {
389  		r.registrations[key.name] = make([]any, 0)
390  	}
391  
392  	r.registrations[key.name] = append(r.registrations[key.name], ref)
393  }
394  
395  // FindInReceptionist returns all actors registered with a service key in the
396  // given receptionist. This is a package-level generic function because methods
397  // cannot have their own type parameters. It performs a type assertion to ensure
398  // that only ActorRefs matching the ServiceKey's generic types (M, R) are
399  // returned, providing type safety.
400  func FindInReceptionist[M Message, R any](r *Receptionist,
401  	key ServiceKey[M, R]) []ActorRef[M, R] {
402  
403  	r.mu.RLock()
404  	defer r.mu.RUnlock()
405  
406  	if refs, exists := r.registrations[key.name]; exists {
407  		typedRefs := make([]ActorRef[M, R], 0, len(refs))
408  		for _, ref := range refs {
409  			// Make sure that the reference is of the correct type.
410  			// This type assertion is crucial for type safety, ensuring
411  			// that the returned ActorRefs match the expected M and R.
412  			if typedRef, ok := ref.(ActorRef[M, R]); ok {
413  				typedRefs = append(typedRefs, typedRef)
414  			}
415  		}
416  
417  		return typedRefs
418  	}
419  
420  	return nil
421  }