/ actor / system_test.go
system_test.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"fmt"
  7  	"sync/atomic"
  8  	"testing"
  9  	"time"
 10  
 11  	"github.com/lightningnetwork/lnd/fn/v2"
 12  	"github.com/stretchr/testify/require"
 13  )
 14  
 15  // TestActorSystemNewActorSystem verifies the basic initialization of an
 16  // ActorSystem, including its default DLO.
 17  func TestActorSystemNewActorSystem(t *testing.T) {
 18  	t.Parallel()
 19  
 20  	as := NewActorSystem()
 21  	require.NotNil(t, as, "newActorSystem should not return nil")
 22  	require.NotNil(t, as.Receptionist(), "receptionist should not be nil")
 23  	require.NotNil(t, as.DeadLetters(), "deadLetters should not be nil")
 24  	require.Equal(t, "dead-letters", as.DeadLetters().ID(), "dLO ID mismatch")
 25  
 26  	// Test the DLO's behavior (it should return an error for Ask).
 27  	testDLOMsg := newTestMsg("to-dlo")
 28  	future := as.DeadLetters().Ask(context.Background(), testDLOMsg)
 29  	result := future.Await(context.Background())
 30  
 31  	// We should get back an error for asks.
 32  	require.True(
 33  		t, result.IsErr(), "system DLO should return an error on Ask",
 34  	)
 35  	expectedErrStr := "message undeliverable: " + testDLOMsg.MessageType()
 36  	require.EqualError(
 37  		t, result.Err(), expectedErrStr, "dLO error message mismatch",
 38  	)
 39  
 40  	// Shutdown the system to clean up resources.
 41  	err := as.Shutdown()
 42  	require.NoError(t, err, "actorSystem shutdown failed")
 43  }
 44  
 45  // TestActorSystemRegisterWithSystem verifies actor registration, lifecycle
 46  // management within the system.
 47  func TestActorSystemRegisterWithSystem(t *testing.T) {
 48  	t.Parallel()
 49  
 50  	as := NewActorSystem()
 51  	defer func() {
 52  		err := as.Shutdown()
 53  		require.NoError(t, err)
 54  	}()
 55  
 56  	actorID := "test-actor-sys-reg"
 57  	serviceKey := NewServiceKey[*testMsg, string]("test-service")
 58  
 59  	// Using echoBehavior from actor_test.go (implicitly available)
 60  	beh := newEchoBehavior(t, 0)
 61  
 62  	// We'll start off by registering the actor.
 63  	actorRef, err := RegisterWithSystem(as, actorID, serviceKey, beh)
 64  	require.NoError(t, err)
 65  	require.NotNil(t, actorRef, "registerWithSystem should return a valid ActorRef")
 66  	require.Equal(t, actorID, actorRef.ID(), "registered actor ID mismatch")
 67  
 68  	// The actor should be found in the receptionist.
 69  	foundActors := FindInReceptionist(as.Receptionist(), serviceKey)
 70  	require.Len(t, foundActors, 1, "actor not found in receptionist")
 71  	require.Equal(t, actorRef, foundActors[0], "incorrect actor in receptionist")
 72  
 73  	// Next, we'll send out a simple tell, using our reply channel to make
 74  	// sure it's actually processed.
 75  	msgData := "hello-system-actor"
 76  	replyChan := make(chan string, 1)
 77  	actorRef.Tell(context.Background(), newTestMsgWithReply(msgData, replyChan))
 78  
 79  	received, err := fn.RecvOrTimeout(replyChan, 100*time.Millisecond)
 80  	require.NoError(t, err, "timed out waiting for actor to process message")
 81  	require.Equal(t, msgData, received, "actor did not process message")
 82  
 83  	// Stop the actor through the system.
 84  	stopped := as.StopAndRemoveActor(actorID)
 85  	require.True(t, stopped, "StopAndRemoveActor failed")
 86  
 87  	// Wait for actor to fully stop.
 88  	time.Sleep(50 * time.Millisecond)
 89  
 90  	// Send a message to the now-stopped actor's ref. This should go to the
 91  	// system's DLO.
 92  	afterStopMsg := newTestMsg("after-stop-to-dlo")
 93  	require.NotPanics(t, func() {
 94  		actorRef.Tell(context.Background(), afterStopMsg)
 95  	}, "tell to stopped actor should not panic")
 96  }
 97  
 98  // TestActorSystemShutdown verifies that all actors are stopped and the system
 99  // context is cancelled upon shutdown.
