/ utils / stream.ts
stream.ts
 1  export class Stream<T> implements AsyncIterator<T> {
 2    private readonly queue: T[] = []
 3    private readResolve?: (value: IteratorResult<T>) => void
 4    private readReject?: (error: unknown) => void
 5    private isDone: boolean = false
 6    private hasError: unknown | undefined
 7    private started = false
 8  
 9    constructor(private readonly returned?: () => void) {}
10  
11    [Symbol.asyncIterator](): AsyncIterableIterator<T> {
12      if (this.started) {
13        throw new Error('Stream can only be iterated once')
14      }
15      this.started = true
16      return this
17    }
18  
19    next(): Promise<IteratorResult<T, unknown>> {
20      if (this.queue.length > 0) {
21        return Promise.resolve({
22          done: false,
23          value: this.queue.shift()!,
24        })
25      }
26      if (this.isDone) {
27        return Promise.resolve({ done: true, value: undefined })
28      }
29      if (this.hasError) {
30        return Promise.reject(this.hasError)
31      }
32      return new Promise<IteratorResult<T>>((resolve, reject) => {
33        this.readResolve = resolve
34        this.readReject = reject
35      })
36    }
37  
38    enqueue(value: T): void {
39      if (this.readResolve) {
40        const resolve = this.readResolve
41        this.readResolve = undefined
42        this.readReject = undefined
43        resolve({ done: false, value })
44      } else {
45        this.queue.push(value)
46      }
47    }
48  
49    done() {
50      this.isDone = true
51      if (this.readResolve) {
52        const resolve = this.readResolve
53        this.readResolve = undefined
54        this.readReject = undefined
55        resolve({ done: true, value: undefined })
56      }
57    }
58  
59    error(error: unknown) {
60      this.hasError = error
61      if (this.readReject) {
62        const reject = this.readReject
63        this.readResolve = undefined
64        this.readReject = undefined
65        reject(error)
66      }
67    }
68  
69    return(): Promise<IteratorResult<T, unknown>> {
70      this.isDone = true
71      if (this.returned) {
72        this.returned()
73      }
74      return Promise.resolve({ done: true, value: undefined })
75    }
76  }