system_test.go
1 package actor 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "sync/atomic" 8 "testing" 9 "time" 10 11 "github.com/lightningnetwork/lnd/fn/v2" 12 "github.com/stretchr/testify/require" 13 ) 14 15 // TestActorSystemNewActorSystem verifies the basic initialization of an 16 // ActorSystem, including its default DLO. 17 func TestActorSystemNewActorSystem(t *testing.T) { 18 t.Parallel() 19 20 as := NewActorSystem() 21 require.NotNil(t, as, "newActorSystem should not return nil") 22 require.NotNil(t, as.Receptionist(), "receptionist should not be nil") 23 require.NotNil(t, as.DeadLetters(), "deadLetters should not be nil") 24 require.Equal(t, "dead-letters", as.DeadLetters().ID(), "dLO ID mismatch") 25 26 // Test the DLO's behavior (it should return an error for Ask). 27 testDLOMsg := newTestMsg("to-dlo") 28 future := as.DeadLetters().Ask(context.Background(), testDLOMsg) 29 result := future.Await(context.Background()) 30 31 // We should get back an error for asks. 32 require.True( 33 t, result.IsErr(), "system DLO should return an error on Ask", 34 ) 35 expectedErrStr := "message undeliverable: " + testDLOMsg.MessageType() 36 require.EqualError( 37 t, result.Err(), expectedErrStr, "dLO error message mismatch", 38 ) 39 40 // Shutdown the system to clean up resources. 41 err := as.Shutdown() 42 require.NoError(t, err, "actorSystem shutdown failed") 43 } 44 45 // TestActorSystemRegisterWithSystem verifies actor registration, lifecycle 46 // management within the system. 47 func TestActorSystemRegisterWithSystem(t *testing.T) { 48 t.Parallel() 49 50 as := NewActorSystem() 51 defer func() { 52 err := as.Shutdown() 53 require.NoError(t, err) 54 }() 55 56 actorID := "test-actor-sys-reg" 57 serviceKey := NewServiceKey[*testMsg, string]("test-service") 58 59 // Using echoBehavior from actor_test.go (implicitly available) 60 beh := newEchoBehavior(t, 0) 61 62 // We'll start off by registering the actor. 63 actorRef, err := RegisterWithSystem(as, actorID, serviceKey, beh) 64 require.NoError(t, err) 65 require.NotNil(t, actorRef, "registerWithSystem should return a valid ActorRef") 66 require.Equal(t, actorID, actorRef.ID(), "registered actor ID mismatch") 67 68 // The actor should be found in the receptionist. 69 foundActors := FindInReceptionist(as.Receptionist(), serviceKey) 70 require.Len(t, foundActors, 1, "actor not found in receptionist") 71 require.Equal(t, actorRef, foundActors[0], "incorrect actor in receptionist") 72 73 // Next, we'll send out a simple tell, using our reply channel to make 74 // sure it's actually processed. 75 msgData := "hello-system-actor" 76 replyChan := make(chan string, 1) 77 actorRef.Tell(context.Background(), newTestMsgWithReply(msgData, replyChan)) 78 79 received, err := fn.RecvOrTimeout(replyChan, 100*time.Millisecond) 80 require.NoError(t, err, "timed out waiting for actor to process message") 81 require.Equal(t, msgData, received, "actor did not process message") 82 83 // Stop the actor through the system. 84 stopped := as.StopAndRemoveActor(actorID) 85 require.True(t, stopped, "StopAndRemoveActor failed") 86 87 // Wait for actor to fully stop. 88 time.Sleep(50 * time.Millisecond) 89 90 // Send a message to the now-stopped actor's ref. This should go to the 91 // system's DLO. 92 afterStopMsg := newTestMsg("after-stop-to-dlo") 93 require.NotPanics(t, func() { 94 actorRef.Tell(context.Background(), afterStopMsg) 95 }, "tell to stopped actor should not panic") 96 } 97 98 // TestActorSystemShutdown verifies that all actors are stopped and the system 99 // context is cancelled upon shutdown. 100 func TestActorSystemShutdown(t *testing.T) { 101 t.Parallel() 102 103 as := NewActorSystem() 104 105 // We'll start by making 3 new actors, each with a unique ID. 106 numActors := 3 107 actorRefs := make([]ActorRef[*testMsg, string], numActors) 108 for i := 0; i < numActors; i++ { 109 actorID := fmt.Sprintf("shutdown-test-actor-%d", i) 110 key := NewServiceKey[*testMsg, string]( 111 fmt.Sprintf("service-%d", i), 112 ) 113 beh := newEchoBehavior(t, 0) 114 ref, regErr := RegisterWithSystem(as, actorID, key, beh) 115 require.NoError(t, regErr) 116 actorRefs[i] = ref 117 } 118 119 // We'll now send a message to each actor to ensure that they're 120 // running. 121 for i, ref := range actorRefs { 122 future := ref.Ask( 123 context.Background(), 124 newTestMsg(fmt.Sprintf("ping-%d", i)), 125 ) 126 ctxAwait, cancelAwait := context.WithTimeout( 127 context.Background(), time.Second, 128 ) 129 res := future.Await(ctxAwait) 130 cancelAwait() 131 require.False( 132 t, res.IsErr(), 133 "actor %d failed to respond before shutdown: %v", 134 i, res.Err(), 135 ) 136 } 137 138 // Next, trigger a shutdown, and assert that the done channel gets 139 // closed. 140 err := as.Shutdown() 141 require.NoError(t, err, "actorSystem shutdown failed") 142 143 // Check if the system context is done using RecvOrTimeout with a zero 144 // timeout for a non-blocking check. 145 _, err = fn.RecvOrTimeout(as.ctx.Done(), time.Millisecond*100) 146 require.NoError(t, err, "actorSystem context not cancelled after shutdown") 147 148 // We'll now try to send a message to each of the actors, this should 149 // result in an error. 150 for i, ref := range actorRefs { 151 future := ref.Ask( 152 context.Background(), 153 newTestMsg(fmt.Sprintf("ping-after-shutdown-%d", i)), 154 ) 155 res := future.Await(context.Background()) 156 require.True( 157 t, res.IsErr(), 158 "actor %d Ask should fail after shutdown", i, 159 ) 160 require.ErrorIs(t, res.Err(), ErrActorTerminated) 161 } 162 163 as.mu.RLock() 164 require.Nil(t, as.actors, "actors map should be nil after shutdown") 165 as.mu.RUnlock() 166 167 // Once shutdown, we shouldn't be able to send to the DLO either. 168 dloRef := as.DeadLetters() 169 futureDLO := dloRef.Ask( 170 context.Background(), newTestMsg("ping-dlo-after-shutdown"), 171 ) 172 resDLO := futureDLO.Await(context.Background()) 173 require.True( 174 t, resDLO.IsErr(), "DLO Ask should fail after system shutdown", 175 ) 176 require.ErrorIs( 177 t, resDLO.Err(), ErrActorTerminated, 178 ) 179 } 180 181 // TestActorSystemStopAndRemoveActor verifies specific actor stopping and 182 // removal. 183 func TestActorSystemStopAndRemoveActor(t *testing.T) { 184 t.Parallel() 185 186 as := NewActorSystem() 187 defer func() { 188 err := as.Shutdown() 189 require.NoError(t, err) 190 }() 191 192 // Make some actor IDs, then unique service keys, then use that to 193 // register two actors. 194 actor1ID := "actor-to-stop" 195 actor2ID := "actor-to-keep" 196 key1 := NewServiceKey[*testMsg, string]("service1") 197 key2 := NewServiceKey[*testMsg, string]("service2") 198 beh := newEchoBehavior(t, 0) 199 200 ref1, err := RegisterWithSystem(as, actor1ID, key1, beh) 201 require.NoError(t, err) 202 ref2, err := RegisterWithSystem(as, actor2ID, key2, beh) 203 require.NoError(t, err) 204 205 // If we remove one actor, then try to send to it, we should get an 206 // error. 207 stopped := as.StopAndRemoveActor(actor1ID) 208 require.True(t, stopped, "failed to stop and remove actor1") 209 210 future1 := ref1.Ask(context.Background(), newTestMsg("ping-actor1")) 211 res1 := future1.Await(context.Background()) 212 require.True(t, res1.IsErr(), "actor1 should be stopped") 213 require.ErrorIs(t, res1.Err(), ErrActorTerminated) 214 215 as.mu.RLock() 216 _, exists := as.actors[actor1ID] 217 as.mu.RUnlock() 218 219 // The actor should no longer be found. 220 require.False(t, exists, "actor1 still in system's actor map") 221 222 // Make sure that we can still send messages to the existing actor. 223 future2 := ref2.Ask( 224 context.Background(), newTestMsg("ping-actor2"), 225 ) 226 227 ctxAwait2, cancelAwait2 := context.WithTimeout( 228 context.Background(), time.Second, 229 ) 230 res2 := future2.Await(ctxAwait2) 231 cancelAwait2() 232 233 require.False( 234 t, res2.IsErr(), "actor2 should still be running: %v", 235 res2.Err(), 236 ) 237 res2.WhenOk(func(s string) { 238 require.Equal(t, "echo: ping-actor2", s) 239 }) 240 241 stoppedNonExistent := as.StopAndRemoveActor("non-existent-actor") 242 require.False( 243 t, stoppedNonExistent, "stopping non-existent actor should "+ 244 "return false", 245 ) 246 } 247 248 // TestReceptionist covers basic registration, finding, and unregistration. 249 func TestReceptionist(t *testing.T) { 250 t.Parallel() 251 252 as := NewActorSystem() 253 defer func() { 254 err := as.Shutdown() 255 require.NoError(t, err) 256 }() 257 receptionist := as.Receptionist() 258 259 key1 := NewServiceKey[*testMsg, string]("key1") 260 key2 := NewServiceKey[*testMsg, string]("key2") 261 key1Again := NewServiceKey[*testMsg, string]("key1") 262 263 // Register 3 actor instance using the service keys we created above. 264 beh := newEchoBehavior(t, 0) 265 actor1Ref, err := RegisterWithSystem(as, "actor1-rec", key1, beh) 266 require.NoError(t, err) 267 actor2Ref, err := RegisterWithSystem(as, "actor2-rec", key1, beh) 268 require.NoError(t, err) 269 actor3Ref, err := RegisterWithSystem(as, "actor3-rec", key2, beh) 270 require.NoError(t, err) 271 272 // We should be able to find the actors we registered. 273 foundForKey1 := FindInReceptionist(receptionist, key1) 274 require.Len(t, foundForKey1, 2, "should find 2 actors for key1") 275 require.Contains(t, foundForKey1, actor1Ref) 276 require.Contains(t, foundForKey1, actor2Ref) 277 278 foundForKey1Again := FindInReceptionist(receptionist, key1Again) 279 require.ElementsMatch(t, foundForKey1, foundForKey1Again) 280 281 // Same goes for the second key we added. 282 foundForKey2 := FindInReceptionist(receptionist, key2) 283 require.Len(t, foundForKey2, 1, "should find 1 actor for key2") 284 require.Equal(t, actor3Ref, foundForKey2[0]) 285 286 // We shouldn't be able to find a key we didn't add. 287 nonExistentKey := NewServiceKey[*testMsg, string]("non-existent") 288 foundForNonExistent := FindInReceptionist(receptionist, nonExistentKey) 289 require.Empty(t, foundForNonExistent) 290 291 // We should be able to unregister the actors we added. 292 unregistered := UnregisterFromReceptionist( 293 receptionist, key1, actor1Ref, 294 ) 295 require.True(t, unregistered, "failed to unregister actor1Ref") 296 297 foundForKey1AfterUnreg := FindInReceptionist(receptionist, key1) 298 require.Len(t, foundForKey1AfterUnreg, 1) 299 require.Equal(t, actor2Ref, foundForKey1AfterUnreg[0]) 300 301 // If we try to unregister the same actor again, it should fail. 302 unregisteredAgain := UnregisterFromReceptionist(receptionist, key1, actor1Ref) 303 require.False(t, unregisteredAgain) 304 305 unregisteredLast := UnregisterFromReceptionist(receptionist, key1, actor2Ref) 306 require.True(t, unregisteredLast) 307 foundForKey1AfterAllUnreg := FindInReceptionist(receptionist, key1) 308 require.Empty(t, foundForKey1AfterAllUnreg) 309 310 receptionist.mu.RLock() 311 _, exists := receptionist.registrations[key1.name] 312 receptionist.mu.RUnlock() 313 require.False(t, exists, "key1 should be removed from registrations map") 314 315 // Finally, if we use the wrong key, or one that doesn't exist, that 316 // should also fail. 317 unregisteredWrongKey := UnregisterFromReceptionist(receptionist, key1, actor3Ref) 318 require.False(t, unregisteredWrongKey) 319 unregisteredNonExistentKey := UnregisterFromReceptionist(receptionist, nonExistentKey, actor1Ref) 320 require.False(t, unregisteredNonExistentKey) 321 } 322 323 // TestServiceKeyMethods tests Spawn and Unregister methods on ServiceKey. 324 func TestServiceKeyMethods(t *testing.T) { 325 t.Parallel() 326 327 as := NewActorSystem() 328 defer func() { 329 err := as.Shutdown() 330 require.NoError(t, err) 331 }() 332 333 key := NewServiceKey[*testMsg, string]("sk-service") 334 beh := newEchoBehavior(t, 0) 335 336 // Attempt to spawn a new actor using the service key and desired 337 // behavior. 338 actorRef, err := key.Spawn(as, "actor-sk-spawn", beh) 339 require.NoError(t, err) 340 require.NotNil(t, actorRef) 341 require.Equal(t, "actor-sk-spawn", actorRef.ID()) 342 343 // We should be able to find the actor in the receptionist. 344 found := FindInReceptionist(as.Receptionist(), key) 345 require.Len(t, found, 1) 346 require.Equal(t, actorRef, found[0]) 347 348 as.mu.RLock() 349 _, sysExists := as.actors[actorRef.ID()] 350 as.mu.RUnlock() 351 require.True(t, sysExists) 352 353 // Next, try to unregister the actor using the service key. 354 success := key.Unregister(as, actorRef) 355 require.True(t, success, "serviceKey.Unregister failed") 356 357 // The actor should no longer be found in the receptionist. 358 foundAfter := FindInReceptionist(as.Receptionist(), key) 359 require.Empty(t, foundAfter) 360 361 as.mu.RLock() 362 _, sysExistsAfter := as.actors[actorRef.ID()] 363 as.mu.RUnlock() 364 require.False(t, sysExistsAfter) 365 366 // If we try to send a message to the actor after unregistering it, then 367 // we should get an error. 368 future := actorRef.Ask(context.Background(), newTestMsg("ping")) 369 res := future.Await(context.Background()) 370 require.True(t, res.IsErr() && errors.Is(res.Err(), ErrActorTerminated)) 371 372 successAgain := key.Unregister(as, actorRef) 373 require.False(t, successAgain) 374 375 otherSys := NewActorSystem() // Create a different actor system 376 defer func() { 377 err := otherSys.Shutdown() 378 require.NoError(t, err) 379 }() 380 381 // Create a dummy actor in otherSys of the correct generic type for the 382 // key. This actor won't be found in 'as', so Unregister should fail. 383 dummyBehOther := newEchoBehavior(t, 0) 384 dummyKeyOther := NewServiceKey[*testMsg, string]("dummy-other") 385 dummyActorRefOtherSys, err := RegisterWithSystem( 386 otherSys, "dummy-other-actor", dummyKeyOther, dummyBehOther, 387 ) 388 require.NoError(t, err) 389 390 successNonMember := key.Unregister(as, dummyActorRefOtherSys) 391 require.False(t, successNonMember) 392 } 393 394 // TestServiceKeyUnregisterAll tests the UnregisterAll method on ServiceKey. 395 // It covers scenarios including basic unregistration of multiple actors, 396 // attempting to unregister with no actors present, unregistering actors for 397 // one key while leaving others intact, and the idempotency of the operation. 398 func TestServiceKeyUnregisterAll(t *testing.T) { 399 t.Parallel() 400 401 // Common setup for all sub-tests. 402 as := NewActorSystem() 403 defer func() { 404 err := as.Shutdown() 405 require.NoError(t, err, "ActorSystem shutdown failed.") 406 }() 407 408 // Common behavior for test actors used across sub-tests. 409 beh := newEchoBehavior(t, 0) 410 411 t.Run("unregister all multiple actors", func(st *testing.T) { 412 key1 := NewServiceKey[*testMsg, string]("sk-ua-key1") 413 actor1Key1, err := key1.Spawn(as, "actor1-k1-ua", beh) 414 require.NoError(st, err) 415 actor2Key1, err := key1.Spawn(as, "actor2-k1-ua", beh) 416 require.NoError(st, err) 417 418 // Verify they are registered in the receptionist. 419 foundActorsForKey1 := FindInReceptionist( 420 as.Receptionist(), key1, 421 ) 422 require.Len( 423 st, foundActorsForKey1, 2, 424 "actors for key1 not in receptionist initially.", 425 ) 426 427 // Verify they are in the system's actor map. 428 as.mu.RLock() 429 _, actor1Key1Exists := as.actors[actor1Key1.ID()] 430 _, actor2Key1Exists := as.actors[actor2Key1.ID()] 431 as.mu.RUnlock() 432 require.True( 433 st, actor1Key1Exists, 434 "actor1 for key1 not in system actors map initially.", 435 ) 436 require.True( 437 st, actor2Key1Exists, 438 "actor2 for key1 not in system actors map initially.", 439 ) 440 441 // Unregister all for key1. 442 stoppedCountKey1 := key1.UnregisterAll(as) 443 require.Equal( 444 st, 2, stoppedCountKey1, 445 "UnregisterAll for key1 returned incorrect count.", 446 ) 447 448 // Verify they are unregistered from the receptionist. 449 foundActorsForKey1After := FindInReceptionist( 450 as.Receptionist(), key1, 451 ) 452 require.Empty( 453 st, foundActorsForKey1After, 454 "actors for key1 still in receptionist after "+ 455 "UnregisterAll.", 456 ) 457 458 // Verify they are removed from system actors map. 459 as.mu.RLock() 460 _, actor1Key1ExistsAfter := as.actors[actor1Key1.ID()] 461 _, actor2Key1ExistsAfter := as.actors[actor2Key1.ID()] 462 as.mu.RUnlock() 463 require.False( 464 st, actor1Key1ExistsAfter, 465 "Actor1 for key1 still in system actors "+ 466 "map after UnregisterAll.", 467 ) 468 require.False( 469 st, actor2Key1ExistsAfter, 470 "Actor2 for key1 still in system actors "+ 471 "map after UnregisterAll.", 472 ) 473 474 // Verify actors are stopped. 475 resultActor1Key1 := actor1Key1.Ask( 476 context.Background(), newTestMsg("ping-k1-a1"), 477 ).Await(context.Background()) 478 require.True( 479 st, resultActor1Key1.IsErr(), 480 "Actor1 key1 Ask should fail after UnregisterAll.", 481 ) 482 require.ErrorIs( 483 st, resultActor1Key1.Err(), ErrActorTerminated, 484 "Actor1 key1 not terminated with correct error.", 485 ) 486 487 resultActor2Key1 := actor2Key1.Ask( 488 context.Background(), newTestMsg("ping-k1-a2"), 489 ).Await(context.Background()) 490 require.True( 491 st, resultActor2Key1.IsErr(), 492 "Actor2 key1 Ask should fail after UnregisterAll.", 493 ) 494 require.ErrorIs( 495 st, resultActor2Key1.Err(), ErrActorTerminated, 496 "Actor2 key1 not terminated with correct error.", 497 ) 498 }) 499 500 t.Run("unregister all with no actors for the key", func(st *testing.T) { 501 keyEmpty := NewServiceKey[*testMsg, string]("sk-ua-key-empty") 502 stoppedCountEmptyKey := keyEmpty.UnregisterAll(as) 503 require.Equal( 504 st, 0, stoppedCountEmptyKey, 505 "UnregisterAll for empty key returned non-zero count.", 506 ) 507 508 foundActorsForKeyEmpty := FindInReceptionist( 509 as.Receptionist(), keyEmpty, 510 ) 511 require.Empty( 512 st, foundActorsForKeyEmpty, 513 "Receptionist not empty for keyEmpty "+ 514 "after UnregisterAll.", 515 ) 516 }) 517 518 t.Run("unregister all with mixed keys", func(st *testing.T) { 519 keyA := NewServiceKey[*testMsg, string]("sk-ua-keyA") 520 keyB := NewServiceKey[*testMsg, string]("sk-ua-keyB") 521 522 // Spawn 3 actors, two of them will share the same service key. 523 actorA1, err := keyA.Spawn(as, "actorA1-ua-mixed", beh) 524 require.NoError(st, err) 525 actorA2, err := keyA.Spawn(as, "actorA2-ua-mixed", beh) 526 require.NoError(st, err) 527 actorB1, err := keyB.Spawn(as, "actorB1-ua-mixed", beh) 528 require.NoError(st, err) 529 530 // Make sure we're able to find them in the receptionist. 531 require.Len( 532 st, FindInReceptionist(as.Receptionist(), keyA), 2, 533 "KeyA initial registration count mismatch.", 534 ) 535 require.Len( 536 st, FindInReceptionist(as.Receptionist(), keyB), 1, 537 "KeyB initial registration count mismatch.", 538 ) 539 540 // We'll start by unregistering all actors for keyA. 541 stoppedCountKeyA := keyA.UnregisterAll(as) 542 require.Equal( 543 st, 2, stoppedCountKeyA, 544 "UnregisterAll for keyA returned incorrect count.", 545 ) 546 547 // Verify keyA actors are gone from receptionist, keyB actor 548 // remains. 549 require.Empty( 550 st, FindInReceptionist(as.Receptionist(), keyA), 551 "actors for keyA still in receptionist after "+ 552 "UnregisterAll.", 553 ) 554 foundActorsForKeyBAfterA := FindInReceptionist( 555 as.Receptionist(), keyB, 556 ) 557 require.Len( 558 st, foundActorsForKeyBAfterA, 1, 559 "Actor for keyB affected by UnregisterAll on keyA.", 560 ) 561 require.Equal( 562 st, actorB1, foundActorsForKeyBAfterA[0], 563 "Wrong actor found for keyB.", 564 ) 565 566 // Verify keyA actors are removed from system map, keyB actor 567 // remains. 568 as.mu.RLock() 569 _, actorA1ExistsAfterMixed := as.actors[actorA1.ID()] 570 _, actorA2ExistsAfterMixed := as.actors[actorA2.ID()] 571 _, actorB1ExistsAfterMixed := as.actors[actorB1.ID()] 572 as.mu.RUnlock() 573 require.False( 574 st, actorA1ExistsAfterMixed, 575 "ActorA1 still in system actors map after "+ 576 "mixed UnregisterAll.", 577 ) 578 require.False( 579 st, actorA2ExistsAfterMixed, 580 "ActorA2 still in system actors map after "+ 581 "mixed UnregisterAll.", 582 ) 583 require.True( 584 st, actorB1ExistsAfterMixed, 585 "ActorB1 removed from system actors map incorrectly.", 586 ) 587 588 // Verify keyA actors are stopped, keyB actor is running. 589 resultActorA1Mixed := actorA1.Ask( 590 context.Background(), newTestMsg("ping-kA-a1"), 591 ).Await(context.Background()) 592 require.True(st, resultActorA1Mixed.IsErr()) 593 require.ErrorIs( 594 st, resultActorA1Mixed.Err(), ErrActorTerminated, 595 ) 596 597 resultActorB1Mixed := actorB1.Ask( 598 context.Background(), newTestMsg("ping-kB-a1"), 599 ).Await(context.Background()) 600 require.False( 601 st, resultActorB1Mixed.IsErr(), 602 "ActorB1 terminated incorrectly (mixed test): %v", 603 resultActorB1Mixed.Err(), 604 ) 605 resultActorB1Mixed.WhenOk(func(s string) { 606 require.Equal(st, "echo: ping-kB-a1", s) 607 }) 608 }) 609 610 t.Run("idempotency of UnregisterAll", func(st *testing.T) { 611 keyIdempotent := NewServiceKey[*testMsg, string]( 612 "sk-ua-key-idem", 613 ) 614 actorIdem, err := keyIdempotent.Spawn(as, "actor-idem-ua", beh) 615 require.NoError(st, err) 616 617 // First call should unregister and stop. 618 stoppedCountFirstCall := keyIdempotent.UnregisterAll(as) 619 require.Equal( 620 st, 1, stoppedCountFirstCall, 621 "UnregisterAll (first call) incorrect count.", 622 ) 623 624 // Second call should do nothing and return 0. 625 stoppedCountSecondCall := keyIdempotent.UnregisterAll(as) 626 require.Equal( 627 st, 0, stoppedCountSecondCall, 628 "UnregisterAll (second call) incorrect count, not "+ 629 "idempotent.", 630 ) 631 632 // Verify actor is gone from receptionist and system map, and is 633 // stopped. 634 require.Empty( 635 st, FindInReceptionist(as.Receptionist(), keyIdempotent), 636 "Actors for keyIdempotent still in receptionist "+ 637 "after calls.", 638 ) 639 640 as.mu.RLock() 641 _, actorIdemExistsAfter := as.actors[actorIdem.ID()] 642 as.mu.RUnlock() 643 require.False( 644 st, actorIdemExistsAfter, 645 "ActorIdem still in system actors map after calls.", 646 ) 647 648 resultActorIdem := actorIdem.Ask( 649 context.Background(), newTestMsg("ping-kidem-a1"), 650 ).Await(context.Background()) 651 require.True(st, resultActorIdem.IsErr()) 652 require.ErrorIs(st, resultActorIdem.Err(), ErrActorTerminated) 653 }) 654 } 655 656 // routerTestHarness helps set up routers and their associated actors for testing. 657 // It uses an actorTestHarness internally for DLO observation for the router. 658 type routerTestHarness struct { 659 *actorTestHarness 660 as *ActorSystem 661 receptionist *Receptionist 662 } 663 664 // newRouterTestHarness sets up a new harness for router testing. 665 // It creates an ActorSystem for actors that the router will route to, 666 // and uses the embedded actorTestHarness for the router's own DLO. 667 func newRouterTestHarness(t *testing.T) *routerTestHarness { 668 t.Helper() 669 system := NewActorSystem() 670 t.Cleanup(func() { 671 err := system.Shutdown() 672 require.NoError(t, err, "router test actor system shutdown failed") 673 }) 674 675 // The DLO for the router itself will come from actorTestHarness. 676 // Actors managed by `system` (router targets) will use `system.DeadLetters()`. 677 return &routerTestHarness{ 678 actorTestHarness: newActorTestHarness(t), 679 as: system, 680 receptionist: system.Receptionist(), 681 } 682 } 683 684 // newRouterTargetActor creates an actor, registers it with the harness's 685 // ActorSystem (h.as) and Receptionist under the given service key. This actor 686 // is intended to be a target for the router. 687 func (h *routerTestHarness) newRouterTargetActor(id string, 688 key ServiceKey[*testMsg, string], 689 beh ActorBehavior[*testMsg, string]) ActorRef[*testMsg, string] { 690 691 h.t.Helper() 692 693 ref, err := RegisterWithSystem(h.as, id, key, beh) 694 require.NoError(h.t, err) 695 696 return ref 697 } 698 699 // TestRouterNewRouter verifies that a new router can be created as expected. 700 func TestRouterNewRouter(t *testing.T) { 701 t.Parallel() 702 h := newRouterTestHarness(t) 703 704 key := NewServiceKey[*testMsg, string]("router-service") 705 strategy := NewRoundRobinStrategy[*testMsg, string]() 706 707 router := NewRouter(h.receptionist, key, strategy, h.dlo.Ref()) 708 require.NotNil(t, router, "newRouter should not return nil") 709 require.Equal(t, "router(router-service)", router.ID(), "router ID mismatch") 710 } 711 712 // countingEchoBehavior is an echo behavior that also counts how many messages 713 // it has processed. 714 type countingEchoBehavior struct { 715 *echoBehavior 716 id string 717 processedMsgs int64 718 } 719 720 func newCountingEchoBehavior(t *testing.T, id string) *countingEchoBehavior { 721 return &countingEchoBehavior{ 722 echoBehavior: newEchoBehavior(t, 0), 723 id: id, 724 } 725 } 726 727 func (b *countingEchoBehavior) Receive(ctx context.Context, 728 msg *testMsg) fn.Result[string] { 729 730 atomic.AddInt64(&b.processedMsgs, 1) 731 732 // Include actor ID in reply for easier verification. 733 res := b.echoBehavior.Receive(ctx, msg) 734 val, err := res.Unpack() 735 if err == nil { 736 return fn.Ok(fmt.Sprintf("%s:%s", b.id, val)) 737 } 738 return res 739 } 740 741 // TestRouterTellAndAskRoundRobin verifies that the router distributes messages 742 // in a round robin properly. 743 func TestRouterTellAndAskRoundRobin(t *testing.T) { 744 t.Parallel() 745 h := newRouterTestHarness(t) 746 747 // Make a new router for the given service key and round robin strategy. 748 serviceKey := NewServiceKey[*testMsg, string]("rr-service") 749 strategy := NewRoundRobinStrategy[*testMsg, string]() 750 router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref()) 751 752 // We'll now register two actors with the router, each with a different 753 // service key. 754 actor1Beh := newCountingEchoBehavior(t, "actor1") 755 actor2Beh := newCountingEchoBehavior(t, "actor2") 756 _ = h.newRouterTargetActor("actor1-rr", serviceKey, actor1Beh) 757 _ = h.newRouterTargetActor("actor2-rr", serviceKey, actor2Beh) 758 759 // Nxet, we'll send a mix of Tell and Ask messages to the router. 760 numMessages := 6 761 for i := 0; i < numMessages; i++ { 762 msgData := fmt.Sprintf("message-%d", i) 763 if i%2 == 0 { 764 router.Tell(context.Background(), newTestMsg(msgData)) 765 } else { 766 future := router.Ask( 767 context.Background(), newTestMsg(msgData), 768 ) 769 ctxAwait, cancelAwait := context.WithTimeout( 770 context.Background(), time.Second, 771 ) 772 773 result := future.Await(ctxAwait) 774 cancelAwait() 775 require.False( 776 t, result.IsErr(), "ask failed: %v", result.Err(), 777 ) 778 } 779 } 780 781 // Wait a bit for Tell messages to be processed. 782 time.Sleep(100 * time.Millisecond) 783 784 // Each actor should have processed numMessages / 2 messages. 785 require.EqualValues( 786 t, numMessages/2, atomic.LoadInt64(&actor1Beh.processedMsgs), 787 "actor1 processed message count mismatch", 788 ) 789 require.EqualValues( 790 t, numMessages/2, atomic.LoadInt64(&actor2Beh.processedMsgs), 791 "actor2 processed message count mismatch", 792 ) 793 794 // Router's DLO should be empty. 795 h.assertNoDLOMessages() 796 } 797 798 // TestRouterNoActorsAvailable verifies that if no actors are available for the 799 // message, then an error is returned. 800 func TestRouterNoActorsAvailable(t *testing.T) { 801 t.Parallel() 802 h := newRouterTestHarness(t) 803 804 serviceKey := NewServiceKey[*testMsg, string]("no-actor-service") 805 strategy := NewRoundRobinStrategy[*testMsg, string]() 806 router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref()) 807 808 // We'll send a message, then assert that it goes to the DLO. 809 tellMsg := newTestMsg("tell-no-actor") 810 router.Tell(context.Background(), tellMsg) 811 h.assertDLOMessage(tellMsg) 812 813 // If we use an ask instead, then we should get an error. 814 askMsg := newTestMsg("ask-no-actor") 815 future := router.Ask(context.Background(), askMsg) 816 result := future.Await(context.Background()) 817 818 require.True( 819 t, result.IsErr(), "ask should fail when no actors are available", 820 ) 821 require.ErrorIs(t, result.Err(), ErrNoActorsAvailable, "error mismatch") 822 } 823 824 // TestRouterTellAskContextCancellation verifies that if the context is 825 // canceled, then sending aborts. 826 func TestRouterTellAskContextCancellation(t *testing.T) { 827 t.Parallel() 828 h := newRouterTestHarness(t) 829 830 serviceKey := NewServiceKey[*testMsg, string]("ctx-cancel-service") 831 strategy := NewRoundRobinStrategy[*testMsg, string]() 832 router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref()) 833 834 // Use a regular echo actor, but we'll control context for Tell/Ask. 835 targetActorBeh := newEchoBehavior(t, 50*time.Millisecond) 836 _ = h.newRouterTargetActor("target-ctx", serviceKey, targetActorBeh) 837 838 // Next, we'll send a Tell message with a context that will be cancelled 839 // before we even send. 840 ctxTell, cancelTell := context.WithCancel(context.Background()) 841 cancelTell() 842 router.Tell(ctxTell, newTestMsg("tell-ctx-cancelled")) 843 844 // The Message should be dropped by actorRefImpl.Tell if ctx is 845 // cancelled. Router's DLO should not receive it from this path. 846 h.assertNoDLOMessages() 847 848 // Next, we'll do the same for Ask. This time, we should get an error. 849 ctxAsk, cancelAsk := context.WithCancel(context.Background()) 850 cancelAsk() 851 futureAsk := router.Ask(ctxAsk, newTestMsg("ask-ctx-cancelled")) 852 resultAsk := futureAsk.Await(context.Background()) 853 854 require.True( 855 t, resultAsk.IsErr(), "ask with cancelled context should fail", 856 ) 857 require.ErrorIs( 858 t, resultAsk.Err(), context.Canceled, 859 "error should be context.Canceled", 860 ) 861 } 862 863 // TestRouterDynamicActorRegistration tests that we're able to dynamically add 864 // and remove actors from the router. 865 func TestRouterDynamicActorRegistration(t *testing.T) { 866 t.Parallel() 867 h := newRouterTestHarness(t) 868 869 serviceKey := NewServiceKey[*testMsg, string]("dynamic-service") 870 strategy := NewRoundRobinStrategy[*testMsg, string]() 871 router := NewRouter(h.receptionist, serviceKey, strategy, h.dlo.Ref()) 872 873 // If we try to send a mesasge to the router before any actors are 874 // added, we should get an error. 875 futureNoActor := router.Ask(context.Background(), newTestMsg("ping-no-actors")) 876 resNoActor := futureNoActor.Await(context.Background()) 877 require.ErrorIs(t, resNoActor.Err(), ErrNoActorsAvailable) 878 879 actor1Beh := newCountingEchoBehavior(t, "actor1") 880 actor1Ref := h.newRouterTargetActor("actor1-dynamic", serviceKey, actor1Beh) 881 882 // At this point, we have a new actor added, but we'll try to send a 883 // message to a different actor ID. This should go to the router's DLO. 884 futureActor1 := router.Ask(context.Background(), newTestMsg("ping-actor1")) 885 ctxAwaitA1, cancelAwaitA1 := context.WithTimeout(context.Background(), time.Second) 886 resActor1 := futureActor1.Await(ctxAwaitA1) 887 cancelAwaitA1() 888 require.False(t, resActor1.IsErr(), "ask to actor1 failed: %v", resActor1.Err()) 889 resActor1.WhenOk(func(s string) { 890 require.Equal(t, "actor1:echo: ping-actor1", s) 891 }) 892 893 actor2Beh := newCountingEchoBehavior(t, "actor2") 894 actor2Ref := h.newRouterTargetActor( 895 "actor2-dynamic", serviceKey, actor2Beh, 896 ) 897 898 // Now that we've added two actors above, we should round robin between 899 // them when sending. 900 ctxAwaitDA1, cancelAwaitDA1 := context.WithTimeout( 901 context.Background(), time.Second, 902 ) 903 router.Ask(context.Background(), newTestMsg("dynamic-ask1")).Await( 904 ctxAwaitDA1, 905 ) 906 cancelAwaitDA1() 907 908 ctxAwaitDA2, cancelAwaitDA2 := context.WithTimeout(context.Background(), time.Second) 909 router.Ask(context.Background(), newTestMsg("dynamic-ask2")).Await(ctxAwaitDA2) 910 cancelAwaitDA2() 911 912 time.Sleep(50 * time.Millisecond) 913 914 // actor1 should have processed 2 messages (ping-actor1, dynamic-ask1), 915 require.EqualValues(t, 2, atomic.LoadInt64(&actor1Beh.processedMsgs)) 916 require.EqualValues(t, 1, atomic.LoadInt64(&actor2Beh.processedMsgs)) 917 918 // Next, we'll unregister the first actor ref. 919 unregistered := UnregisterFromReceptionist( 920 h.receptionist, serviceKey, actor1Ref, 921 ) 922 require.True(t, unregistered) 923 924 // All the messages should now go to the second actor. 925 for i := 0; i < 2; i++ { 926 msgData := fmt.Sprintf("to-actor2-%d", i) 927 future := router.Ask(context.Background(), newTestMsg(msgData)) 928 ctxAwaitLoop, cancelAwaitLoop := context.WithTimeout( 929 context.Background(), time.Second, 930 ) 931 932 res := future.Await(ctxAwaitLoop) 933 cancelAwaitLoop() 934 935 require.False( 936 t, res.IsErr(), "ask to actor2 failed: %v", res.Err(), 937 ) 938 res.WhenOk(func(s string) { 939 require.Equal(t, "actor2:echo: "+msgData, s) 940 }) 941 } 942 943 // Actor 1 shouldn't have got any of the messages, they should go to 944 // actor 2. 945 require.EqualValues(t, 2, atomic.LoadInt64(&actor1Beh.processedMsgs)) 946 require.EqualValues(t, 1+2, atomic.LoadInt64(&actor2Beh.processedMsgs)) 947 948 // Next, we'll unregister the second actor ref. 949 unregistered2 := UnregisterFromReceptionist( 950 h.receptionist, serviceKey, actor2Ref, 951 ) 952 require.True(t, unregistered2) 953 954 // If we try to send another message, it should go to the DL. 955 tellMsg := newTestMsg("dynamic-tell-no-actors") 956 router.Tell(context.Background(), tellMsg) 957 h.assertDLOMessage(tellMsg) 958 }