stream.ts
1 2 import { getWebSocketUrl, type NetworkType } from '../../lib/config' 3 import { getLatestBlockHeight } from '../api/discovery' 4 export interface StreamingBlock { 5 height: number; 6 transactions: number; 7 timestamp: number; 8 size: number; 9 hash?: string; 10 } 11 12 export interface StreamingStats { 13 latestBlock: number; 14 totalTransactions: number; 15 avgBlockTime: number; 16 isConnected: boolean; 17 recentBlocks: StreamingBlock[]; 18 } 19 20 export class EspressoBlockStream { 21 private ws: WebSocket | null = null; 22 private currentNetwork: NetworkType = 'mainnet'; 23 private reconnectAttempts = 0; 24 private maxReconnectAttempts = 5; 25 private reconnectDelay = 1000; 26 private onBlockCallback: ((block: StreamingBlock) => void) | null = null; 27 private onStatsCallback: ((stats: StreamingStats) => void) | null = null; 28 private onErrorCallback: ((error: Error) => void) | null = null; 29 private startHeight: number | null = null; 30 private recentBlocks: StreamingBlock[] = []; 31 private blockTimes: number[] = []; 32 33 constructor() { 34 35 } 36 37 async connect(network?: NetworkType) { 38 if (network) { 39 this.currentNetwork = network; 40 } 41 42 if (typeof window === 'undefined' || typeof WebSocket === 'undefined') { 43 if (this.onErrorCallback) { 44 this.onErrorCallback(new Error('WebSocket not supported')); 45 } 46 return; 47 } 48 49 const latestHeight = await getLatestBlockHeight(); 50 this.startHeight = latestHeight; 51 52 const wsUrl = getWebSocketUrl(this.currentNetwork, `/availability/stream/blocks/${latestHeight}`); 53 54 this.ws = new WebSocket(wsUrl); 55 56 this.ws.onopen = () => { 57 58 this.reconnectAttempts = 0; 59 }; 60 61 this.ws.onmessage = (event) => { 62 const blockData = JSON.parse(event.data); 63 64 65 const streamingBlock: StreamingBlock = { 66 height: blockData.header?.fields?.height || 0, 67 transactions: blockData.num_transactions || 0, 68 timestamp: blockData.header?.fields?.timestamp || 0, 69 size: blockData.size || 0, 70 hash: blockData.hash 71 }; 72 73 if (streamingBlock.height > 0) { 74 75 const latestHeight = this.recentBlocks.length > 0 ? this.recentBlocks[0].height : 0; 76 if (streamingBlock.height <= latestHeight) { 77 78 return; 79 } 80 81 82 this.recentBlocks = [streamingBlock, ...this.recentBlocks].slice(0, 1000); 83 84 85 if (this.recentBlocks.length >= 5) { 86 const currentBlock = this.recentBlocks[0]; 87 const previousBlock = this.recentBlocks[1]; 88 89 const currentTime = currentBlock.timestamp; 90 const previousTime = previousBlock.timestamp; 91 92 93 if (currentTime > previousTime && currentTime > 0 && previousTime > 0) { 94 const blockTime = currentTime - previousTime; 95 const heightDiff = currentBlock.height - previousBlock.height; 96 97 if (heightDiff === 1) { 98 99 if (blockTime >= 1 && blockTime <= 300) { 100 this.blockTimes = [blockTime, ...this.blockTimes].slice(0, 500); 101 } else { 102 103 } 104 } else if (heightDiff > 1) { 105 106 } else { 107 108 } 109 } else { 110 111 } 112 } 113 114 115 const avgTxnsPerBlock = this.recentBlocks.length > 0 116 ? Math.round(this.recentBlocks.reduce((sum, block) => sum + block.transactions, 0) / this.recentBlocks.length) 117 : 0; 118 119 const avgBlockTime = this.blockTimes.length > 0 120 ? Math.round(this.blockTimes.reduce((sum, time) => sum + time, 0) / this.blockTimes.length) 121 : 0; 122 123 124 const stats: StreamingStats = { 125 latestBlock: streamingBlock.height, 126 totalTransactions: avgTxnsPerBlock, 127 avgBlockTime: avgBlockTime, 128 isConnected: this.isConnected(), 129 recentBlocks: this.recentBlocks 130 }; 131 132 if (this.onBlockCallback) { 133 this.onBlockCallback(streamingBlock); 134 } 135 136 if (this.onStatsCallback) { 137 this.onStatsCallback(stats); 138 } 139 } else { 140 141 } 142 }; 143 144 this.ws.onerror = (error) => { 145 146 147 if (this.onErrorCallback) { 148 this.onErrorCallback(new Error(`WebSocket connection failed for ${this.currentNetwork}`)); 149 } 150 }; 151 152 this.ws.onclose = (event) => { 153 154 155 if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) { 156 this.attemptReconnect(); 157 } else if (event.code !== 1000) { 158 159 if (this.onErrorCallback) { 160 this.onErrorCallback(new Error(`WebSocket connection permanently failed for ${this.currentNetwork}`)); 161 } 162 } 163 }; 164 165 } 166 167 private attemptReconnect() { 168 if (this.reconnectAttempts >= this.maxReconnectAttempts) { 169 170 171 setTimeout(() => { 172 this.reconnectAttempts = 0; 173 this.attemptReconnect(); 174 }, 30000); 175 return; 176 } 177 178 this.reconnectAttempts++; 179 180 const delay = Math.min(this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1), 10000); 181 182 183 184 setTimeout(async () => { 185 await this.connect(); 186 }, delay); 187 } 188 189 onBlock(callback: (block: StreamingBlock) => void) { 190 this.onBlockCallback = callback; 191 } 192 193 onStats(callback: (stats: StreamingStats) => void) { 194 this.onStatsCallback = callback; 195 } 196 197 onError(callback: (error: Error) => void) { 198 this.onErrorCallback = callback; 199 } 200 201 getStats(): StreamingStats { 202 203 const avgTxnsPerBlock = this.recentBlocks.length > 0 204 ? Math.round(this.recentBlocks.reduce((sum, block) => sum + block.transactions, 0) / this.recentBlocks.length) 205 : 0; 206 207 const avgBlockTime = this.blockTimes.length > 0 208 ? Math.round(this.blockTimes.reduce((sum, time) => sum + time, 0) / this.blockTimes.length) 209 : 0; 210 211 return { 212 latestBlock: this.recentBlocks[0]?.height || 0, 213 totalTransactions: avgTxnsPerBlock, 214 avgBlockTime: avgBlockTime, 215 isConnected: this.isConnected(), 216 recentBlocks: this.recentBlocks 217 }; 218 } 219 220 disconnect() { 221 if (this.ws) { 222 this.ws.close(); 223 this.ws = null; 224 } 225 226 this.reconnectAttempts = this.maxReconnectAttempts; 227 } 228 229 isConnected(): boolean { 230 return this.ws !== null && this.ws.readyState === WebSocket.OPEN; 231 } 232 233 switchNetwork(network: NetworkType) { 234 if (this.currentNetwork === network) return; 235 236 console.log(`Switching WebSocket from ${this.currentNetwork} to ${network}`); 237 this.disconnect(); 238 239 // Clear old network's data 240 this.recentBlocks = []; 241 this.blockTimes = []; 242 this.startHeight = null; 243 244 this.currentNetwork = network; 245 this.connect(network); 246 } 247 } 248 249 250 let globalStream: EspressoBlockStream | null = null; 251 252 export function getBlockStream(): EspressoBlockStream { 253 if (!globalStream) { 254 globalStream = new EspressoBlockStream(); 255 } 256 return globalStream; 257 }