/ bridge / flushGate.ts
flushGate.ts
 1  /**
 2   * State machine for gating message writes during an initial flush.
 3   *
 4   * When a bridge session starts, historical messages are flushed to the
 5   * server via a single HTTP POST. During that flush, new messages must
 6   * be queued to prevent them from arriving at the server interleaved
 7   * with the historical messages.
 8   *
 9   * Lifecycle:
10   *   start() → enqueue() returns true, items are queued
11   *   end()   → returns queued items for draining, enqueue() returns false
12   *   drop()  → discards queued items (permanent transport close)
13   *   deactivate() → clears active flag without dropping items
14   *                   (transport replacement — new transport will drain)
15   */
16  export class FlushGate<T> {
17    private _active = false
18    private _pending: T[] = []
19  
20    get active(): boolean {
21      return this._active
22    }
23  
24    get pendingCount(): number {
25      return this._pending.length
26    }
27  
28    /** Mark flush as in-progress. enqueue() will start queuing items. */
29    start(): void {
30      this._active = true
31    }
32  
33    /**
34     * End the flush and return any queued items for draining.
35     * Caller is responsible for sending the returned items.
36     */
37    end(): T[] {
38      this._active = false
39      return this._pending.splice(0)
40    }
41  
42    /**
43     * If flush is active, queue the items and return true.
44     * If flush is not active, return false (caller should send directly).
45     */
46    enqueue(...items: T[]): boolean {
47      if (!this._active) return false
48      this._pending.push(...items)
49      return true
50    }
51  
52    /**
53     * Discard all queued items (permanent transport close).
54     * Returns the number of items dropped.
55     */
56    drop(): number {
57      this._active = false
58      const count = this._pending.length
59      this._pending.length = 0
60      return count
61    }
62  
63    /**
64     * Clear the active flag without dropping queued items.
65     * Used when the transport is replaced (onWorkReceived) — the new
66     * transport's flush will drain the pending items.
67     */
68    deactivate(): void {
69      this._active = false
70    }
71  }