/ src / queueable.test.ts
queueable.test.ts
  1  import t, { type Test } from "tap";
  2  import * as u8 from "uint8arrays";
  3  
  4  import { QueueableStreamSearch } from "./queueable.js";
  5  import { MATCH } from "./search.js";
  6  
  7  t.test("queueable error handling", async (t: Test): Promise<void> => {
  8  	t.test(
  9  		"should throw error when push is called after close",
 10  		async (t: Test): Promise<void> => {
 11  			const search = new QueueableStreamSearch("test");
 12  			search.close();
 13  
 14  			t.throws(() => search.push(u8.fromString("data")), {
 15  				message: "cannot call push after close",
 16  			});
 17  
 18  			t.end();
 19  		},
 20  	);
 21  
 22  	t.test(
 23  		"should throw error when close is called twice",
 24  		async (t: Test): Promise<void> => {
 25  			const search = new QueueableStreamSearch("test");
 26  			search.close();
 27  
 28  			t.throws(() => search.close(), { message: "close was already called" });
 29  
 30  			t.end();
 31  		},
 32  	);
 33  
 34  	t.end();
 35  });
 36  
 37  t.test("queueable async behavior", async (t: Test): Promise<void> => {
 38  	t.test(
 39  		"should handle push notification while iterating",
 40  		async (t: Test): Promise<void> => {
 41  			const search = new QueueableStreamSearch("||");
 42  			const results: string[] = [];
 43  
 44  			const iterPromise = (async () => {
 45  				for await (const token of search) {
 46  					if (token !== MATCH) {
 47  						results.push(u8.toString(token));
 48  					}
 49  				}
 50  			})();
 51  
 52  			await new Promise((resolve) => setTimeout(resolve, 10));
 53  			search.push(u8.fromString("hello"));
 54  
 55  			await new Promise((resolve) => setTimeout(resolve, 10));
 56  			search.push(u8.fromString("||"));
 57  
 58  			await new Promise((resolve) => setTimeout(resolve, 10));
 59  			search.push(u8.fromString("world"));
 60  
 61  			await new Promise((resolve) => setTimeout(resolve, 10));
 62  			search.close();
 63  
 64  			await iterPromise;
 65  
 66  			t.same(results, ["hello", "world"]);
 67  
 68  			t.end();
 69  		},
 70  	);
 71  
 72  	t.test(
 73  		"should handle close notification while waiting",
 74  		async (t: Test): Promise<void> => {
 75  			const search = new QueueableStreamSearch("test");
 76  			const results: string[] = [];
 77  
 78  			const iterPromise = (async () => {
 79  				for await (const token of search) {
 80  					if (token !== MATCH) {
 81  						results.push(u8.toString(token));
 82  					}
 83  				}
 84  			})();
 85  
 86  			await new Promise((resolve) => setTimeout(resolve, 10));
 87  			search.close();
 88  
 89  			await iterPromise;
 90  
 91  			t.same(results, []);
 92  
 93  			t.end();
 94  		},
 95  	);
 96  
 97  	t.test(
 98  		"should yield tail when it has length",
 99  		async (t: Test): Promise<void> => {
100  			const search = new QueueableStreamSearch("test");
101  			const results: string[] = [];
102  
103  			const iterPromise = (async () => {
104  				for await (const token of search) {
105  					if (token !== MATCH) {
106  						results.push(u8.toString(token));
107  					}
108  				}
109  			})();
110  
111  			await new Promise((resolve) => setTimeout(resolve, 10));
112  			search.push(u8.fromString("hello te"));
113  
114  			await new Promise((resolve) => setTimeout(resolve, 10));
115  			search.close();
116  
117  			await iterPromise;
118  
119  			t.same(results, ["hello ", "te"]);
120  
121  			t.end();
122  		},
123  	);
124  
125  	t.end();
126  });