/ actor / mailbox_test.go
mailbox_test.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"sync"
  6  	"testing"
  7  
  8  	"github.com/stretchr/testify/require"
  9  )
 10  
 11  // TestMessage is a test message type that embeds BaseMessage.
 12  type TestMessage struct {
 13  	BaseMessage
 14  	Value int
 15  }
 16  
 17  // MessageType returns the type name of the message for routing/filtering.
 18  func (tm TestMessage) MessageType() string {
 19  	return "TestMessage"
 20  }
 21  
 22  // TestChannelMailboxSend tests the Send method of ChannelMailbox.
 23  func TestChannelMailboxSend(t *testing.T) {
 24  	t.Run("successful send", func(t *testing.T) {
 25  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
 26  		ctx := context.Background()
 27  		env := envelope[TestMessage, int]{
 28  			message: TestMessage{Value: 42},
 29  			promise: nil,
 30  		}
 31  
 32  		sent := mailbox.Send(ctx, env)
 33  		require.True(t, sent, "Send should succeed")
 34  	})
 35  
 36  	t.Run("send with cancelled context", func(t *testing.T) {
 37  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 1)
 38  		// Fill the mailbox first.
 39  		env := envelope[TestMessage, int]{
 40  			message: TestMessage{Value: 42},
 41  			promise: nil,
 42  		}
 43  		mailbox.TrySend(env)
 44  
 45  		ctx, cancel := context.WithCancel(context.Background())
 46  		// Cancel immediately.
 47  		cancel()
 48  
 49  		env2 := envelope[TestMessage, int]{
 50  			message: TestMessage{Value: 43},
 51  			promise: nil,
 52  		}
 53  
 54  		sent := mailbox.Send(ctx, env2)
 55  		require.False(t, sent, "Send should fail with cancelled context")
 56  	})
 57  
 58  	t.Run("send to closed mailbox", func(t *testing.T) {
 59  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
 60  		mailbox.Close()
 61  
 62  		ctx := context.Background()
 63  		env := envelope[TestMessage, int]{
 64  			message: TestMessage{Value: 42},
 65  			promise: nil,
 66  		}
 67  
 68  		sent := mailbox.Send(ctx, env)
 69  		require.False(t, sent, "Send should fail on closed mailbox")
 70  	})
 71  }
 72  
 73  // TestChannelMailboxTrySend tests the TrySend method of ChannelMailbox.
 74  func TestChannelMailboxTrySend(t *testing.T) {
 75  	t.Run("successful try send", func(t *testing.T) {
 76  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
 77  		env := envelope[TestMessage, int]{
 78  			message: TestMessage{Value: 42},
 79  			promise: nil,
 80  		}
 81  
 82  		sent := mailbox.TrySend(env)
 83  		require.True(t, sent, "TrySend should succeed")
 84  	})
 85  
 86  	t.Run("try send to full mailbox", func(t *testing.T) {
 87  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 1)
 88  		env := envelope[TestMessage, int]{
 89  			message: TestMessage{Value: 42},
 90  			promise: nil,
 91  		}
 92  
 93  		// Fill the mailbox.
 94  		sent := mailbox.TrySend(env)
 95  		require.True(t, sent, "First TrySend should succeed")
 96  
 97  		// Try to send again - should fail.
 98  		sent = mailbox.TrySend(env)
 99  		require.False(t, sent, "TrySend should fail on full mailbox")
100  	})
101  
102  	t.Run("try send to closed mailbox", func(t *testing.T) {
103  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
104  		mailbox.Close()
105  
106  		env := envelope[TestMessage, int]{
107  			message: TestMessage{Value: 42},
108  			promise: nil,
109  		}
110  
111  		sent := mailbox.TrySend(env)
112  		require.False(t, sent, "TrySend should fail on closed mailbox")
113  	})
114  }
115  
116  // TestChannelMailboxReceive tests the Receive method of ChannelMailbox.
117  func TestChannelMailboxReceive(t *testing.T) {
118  	t.Run("receive messages", func(t *testing.T) {
119  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
120  		ctx := context.Background()
121  
122  		// Send some messages.
123  		for i := 0; i < 3; i++ {
124  			env := envelope[TestMessage, int]{
125  				message: TestMessage{Value: i},
126  				promise: nil,
127  			}
128  			mailbox.Send(ctx, env)
129  		}
130  
131  		// Start receiving in a goroutine.
132  		var received []int
133  		var wg sync.WaitGroup
134  		wg.Add(1)
135  		go func() {
136  			defer wg.Done()
137  			for env := range mailbox.Receive(ctx) {
138  				received = append(received, env.message.Value)
139  			}
140  		}()
141  
142  		// Close the mailbox after sending all messages.
143  		mailbox.Close()
144  		wg.Wait()
145  
146  		require.Len(t, received, 3, "Should receive 3 messages")
147  		require.Equal(t, []int{0, 1, 2}, received, "Should receive messages in order")
148  	})
149  
150  	t.Run("receive with cancelled context", func(t *testing.T) {
151  		mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
152  		ctx, cancel := context.WithCancel(context.Background())
153  
154  		// Send a message.
155  		env := envelope[TestMessage, int]{
156  			message: TestMessage{Value: 42},
157  			promise: nil,
158  		}
159  		mailbox.Send(context.Background(), env)
160  
161  		// Start receiving.
162  		var received int
163  		var wg sync.WaitGroup
164  		wg.Add(1)
165  		go func() {
166  			defer wg.Done()
167  			for env := range mailbox.Receive(ctx) {
168  				received++
169  				_ = env
170  			}
171  		}()
172  
173  		// Cancel the context.
174  		cancel()
175  		wg.Wait()
176  
177  		// Might receive 0 or 1 message depending on timing.
178  		require.LessOrEqual(t, received, 1,
179  			"Should stop receiving after context cancel")
180  	})
181  }
182  
183  // TestChannelMailboxClose tests the Close and IsClosed methods.
184  func TestChannelMailboxClose(t *testing.T) {
185  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
186  
187  	require.False(t, mailbox.IsClosed(), "Mailbox should not be closed initially")
188  
189  	mailbox.Close()
190  	require.True(t, mailbox.IsClosed(), "Mailbox should be closed after Close()")
191  
192  	// Closing again should be safe.
193  	mailbox.Close()
194  	require.True(t, mailbox.IsClosed(), "Mailbox should remain closed")
195  }
196  
197  // TestChannelMailboxDrain tests the Drain method of ChannelMailbox.
198  func TestChannelMailboxDrain(t *testing.T) {
199  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
200  	ctx := context.Background()
201  
202  	// Send some messages.
203  	for i := 0; i < 3; i++ {
204  		env := envelope[TestMessage, int]{
205  			message: TestMessage{Value: i},
206  			promise: nil,
207  		}
208  		mailbox.Send(ctx, env)
209  	}
210  
211  	// Close the mailbox.
212  	mailbox.Close()
213  
214  	// Drain messages.
215  	var drained []int
216  	for env := range mailbox.Drain() {
217  		drained = append(drained, env.message.Value)
218  	}
219  
220  	require.Len(t, drained, 3, "Should drain 3 messages")
221  	require.Equal(t, []int{0, 1, 2}, drained, "Should drain messages in order")
222  }
223  
224  // TestChannelMailboxConcurrent tests concurrent operations on ChannelMailbox.
225  func TestChannelMailboxConcurrent(t *testing.T) {
226  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100)
227  	ctx := context.Background()
228  
229  	const numSenders = 10
230  	const messagesPerSender = 100
231  
232  	var wg sync.WaitGroup
233  
234  	// Start multiple senders.
235  	for i := 0; i < numSenders; i++ {
236  		wg.Add(1)
237  		go func(senderID int) {
238  			defer wg.Done()
239  			for j := 0; j < messagesPerSender; j++ {
240  				env := envelope[TestMessage, int]{
241  					message: TestMessage{Value: senderID*1000 + j},
242  					promise: nil,
243  				}
244  				mailbox.Send(ctx, env)
245  			}
246  		}(i)
247  	}
248  
249  	// Start receiver.
250  	received := make([]int, 0, numSenders*messagesPerSender)
251  	var receiverWg sync.WaitGroup
252  	receiverWg.Add(1)
253  	go func() {
254  		defer receiverWg.Done()
255  		for env := range mailbox.Receive(ctx) {
256  			received = append(received, env.message.Value)
257  		}
258  	}()
259  
260  	// Wait for all senders to complete.
261  	wg.Wait()
262  
263  	// Close the mailbox now that all sends are complete.
264  	mailbox.Close()
265  	receiverWg.Wait()
266  
267  	require.Len(t, received, numSenders*messagesPerSender,
268  		"Should receive all messages")
269  }
270  
271  // TestChannelMailboxZeroCapacity tests that zero capacity defaults to 1.
272  func TestChannelMailboxZeroCapacity(t *testing.T) {
273  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 0)
274  
275  	// Should default to capacity of 1.
276  	env := envelope[TestMessage, int]{
277  		message: TestMessage{Value: 42},
278  		promise: nil,
279  	}
280  
281  	sent := mailbox.TrySend(env)
282  	require.True(t, sent, "Should be able to send one message")
283  
284  	// Second send should fail (mailbox full).
285  	sent = mailbox.TrySend(env)
286  	require.False(t, sent, "Second send should fail on full mailbox")
287  }
288  
289  // TestChannelMailboxActorContext tests that the mailbox respects the actor's
290  // context for cancellation.
291  func TestChannelMailboxActorContext(t *testing.T) {
292  	t.Run("send respects actor context", func(t *testing.T) {
293  		actorCtx, actorCancel := context.WithCancel(context.Background())
294  		mailbox := NewChannelMailbox[TestMessage, int](actorCtx, 1)
295  
296  		// Fill the mailbox.
297  		env := envelope[TestMessage, int]{
298  			message: TestMessage{Value: 42},
299  			promise: nil,
300  		}
301  		mailbox.TrySend(env)
302  
303  		// Cancel the actor context.
304  		actorCancel()
305  
306  		// Try to send with a fresh caller context - should fail due to
307  		// actor context cancellation.
308  		callerCtx := context.Background()
309  		env2 := envelope[TestMessage, int]{
310  			message: TestMessage{Value: 43},
311  			promise: nil,
312  		}
313  
314  		sent := mailbox.Send(callerCtx, env2)
315  		require.False(t, sent, "Send should fail when actor context is cancelled")
316  	})
317  
318  	t.Run("receive respects actor context", func(t *testing.T) {
319  		actorCtx, actorCancel := context.WithCancel(context.Background())
320  		mailbox := NewChannelMailbox[TestMessage, int](actorCtx, 10)
321  
322  		// Send a message.
323  		env := envelope[TestMessage, int]{
324  			message: TestMessage{Value: 42},
325  			promise: nil,
326  		}
327  		mailbox.Send(context.Background(), env)
328  
329  		// Start receiving with a fresh context.
330  		callerCtx := context.Background()
331  		var received int
332  		var wg sync.WaitGroup
333  		wg.Add(1)
334  		go func() {
335  			defer wg.Done()
336  			for env := range mailbox.Receive(callerCtx) {
337  				received++
338  				_ = env
339  			}
340  		}()
341  
342  		// Cancel the actor context.
343  		actorCancel()
344  		wg.Wait()
345  
346  		// Should have stopped receiving due to actor context cancellation.
347  		require.LessOrEqual(t, received, 1,
348  			"Should stop receiving when actor context is cancelled")
349  	})
350  }
351  
352  // TestMailboxConcurrentSendAndClose tests concurrent Send and Close operations
353  // to ensure no race conditions or panics occur.
354  func TestMailboxConcurrentSendAndClose(t *testing.T) {
355  	const numSenders = 20
356  	const sendsPerSender = 100
357  
358  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100)
359  	ctx := context.Background()
360  
361  	var wg sync.WaitGroup
362  
363  	// Start receiver to drain messages.
364  	var recvWg sync.WaitGroup
365  	recvWg.Add(1)
366  	go func() {
367  		defer recvWg.Done()
368  		for range mailbox.Receive(ctx) {
369  			// Just drain.
370  		}
371  	}()
372  
373  	// Start multiple senders.
374  	for i := 0; i < numSenders; i++ {
375  		wg.Add(1)
376  		go func(senderID int) {
377  			defer wg.Done()
378  			for j := 0; j < sendsPerSender; j++ {
379  				env := envelope[TestMessage, int]{
380  					message: TestMessage{Value: senderID*1000 + j},
381  					promise: nil,
382  				}
383  				// Send may fail if mailbox closes, that's ok.
384  				mailbox.Send(ctx, env)
385  			}
386  		}(i)
387  	}
388  
389  	// Concurrently close the mailbox multiple times from different
390  	// goroutines.
391  	for i := 0; i < 5; i++ {
392  		wg.Add(1)
393  		go func() {
394  			defer wg.Done()
395  			mailbox.Close()
396  		}()
397  	}
398  
399  	wg.Wait()
400  	recvWg.Wait()
401  
402  	// Mailbox should be closed.
403  	require.True(t, mailbox.IsClosed(), "Mailbox should be closed")
404  
405  	// Further sends should fail without panic.
406  	env := envelope[TestMessage, int]{
407  		message: TestMessage{Value: 999},
408  		promise: nil,
409  	}
410  	sent := mailbox.Send(ctx, env)
411  	require.False(t, sent, "Send should fail on closed mailbox")
412  }
413  
414  // TestMailboxConcurrentTrySendAndClose tests concurrent TrySend and Close
415  // operations to ensure no race conditions or panics occur.
416  func TestMailboxConcurrentTrySendAndClose(t *testing.T) {
417  	const numSenders = 20
418  	const sendsPerSender = 100
419  
420  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
421  
422  	var wg sync.WaitGroup
423  
424  	// Start multiple senders using TrySend.
425  	for i := 0; i < numSenders; i++ {
426  		wg.Add(1)
427  		go func(senderID int) {
428  			defer wg.Done()
429  			for j := 0; j < sendsPerSender; j++ {
430  				env := envelope[TestMessage, int]{
431  					message: TestMessage{Value: senderID*1000 + j},
432  					promise: nil,
433  				}
434  				// TrySend may fail if mailbox is full or closed.
435  				mailbox.TrySend(env)
436  			}
437  		}(i)
438  	}
439  
440  	// Concurrently close the mailbox.
441  	for i := 0; i < 5; i++ {
442  		wg.Add(1)
443  		go func() {
444  			defer wg.Done()
445  			mailbox.Close()
446  		}()
447  	}
448  
449  	wg.Wait()
450  
451  	// Mailbox should be closed.
452  	require.True(t, mailbox.IsClosed(), "Mailbox should be closed")
453  
454  	// Further sends should fail without panic.
455  	env := envelope[TestMessage, int]{
456  		message: TestMessage{Value: 999},
457  		promise: nil,
458  	}
459  	sent := mailbox.TrySend(env)
460  	require.False(t, sent, "TrySend should fail on closed mailbox")
461  }
462  
463  // TestMailboxMultipleCloseCallers tests that multiple goroutines calling
464  // Close() simultaneously don't cause panics or issues.
465  func TestMailboxMultipleCloseCallers(t *testing.T) {
466  	const numClosers = 100
467  
468  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 10)
469  
470  	var wg sync.WaitGroup
471  
472  	// Start many goroutines all trying to close the mailbox.
473  	for i := 0; i < numClosers; i++ {
474  		wg.Add(1)
475  		go func() {
476  			defer wg.Done()
477  			mailbox.Close()
478  		}()
479  	}
480  
481  	wg.Wait()
482  
483  	// Mailbox should be closed exactly once.
484  	require.True(t, mailbox.IsClosed(), "Mailbox should be closed")
485  
486  	// Calling Close again should be safe.
487  	mailbox.Close()
488  	require.True(t, mailbox.IsClosed(), "Mailbox should remain closed")
489  }
490  
491  // TestMailboxCloseWhileSending tests closing the mailbox while multiple
492  // senders are actively sending messages.
493  func TestMailboxCloseWhileSending(t *testing.T) {
494  	const numSenders = 10
495  	const sendsPerSender = 1000
496  
497  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 100)
498  	ctx := context.Background()
499  
500  	var sendWg sync.WaitGroup
501  
502  	// Start multiple senders.
503  	for i := 0; i < numSenders; i++ {
504  		sendWg.Add(1)
505  		go func(senderID int) {
506  			defer sendWg.Done()
507  			for j := 0; j < sendsPerSender; j++ {
508  				env := envelope[TestMessage, int]{
509  					message: TestMessage{Value: senderID*1000 + j},
510  					promise: nil,
511  				}
512  				// Send may fail after close, that's expected.
513  				mailbox.Send(ctx, env)
514  			}
515  		}(i)
516  	}
517  
518  	// Start receiver to drain messages.
519  	var recvWg sync.WaitGroup
520  	recvWg.Add(1)
521  	receivedCount := 0
522  	go func() {
523  		defer recvWg.Done()
524  		for range mailbox.Receive(ctx) {
525  			receivedCount++
526  		}
527  	}()
528  
529  	// Close mailbox while sends are happening.
530  	mailbox.Close()
531  
532  	sendWg.Wait()
533  	recvWg.Wait()
534  
535  	// Should have received at least some messages (exact count depends on
536  	// timing).
537  	t.Logf("Received %d messages before close", receivedCount)
538  
539  	// Mailbox should be closed.
540  	require.True(t, mailbox.IsClosed(), "Mailbox should be closed")
541  }
542  
543  // TestMailboxStressTest performs a high-concurrency stress test with multiple
544  // senders, receivers, and close operations.
545  func TestMailboxStressTest(t *testing.T) {
546  	const numSenders = 50
547  	const numReceivers = 5
548  	const sendsPerSender = 200
549  
550  	mailbox := NewChannelMailbox[TestMessage, int](context.Background(), 200)
551  	ctx := context.Background()
552  
553  	var sendWg sync.WaitGroup
554  
555  	// Start multiple senders.
556  	for i := 0; i < numSenders; i++ {
557  		sendWg.Add(1)
558  		go func(senderID int) {
559  			defer sendWg.Done()
560  			for j := 0; j < sendsPerSender; j++ {
561  				env := envelope[TestMessage, int]{
562  					message: TestMessage{Value: senderID*1000 + j},
563  					promise: nil,
564  				}
565  				mailbox.Send(ctx, env)
566  			}
567  		}(i)
568  	}
569  
570  	// Start multiple receivers.
571  	var recvWg sync.WaitGroup
572  	for i := 0; i < numReceivers; i++ {
573  		recvWg.Add(1)
574  		go func() {
575  			defer recvWg.Done()
576  			for range mailbox.Receive(ctx) {
577  				// Just drain messages.
578  			}
579  		}()
580  	}
581  
582  	// Wait for all sends to complete.
583  	sendWg.Wait()
584  
585  	// Close mailbox.
586  	mailbox.Close()
587  
588  	// Wait for all receivers to finish.
589  	recvWg.Wait()
590  
591  	// Mailbox should be closed.
592  	require.True(t, mailbox.IsClosed(), "Mailbox should be closed")
593  }