/ actor / actor_test.go
actor_test.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"fmt"
  7  	"reflect"
  8  	"sync"
  9  	"sync/atomic"
 10  	"testing"
 11  	"time"
 12  
 13  	"github.com/lightningnetwork/lnd/fn/v2"
 14  	"github.com/stretchr/testify/require"
 15  )
 16  
 17  // testMsg is a simple message type for testing. It embeds BaseMessage to
 18  // satisfy the actor.Message interface.
 19  type testMsg struct {
 20  	BaseMessage
 21  	data string
 22  
 23  	replyChan chan string
 24  }
 25  
 26  // MessageType returns the type name of the message.
 27  func (m *testMsg) MessageType() string {
 28  	return "testMsg"
 29  }
 30  
 31  // newTestMsg creates a new test message.
 32  func newTestMsg(data string) *testMsg {
 33  	return &testMsg{data: data}
 34  }
 35  
 36  // newTestMsgWithReply creates a new test message that includes a reply channel.
 37  // This can be used by test behaviors to send data back to the test
 38  // synchronously, especially for Tell operations.
 39  func newTestMsgWithReply(data string, replyChan chan string) *testMsg {
 40  	return &testMsg{data: data, replyChan: replyChan}
 41  }
 42  
 43  // echoBehavior is a simple actor behavior that processes *testMsg messages. It
 44  // stores the last message's data and, for Ask, echoes it back. For Tell, if
 45  // replyChan is set in testMsg, it sends data back on it.
 46  type echoBehavior struct {
 47  	lastMsgData     atomic.Value
 48  	processingDelay time.Duration
 49  	t               *testing.T
 50  }
 51  
 52  // newEchoBehavior creates a new echoBehavior.
 53  func newEchoBehavior(t *testing.T, delay time.Duration) *echoBehavior {
 54  	return &echoBehavior{t: t, processingDelay: delay}
 55  }
 56  
 57  // Receive handles incoming messages. It simulates work if processingDelay is
 58  // set, stores the message data, and responds for Ask operations or via
 59  // replyChan for Tell.
 60  func (b *echoBehavior) Receive(_ context.Context,
 61  	msg *testMsg) fn.Result[string] {
 62  
 63  	if b.processingDelay > 0 {
 64  		time.Sleep(b.processingDelay)
 65  	}
 66  
 67  	b.lastMsgData.Store(msg.data)
 68  
 69  	if msg.replyChan != nil {
 70  		// Attempt to send the data on the reply channel, but quit if
 71  		// it takes longer than 1 second (e.g., channel unbuffered
 72  		// and no receiver).
 73  		select {
 74  		case msg.replyChan <- msg.data:
 75  		case <-time.After(time.Second):
 76  			b.t.Logf("warning: replyChan send timed out")
 77  		}
 78  	}
 79  
 80  	return fn.Ok(fmt.Sprintf("echo: %s", msg.data))
 81  }
 82  
 83  // GetLastMsgData retrieves the data from the last message processed.
 84  func (b *echoBehavior) GetLastMsgData() (string, bool) {
 85  	val := b.lastMsgData.Load()
 86  	if val == nil {
 87  		return "", false
 88  	}
 89  	data, ok := val.(string)
 90  	return data, ok
 91  }
 92  
 93  // errorBehavior is an actor behavior that always returns a predefined error
 94  // upon receiving a message.
 95  type errorBehavior struct {
 96  	err error
 97  }
 98  
 99  // newErrorBehavior creates a new errorBehavior.
