/ utils / generators.ts
generators.ts
 1  const NO_VALUE = Symbol('NO_VALUE')
 2  
 3  export async function lastX<A>(as: AsyncGenerator<A>): Promise<A> {
 4    let lastValue: A | typeof NO_VALUE = NO_VALUE
 5    for await (const a of as) {
 6      lastValue = a
 7    }
 8    if (lastValue === NO_VALUE) {
 9      throw new Error('No items in generator')
10    }
11    return lastValue
12  }
13  
14  export async function returnValue<A>(
15    as: AsyncGenerator<unknown, A>,
16  ): Promise<A> {
17    let e
18    do {
19      e = await as.next()
20    } while (!e.done)
21    return e.value
22  }
23  
24  type QueuedGenerator<A> = {
25    done: boolean | void
26    value: A | void
27    generator: AsyncGenerator<A, void>
28    promise: Promise<QueuedGenerator<A>>
29  }
30  
31  // Run all generators concurrently up to a concurrency cap, yielding values as they come in
32  export async function* all<A>(
33    generators: AsyncGenerator<A, void>[],
34    concurrencyCap = Infinity,
35  ): AsyncGenerator<A, void> {
36    const next = (generator: AsyncGenerator<A, void>) => {
37      const promise: Promise<QueuedGenerator<A>> = generator
38        .next()
39        .then(({ done, value }) => ({
40          done,
41          value,
42          generator,
43          promise,
44        }))
45      return promise
46    }
47    const waiting = [...generators]
48    const promises = new Set<Promise<QueuedGenerator<A>>>()
49  
50    // Start initial batch up to concurrency cap
51    while (promises.size < concurrencyCap && waiting.length > 0) {
52      const gen = waiting.shift()!
53      promises.add(next(gen))
54    }
55  
56    while (promises.size > 0) {
57      const { done, value, generator, promise } = await Promise.race(promises)
58      promises.delete(promise)
59  
60      if (!done) {
61        promises.add(next(generator))
62        // TODO: Clean this up
63        if (value !== undefined) {
64          yield value
65        }
66      } else if (waiting.length > 0) {
67        // Start a new generator when one finishes
68        const nextGen = waiting.shift()!
69        promises.add(next(nextGen))
70      }
71    }
72  }
73  
74  export async function toArray<A>(
75    generator: AsyncGenerator<A, void>,
76  ): Promise<A[]> {
77    const result: A[] = []
78    for await (const a of generator) {
79      result.push(a)
80    }
81    return result
82  }
83  
84  export async function* fromArray<T>(values: T[]): AsyncGenerator<T, void> {
85    for (const value of values) {
86      yield value
87    }
88  }