router.go
1 package actor 2 3 import ( 4 "context" 5 "errors" 6 "sync/atomic" 7 8 "github.com/lightningnetwork/lnd/fn/v2" 9 ) 10 11 // ErrNoActorsAvailable is returned when a router cannot find any actors 12 // registered for its service key to forward a message to. 13 var ErrNoActorsAvailable = errors.New("no actors available for service key") 14 15 // Compile-time assertion that Router satisfies the ActorRef interface. 16 var _ ActorRef[Message, any] = (*Router[Message, any])(nil) 17 18 // RoutingStrategy defines the interface for selecting an actor from a list of 19 // available actors. 20 // The M (Message) and R (Response) type parameters ensure that the strategy 21 // is compatible with the types of actors it will be selecting. 22 type RoutingStrategy[M Message, R any] interface { 23 // Select chooses an ActorRef from the provided slice. It returns the 24 // selected actor or an error if no actor can be selected (e.g., if the 25 // list is empty or another strategy-specific issue occurs). 26 Select(refs []ActorRef[M, R]) (ActorRef[M, R], error) 27 } 28 29 // RoundRobinStrategy implements a round-robin selection strategy. It is generic 30 // over M and R to match the RoutingStrategy interface, though its logic doesn't 31 // depend on these types directly for the selection mechanism itself. 32 type RoundRobinStrategy[M Message, R any] struct { 33 // index is used to pick the next actor in a round-robin fashion. It 34 // must be accessed atomically to ensure thread-safety if multiple 35 // goroutines use the same strategy instance (which they will via the 36 // router). 37 index uint64 38 } 39 40 // NewRoundRobinStrategy creates a new RoundRobinStrategy, initialized for 41 // round-robin selection. 42 func NewRoundRobinStrategy[M Message, R any]() *RoundRobinStrategy[M, R] { 43 return &RoundRobinStrategy[M, R]{} 44 } 45 46 // Select picks an actor from the list using a round-robin algorithm. 47 func (s *RoundRobinStrategy[M, R]) Select( 48 refs []ActorRef[M, R], 49 ) (ActorRef[M, R], error) { 50 if len(refs) == 0 { 51 return nil, ErrNoActorsAvailable 52 } 53 54 // Atomically increment and get the current index for selection. 55 // We subtract 1 because AddUint64 returns the new value (which is 56 // 1-based for the first call after initialization to 0), and slice 57 // indexing is 0-based. 58 idx := atomic.AddUint64(&s.index, 1) - 1 59 selectedRef := refs[idx%uint64(len(refs))] 60 61 return selectedRef, nil 62 } 63 64 // Router is a message-dispatching component that fronts multiple actors 65 // registered under a specific ServiceKey. It uses a RoutingStrategy to 66 // distribute messages to one of the available actors. It is generic over M 67 // (Message type) and R (Response type) to match the actors it routes to. 68 type Router[M Message, R any] struct { 69 receptionist *Receptionist 70 serviceKey ServiceKey[M, R] 71 strategy RoutingStrategy[M, R] 72 dlo ActorRef[Message, any] // Dead Letter Office reference. 73 } 74 75 // NewRouter creates a new Router for a given service key and strategy. The 76 // receptionist is used to discover actors registered with the service key. 77 // The router itself is not an actor but a message dispatcher that behaves like 78 // an ActorRef from the sender's perspective. 79 func NewRouter[M Message, R any](receptionist *Receptionist, 80 key ServiceKey[M, R], strategy RoutingStrategy[M, R], 81 dlo ActorRef[Message, any]) *Router[M, R] { 82 83 return &Router[M, R]{ 84 receptionist: receptionist, 85 serviceKey: key, 86 strategy: strategy, 87 dlo: dlo, 88 } 89 } 90 91 // getActor dynamically finds available actors for the service key and selects 92 // one using the configured strategy. This method is called internally by Tell 93 // and Ask on each invocation to ensure up-to-date actor discovery. 94 func (r *Router[M, R]) getActor() (ActorRef[M, R], error) { 95 // Discover available actors from the receptionist. 96 availableActors := FindInReceptionist(r.receptionist, r.serviceKey) 97 if len(availableActors) == 0 { 98 return nil, ErrNoActorsAvailable 99 } 100 101 // Select one actor using the strategy. 102 return r.strategy.Select(availableActors) 103 } 104 105 // Tell sends a message to one of the actors managed by the router, selected by 106 // the routing strategy. If no actors are available or the send context is 107 // cancelled before the message can be enqueued in the target actor's mailbox, 108 // the message may be dropped. Errors during actor selection (e.g., 109 // ErrNoActorsAvailable) are currently not propagated from Tell, aligning with 110 // its fire-and-forget nature. Such errors could be logged internally if needed. 111 func (r *Router[M, R]) Tell(ctx context.Context, msg M) { 112 selectedActor, err := r.getActor() 113 if err != nil { 114 // If no actors are available for the service, and a DLO is 115 // configured, forward the message there. 116 if errors.Is(err, ErrNoActorsAvailable) && r.dlo != nil { 117 r.dlo.Tell(context.Background(), msg) 118 } else { 119 log.Warnf("Router(%s): message %s dropped "+ 120 "(no actors available, no DLO configured)", 121 r.serviceKey.name, msg.MessageType()) 122 } 123 124 return 125 } 126 127 selectedActor.Tell(ctx, msg) 128 } 129 130 // Ask sends a message to one of the actors managed by the router, selected by 131 // the routing strategy, and returns a Future for the response. If no actors are 132 // available (ErrNoActorsAvailable), the Future will be completed with this 133 // error. If the send context is cancelled before the message can be enqueued in 134 // the chosen actor's mailbox, the Future will be completed with the context's 135 // error. 136 func (r *Router[M, R]) Ask(ctx context.Context, msg M) Future[R] { 137 selectedActor, err := r.getActor() 138 if err != nil { 139 // If no actor could be selected (e.g., none available), 140 // complete the promise immediately with the selection error. 141 promise := NewPromise[R]() 142 promise.Complete(fn.Err[R](err)) 143 return promise.Future() 144 } 145 146 return selectedActor.Ask(ctx, msg) 147 } 148 149 // ID provides an identifier for the router. Since a router isn't an actor 150 // itself but a dispatcher for a service, its ID can be based on the service 151 // key. 152 func (r *Router[M, R]) ID() string { 153 return "router(" + r.serviceKey.name + ")" 154 }