100  func TestActorSystemShutdown(t *testing.T) {
101  	t.Parallel()
102  
103  	as := NewActorSystem()
104  
105  	// We'll start by making 3 new actors, each with a unique ID.
106  	numActors := 3
107  	actorRefs := make([]ActorRef[*testMsg, string], numActors)
108  	for i := 0; i < numActors; i++ {
109  		actorID := fmt.Sprintf("shutdown-test-actor-%d", i)
110  		key := NewServiceKey[*testMsg, string](
111  			fmt.Sprintf("service-%d", i),
112  		)
113  		beh := newEchoBehavior(t, 0)
114  		ref, regErr := RegisterWithSystem(as, actorID, key, beh)
115  		require.NoError(t, regErr)
116  		actorRefs[i] = ref
117  	}
118  
119  	// We'll now send a message to each actor to ensure that they're
120  	// running.
121  	for i, ref := range actorRefs {
122  		future := ref.Ask(
123  			context.Background(),
124  			newTestMsg(fmt.Sprintf("ping-%d", i)),
125  		)
126  		ctxAwait, cancelAwait := context.WithTimeout(
127  			context.Background(), time.Second,
128  		)
129  		res := future.Await(ctxAwait)
130  		cancelAwait()
131  		require.False(
132  			t, res.IsErr(),
133  			"actor %d failed to respond before shutdown: %v",
134  			i, res.Err(),
135  		)
136  	}
137  
138  	// Next, trigger a shutdown, and assert that the done channel gets
139  	// closed.
140  	err := as.Shutdown()
141  	require.NoError(t, err, "actorSystem shutdown failed")
142  
143  	// Check if the system context is done using RecvOrTimeout with a zero
144  	// timeout for a non-blocking check.
145  	_, err = fn.RecvOrTimeout(as.ctx.Done(), time.Millisecond*100)
146  	require.NoError(t, err, "actorSystem context not cancelled after shutdown")
147  
148  	// We'll now try to send a message to each of the actors, this should
149  	// result in an error.
150  	for i, ref := range actorRefs {
151  		future := ref.Ask(
152  			context.Background(),
153  			newTestMsg(fmt.Sprintf("ping-after-shutdown-%d", i)),
154  		)
155  		res := future.Await(context.Background())
156  		require.True(
157  			t, res.IsErr(),
158  			"actor %d Ask should fail after shutdown", i,
159  		)
160  		require.ErrorIs(t, res.Err(), ErrActorTerminated)
161  	}
162  
163  	as.mu.RLock()
164  	require.Nil(t, as.actors, "actors map should be nil after shutdown")
165  	as.mu.RUnlock()
166  
167  	// Once shutdown, we shouldn't be able to send to the DLO either.
168  	dloRef := as.DeadLetters()
169  	futureDLO := dloRef.Ask(
170  		context.Background(), newTestMsg("ping-dlo-after-shutdown"),
171  	)
172  	resDLO := futureDLO.Await(context.Background())
173  	require.True(
174  		t, resDLO.IsErr(), "DLO Ask should fail after system shutdown",
175  	)
176  	require.ErrorIs(
177  		t, resDLO.Err(), ErrActorTerminated,
178  	)
179  }
180  
181  // TestActorSystemStopAndRemoveActor verifies specific actor stopping and
182  // removal.
183  func TestActorSystemStopAndRemoveActor(t *testing.T) {
184  	t.Parallel()
185  
186  	as := NewActorSystem()
187  	defer func() {
188  		err := as.Shutdown()
189  		require.NoError(t, err)
190  	}()
191  
192  	// Make some actor IDs, then unique service keys, then use that to
193  	// register two actors.
194  	actor1ID := "actor-to-stop"
195  	actor2ID := "actor-to-keep"
196  	key1 := NewServiceKey[*testMsg, string]("service1")
197  	key2 := NewServiceKey[*testMsg, string]("service2")
198  	beh := newEchoBehavior(t, 0)
199  
200  	ref1, err := RegisterWithSystem(as, actor1ID, key1, beh)
201  	require.NoError(t, err)
202  	ref2, err := RegisterWithSystem(as, actor2ID, key2, beh)
203  	require.NoError(t, err)
204  
205  	// If we remove one actor, then try to send to it, we should get an
206  	// error.
207  	stopped := as.StopAndRemoveActor(actor1ID)
208  	require.True(t, stopped, "failed to stop and remove actor1")
209  
210  	future1 := ref1.Ask(context.Background(), newTestMsg("ping-actor1"))
211  	res1 := future1.Await(context.Background())
212  	require.True(t, res1.IsErr(), "actor1 should be stopped")
213  	require.ErrorIs(t, res1.Err(), ErrActorTerminated)
214  
215  	as.mu.RLock()
216  	_, exists := as.actors[actor1ID]
217  	as.mu.RUnlock()
218  
219  	// The actor should no longer be found.
220  	require.False(t, exists, "actor1 still in system's actor map")
221  
222  	// Make sure that we can still send messages to the existing actor.
223  	future2 := ref2.Ask(
224  		context.Background(), newTestMsg("ping-actor2"),
225  	)
226  
227  	ctxAwait2, cancelAwait2 := context.WithTimeout(
228  		context.Background(), time.Second,
229  	)
230  	res2 := future2.Await(ctxAwait2)
231  	cancelAwait2()
232  
233  	require.False(
234  		t, res2.IsErr(), "actor2 should still be running: %v",
235  		res2.Err(),
236  	)
237  	res2.WhenOk(func(s string) {
238  		require.Equal(t, "echo: ping-actor2", s)
239  	})
240  
241  	stoppedNonExistent := as.StopAndRemoveActor("non-existent-actor")
242  	require.False(
243  		t, stoppedNonExistent, "stopping non-existent actor should "+
244  			"return false",
245  	)
246  }
247  
248  // TestReceptionist covers basic registration, finding, and unregistration.
249  func TestReceptionist(t *testing.T) {
250  	t.Parallel()
251  
252  	as := NewActorSystem()
253  	defer func() {
254  		err := as.Shutdown()
255  		require.NoError(t, err)
256  	}()
257  	receptionist := as.Receptionist()
258  
259  	key1 := NewServiceKey[*testMsg, string]("key1")
260  	key2 := NewServiceKey[*testMsg, string]("key2")
261  	key1Again := NewServiceKey[*testMsg, string]("key1")
262  
263  	// Register 3 actor instance using the service keys we created above.
264  	beh := newEchoBehavior(t, 0)
265  	actor1Ref, err := RegisterWithSystem(as, "actor1-rec", key1, beh)
266  	require.NoError(t, err)
267  	actor2Ref, err := RegisterWithSystem(as, "actor2-rec", key1, beh)
268  	require.NoError(t, err)
269  	actor3Ref, err := RegisterWithSystem(as, "actor3-rec", key2, beh)
270  	require.NoError(t, err)
271  
272  	// We should be able to find the actors we registered.
273  	foundForKey1 := FindInReceptionist(receptionist, key1)
274  	require.Len(t, foundForKey1, 2, "should find 2 actors for key1")
275  	require.Contains(t, foundForKey1, actor1Ref)
276  	require.Contains(t, foundForKey1, actor2Ref)
277  
278  	foundForKey1Again := FindInReceptionist(receptionist, key1Again)
279  	require.ElementsMatch(t, foundForKey1, foundForKey1Again)
280  
281  	// Same goes for the second key we added.
282  	foundForKey2 := FindInReceptionist(receptionist, key2)
283  	require.Len(t, foundForKey2, 1, "should find 1 actor for key2")
284  	require.Equal(t, actor3Ref, foundForKey2[0])
285  
286  	// We shouldn't be able to find a key we didn't add.
287  	nonExistentKey := NewServiceKey[*testMsg, string]("non-existent")
288  	foundForNonExistent := FindInReceptionist(receptionist, nonExistentKey)
289  	require.Empty(t, foundForNonExistent)
290  
291  	// We should be able to unregister the actors we added.
292  	unregistered := UnregisterFromReceptionist(
293  		receptionist, key1, actor1Ref,
294  	)
295  	require.True(t, unregistered, "failed to unregister actor1Ref")
296  
297  	foundForKey1AfterUnreg := FindInReceptionist(receptionist, key1)
298  	require.Len(t, foundForKey1AfterUnreg, 1)
299  	require.Equal(t, actor2Ref, foundForKey1AfterUnreg[0])
300  
301  	// If we try to unregister the same actor again, it should fail.
302  	unregisteredAgain := UnregisterFromReceptionist(receptionist, key1, actor1Ref)
303  	require.False(t, unregisteredAgain)
304  
305  	unregisteredLast := UnregisterFromReceptionist(receptionist, key1, actor2Ref)
306  	require.True(t, unregisteredLast)
307  	foundForKey1AfterAllUnreg := FindInReceptionist(receptionist, key1)
308  	require.Empty(t, foundForKey1AfterAllUnreg)
309  
310  	receptionist.mu.RLock()
311  	_, exists := receptionist.registrations[key1.name]
312  	receptionist.mu.RUnlock()
313  	require.False(t, exists, "key1 should be removed from registrations map")
314  
315  	// Finally, if we use the wrong key, or one that doesn't exist, that
316  	// should also fail.
317  	unregisteredWrongKey := UnregisterFromReceptionist(receptionist, key1, actor3Ref)
318  	require.False(t, unregisteredWrongKey)
319  	unregisteredNonExistentKey := UnregisterFromReceptionist(receptionist, nonExistentKey, actor1Ref)
320  	require.False(t, unregisteredNonExistentKey)
321  }
322  
323  // TestServiceKeyMethods tests Spawn and Unregister methods on ServiceKey.
324  func TestServiceKeyMethods(t *testing.T) {
325  	t.Parallel()
326  
327  	as := NewActorSystem()
328  	defer func() {
329  		err := as.Shutdown()
330  		require.NoError(t, err)
331  	}()
332  
333  	key := NewServiceKey[*testMsg, string]("sk-service")
334  	beh := newEchoBehavior(t, 0)
335  
336  	// Attempt to spawn a new actor using the service key and desired
337  	// behavior.
338  	actorRef, err := key.Spawn(as, "actor-sk-spawn", beh)
339  	require.NoError(t, err)
340  	require.NotNil(t, actorRef)
341  	require.Equal(t, "actor-sk-spawn", actorRef.ID())
342  
343  	// We should be able to find the actor in the receptionist.
344  	found := FindInReceptionist(as.Receptionist(), key)
345  	require.Len(t, found, 1)
346  	require.Equal(t, actorRef, found[0])
347  
348  	as.mu.RLock()
349  	_, sysExists := as.actors[actorRef.ID()]
350  	as.mu.RUnlock()
351  	require.True(t, sysExists)
352  
353  	// Next, try to unregister the actor using the service key.
354  	success := key.Unregister(as, actorRef)
355  	require.True(t, success, "serviceKey.Unregister failed")
356  
357  	// The actor should no longer be found in the receptionist.
358  	foundAfter := FindInReceptionist(as.Receptionist(), key)
359  	require.Empty(t, foundAfter)
360  
361  	as.mu.RLock()
362  	_, sysExistsAfter := as.actors[actorRef.ID()]
363  	as.mu.RUnlock()
364  	require.False(t, sysExistsAfter)
365  
366  	// If we try to send a message to the actor after unregistering it, then
367  	// we should get an error.
368  	future := actorRef.Ask(context.Background(), newTestMsg("ping"))
369  	res := future.Await(context.Background())
370  	require.True(t, res.IsErr() && errors.Is(res.Err(), ErrActorTerminated))
371  
372  	successAgain := key.Unregister(as, actorRef)
373  	require.False(t, successAgain)
374  
375  	otherSys := NewActorSystem() // Create a different actor system
376  	defer func() {
377  		err := otherSys.Shutdown()
378  		require.NoError(t, err)
379  	}()
380  
381  	// Create a dummy actor in otherSys of the correct generic type for the
382  	// key. This actor won't be found in 'as', so Unregister should fail.
383  	dummyBehOther := newEchoBehavior(t, 0)
384  	dummyKeyOther := NewServiceKey[*testMsg, string]("dummy-other")
385  	dummyActorRefOtherSys, err := RegisterWithSystem(
386  		otherSys, "dummy-other-actor", dummyKeyOther, dummyBehOther,
387  	)
388  	require.NoError(t, err)
389  
390  	successNonMember := key.Unregister(as, dummyActorRefOtherSys)
391  	require.False(t, successNonMember)
392  }
393  
394  // TestServiceKeyUnregisterAll tests the UnregisterAll method on ServiceKey.
395  // It covers scenarios including basic unregistration of multiple actors,
396  // attempting to unregister with no actors present, unregistering actors for
397  // one key while leaving others intact, and the idempotency of the operation.
398  func TestServiceKeyUnregisterAll(t *testing.T) {
399  	t.Parallel()
400  
401  	// Common setup for all sub-tests.
402  	as := NewActorSystem()
403  	defer func() {
404  		err := as.Shutdown()
405  		require.NoError(t, err, "ActorSystem shutdown failed.")
406  	}()
407  
408  	// Common behavior for test actors used across sub-tests.
409  	beh := newEchoBehavior(t, 0)
410  
411  	t.Run("unregister all multiple actors", func(st *testing.T) {
412  		key1 := NewServiceKey[*testMsg, string]("sk-ua-key1")
413  		actor1Key1, err := key1.Spawn(as, "actor1-k1-ua", beh)
414  		require.NoError(st, err)
415  		actor2Key1, err := key1.Spawn(as, "actor2-k1-ua", beh)
416  		require.NoError(st, err)
417  
418  		// Verify they are registered in the receptionist.
419  		foundActorsForKey1 := FindInReceptionist(
420  			as.Receptionist(), key1,
421  		)
422  		require.Len(
423  			st, foundActorsForKey1, 2,
424  			"actors for key1 not in receptionist initially.",
425  		)
426  
427  		// Verify they are in the system's actor map.
428  		as.mu.RLock()
429  		_, actor1Key1Exists := as.actors[actor1Key1.ID()]
430  		_, actor2Key1Exists := as.actors[actor2Key1.ID()]
431  		as.mu.RUnlock()
432  		require.True(
433  			st, actor1Key1Exists,
434  			"actor1 for key1 not in system actors map initially.",
435  		)
436  		require.True(
437  			st, actor2Key1Exists,
438  			"actor2 for key1 not in system actors map initially.",
439  		)
440  
441  		// Unregister all for key1.
442  		stoppedCountKey1 := key1.UnregisterAll(as)
443  		require.Equal(
444  			st, 2, stoppedCountKey1,
445  			"UnregisterAll for key1 returned incorrect count.",
446  		)
447  
448  		// Verify they are unregistered from the receptionist.
449  		foundActorsForKey1After := FindInReceptionist(
450  			as.Receptionist(), key1,
451  		)
452  		require.Empty(
453  			st, foundActorsForKey1After,
454  			"actors for key1 still in receptionist after "+
455  				"UnregisterAll.",
456  		)
457  
458  		// Verify they are removed from system actors map.
459  		as.mu.RLock()
460  		_, actor1Key1ExistsAfter := as.actors[actor1Key1.ID()]
461  		_, actor2Key1ExistsAfter := as.actors[actor2Key1.ID()]
462  		as.mu.RUnlock()
463  		require.False(
464  			st, actor1Key1ExistsAfter,
465  			"Actor1 for key1 still in system actors "+
466  				"map after UnregisterAll.",
467  		)
468  		require.False(
469  			st, actor2Key1ExistsAfter,
470  			"Actor2 for key1 still in system actors "+
471  				"map after UnregisterAll.",
472  		)
473  
474  		// Verify actors are stopped.
475  		resultActor1Key1 := actor1Key1.Ask(
476  			context.Background(), newTestMsg("ping-k1-a1"),
477  		).Await(context.Background())
478  		require.True(
479  			st, resultActor1Key1.IsErr(),
480  			"Actor1 key1 Ask should fail after UnregisterAll.",
481  		)
482  		require.ErrorIs(
483  			st, resultActor1Key1.Err(), ErrActorTerminated,
484  			"Actor1 key1 not terminated with correct error.",
485  		)
486  
487  		resultActor2Key1 := actor2Key1.Ask(
488  			context.Background(), newTestMsg("ping-k1-a2"),
489  		).Await(context.Background())
490  		require.True(
491  			st, resultActor2Key1.IsErr(),
492  			"Actor2 key1 Ask should fail after UnregisterAll.",
493  		)
494  		require.ErrorIs(
495  			st, resultActor2Key1.Err(), ErrActorTerminated,
496  			"Actor2 key1 not terminated with correct error.",
497  		)
498  	})
499  
500  	t.Run("unregister all with no actors for the key", func(st *testing.T) {
501  		keyEmpty := NewServiceKey[*testMsg, string]("sk-ua-key-empty")
502  		stoppedCountEmptyKey := keyEmpty.UnregisterAll(as)
503  		require.Equal(
504  			st, 0, stoppedCountEmptyKey,
505  			"UnregisterAll for empty key returned non-zero count.",
506  		)
507  
508  		foundActorsForKeyEmpty := FindInReceptionist(
509  			as.Receptionist(), keyEmpty,
510  		)
511  		require.Empty(
512  			st, foundActorsForKeyEmpty,
513  			"Receptionist not empty for keyEmpty "+
514  				"after UnregisterAll.",
515  		)
516  	})
517  
518  	t.Run("unregister all with mixed keys", func(st *testing.T) {
519  		keyA := NewServiceKey[*testMsg, string]("sk-ua-keyA")
520  		keyB := NewServiceKey[*testMsg, string]("sk-ua-keyB")
521  
522  		// Spawn 3 actors, two of them will share the same service key.
523  		actorA1, err := keyA.Spawn(as, "actorA1-ua-mixed", beh)
524  		require.NoError(st, err)
525  		actorA2, err := keyA.Spawn(as, "actorA2-ua-mixed", beh)
526  		require.NoError(st, err)
527  		actorB1, err := keyB.Spawn(as, "actorB1-ua-mixed", beh)
528  		require.NoError(st, err)
529  
530  		// Make sure we're able to find them in the receptionist.
531  		require.Len(
532  			st, FindInReceptionist(as.Receptionist(), keyA), 2,
533  			"KeyA initial registration count mismatch.",
534  		)
535  		require.Len(
536  			st, FindInReceptionist(as.Receptionist(), keyB), 1,
537  			"KeyB initial registration count mismatch.",
538  		)
539  
540  		// We'll start by unregistering all actors for keyA.
541  		stoppedCountKeyA := keyA.UnregisterAll(as)
542  		require.Equal(
543  			st, 2, stoppedCountKeyA,
544  			"UnregisterAll for keyA returned incorrect count.",
545  		)
546  
547  		// Verify keyA actors are gone from receptionist, keyB actor
548  		// remains.
549  		require.Empty(
550  			st, FindInReceptionist(as.Receptionist(), keyA),
551  			"actors for keyA still in receptionist after "+
552  				"UnregisterAll.",
553  		)
554  		foundActorsForKeyBAfterA := FindInReceptionist(
555  			as.Receptionist(), keyB,
556  		)
557  		require.Len(
558  			st, foundActorsForKeyBAfterA, 1,
559  			"Actor for keyB affected by UnregisterAll on keyA.",
560  		)
561  		require.Equal(
562  			st, actorB1, foundActorsForKeyBAfterA[0],
563  			"Wrong actor found for keyB.",
564  		)
565  
566  		// Verify keyA actors are removed from system map, keyB actor
567  		// remains.
568  		as.mu.RLock()
569  		_, actorA1ExistsAfterMixed := as.actors[actorA1.ID()]
570  		_, actorA2ExistsAfterMixed := as.actors[actorA2.ID()]
571  		_, actorB1ExistsAfterMixed := as.actors[actorB1.ID()]
572  		as.mu.RUnlock()
573  		require.False(
574  			st, actorA1ExistsAfterMixed,
575  			"ActorA1 still in system actors map after "+
576  				"mixed UnregisterAll.",
577  		)
578  		require.False(
579  			st, actorA2ExistsAfterMixed,
580  			"ActorA2 still in system actors map after "+
581  				"mixed UnregisterAll.",
582  		)
583  		require.True(
584  			st, actorB1ExistsAfterMixed,
585  			"ActorB1 removed from system actors map incorrectly.",
586  		)
587  
588  		// Verify keyA actors are stopped, keyB actor is running.
589  		resultActorA1Mixed := actorA1.Ask(
590  			context.Background(), newTestMsg("ping-kA-a1"),
591  		).Await(context.Background())
592  		require.True(st, resultActorA1Mixed.IsErr())
593  		require.ErrorIs(
594  			st, resultActorA1Mixed.Err(), ErrActorTerminated,
595  		)
596  
597  		resultActorB1Mixed := actorB1.Ask(
598  			context.Background(), newTestMsg("ping-kB-a1"),
599  		).Await(context.Background())
600  		require.False(
601  			st, resultActorB1Mixed.IsErr(),
602  			"ActorB1 terminated incorrectly (mixed test): %v",
603  			resultActorB1Mixed.Err(),
604  		)
605  		resultActorB1Mixed.WhenOk(func(s string) {
606  			require.Equal(st, "echo: ping-kB-a1", s)
607  		})
608  	})
609  
610  	t.Run("idempotency of UnregisterAll", func(st *testing.T) {
611  		keyIdempotent := NewServiceKey[*testMsg, string](
612  			"sk-ua-key-idem",
613  		)
614  		actorIdem, err := keyIdempotent.Spawn(as, "actor-idem-ua", beh)
615  		require.NoError(st, err)
616  
617  		// First call should unregister and stop.
618  		stoppedCountFirstCall := keyIdempotent.UnregisterAll(as)
619  		require.Equal(
620  			st, 1, stoppedCountFirstCall,
621  			"UnregisterAll (first call) incorrect count.",
622  		)
623  
624  		// Second call should do nothing and return 0.
625  		stoppedCountSecondCall := keyIdempotent.UnregisterAll(as)
626  		require.Equal(
627  			st, 0, stoppedCountSecondCall,
628  			"UnregisterAll (second call) incorrect count, not "+
629  				"idempotent.",
630  		)
631  
632  		// Verify actor is gone from receptionist and system map, and is
633  		// stopped.
634  		require.Empty(
635  			st, FindInReceptionist(as.Receptionist(), keyIdempotent),
636  			"Actors for keyIdempotent still in receptionist "+
637  				"after calls.",
638  		)
639  
640  		as.mu.RLock()
641  		_, actorIdemExistsAfter := as.actors[actorIdem.ID()]
642  		as.mu.RUnlock()
643  		require.False(
644  			st, actorIdemExistsAfter,
645  			"ActorIdem still in system actors map after calls.",
646  		)
647  
648  		resultActorIdem := actorIdem.Ask(
649  			context.Background(), newTestMsg("ping-kidem-a1"),
650  		).Await(context.Background())
651  		require.True(st, resultActorIdem.IsErr())
652  		require.ErrorIs(st, resultActorIdem.Err(), ErrActorTerminated)
653  	})
654  }
655  
656  // routerTestHarness helps set up routers and their associated actors for testing.
657  // It uses an actorTestHarness internally for DLO observation for the router.
658  type routerTestHarness struct {
659  	*actorTestHarness
660  	as           *ActorSystem
661  	receptionist *Receptionist
662  }
663  
664  // newRouterTestHarness sets up a new harness for router testing.
665  // It creates an ActorSystem for actors that the router will route to,
666  // and uses the embedded actorTestHarness for the router's own DLO.
667  func newRouterTestHarness(t *testing.T) *routerTestHarness {
668  	t.Helper()
669  	system := NewActorSystem()
670  	t.Cleanup(func() {
671  		err := system.Shutdown()
672  		require.NoError(t, err, "router test actor system shutdown failed")
673  	})
674  
675  	// The DLO for the router itself will come from actorTestHarness.
676  	// Actors managed by `system` (router targets) will use `system.DeadLetters()`.
677  	return &routerTestHarness{
678  		actorTestHarness: newActorTestHarness(t),
679  		as:               system,
680  		receptionist:     system.Receptionist(),
681  	}
682  }
683  
684  // newRouterTargetActor creates an actor, registers it with the harness's
685  // ActorSystem (h.as) and Receptionist under the given service key. This actor
686  // is intended to be a target for the router.
687  func (h *routerTestHarness) newRouterTargetActor(id string,
688  	key ServiceKey[*testMsg, string],
689  	beh ActorBehavior[*testMsg, string]) ActorRef[*testMsg, string] {
690  
691  	h.t.Helper()
692  
693  	ref, err := RegisterWithSystem(h.as, id, key, beh)
694  	require.NoError(h.t, err)
695  
696  	return ref
697  }
698  
699  // TestRouterNewRouter verifies that a new router can be created as expected.
700  func TestRouterNewRouter(t *testing.T) {
701  	t.Parallel()
702  	h := newRouterTestHarness(t)
703  
704  	key := NewServiceKey[*testMsg, string]("router-service")
705  	strategy := NewRoundRobinStrategy[*testMsg, string]()
706  
707  	router := NewRouter(h.receptionist, key, strategy, h.dlo.Ref())
708  	require.NotNil(t, router, "newRouter should not return nil")
709  	require.Equal(t, "router(router-service)", router.ID(), "router ID mismatch")
710  }
711  
712  // countingEchoBehavior is an echo behavior that also counts how many messages
713  // it has processed.
714  type countingEchoBehavior struct {
715  	*echoBehavior
716  	id            string
717  	processedMsgs int64
718  }
719  
720  func newCountingEchoBehavior(t *testing.T, id string) *countingEchoBehavior {
721  	return &countingEchoBehavior{
722  		echoBehavior: newEchoBehavior(t, 0),
723  		id:           id,
724  	}
725  }
726  
727  func (b *countingEchoBehavior) Receive(ctx context.Context,
728  	msg *testMsg) fn.Result[string] {
729  
730  	atomic.AddInt64(&b.processedMsgs, 1)
731  
732  	// Include actor ID in reply for easier verification.
733  	res := b.echoBehavior.Receive(ctx, msg)
734  	val, err := res.Unpack()
735  	if err == nil {
736  		return fn.Ok(fmt.Sprintf("%s:%s", b.id, val))
737  	}
738  	return res
739  }
740  
741  // TestRouterTellAndAskRoundRobin verifies that the router distributes messages
742  // in a round robin properly.
743  func TestRouterTellAndAskRoundRobin(t *testing.T) {
744  	t.Parallel()
745  	h := newRouterTestHarness(t)
746  
747  	// Make a new router for the given service key and round robin strategy.
748  	serviceKey := NewServiceKey[*testMsg, string]("rr-service")
749  	strategy := NewRoundRobinStrategy[*testMsg, string]()
750  	router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref())
751  
752  	// We'll now register two actors with the router, each with a different
753  	// service key.
754  	actor1Beh := newCountingEchoBehavior(t, "actor1")
755  	actor2Beh := newCountingEchoBehavior(t, "actor2")
756  	_ = h.newRouterTargetActor("actor1-rr", serviceKey, actor1Beh)
757  	_ = h.newRouterTargetActor("actor2-rr", serviceKey, actor2Beh)
758  
759  	// Nxet, we'll send a mix of Tell and Ask messages to the router.
760  	numMessages := 6
761  	for i := 0; i < numMessages; i++ {
762  		msgData := fmt.Sprintf("message-%d", i)
763  		if i%2 == 0 {
764  			router.Tell(context.Background(), newTestMsg(msgData))
765  		} else {
766  			future := router.Ask(
767  				context.Background(), newTestMsg(msgData),
768  			)
769  			ctxAwait, cancelAwait := context.WithTimeout(
770  				context.Background(), time.Second,
771  			)
772  
773  			result := future.Await(ctxAwait)
774  			cancelAwait()
775  			require.False(
776  				t, result.IsErr(), "ask failed: %v", result.Err(),
777  			)
778  		}
779  	}
780  
781  	// Wait a bit for Tell messages to be processed.
782  	time.Sleep(100 * time.Millisecond)
783  
784  	// Each actor should have processed numMessages / 2 messages.
785  	require.EqualValues(
786  		t, numMessages/2, atomic.LoadInt64(&actor1Beh.processedMsgs),
787  		"actor1 processed message count mismatch",
788  	)
789  	require.EqualValues(
790  		t, numMessages/2, atomic.LoadInt64(&actor2Beh.processedMsgs),
791  		"actor2 processed message count mismatch",
792  	)
793  
794  	// Router's DLO should be empty.
795  	h.assertNoDLOMessages()
796  }
797  
798  // TestRouterNoActorsAvailable verifies that if no actors are available for the
799  // message, then an error is returned.
800  func TestRouterNoActorsAvailable(t *testing.T) {
801  	t.Parallel()
802  	h := newRouterTestHarness(t)
803  
804  	serviceKey := NewServiceKey[*testMsg, string]("no-actor-service")
805  	strategy := NewRoundRobinStrategy[*testMsg, string]()
806  	router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref())
807  
808  	// We'll send a message, then assert that it goes to the DLO.
809  	tellMsg := newTestMsg("tell-no-actor")
810  	router.Tell(context.Background(), tellMsg)
811  	h.assertDLOMessage(tellMsg)
812  
813  	// If we use an ask instead, then we should get an error.
814  	askMsg := newTestMsg("ask-no-actor")
815  	future := router.Ask(context.Background(), askMsg)
816  	result := future.Await(context.Background())
817  
818  	require.True(
819  		t, result.IsErr(), "ask should fail when no actors are available",
820  	)
821  	require.ErrorIs(t, result.Err(), ErrNoActorsAvailable, "error mismatch")
822  }
823  
824  // TestRouterTellAskContextCancellation verifies that if the context is
825  // canceled, then sending aborts.
826  func TestRouterTellAskContextCancellation(t *testing.T) {
827  	t.Parallel()
828  	h := newRouterTestHarness(t)
829  
830  	serviceKey := NewServiceKey[*testMsg, string]("ctx-cancel-service")
831  	strategy := NewRoundRobinStrategy[*testMsg, string]()
832  	router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref())
833  
834  	// Use a regular echo actor, but we'll control context for Tell/Ask.
835  	targetActorBeh := newEchoBehavior(t, 50*time.Millisecond)
836  	_ = h.newRouterTargetActor("target-ctx", serviceKey, targetActorBeh)
837  
838  	// Next, we'll send a Tell message with a context that will be cancelled
839  	// before we even send.
840  	ctxTell, cancelTell := context.WithCancel(context.Background())
841  	cancelTell()
842  	router.Tell(ctxTell, newTestMsg("tell-ctx-cancelled"))
843  
844  	// The Message should be dropped by actorRefImpl.Tell if ctx is
845  	// cancelled. Router's DLO should not receive it from this path.
846  	h.assertNoDLOMessages()
847  
848  	// Next, we'll do the same for Ask. This time, we should get an error.
849  	ctxAsk, cancelAsk := context.WithCancel(context.Background())
850  	cancelAsk()
851  	futureAsk := router.Ask(ctxAsk, newTestMsg("ask-ctx-cancelled"))
852  	resultAsk := futureAsk.Await(context.Background())
853  
854  	require.True(
855  		t, resultAsk.IsErr(), "ask with cancelled context should fail",
856  	)
857  	require.ErrorIs(
858  		t, resultAsk.Err(), context.Canceled,
859  		"error should be context.Canceled",
860  	)
861  }
862  
863  // TestRouterDynamicActorRegistration tests that we're able to dynamically add
864  // and remove actors from the router.
865  func TestRouterDynamicActorRegistration(t *testing.T) {
866  	t.Parallel()
867  	h := newRouterTestHarness(t)
868  
869  	serviceKey := NewServiceKey[*testMsg, string]("dynamic-service")
870  	strategy := NewRoundRobinStrategy[*testMsg, string]()
871  	router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref())
872  
873  	// If we try to send a mesasge to the router before any actors are
874  	// added, we should get an error.
875  	futureNoActor := router.Ask(context.Background(), newTestMsg("ping-no-actors"))
876  	resNoActor := futureNoActor.Await(context.Background())
877  	require.ErrorIs(t, resNoActor.Err(), ErrNoActorsAvailable)
878  
879  	actor1Beh := newCountingEchoBehavior(t, "actor1")
880  	actor1Ref := h.newRouterTargetActor("actor1-dynamic", serviceKey, actor1Beh)
881  
882  	// At this point, we have a new actor added, but we'll try to send a
883  	// message to a different actor ID. This should go to the router's DLO.
884  	futureActor1 := router.Ask(context.Background(), newTestMsg("ping-actor1"))
885  	ctxAwaitA1, cancelAwaitA1 := context.WithTimeout(context.Background(), time.Second)
886  	resActor1 := futureActor1.Await(ctxAwaitA1)
887  	cancelAwaitA1()
888  	require.False(t, resActor1.IsErr(), "ask to actor1 failed: %v", resActor1.Err())
889  	resActor1.WhenOk(func(s string) {
890  		require.Equal(t, "actor1:echo: ping-actor1", s)
891  	})
892  
893  	actor2Beh := newCountingEchoBehavior(t, "actor2")
894  	actor2Ref := h.newRouterTargetActor(
895  		"actor2-dynamic", serviceKey, actor2Beh,
896  	)
897  
898  	// Now that we've added two actors above, we should round robin between
899  	// them when sending.
900  	ctxAwaitDA1, cancelAwaitDA1 := context.WithTimeout(
901  		context.Background(), time.Second,
902  	)
903  	router.Ask(context.Background(), newTestMsg("dynamic-ask1")).Await(
904  		ctxAwaitDA1,
905  	)
906  	cancelAwaitDA1()
907  
908  	ctxAwaitDA2, cancelAwaitDA2 := context.WithTimeout(context.Background(), time.Second)
909  	router.Ask(context.Background(), newTestMsg("dynamic-ask2")).Await(ctxAwaitDA2)
910  	cancelAwaitDA2()
911  
912  	time.Sleep(50 * time.Millisecond)
913  
914  	// actor1 should have processed 2 messages (ping-actor1, dynamic-ask1),
915  	require.EqualValues(t, 2, atomic.LoadInt64(&actor1Beh.processedMsgs))
916  	require.EqualValues(t, 1, atomic.LoadInt64(&actor2Beh.processedMsgs))
917  
918  	// Next, we'll unregister the first actor ref.
919  	unregistered := UnregisterFromReceptionist(
920  		h.receptionist, serviceKey, actor1Ref,
921  	)
922  	require.True(t, unregistered)
923  
924  	// All the messages should now go to the second actor.
925  	for i := 0; i < 2; i++ {
926  		msgData := fmt.Sprintf("to-actor2-%d", i)
927  		future := router.Ask(context.Background(), newTestMsg(msgData))
928  		ctxAwaitLoop, cancelAwaitLoop := context.WithTimeout(
929  			context.Background(), time.Second,
930  		)
931  
932  		res := future.Await(ctxAwaitLoop)
933  		cancelAwaitLoop()
934  
935  		require.False(
936  			t, res.IsErr(), "ask to actor2 failed: %v", res.Err(),
937  		)
938  		res.WhenOk(func(s string) {
939  			require.Equal(t, "actor2:echo: "+msgData, s)
940  		})
941  	}
942  
943  	// Actor 1 shouldn't have got any of the messages, they should go to
944  	// actor 2.
945  	require.EqualValues(t, 2, atomic.LoadInt64(&actor1Beh.processedMsgs))
946  	require.EqualValues(t, 1+2, atomic.LoadInt64(&actor2Beh.processedMsgs))
947  
948  	// Next, we'll unregister the second actor ref.
949  	unregistered2 := UnregisterFromReceptionist(
950  		h.receptionist, serviceKey, actor2Ref,
951  	)
952  	require.True(t, unregistered2)
953  
954  	// If we try to send another message, it should go to the DL.
955  	tellMsg := newTestMsg("dynamic-tell-no-actors")
956  	router.Tell(context.Background(), tellMsg)
957  	h.assertDLOMessage(tellMsg)
958  }