mailbox_test.go
1 package actor 2 3 import ( 4 "context" 5 "sync" 6 "testing" 7 8 "github.com/stretchr/testify/require" 9 ) 10 11 // TestMessage is a test message type that embeds BaseMessage. 12 type TestMessage struct { 13 BaseMessage 14 Value int 15 } 16 17 // MessageType returns the type name of the message for routing/filtering. 18 func (tm TestMessage) MessageType() string { 19 return "TestMessage" 20 } 21 22 // TestChannelMailboxSend tests the Send method of ChannelMailbox. 23 func TestChannelMailboxSend(t *testing.T) { 24 t.Run("successful send", func(t *testing.T) { 25 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 26 ctx := context.Background() 27 env := envelope[TestMessage, int]{ 28 message: TestMessage{Value: 42}, 29 promise: nil, 30 } 31 32 sent := mailbox.Send(ctx, env) 33 require.True(t, sent, "Send should succeed") 34 }) 35 36 t.Run("send with cancelled context", func(t *testing.T) { 37 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 1) 38 // Fill the mailbox first. 39 env := envelope[TestMessage, int]{ 40 message: TestMessage{Value: 42}, 41 promise: nil, 42 } 43 mailbox.TrySend(env) 44 45 ctx, cancel := context.WithCancel(context.Background()) 46 // Cancel immediately. 47 cancel() 48 49 env2 := envelope[TestMessage, int]{ 50 message: TestMessage{Value: 43}, 51 promise: nil, 52 } 53 54 sent := mailbox.Send(ctx, env2) 55 require.False(t, sent, "Send should fail with cancelled context") 56 }) 57 58 t.Run("send to closed mailbox", func(t *testing.T) { 59 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 60 mailbox.Close() 61 62 ctx := context.Background() 63 env := envelope[TestMessage, int]{ 64 message: TestMessage{Value: 42}, 65 promise: nil, 66 } 67 68 sent := mailbox.Send(ctx, env) 69 require.False(t, sent, "Send should fail on closed mailbox") 70 }) 71 } 72 73 // TestChannelMailboxTrySend tests the TrySend method of ChannelMailbox. 74 func TestChannelMailboxTrySend(t *testing.T) { 75 t.Run("successful try send", func(t *testing.T) { 76 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 77 env := envelope[TestMessage, int]{ 78 message: TestMessage{Value: 42}, 79 promise: nil, 80 } 81 82 sent := mailbox.TrySend(env) 83 require.True(t, sent, "TrySend should succeed") 84 }) 85 86 t.Run("try send to full mailbox", func(t *testing.T) { 87 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 1) 88 env := envelope[TestMessage, int]{ 89 message: TestMessage{Value: 42}, 90 promise: nil, 91 } 92 93 // Fill the mailbox. 94 sent := mailbox.TrySend(env) 95 require.True(t, sent, "First TrySend should succeed") 96 97 // Try to send again - should fail. 98 sent = mailbox.TrySend(env) 99 require.False(t, sent, "TrySend should fail on full mailbox") 100 }) 101 102 t.Run("try send to closed mailbox", func(t *testing.T) { 103 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 104 mailbox.Close() 105 106 env := envelope[TestMessage, int]{ 107 message: TestMessage{Value: 42}, 108 promise: nil, 109 } 110 111 sent := mailbox.TrySend(env) 112 require.False(t, sent, "TrySend should fail on closed mailbox") 113 }) 114 } 115 116 // TestChannelMailboxReceive tests the Receive method of ChannelMailbox. 117 func TestChannelMailboxReceive(t *testing.T) { 118 t.Run("receive messages", func(t *testing.T) { 119 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 120 ctx := context.Background() 121 122 // Send some messages. 123 for i := 0; i < 3; i++ { 124 env := envelope[TestMessage, int]{ 125 message: TestMessage{Value: i}, 126 promise: nil, 127 } 128 mailbox.Send(ctx, env) 129 } 130 131 // Start receiving in a goroutine. 132 var received []int 133 var wg sync.WaitGroup 134 wg.Add(1) 135 go func() { 136 defer wg.Done() 137 for env := range mailbox.Receive(ctx) { 138 received = append(received, env.message.Value) 139 } 140 }() 141 142 // Close the mailbox after sending all messages. 143 mailbox.Close() 144 wg.Wait() 145 146 require.Len(t, received, 3, "Should receive 3 messages") 147 require.Equal(t, []int{0, 1, 2}, received, "Should receive messages in order") 148 }) 149 150 t.Run("receive with cancelled context", func(t *testing.T) { 151 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 152 ctx, cancel := context.WithCancel(context.Background()) 153 154 // Send a message. 155 env := envelope[TestMessage, int]{ 156 message: TestMessage{Value: 42}, 157 promise: nil, 158 } 159 mailbox.Send(context.Background(), env) 160 161 // Start receiving. 162 var received int 163 var wg sync.WaitGroup 164 wg.Add(1) 165 go func() { 166 defer wg.Done() 167 for env := range mailbox.Receive(ctx) { 168 received++ 169 _ = env 170 } 171 }() 172 173 // Cancel the context. 174 cancel() 175 wg.Wait() 176 177 // Might receive 0 or 1 message depending on timing. 178 require.LessOrEqual(t, received, 1, 179 "Should stop receiving after context cancel") 180 }) 181 } 182 183 // TestChannelMailboxClose tests the Close and IsClosed methods. 184 func TestChannelMailboxClose(t *testing.T) { 185 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 186 187 require.False(t, mailbox.IsClosed(), "Mailbox should not be closed initially") 188 189 mailbox.Close() 190 require.True(t, mailbox.IsClosed(), "Mailbox should be closed after Close()") 191 192 // Closing again should be safe. 193 mailbox.Close() 194 require.True(t, mailbox.IsClosed(), "Mailbox should remain closed") 195 } 196 197 // TestChannelMailboxDrain tests the Drain method of ChannelMailbox. 198 func TestChannelMailboxDrain(t *testing.T) { 199 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 200 ctx := context.Background() 201 202 // Send some messages. 203 for i := 0; i < 3; i++ { 204 env := envelope[TestMessage, int]{ 205 message: TestMessage{Value: i}, 206 promise: nil, 207 } 208 mailbox.Send(ctx, env) 209 } 210 211 // Close the mailbox. 212 mailbox.Close() 213 214 // Drain messages. 215 var drained []int 216 for env := range mailbox.Drain() { 217 drained = append(drained, env.message.Value) 218 } 219 220 require.Len(t, drained, 3, "Should drain 3 messages") 221 require.Equal(t, []int{0, 1, 2}, drained, "Should drain messages in order") 222 } 223 224 // TestChannelMailboxConcurrent tests concurrent operations on ChannelMailbox. 225 func TestChannelMailboxConcurrent(t *testing.T) { 226 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100) 227 ctx := context.Background() 228 229 const numSenders = 10 230 const messagesPerSender = 100 231 232 var wg sync.WaitGroup 233 234 // Start multiple senders. 235 for i := 0; i < numSenders; i++ { 236 wg.Add(1) 237 go func(senderID int) { 238 defer wg.Done() 239 for j := 0; j < messagesPerSender; j++ { 240 env := envelope[TestMessage, int]{ 241 message: TestMessage{Value: senderID*1000 + j}, 242 promise: nil, 243 } 244 mailbox.Send(ctx, env) 245 } 246 }(i) 247 } 248 249 // Start receiver. 250 received := make([]int, 0, numSenders*messagesPerSender) 251 var receiverWg sync.WaitGroup 252 receiverWg.Add(1) 253 go func() { 254 defer receiverWg.Done() 255 for env := range mailbox.Receive(ctx) { 256 received = append(received, env.message.Value) 257 } 258 }() 259 260 // Wait for all senders to complete. 261 wg.Wait() 262 263 // Close the mailbox now that all sends are complete. 264 mailbox.Close() 265 receiverWg.Wait() 266 267 require.Len(t, received, numSenders*messagesPerSender, 268 "Should receive all messages") 269 } 270 271 // TestChannelMailboxZeroCapacity tests that zero capacity defaults to 1. 272 func TestChannelMailboxZeroCapacity(t *testing.T) { 273 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 0) 274 275 // Should default to capacity of 1. 276 env := envelope[TestMessage, int]{ 277 message: TestMessage{Value: 42}, 278 promise: nil, 279 } 280 281 sent := mailbox.TrySend(env) 282 require.True(t, sent, "Should be able to send one message") 283 284 // Second send should fail (mailbox full). 285 sent = mailbox.TrySend(env) 286 require.False(t, sent, "Second send should fail on full mailbox") 287 } 288 289 // TestChannelMailboxActorContext tests that the mailbox respects the actor's 290 // context for cancellation. 291 func TestChannelMailboxActorContext(t *testing.T) { 292 t.Run("send respects actor context", func(t *testing.T) { 293 actorCtx, actorCancel := context.WithCancel(context.Background()) 294 mailbox := NewChannelMailbox[TestMessage, int](actorCtx, 1) 295 296 // Fill the mailbox. 297 env := envelope[TestMessage, int]{ 298 message: TestMessage{Value: 42}, 299 promise: nil, 300 } 301 mailbox.TrySend(env) 302 303 // Cancel the actor context. 304 actorCancel() 305 306 // Try to send with a fresh caller context - should fail due to 307 // actor context cancellation. 308 callerCtx := context.Background() 309 env2 := envelope[TestMessage, int]{ 310 message: TestMessage{Value: 43}, 311 promise: nil, 312 } 313 314 sent := mailbox.Send(callerCtx, env2) 315 require.False(t, sent, "Send should fail when actor context is cancelled") 316 }) 317 318 t.Run("receive respects actor context", func(t *testing.T) { 319 actorCtx, actorCancel := context.WithCancel(context.Background()) 320 mailbox := NewChannelMailbox[TestMessage, int](actorCtx, 10) 321 322 // Send a message. 323 env := envelope[TestMessage, int]{ 324 message: TestMessage{Value: 42}, 325 promise: nil, 326 } 327 mailbox.Send(context.Background(), env) 328 329 // Start receiving with a fresh context. 330 callerCtx := context.Background() 331 var received int 332 var wg sync.WaitGroup 333 wg.Add(1) 334 go func() { 335 defer wg.Done() 336 for env := range mailbox.Receive(callerCtx) { 337 received++ 338 _ = env 339 } 340 }() 341 342 // Cancel the actor context. 343 actorCancel() 344 wg.Wait() 345 346 // Should have stopped receiving due to actor context cancellation. 347 require.LessOrEqual(t, received, 1, 348 "Should stop receiving when actor context is cancelled") 349 }) 350 } 351 352 // TestMailboxConcurrentSendAndClose tests concurrent Send and Close operations 353 // to ensure no race conditions or panics occur. 354 func TestMailboxConcurrentSendAndClose(t *testing.T) { 355 const numSenders = 20 356 const sendsPerSender = 100 357 358 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100) 359 ctx := context.Background() 360 361 var wg sync.WaitGroup 362 363 // Start receiver to drain messages. 364 var recvWg sync.WaitGroup 365 recvWg.Add(1) 366 go func() { 367 defer recvWg.Done() 368 for range mailbox.Receive(ctx) { 369 // Just drain. 370 } 371 }() 372 373 // Start multiple senders. 374 for i := 0; i < numSenders; i++ { 375 wg.Add(1) 376 go func(senderID int) { 377 defer wg.Done() 378 for j := 0; j < sendsPerSender; j++ { 379 env := envelope[TestMessage, int]{ 380 message: TestMessage{Value: senderID*1000 + j}, 381 promise: nil, 382 } 383 // Send may fail if mailbox closes, that's ok. 384 mailbox.Send(ctx, env) 385 } 386 }(i) 387 } 388 389 // Concurrently close the mailbox multiple times from different 390 // goroutines. 391 for i := 0; i < 5; i++ { 392 wg.Add(1) 393 go func() { 394 defer wg.Done() 395 mailbox.Close() 396 }() 397 } 398 399 wg.Wait() 400 recvWg.Wait() 401 402 // Mailbox should be closed. 403 require.True(t, mailbox.IsClosed(), "Mailbox should be closed") 404 405 // Further sends should fail without panic. 406 env := envelope[TestMessage, int]{ 407 message: TestMessage{Value: 999}, 408 promise: nil, 409 } 410 sent := mailbox.Send(ctx, env) 411 require.False(t, sent, "Send should fail on closed mailbox") 412 } 413 414 // TestMailboxConcurrentTrySendAndClose tests concurrent TrySend and Close 415 // operations to ensure no race conditions or panics occur. 416 func TestMailboxConcurrentTrySendAndClose(t *testing.T) { 417 const numSenders = 20 418 const sendsPerSender = 100 419 420 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 421 422 var wg sync.WaitGroup 423 424 // Start multiple senders using TrySend. 425 for i := 0; i < numSenders; i++ { 426 wg.Add(1) 427 go func(senderID int) { 428 defer wg.Done() 429 for j := 0; j < sendsPerSender; j++ { 430 env := envelope[TestMessage, int]{ 431 message: TestMessage{Value: senderID*1000 + j}, 432 promise: nil, 433 } 434 // TrySend may fail if mailbox is full or closed. 435 mailbox.TrySend(env) 436 } 437 }(i) 438 } 439 440 // Concurrently close the mailbox. 441 for i := 0; i < 5; i++ { 442 wg.Add(1) 443 go func() { 444 defer wg.Done() 445 mailbox.Close() 446 }() 447 } 448 449 wg.Wait() 450 451 // Mailbox should be closed. 452 require.True(t, mailbox.IsClosed(), "Mailbox should be closed") 453 454 // Further sends should fail without panic. 455 env := envelope[TestMessage, int]{ 456 message: TestMessage{Value: 999}, 457 promise: nil, 458 } 459 sent := mailbox.TrySend(env) 460 require.False(t, sent, "TrySend should fail on closed mailbox") 461 } 462 463 // TestMailboxMultipleCloseCallers tests that multiple goroutines calling 464 // Close() simultaneously don't cause panics or issues. 465 func TestMailboxMultipleCloseCallers(t *testing.T) { 466 const numClosers = 100 467 468 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10) 469 470 var wg sync.WaitGroup 471 472 // Start many goroutines all trying to close the mailbox. 473 for i := 0; i < numClosers; i++ { 474 wg.Add(1) 475 go func() { 476 defer wg.Done() 477 mailbox.Close() 478 }() 479 } 480 481 wg.Wait() 482 483 // Mailbox should be closed exactly once. 484 require.True(t, mailbox.IsClosed(), "Mailbox should be closed") 485 486 // Calling Close again should be safe. 487 mailbox.Close() 488 require.True(t, mailbox.IsClosed(), "Mailbox should remain closed") 489 } 490 491 // TestMailboxCloseWhileSending tests closing the mailbox while multiple 492 // senders are actively sending messages. 493 func TestMailboxCloseWhileSending(t *testing.T) { 494 const numSenders = 10 495 const sendsPerSender = 1000 496 497 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100) 498 ctx := context.Background() 499 500 var sendWg sync.WaitGroup 501 502 // Start multiple senders. 503 for i := 0; i < numSenders; i++ { 504 sendWg.Add(1) 505 go func(senderID int) { 506 defer sendWg.Done() 507 for j := 0; j < sendsPerSender; j++ { 508 env := envelope[TestMessage, int]{ 509 message: TestMessage{Value: senderID*1000 + j}, 510 promise: nil, 511 } 512 // Send may fail after close, that's expected. 513 mailbox.Send(ctx, env) 514 } 515 }(i) 516 } 517 518 // Start receiver to drain messages. 519 var recvWg sync.WaitGroup 520 recvWg.Add(1) 521 receivedCount := 0 522 go func() { 523 defer recvWg.Done() 524 for range mailbox.Receive(ctx) { 525 receivedCount++ 526 } 527 }() 528 529 // Close mailbox while sends are happening. 530 mailbox.Close() 531 532 sendWg.Wait() 533 recvWg.Wait() 534 535 // Should have received at least some messages (exact count depends on 536 // timing). 537 t.Logf("Received %d messages before close", receivedCount) 538 539 // Mailbox should be closed. 540 require.True(t, mailbox.IsClosed(), "Mailbox should be closed") 541 } 542 543 // TestMailboxStressTest performs a high-concurrency stress test with multiple 544 // senders, receivers, and close operations. 545 func TestMailboxStressTest(t *testing.T) { 546 const numSenders = 50 547 const numReceivers = 5 548 const sendsPerSender = 200 549 550 mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 200) 551 ctx := context.Background() 552 553 var sendWg sync.WaitGroup 554 555 // Start multiple senders. 556 for i := 0; i < numSenders; i++ { 557 sendWg.Add(1) 558 go func(senderID int) { 559 defer sendWg.Done() 560 for j := 0; j < sendsPerSender; j++ { 561 env := envelope[TestMessage, int]{ 562 message: TestMessage{Value: senderID*1000 + j}, 563 promise: nil, 564 } 565 mailbox.Send(ctx, env) 566 } 567 }(i) 568 } 569 570 // Start multiple receivers. 571 var recvWg sync.WaitGroup 572 for i := 0; i < numReceivers; i++ { 573 recvWg.Add(1) 574 go func() { 575 defer recvWg.Done() 576 for range mailbox.Receive(ctx) { 577 // Just drain messages. 578 } 579 }() 580 } 581 582 // Wait for all sends to complete. 583 sendWg.Wait() 584 585 // Close mailbox. 586 mailbox.Close() 587 588 // Wait for all receivers to finish. 589 recvWg.Wait() 590 591 // Mailbox should be closed. 592 require.True(t, mailbox.IsClosed(), "Mailbox should be closed") 593 }