/ src / services / ws / stream.ts
stream.ts
  1  
  2  import { getWebSocketUrl, type NetworkType } from '../../lib/config'
  3  import { getLatestBlockHeight } from '../api/discovery'
  4  export interface StreamingBlock {
  5    height: number;
  6    transactions: number;
  7    timestamp: number;
  8    size: number;
  9    hash?: string;
 10  }
 11  
 12  export interface StreamingStats {
 13    latestBlock: number;
 14    totalTransactions: number;
 15    avgBlockTime: number;
 16    isConnected: boolean;
 17    recentBlocks: StreamingBlock[];
 18  }
 19  
 20  export class EspressoBlockStream {
 21    private ws: WebSocket | null = null;
 22    private currentNetwork: NetworkType = 'mainnet';
 23    private reconnectAttempts = 0;
 24    private maxReconnectAttempts = 5;
 25    private reconnectDelay = 1000;
 26    private onBlockCallback: ((block: StreamingBlock) => void) | null = null;
 27    private onStatsCallback: ((stats: StreamingStats) => void) | null = null;
 28    private onErrorCallback: ((error: Error) => void) | null = null;
 29    private startHeight: number | null = null;
 30    private recentBlocks: StreamingBlock[] = [];
 31    private blockTimes: number[] = [];
 32  
 33    constructor() {
 34  
 35    }
 36  
 37    async connect(network?: NetworkType) {
 38      if (network) {
 39        this.currentNetwork = network;
 40      }
 41      
 42      if (typeof window === 'undefined' || typeof WebSocket === 'undefined') {
 43        if (this.onErrorCallback) {
 44          this.onErrorCallback(new Error('WebSocket not supported'));
 45        }
 46        return;
 47      }
 48      
 49      const latestHeight = await getLatestBlockHeight();
 50      this.startHeight = latestHeight;
 51      
 52      const wsUrl = getWebSocketUrl(this.currentNetwork, `/availability/stream/blocks/${latestHeight}`);
 53  
 54      this.ws = new WebSocket(wsUrl);
 55  
 56      this.ws.onopen = () => {
 57  
 58        this.reconnectAttempts = 0;
 59      };
 60  
 61      this.ws.onmessage = (event) => {
 62        const blockData = JSON.parse(event.data);
 63            
 64  
 65            const streamingBlock: StreamingBlock = {
 66              height: blockData.header?.fields?.height || 0,
 67              transactions: blockData.num_transactions || 0,
 68              timestamp: blockData.header?.fields?.timestamp || 0,
 69              size: blockData.size || 0,
 70              hash: blockData.hash
 71            };
 72  
 73            if (streamingBlock.height > 0) {
 74  
 75              const latestHeight = this.recentBlocks.length > 0 ? this.recentBlocks[0].height : 0;
 76              if (streamingBlock.height <= latestHeight) {
 77  
 78                return;
 79              }
 80              
 81  
 82              this.recentBlocks = [streamingBlock, ...this.recentBlocks].slice(0, 1000);
 83              
 84  
 85              if (this.recentBlocks.length >= 5) {
 86                const currentBlock = this.recentBlocks[0];
 87                const previousBlock = this.recentBlocks[1];
 88                
 89                const currentTime = currentBlock.timestamp;
 90                const previousTime = previousBlock.timestamp;
 91                
 92  
 93                if (currentTime > previousTime && currentTime > 0 && previousTime > 0) {
 94                  const blockTime = currentTime - previousTime;
 95                  const heightDiff = currentBlock.height - previousBlock.height;
 96                  
 97                  if (heightDiff === 1) {
 98  
 99                    if (blockTime >= 1 && blockTime <= 300) {
100                      this.blockTimes = [blockTime, ...this.blockTimes].slice(0, 500);
101                    } else {
102  
103                    }
104                  } else if (heightDiff > 1) {
105  
106                  } else {
107  
108                  }
109                } else {
110  
111                }
112              }
113  
114  
115              const avgTxnsPerBlock = this.recentBlocks.length > 0 
116                ? Math.round(this.recentBlocks.reduce((sum, block) => sum + block.transactions, 0) / this.recentBlocks.length)
117                : 0;
118              
119              const avgBlockTime = this.blockTimes.length > 0 
120                ? Math.round(this.blockTimes.reduce((sum, time) => sum + time, 0) / this.blockTimes.length)
121                : 0;
122              
123              
124              const stats: StreamingStats = {
125                latestBlock: streamingBlock.height,
126                totalTransactions: avgTxnsPerBlock,
127                avgBlockTime: avgBlockTime,
128                isConnected: this.isConnected(),
129                recentBlocks: this.recentBlocks
130              };
131  
132              if (this.onBlockCallback) {
133                this.onBlockCallback(streamingBlock);
134              }
135  
136              if (this.onStatsCallback) {
137                this.onStatsCallback(stats);
138              }
139            } else {
140  
141            }
142        };
143  
144        this.ws.onerror = (error) => {
145  
146  
147          if (this.onErrorCallback) {
148            this.onErrorCallback(new Error(`WebSocket connection failed for ${this.currentNetwork}`));
149          }
150        };
151  
152        this.ws.onclose = (event) => {
153  
154  
155          if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) {
156            this.attemptReconnect();
157          } else if (event.code !== 1000) {
158  
159            if (this.onErrorCallback) {
160              this.onErrorCallback(new Error(`WebSocket connection permanently failed for ${this.currentNetwork}`));
161            }
162          }
163        };
164  
165    }
166  
167    private attemptReconnect() {
168      if (this.reconnectAttempts >= this.maxReconnectAttempts) {
169  
170  
171        setTimeout(() => {
172          this.reconnectAttempts = 0;
173          this.attemptReconnect();
174        }, 30000);
175        return;
176      }
177  
178      this.reconnectAttempts++;
179  
180      const delay = Math.min(this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1), 10000);
181      
182  
183      
184      setTimeout(async () => {
185        await this.connect();
186      }, delay);
187    }
188  
189    onBlock(callback: (block: StreamingBlock) => void) {
190      this.onBlockCallback = callback;
191    }
192  
193    onStats(callback: (stats: StreamingStats) => void) {
194      this.onStatsCallback = callback;
195    }
196  
197    onError(callback: (error: Error) => void) {
198      this.onErrorCallback = callback;
199    }
200  
201    getStats(): StreamingStats {
202  
203      const avgTxnsPerBlock = this.recentBlocks.length > 0 
204        ? Math.round(this.recentBlocks.reduce((sum, block) => sum + block.transactions, 0) / this.recentBlocks.length)
205        : 0;
206      
207      const avgBlockTime = this.blockTimes.length > 0 
208        ? Math.round(this.blockTimes.reduce((sum, time) => sum + time, 0) / this.blockTimes.length)
209        : 0;
210      
211      return {
212        latestBlock: this.recentBlocks[0]?.height || 0,
213        totalTransactions: avgTxnsPerBlock,
214        avgBlockTime: avgBlockTime,
215        isConnected: this.isConnected(),
216        recentBlocks: this.recentBlocks
217      };
218    }
219  
220    disconnect() {
221      if (this.ws) {
222        this.ws.close();
223        this.ws = null;
224      }
225  
226      this.reconnectAttempts = this.maxReconnectAttempts;
227    }
228  
229    isConnected(): boolean {
230      return this.ws !== null && this.ws.readyState === WebSocket.OPEN;
231    }
232    
233    switchNetwork(network: NetworkType) {
234      if (this.currentNetwork === network) return;
235      
236      console.log(`Switching WebSocket from ${this.currentNetwork} to ${network}`);
237      this.disconnect();
238      
239      // Clear old network's data
240      this.recentBlocks = [];
241      this.blockTimes = [];
242      this.startHeight = null;
243      
244      this.currentNetwork = network;
245      this.connect(network);
246    }
247  }
248  
249  
250  let globalStream: EspressoBlockStream | null = null;
251  
252  export function getBlockStream(): EspressoBlockStream {
253    if (!globalStream) {
254      globalStream = new EspressoBlockStream();
255    }
256    return globalStream;
257  }