/ actor / router.go
router.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"sync/atomic"
  7  
  8  	"github.com/lightningnetwork/lnd/fn/v2"
  9  )
 10  
 11  // ErrNoActorsAvailable is returned when a router cannot find any actors
 12  // registered for its service key to forward a message to.
 13  var ErrNoActorsAvailable = errors.New("no actors available for service key")
 14  
 15  // Compile-time assertion that Router satisfies the ActorRef interface.
 16  var _ ActorRef[Message, any] = (*Router[Message, any])(nil)
 17  
 18  // RoutingStrategy defines the interface for selecting an actor from a list of
 19  // available actors.
 20  // The M (Message) and R (Response) type parameters ensure that the strategy
 21  // is compatible with the types of actors it will be selecting.
 22  type RoutingStrategy[M Message, R any] interface {
 23  	// Select chooses an ActorRef from the provided slice. It returns the
 24  	// selected actor or an error if no actor can be selected (e.g., if the
 25  	// list is empty or another strategy-specific issue occurs).
 26  	Select(refs []ActorRef[M, R]) (ActorRef[M, R], error)
 27  }
 28  
 29  // RoundRobinStrategy implements a round-robin selection strategy. It is generic
 30  // over M and R to match the RoutingStrategy interface, though its logic doesn't
 31  // depend on these types directly for the selection mechanism itself.
 32  type RoundRobinStrategy[M Message, R any] struct {
 33  	// index is used to pick the next actor in a round-robin fashion. It
 34  	// must be accessed atomically to ensure thread-safety if multiple
 35  	// goroutines use the same strategy instance (which they will via the
 36  	// router).
 37  	index uint64
 38  }
 39  
 40  // NewRoundRobinStrategy creates a new RoundRobinStrategy, initialized for
 41  // round-robin selection.
 42  func NewRoundRobinStrategy[M Message, R any]() *RoundRobinStrategy[M, R] {
 43  	return &RoundRobinStrategy[M, R]{}
 44  }
 45  
 46  // Select picks an actor from the list using a round-robin algorithm.
 47  func (s *RoundRobinStrategy[M, R]) Select(
 48  	refs []ActorRef[M, R],
 49  ) (ActorRef[M, R], error) {
 50  	if len(refs) == 0 {
 51  		return nil, ErrNoActorsAvailable
 52  	}
 53  
 54  	// Atomically increment and get the current index for selection.
 55  	// We subtract 1 because AddUint64 returns the new value (which is
 56  	// 1-based for the first call after initialization to 0), and slice
 57  	// indexing is 0-based.
 58  	idx := atomic.AddUint64(&s.index, 1) - 1
 59  	selectedRef := refs[idx%uint64(len(refs))]
 60  
 61  	return selectedRef, nil
 62  }
 63  
 64  // Router is a message-dispatching component that fronts multiple actors
 65  // registered under a specific ServiceKey. It uses a RoutingStrategy to
 66  // distribute messages to one of the available actors. It is generic over M
 67  // (Message type) and R (Response type) to match the actors it routes to.
 68  type Router[M Message, R any] struct {
 69  	receptionist *Receptionist
 70  	serviceKey   ServiceKey[M, R]
 71  	strategy     RoutingStrategy[M, R]
 72  	dlo          ActorRef[Message, any] // Dead Letter Office reference.
 73  }
 74  
 75  // NewRouter creates a new Router for a given service key and strategy. The
 76  // receptionist is used to discover actors registered with the service key.
 77  // The router itself is not an actor but a message dispatcher that behaves like
 78  // an ActorRef from the sender's perspective.
 79  func NewRouter[M Message, R any](receptionist *Receptionist,
 80  	key ServiceKey[M, R], strategy RoutingStrategy[M, R],
 81  	dlo ActorRef[Message, any]) *Router[M, R] {
 82  
 83  	return &Router[M, R]{
 84  		receptionist: receptionist,
 85  		serviceKey:   key,
 86  		strategy:     strategy,
 87  		dlo:          dlo,
 88  	}
 89  }
 90  
 91  // getActor dynamically finds available actors for the service key and selects
 92  // one using the configured strategy. This method is called internally by Tell
 93  // and Ask on each invocation to ensure up-to-date actor discovery.
 94  func (r *Router[M, R]) getActor() (ActorRef[M, R], error) {
 95  	// Discover available actors from the receptionist.
 96  	availableActors := FindInReceptionist(r.receptionist, r.serviceKey)
 97  	if len(availableActors) == 0 {
 98  		return nil, ErrNoActorsAvailable
 99  	}
100  
101  	// Select one actor using the strategy.
102  	return r.strategy.Select(availableActors)
103  }
104  
105  // Tell sends a message to one of the actors managed by the router, selected by
106  // the routing strategy. If no actors are available or the send context is
107  // cancelled before the message can be enqueued in the target actor's mailbox,
108  // the message may be dropped. Errors during actor selection (e.g.,
109  // ErrNoActorsAvailable) are currently not propagated from Tell, aligning with
110  // its fire-and-forget nature. Such errors could be logged internally if needed.
111  func (r *Router[M, R]) Tell(ctx context.Context, msg M) {
112  	selectedActor, err := r.getActor()
113  	if err != nil {
114  		// If no actors are available for the service, and a DLO is
115  		// configured, forward the message there.
116  		if errors.Is(err, ErrNoActorsAvailable) && r.dlo != nil {
117  			r.dlo.Tell(context.Background(), msg)
118  		} else {
119  			log.Warnf("Router(%s): message %s dropped "+
120  				"(no actors available, no DLO configured)",
121  				r.serviceKey.name, msg.MessageType())
122  		}
123  
124  		return
125  	}
126  
127  	selectedActor.Tell(ctx, msg)
128  }
129  
130  // Ask sends a message to one of the actors managed by the router, selected by
131  // the routing strategy, and returns a Future for the response. If no actors are
132  // available (ErrNoActorsAvailable), the Future will be completed with this
133  // error. If the send context is cancelled before the message can be enqueued in
134  // the chosen actor's mailbox, the Future will be completed with the context's
135  // error.
136  func (r *Router[M, R]) Ask(ctx context.Context, msg M) Future[R] {
137  	selectedActor, err := r.getActor()
138  	if err != nil {
139  		// If no actor could be selected (e.g., none available),
140  		// complete the promise immediately with the selection error.
141  		promise := NewPromise[R]()
142  		promise.Complete(fn.Err[R](err))
143  		return promise.Future()
144  	}
145  
146  	return selectedActor.Ask(ctx, msg)
147  }
148  
149  // ID provides an identifier for the router. Since a router isn't an actor
150  // itself but a dispatcher for a service, its ID can be based on the service
151  // key.
152  func (r *Router[M, R]) ID() string {
153  	return "router(" + r.serviceKey.name + ")"
154  }