100  func newErrorBehavior(err error) *errorBehavior {
101  	return &errorBehavior{err: err}
102  }
103  
104  // Receive always returns the configured error.
105  func (b *errorBehavior) Receive(_ context.Context,
106  	_ *testMsg) fn.Result[string] {
107  
108  	return fn.Err[string](b.err)
109  }
110  
111  // blockingBehavior is an actor behavior that blocks until its actorCtx is done.
112  type blockingBehavior struct{}
113  
114  // Receive blocks until the actor's context is cancelled, then returns the
115  // context's error.
116  func (b *blockingBehavior) Receive(actorCtx context.Context,
117  	_ *testMsg) fn.Result[string] {
118  
119  	<-actorCtx.Done()
120  	return fn.Err[string](actorCtx.Err())
121  }
122  
123  // deadLetterTestMsg is a distinct message type used for testing DLO
124  // interactions.
125  type deadLetterTestMsg struct {
126  	BaseMessage
127  	id string
128  }
129  
130  // MessageType returns the type name of the message.
131  func (m *deadLetterTestMsg) MessageType() string {
132  	return "deadLetterTestMsg"
133  }
134  
135  // deadLetterObserverBehavior is a behavior for a test Dead Letter Office actor.
136  // It records all messages sent to it, allowing tests to verify DLO
137  // interactions.
138  type deadLetterObserverBehavior struct {
139  	mu           sync.Mutex
140  	receivedMsgs []Message
141  }
142  
143  // newDeadLetterObserverBehavior creates a new deadLetterObserverBehavior.
144  func newDeadLetterObserverBehavior() *deadLetterObserverBehavior {
145  	return &deadLetterObserverBehavior{
146  		receivedMsgs: make([]Message, 0),
147  	}
148  }
149  
150  // Receive records the incoming message and returns a successful result.
151  func (b *deadLetterObserverBehavior) Receive(_ context.Context,
152  	msg Message) fn.Result[any] {
153  
154  	b.mu.Lock()
155  	b.receivedMsgs = append(b.receivedMsgs, msg)
156  	b.mu.Unlock()
157  
158  	return fn.Ok[any](nil)
159  }
160  
161  // GetReceivedMsgs returns a copy of all messages received by this DLO.
162  func (b *deadLetterObserverBehavior) GetReceivedMsgs() []Message {
163  	b.mu.Lock()
164  	defer b.mu.Unlock()
165  
166  	msgs := make([]Message, len(b.receivedMsgs))
167  	copy(msgs, b.receivedMsgs)
168  
169  	return msgs
170  }
171  
172  // actorTestHarness provides helper methods for setting up actors in tests. It
173  // manages a dedicated DLO for actors created through it.
174  type actorTestHarness struct {
175  	t      *testing.T
176  	dlo    *Actor[Message, any]
177  	dloBeh *deadLetterObserverBehavior
178  }
179  
180  // newActorTestHarness sets up a test harness with a dedicated DLO. The DLO is
181  // automatically stopped when the test cleans up.
182  func newActorTestHarness(t *testing.T) *actorTestHarness {
183  	t.Helper()
184  
185  	dloBeh := newDeadLetterObserverBehavior()
186  	dloCfg := ActorConfig[Message, any]{
187  		ID:          "test-dlo-" + t.Name(),
188  		Behavior:    dloBeh,
189  		DLO:         nil,
190  		MailboxSize: 10,
191  	}
192  	dloActor, err := NewActor[Message, any](dloCfg)
193  	require.NoError(t, err)
194  	dloActor.Start()
195  
196  	t.Cleanup(dloActor.Stop)
197  
198  	return &actorTestHarness{
199  		t:      t,
200  		dlo:    dloActor,
201  		dloBeh: dloBeh,
202  	}
203  }
204  
205  // newActor creates, starts, and registers a new actor for cleanup. The actor
206  // will use the harness's DLO.
207  func (h *actorTestHarness) newActor(id string,
208  	beh ActorBehavior[*testMsg, string],
209  	mailboxSize int) *Actor[*testMsg, string] {
210  
211  	h.t.Helper()
212  
213  	cfg := ActorConfig[*testMsg, string]{
214  		ID:          id,
215  		Behavior:    beh,
216  		DLO:         h.dlo.Ref(),
217  		MailboxSize: mailboxSize,
218  	}
219  	actor, err := NewActor(cfg)
220  	require.NoError(h.t, err)
221  	actor.Start()
222  
223  	h.t.Cleanup(actor.Stop)
224  
225  	return actor
226  }
227  
228  // assertDLOMessage checks that the DLO eventually receives a specific message.
229  func (h *actorTestHarness) assertDLOMessage(expectedMsg Message) {
230  	h.t.Helper()
231  	require.Eventually(h.t, func() bool {
232  		msgs := h.dloBeh.GetReceivedMsgs()
233  		for _, m := range msgs {
234  			if reflect.DeepEqual(m, expectedMsg) {
235  				return true
236  			}
237  		}
238  		return false
239  	}, time.Second, 10*time.Millisecond,
240  		"dLO did not receive expected message: %v", expectedMsg,
241  	)
242  }
243  
244  // assertNoDLOMessages checks that the DLO has not received any messages.
245  func (h *actorTestHarness) assertNoDLOMessages() {
246  	h.t.Helper()
247  
248  	// Allow a very brief moment for any async DLO sends to occur.
249  	time.Sleep(20 * time.Millisecond)
250  
251  	msgs := h.dloBeh.GetReceivedMsgs()
252  
253  	require.Empty(h.t, msgs, "dLO received unexpected messages")
254  }
255  
256  // TestActorNewActorIDAndRefs verifies that NewActor correctly initializes an
257  // actor's ID and provides functional ActorRef and TellOnlyRef instances.
258  func TestActorNewActorIDAndRefs(t *testing.T) {
259  	t.Parallel()
260  
261  	h := newActorTestHarness(t)
262  	actorID := "test-actor-1"
263  	beh := newEchoBehavior(t, 0)
264  	actor := h.newActor(actorID, beh, 1)
265  
266  	require.Equal(t, actorID, actor.Ref().ID(), "actorRef ID mismatch")
267  	require.Equal(
268  		t, actorID, actor.TellRef().ID(), "tellOnlyRef ID mismatch",
269  	)
270  	require.NotNil(t, actor.Ref(), "actorRef should not be nil")
271  	require.NotNil(t, actor.TellRef(), "tellOnlyRef should not be nil")
272  }
273  
274  // TestActorStartStop verifies the basic lifecycle of an actor: starting,
275  // processing messages, and stopping.
276  func TestActorStartStop(t *testing.T) {
277  	t.Parallel()
278  
279  	h := newActorTestHarness(t)
280  	beh := newEchoBehavior(t, 0)
281  	actor := h.newActor("test-actor-lifecycle", beh, 1)
282  
283  	// Actor should be running and process a message.
284  	msgData := "hello"
285  	replyChan := make(chan string, 1)
286  	actor.Ref().Tell(
287  		context.Background(), newTestMsgWithReply(msgData, replyChan),
288  	)
289  
290  	received, err := fn.RecvOrTimeout(replyChan, 100*time.Millisecond)
291  	require.NoError(t, err, "timed out waiting for actor to process message")
292  	require.Equal(
293  		t, msgData, received, "actor did not process message before stop",
294  	)
295  
296  	actor.Stop()
297  	time.Sleep(50 * time.Millisecond)
298  
299  	// Try sending another message; it should ideally not be processed or go
300  	// to DLO.
301  	msgDataAfterStop := "message-after-stop"
302  	replyChanAfterStop := make(chan string, 1)
303  	actor.Ref().Tell(
304  		context.Background(),
305  		newTestMsgWithReply(msgDataAfterStop, replyChanAfterStop),
306  	)
307  
308  	// We expect a timeout here, meaning the message was not processed by
309  	// the echoBehavior's replyChan.
310  	_, err = fn.RecvOrTimeout(replyChanAfterStop, 100*time.Millisecond)
311  	// err == nil would mean a message was received, meaning the actor
312  	// processed it after Stop().
313  	require.Error(t, err, "actor processed message after Stop()")
314  	require.ErrorContains(t, err, "timeout hit")
315  
316  	h.assertDLOMessage(
317  		&testMsg{data: msgDataAfterStop, replyChan: replyChanAfterStop},
318  	)
319  }
320  
321  // TestActorTellBasic verifies that a message sent via Tell is processed by the
322  // actor's behavior.
323  func TestActorTellBasic(t *testing.T) {
324  	t.Parallel()
325  
326  	h := newActorTestHarness(t)
327  	beh := newEchoBehavior(t, 0)
328  	actor := h.newActor("test-actor-tell", beh, 1)
329  
330  	msgData := "tell-message"
331  	replyChan := make(chan string, 1)
332  	actor.Ref().Tell(
333  		context.Background(), newTestMsgWithReply(msgData, replyChan),
334  	)
335  
336  	receivedTell, errTell := fn.RecvOrTimeout(replyChan, 100*time.Millisecond)
337  	require.NoError(t, errTell, "timed out waiting for Tell message processing")
338  	require.Equal(
339  		t, msgData, receivedTell, "behavior did not receive Tell message data",
340  	)
341  
342  	lastData, ok := beh.GetLastMsgData()
343  	require.True(t, ok, "last message data not set in behavior")
344  	require.Equal(t, msgData, lastData, "last message data mismatch")
345  	h.assertNoDLOMessages()
346  }
347  
348  // TestActorAskSuccess verifies that a message sent via Ask is processed, and
349  // the returned Future is completed with the behavior's successful result.
350  func TestActorAskSuccess(t *testing.T) {
351  	t.Parallel()
352  
353  	h := newActorTestHarness(t)
354  	beh := newEchoBehavior(t, 0)
355  	actor := h.newActor("test-actor-ask-success", beh, 1)
356  
357  	msgData := "ask-message"
358  	future := actor.Ref().Ask(context.Background(), newTestMsg(msgData))
359  
360  	result := future.Await(context.Background())
361  	require.False(t, result.IsErr(), "ask returned an error: %v", result.Err())
362  
363  	result.WhenOk(func(val string) {
364  		expectedReply := fmt.Sprintf("echo: %s", msgData)
365  		require.Equal(t, expectedReply, val, "ask response mismatch")
366  	})
367  
368  	lastData, ok := beh.GetLastMsgData()
369  	require.True(t, ok, "last message data not set in behavior")
370  	require.Equal(t, msgData, lastData, "last message data mismatch")
371  	h.assertNoDLOMessages()
372  }
373  
374  // TestActorAskErrorBehavior verifies that if an actor's behavior returns an
375  // error, the Future from an Ask call is completed with that error.
376  func TestActorAskErrorBehavior(t *testing.T) {
377  	t.Parallel()
378  
379  	h := newActorTestHarness(t)
380  	expectedErr := errors.New("behavior error")
381  	beh := newErrorBehavior(expectedErr)
382  	actor := h.newActor("test-actor-ask-error", beh, 1)
383  
384  	future := actor.Ref().Ask(
385  		context.Background(), newTestMsg("ask-error-test"),
386  	)
387  
388  	result := future.Await(context.Background())
389  	require.True(t, result.IsErr(), "ask should have returned an error")
390  	require.ErrorIs(t, result.Err(), expectedErr, "ask error mismatch")
391  
392  	h.assertNoDLOMessages()
393  }
394  
395  // TestFunctionBehaviorFromSimple verifies that FunctionBehaviorFromSimple
396  // correctly adapts a simple (msg) -> (result, error) function into an
397  // ActorBehavior, handling both success and error cases.
398  func TestFunctionBehaviorFromSimple(t *testing.T) {
399  	t.Parallel()
400  
401  	t.Run("success", func(t *testing.T) {
402  		t.Parallel()
403  
404  		h := newActorTestHarness(t)
405  
406  		beh := FunctionBehaviorFromSimple(
407  			func(msg *testMsg) (string, error) {
408  				return "simple: " + msg.data, nil
409  			},
410  		)
411  		actor := h.newActor("test-simple-success", beh, 1)
412  
413  		future := actor.Ref().Ask(
414  			context.Background(), newTestMsg("hello"),
415  		)
416  		result := future.Await(context.Background())
417  		require.False(
418  			t, result.IsErr(),
419  			"expected success, got: %v", result.Err(),
420  		)
421  		result.WhenOk(func(val string) {
422  			require.Equal(t, "simple: hello", val)
423  		})
424  	})
425  
426  	t.Run("error", func(t *testing.T) {
427  		t.Parallel()
428  
429  		h := newActorTestHarness(t)
430  
431  		expectedErr := errors.New("simple behavior error")
432  		beh := FunctionBehaviorFromSimple(
433  			func(msg *testMsg) (string, error) {
434  				return "", expectedErr
435  			},
436  		)
437  		actor := h.newActor("test-simple-error", beh, 1)
438  
439  		future := actor.Ref().Ask(
440  			context.Background(), newTestMsg("hello"),
441  		)
442  		result := future.Await(context.Background())
443  		require.True(t, result.IsErr())
444  		require.ErrorIs(t, result.Err(), expectedErr)
445  	})
446  }