/ src / queueable.ts
queueable.ts
 1  import { StreamSearch, type Token } from "./search.js";
 2  
 3  const EOQ = Symbol("End of Queue");
 4  
 5  export class QueueableStreamSearch {
 6  	private _search: StreamSearch;
 7  	private _chunksQueue: Array<Uint8Array | typeof EOQ> = [];
 8  	private _notify?: () => void;
 9  	private _closed = false;
10  
11  	public constructor(needle: Uint8Array | string) {
12  		this._search = new StreamSearch(needle);
13  	}
14  
15  	public push(...chunks: Uint8Array[]): void {
16  		if (this._closed) {
17  			throw new Error("cannot call push after close");
18  		}
19  
20  		this._chunksQueue.push(...chunks);
21  		if (this._notify) {
22  			this._notify();
23  		}
24  	}
25  
26  	public close(): void {
27  		if (this._closed) {
28  			throw new Error("close was already called");
29  		}
30  
31  		this._closed = true;
32  		this._chunksQueue.push(EOQ);
33  		if (this._notify) {
34  			this._notify();
35  		}
36  	}
37  
38  	public async *[Symbol.asyncIterator](): AsyncIterableIterator<Token> {
39  		while (true) {
40  			let chunk: Uint8Array | typeof EOQ | undefined;
41  			while (!(chunk = this._chunksQueue.shift())) {
42  				await new Promise<void>((resolve) => (this._notify = resolve));
43  				this._notify = undefined;
44  			}
45  
46  			if (chunk === EOQ) {
47  				break;
48  			}
49  
50  			yield* this._search.feed(chunk);
51  		}
52  
53  		const tail = this._search.end();
54  		if (tail.length) {
55  			yield tail;
56  		}
57  	}
58  }