/ queue / queue_test.go
queue_test.go
 1  package queue_test
 2  
 3  import (
 4  	"testing"
 5  
 6  	"github.com/lightningnetwork/lnd/queue"
 7  )
 8  
 9  func testQueueAddDrain(t *testing.T, size, numStart, numStop, numAdd, numDrain int) {
10  	t.Helper()
11  
12  	queue := queue.NewConcurrentQueue(size)
13  	for i := 0; i < numStart; i++ {
14  		queue.Start()
15  	}
16  	for i := 0; i < numStop; i++ {
17  		t.Cleanup(queue.Stop)
18  	}
19  
20  	// Pushes should never block for long.
21  	for i := 0; i < numAdd; i++ {
22  		queue.ChanIn() <- i
23  	}
24  
25  	// Pops also should not block for long. Expect elements in FIFO order.
26  	for i := 0; i < numDrain; i++ {
27  		item := <-queue.ChanOut()
28  		if i != item.(int) {
29  			t.Fatalf("Dequeued wrong value: expected %d, got %d",
30  				i, item.(int))
31  		}
32  	}
33  }
34  
35  // TestConcurrentQueue tests that the queue properly adds 1000 items, drain all
36  // of them, and exit cleanly.
37  func TestConcurrentQueue(t *testing.T) {
38  	t.Parallel()
39  
40  	testQueueAddDrain(t, 100, 1, 1, 1000, 1000)
41  }
42  
43  // TestConcurrentQueueEarlyStop tests that the queue properly adds 1000 items,
44  // drain half of them, and still exit cleanly.
45  func TestConcurrentQueueEarlyStop(t *testing.T) {
46  	t.Parallel()
47  
48  	testQueueAddDrain(t, 100, 1, 1, 1000, 500)
49  }
50  
51  // TestConcurrentQueueIdempotentStart asserts that calling Start multiple times
52  // doesn't fail, and that the queue can still exit cleanly.
53  func TestConcurrentQueueIdempotentStart(t *testing.T) {
54  	t.Parallel()
55  
56  	testQueueAddDrain(t, 100, 10, 1, 1000, 1000)
57  }
58  
59  // TestConcurrentQueueIdempotentStop asserts that calling Stop multiple times
60  // doesn't fail, and that exiting doesn't block on subsequent Stops.
61  func TestConcurrentQueueIdempotentStop(t *testing.T) {
62  	t.Parallel()
63  
64  	testQueueAddDrain(t, 100, 1, 10, 1000, 1000)
65  }
66  
67  // TestQueueCloseIncoming tests that the queue properly handles an incoming
68  // channel that is closed.
69  func TestQueueCloseIncoming(t *testing.T) {
70  	t.Parallel()
71  
72  	queue := queue.NewConcurrentQueue(10)
73  	queue.Start()
74  
75  	queue.ChanIn() <- 1
76  	close(queue.ChanIn())
77  
78  	item := <-queue.ChanOut()
79  	if item.(int) != 1 {
80  		t.Fatalf("unexpected item")
81  	}
82  
83  	_, ok := <-queue.ChanOut()
84  	if ok {
85  		t.Fatalf("expected outgoing channel being closed")
86  	}
87  }