queueable.ts
1 import { StreamSearch, type Token } from "./search.js"; 2 3 const EOQ = Symbol("End of Queue"); 4 5 export class QueueableStreamSearch { 6 private _search: StreamSearch; 7 private _chunksQueue: Array<Uint8Array | typeof EOQ> = []; 8 private _notify?: () => void; 9 private _closed = false; 10 11 public constructor(needle: Uint8Array | string) { 12 this._search = new StreamSearch(needle); 13 } 14 15 public push(...chunks: Uint8Array[]): void { 16 if (this._closed) { 17 throw new Error("cannot call push after close"); 18 } 19 20 this._chunksQueue.push(...chunks); 21 if (this._notify) { 22 this._notify(); 23 } 24 } 25 26 public close(): void { 27 if (this._closed) { 28 throw new Error("close was already called"); 29 } 30 31 this._closed = true; 32 this._chunksQueue.push(EOQ); 33 if (this._notify) { 34 this._notify(); 35 } 36 } 37 38 public async *[Symbol.asyncIterator](): AsyncIterableIterator<Token> { 39 while (true) { 40 let chunk: Uint8Array | typeof EOQ | undefined; 41 while (!(chunk = this._chunksQueue.shift())) { 42 await new Promise<void>((resolve) => (this._notify = resolve)); 43 this._notify = undefined; 44 } 45 46 if (chunk === EOQ) { 47 break; 48 } 49 50 yield* this._search.feed(chunk); 51 } 52 53 const tail = this._search.end(); 54 if (tail.length) { 55 yield tail; 56 } 57 } 58 }