queueable.test.ts
1 import t, { type Test } from "tap"; 2 import * as u8 from "uint8arrays"; 3 4 import { QueueableStreamSearch } from "./queueable.js"; 5 import { MATCH } from "./search.js"; 6 7 t.test("queueable error handling", async (t: Test): Promise<void> => { 8 t.test( 9 "should throw error when push is called after close", 10 async (t: Test): Promise<void> => { 11 const search = new QueueableStreamSearch("test"); 12 search.close(); 13 14 t.throws(() => search.push(u8.fromString("data")), { 15 message: "cannot call push after close", 16 }); 17 18 t.end(); 19 }, 20 ); 21 22 t.test( 23 "should throw error when close is called twice", 24 async (t: Test): Promise<void> => { 25 const search = new QueueableStreamSearch("test"); 26 search.close(); 27 28 t.throws(() => search.close(), { message: "close was already called" }); 29 30 t.end(); 31 }, 32 ); 33 34 t.end(); 35 }); 36 37 t.test("queueable async behavior", async (t: Test): Promise<void> => { 38 t.test( 39 "should handle push notification while iterating", 40 async (t: Test): Promise<void> => { 41 const search = new QueueableStreamSearch("||"); 42 const results: string[] = []; 43 44 const iterPromise = (async () => { 45 for await (const token of search) { 46 if (token !== MATCH) { 47 results.push(u8.toString(token)); 48 } 49 } 50 })(); 51 52 await new Promise((resolve) => setTimeout(resolve, 10)); 53 search.push(u8.fromString("hello")); 54 55 await new Promise((resolve) => setTimeout(resolve, 10)); 56 search.push(u8.fromString("||")); 57 58 await new Promise((resolve) => setTimeout(resolve, 10)); 59 search.push(u8.fromString("world")); 60 61 await new Promise((resolve) => setTimeout(resolve, 10)); 62 search.close(); 63 64 await iterPromise; 65 66 t.same(results, ["hello", "world"]); 67 68 t.end(); 69 }, 70 ); 71 72 t.test( 73 "should handle close notification while waiting", 74 async (t: Test): Promise<void> => { 75 const search = new QueueableStreamSearch("test"); 76 const results: string[] = []; 77 78 const iterPromise = (async () => { 79 for await (const token of search) { 80 if (token !== MATCH) { 81 results.push(u8.toString(token)); 82 } 83 } 84 })(); 85 86 await new Promise((resolve) => setTimeout(resolve, 10)); 87 search.close(); 88 89 await iterPromise; 90 91 t.same(results, []); 92 93 t.end(); 94 }, 95 ); 96 97 t.test( 98 "should yield tail when it has length", 99 async (t: Test): Promise<void> => { 100 const search = new QueueableStreamSearch("test"); 101 const results: string[] = []; 102 103 const iterPromise = (async () => { 104 for await (const token of search) { 105 if (token !== MATCH) { 106 results.push(u8.toString(token)); 107 } 108 } 109 })(); 110 111 await new Promise((resolve) => setTimeout(resolve, 10)); 112 search.push(u8.fromString("hello te")); 113 114 await new Promise((resolve) => setTimeout(resolve, 10)); 115 search.close(); 116 117 await iterPromise; 118 119 t.same(results, ["hello ", "te"]); 120 121 t.end(); 122 }, 123 ); 124 125 t.end(); 126 });