actor_test.go
1 package actor 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "reflect" 8 "sync" 9 "sync/atomic" 10 "testing" 11 "time" 12 13 "github.com/lightningnetwork/lnd/fn/v2" 14 "github.com/stretchr/testify/require" 15 ) 16 17 // testMsg is a simple message type for testing. It embeds BaseMessage to 18 // satisfy the actor.Message interface. 19 type testMsg struct { 20 BaseMessage 21 data string 22 23 replyChan chan string 24 } 25 26 // MessageType returns the type name of the message. 27 func (m *testMsg) MessageType() string { 28 return "testMsg" 29 } 30 31 // newTestMsg creates a new test message. 32 func newTestMsg(data string) *testMsg { 33 return &testMsg{data: data} 34 } 35 36 // newTestMsgWithReply creates a new test message that includes a reply channel. 37 // This can be used by test behaviors to send data back to the test 38 // synchronously, especially for Tell operations. 39 func newTestMsgWithReply(data string, replyChan chan string) *testMsg { 40 return &testMsg{data: data, replyChan: replyChan} 41 } 42 43 // echoBehavior is a simple actor behavior that processes *testMsg messages. It 44 // stores the last message's data and, for Ask, echoes it back. For Tell, if 45 // replyChan is set in testMsg, it sends data back on it. 46 type echoBehavior struct { 47 lastMsgData atomic.Value 48 processingDelay time.Duration 49 t *testing.T 50 } 51 52 // newEchoBehavior creates a new echoBehavior. 53 func newEchoBehavior(t *testing.T, delay time.Duration) *echoBehavior { 54 return &echoBehavior{t: t, processingDelay: delay} 55 } 56 57 // Receive handles incoming messages. It simulates work if processingDelay is 58 // set, stores the message data, and responds for Ask operations or via 59 // replyChan for Tell. 60 func (b *echoBehavior) Receive(_ context.Context, 61 msg *testMsg) fn.Result[string] { 62 63 if b.processingDelay > 0 { 64 time.Sleep(b.processingDelay) 65 } 66 67 b.lastMsgData.Store(msg.data) 68 69 if msg.replyChan != nil { 70 // Attempt to send the data on the reply channel, but quit if 71 // it takes longer than 1 second (e.g., channel unbuffered 72 // and no receiver). 73 select { 74 case msg.replyChan <- msg.data: 75 case <-time.After(time.Second): 76 b.t.Logf("warning: replyChan send timed out") 77 } 78 } 79 80 return fn.Ok(fmt.Sprintf("echo: %s", msg.data)) 81 } 82 83 // GetLastMsgData retrieves the data from the last message processed. 84 func (b *echoBehavior) GetLastMsgData() (string, bool) { 85 val := b.lastMsgData.Load() 86 if val == nil { 87 return "", false 88 } 89 data, ok := val.(string) 90 return data, ok 91 } 92 93 // errorBehavior is an actor behavior that always returns a predefined error 94 // upon receiving a message. 95 type errorBehavior struct { 96 err error 97 } 98 99 // newErrorBehavior creates a new errorBehavior. 100 func newErrorBehavior(err error) *errorBehavior { 101 return &errorBehavior{err: err} 102 } 103 104 // Receive always returns the configured error. 105 func (b *errorBehavior) Receive(_ context.Context, 106 _ *testMsg) fn.Result[string] { 107 108 return fn.Err[string](b.err) 109 } 110 111 // blockingBehavior is an actor behavior that blocks until its actorCtx is done. 112 type blockingBehavior struct{} 113 114 // Receive blocks until the actor's context is cancelled, then returns the 115 // context's error. 116 func (b *blockingBehavior) Receive(actorCtx context.Context, 117 _ *testMsg) fn.Result[string] { 118 119 <-actorCtx.Done() 120 return fn.Err[string](actorCtx.Err()) 121 } 122 123 // deadLetterTestMsg is a distinct message type used for testing DLO 124 // interactions. 125 type deadLetterTestMsg struct { 126 BaseMessage 127 id string 128 } 129 130 // MessageType returns the type name of the message. 131 func (m *deadLetterTestMsg) MessageType() string { 132 return "deadLetterTestMsg" 133 } 134 135 // deadLetterObserverBehavior is a behavior for a test Dead Letter Office actor. 136 // It records all messages sent to it, allowing tests to verify DLO 137 // interactions. 138 type deadLetterObserverBehavior struct { 139 mu sync.Mutex 140 receivedMsgs []Message 141 } 142 143 // newDeadLetterObserverBehavior creates a new deadLetterObserverBehavior. 144 func newDeadLetterObserverBehavior() *deadLetterObserverBehavior { 145 return &deadLetterObserverBehavior{ 146 receivedMsgs: make([]Message, 0), 147 } 148 } 149 150 // Receive records the incoming message and returns a successful result. 151 func (b *deadLetterObserverBehavior) Receive(_ context.Context, 152 msg Message) fn.Result[any] { 153 154 b.mu.Lock() 155 b.receivedMsgs = append(b.receivedMsgs, msg) 156 b.mu.Unlock() 157 158 return fn.Ok[any](nil) 159 } 160 161 // GetReceivedMsgs returns a copy of all messages received by this DLO. 162 func (b *deadLetterObserverBehavior) GetReceivedMsgs() []Message { 163 b.mu.Lock() 164 defer b.mu.Unlock() 165 166 msgs := make([]Message, len(b.receivedMsgs)) 167 copy(msgs, b.receivedMsgs) 168 169 return msgs 170 } 171 172 // actorTestHarness provides helper methods for setting up actors in tests. It 173 // manages a dedicated DLO for actors created through it. 174 type actorTestHarness struct { 175 t *testing.T 176 dlo *Actor[Message, any] 177 dloBeh *deadLetterObserverBehavior 178 } 179 180 // newActorTestHarness sets up a test harness with a dedicated DLO. The DLO is 181 // automatically stopped when the test cleans up. 182 func newActorTestHarness(t *testing.T) *actorTestHarness { 183 t.Helper() 184 185 dloBeh := newDeadLetterObserverBehavior() 186 dloCfg := ActorConfig[Message, any]{ 187 ID: "test-dlo-" + t.Name(), 188 Behavior: dloBeh, 189 DLO: nil, 190 MailboxSize: 10, 191 } 192 dloActor, err := NewActor[Message, any](dloCfg) 193 require.NoError(t, err) 194 dloActor.Start() 195 196 t.Cleanup(dloActor.Stop) 197 198 return &actorTestHarness{ 199 t: t, 200 dlo: dloActor, 201 dloBeh: dloBeh, 202 } 203 } 204 205 // newActor creates, starts, and registers a new actor for cleanup. The actor 206 // will use the harness's DLO. 207 func (h *actorTestHarness) newActor(id string, 208 beh ActorBehavior[*testMsg, string], 209 mailboxSize int) *Actor[*testMsg, string] { 210 211 h.t.Helper() 212 213 cfg := ActorConfig[*testMsg, string]{ 214 ID: id, 215 Behavior: beh, 216 DLO: h.dlo.Ref(), 217 MailboxSize: mailboxSize, 218 } 219 actor, err := NewActor(cfg) 220 require.NoError(h.t, err) 221 actor.Start() 222 223 h.t.Cleanup(actor.Stop) 224 225 return actor 226 } 227 228 // assertDLOMessage checks that the DLO eventually receives a specific message. 229 func (h *actorTestHarness) assertDLOMessage(expectedMsg Message) { 230 h.t.Helper() 231 require.Eventually(h.t, func() bool { 232 msgs := h.dloBeh.GetReceivedMsgs() 233 for _, m := range msgs { 234 if reflect.DeepEqual(m, expectedMsg) { 235 return true 236 } 237 } 238 return false 239 }, time.Second, 10*time.Millisecond, 240 "dLO did not receive expected message: %v", expectedMsg, 241 ) 242 } 243 244 // assertNoDLOMessages checks that the DLO has not received any messages. 245 func (h *actorTestHarness) assertNoDLOMessages() { 246 h.t.Helper() 247 248 // Allow a very brief moment for any async DLO sends to occur. 249 time.Sleep(20 * time.Millisecond) 250 251 msgs := h.dloBeh.GetReceivedMsgs() 252 253 require.Empty(h.t, msgs, "dLO received unexpected messages") 254 } 255 256 // TestActorNewActorIDAndRefs verifies that NewActor correctly initializes an 257 // actor's ID and provides functional ActorRef and TellOnlyRef instances. 258 func TestActorNewActorIDAndRefs(t *testing.T) { 259 t.Parallel() 260 261 h := newActorTestHarness(t) 262 actorID := "test-actor-1" 263 beh := newEchoBehavior(t, 0) 264 actor := h.newActor(actorID, beh, 1) 265 266 require.Equal(t, actorID, actor.Ref().ID(), "actorRef ID mismatch") 267 require.Equal( 268 t, actorID, actor.TellRef().ID(), "tellOnlyRef ID mismatch", 269 ) 270 require.NotNil(t, actor.Ref(), "actorRef should not be nil") 271 require.NotNil(t, actor.TellRef(), "tellOnlyRef should not be nil") 272 } 273 274 // TestActorStartStop verifies the basic lifecycle of an actor: starting, 275 // processing messages, and stopping. 276 func TestActorStartStop(t *testing.T) { 277 t.Parallel() 278 279 h := newActorTestHarness(t) 280 beh := newEchoBehavior(t, 0) 281 actor := h.newActor("test-actor-lifecycle", beh, 1) 282 283 // Actor should be running and process a message. 284 msgData := "hello" 285 replyChan := make(chan string, 1) 286 actor.Ref().Tell( 287 context.Background(), newTestMsgWithReply(msgData, replyChan), 288 ) 289 290 received, err := fn.RecvOrTimeout(replyChan, 100*time.Millisecond) 291 require.NoError(t, err, "timed out waiting for actor to process message") 292 require.Equal( 293 t, msgData, received, "actor did not process message before stop", 294 ) 295 296 actor.Stop() 297 time.Sleep(50 * time.Millisecond) 298 299 // Try sending another message; it should ideally not be processed or go 300 // to DLO. 301 msgDataAfterStop := "message-after-stop" 302 replyChanAfterStop := make(chan string, 1) 303 actor.Ref().Tell( 304 context.Background(), 305 newTestMsgWithReply(msgDataAfterStop, replyChanAfterStop), 306 ) 307 308 // We expect a timeout here, meaning the message was not processed by 309 // the echoBehavior's replyChan. 310 _, err = fn.RecvOrTimeout(replyChanAfterStop, 100*time.Millisecond) 311 // err == nil would mean a message was received, meaning the actor 312 // processed it after Stop(). 313 require.Error(t, err, "actor processed message after Stop()") 314 require.ErrorContains(t, err, "timeout hit") 315 316 h.assertDLOMessage( 317 &testMsg{data: msgDataAfterStop, replyChan: replyChanAfterStop}, 318 ) 319 } 320 321 // TestActorTellBasic verifies that a message sent via Tell is processed by the 322 // actor's behavior. 323 func TestActorTellBasic(t *testing.T) { 324 t.Parallel() 325 326 h := newActorTestHarness(t) 327 beh := newEchoBehavior(t, 0) 328 actor := h.newActor("test-actor-tell", beh, 1) 329 330 msgData := "tell-message" 331 replyChan := make(chan string, 1) 332 actor.Ref().Tell( 333 context.Background(), newTestMsgWithReply(msgData, replyChan), 334 ) 335 336 receivedTell, errTell := fn.RecvOrTimeout(replyChan, 100*time.Millisecond) 337 require.NoError(t, errTell, "timed out waiting for Tell message processing") 338 require.Equal( 339 t, msgData, receivedTell, "behavior did not receive Tell message data", 340 ) 341 342 lastData, ok := beh.GetLastMsgData() 343 require.True(t, ok, "last message data not set in behavior") 344 require.Equal(t, msgData, lastData, "last message data mismatch") 345 h.assertNoDLOMessages() 346 } 347 348 // TestActorAskSuccess verifies that a message sent via Ask is processed, and 349 // the returned Future is completed with the behavior's successful result. 350 func TestActorAskSuccess(t *testing.T) { 351 t.Parallel() 352 353 h := newActorTestHarness(t) 354 beh := newEchoBehavior(t, 0) 355 actor := h.newActor("test-actor-ask-success", beh, 1) 356 357 msgData := "ask-message" 358 future := actor.Ref().Ask(context.Background(), newTestMsg(msgData)) 359 360 result := future.Await(context.Background()) 361 require.False(t, result.IsErr(), "ask returned an error: %v", result.Err()) 362 363 result.WhenOk(func(val string) { 364 expectedReply := fmt.Sprintf("echo: %s", msgData) 365 require.Equal(t, expectedReply, val, "ask response mismatch") 366 }) 367 368 lastData, ok := beh.GetLastMsgData() 369 require.True(t, ok, "last message data not set in behavior") 370 require.Equal(t, msgData, lastData, "last message data mismatch") 371 h.assertNoDLOMessages() 372 } 373 374 // TestActorAskErrorBehavior verifies that if an actor's behavior returns an 375 // error, the Future from an Ask call is completed with that error. 376 func TestActorAskErrorBehavior(t *testing.T) { 377 t.Parallel() 378 379 h := newActorTestHarness(t) 380 expectedErr := errors.New("behavior error") 381 beh := newErrorBehavior(expectedErr) 382 actor := h.newActor("test-actor-ask-error", beh, 1) 383 384 future := actor.Ref().Ask( 385 context.Background(), newTestMsg("ask-error-test"), 386 ) 387 388 result := future.Await(context.Background()) 389 require.True(t, result.IsErr(), "ask should have returned an error") 390 require.ErrorIs(t, result.Err(), expectedErr, "ask error mismatch") 391 392 h.assertNoDLOMessages() 393 } 394 395 // TestFunctionBehaviorFromSimple verifies that FunctionBehaviorFromSimple 396 // correctly adapts a simple (msg) -> (result, error) function into an 397 // ActorBehavior, handling both success and error cases. 398 func TestFunctionBehaviorFromSimple(t *testing.T) { 399 t.Parallel() 400 401 t.Run("success", func(t *testing.T) { 402 t.Parallel() 403 404 h := newActorTestHarness(t) 405 406 beh := FunctionBehaviorFromSimple( 407 func(msg *testMsg) (string, error) { 408 return "simple: " + msg.data, nil 409 }, 410 ) 411 actor := h.newActor("test-simple-success", beh, 1) 412 413 future := actor.Ref().Ask( 414 context.Background(), newTestMsg("hello"), 415 ) 416 result := future.Await(context.Background()) 417 require.False( 418 t, result.IsErr(), 419 "expected success, got: %v", result.Err(), 420 ) 421 result.WhenOk(func(val string) { 422 require.Equal(t, "simple: hello", val) 423 }) 424 }) 425 426 t.Run("error", func(t *testing.T) { 427 t.Parallel() 428 429 h := newActorTestHarness(t) 430 431 expectedErr := errors.New("simple behavior error") 432 beh := FunctionBehaviorFromSimple( 433 func(msg *testMsg) (string, error) { 434 return "", expectedErr 435 }, 436 ) 437 actor := h.newActor("test-simple-error", beh, 1) 438 439 future := actor.Ref().Ask( 440 context.Background(), newTestMsg("hello"), 441 ) 442 result := future.Await(context.Background()) 443 require.True(t, result.IsErr()) 444 require.ErrorIs(t, result.Err(), expectedErr) 445 }) 446 }