websocket-tracker.js
1 import Debug from 'debug' 2 import Peer from '@thaunknown/simple-peer/lite.js' 3 import Socket from '@thaunknown/simple-websocket' 4 import { arr2text, arr2hex, hex2bin, bin2hex, randomBytes } from 'uint8-util' 5 6 import common from '../common.js' 7 import Tracker from './tracker.js' 8 9 const debug = Debug('bittorrent-tracker:websocket-tracker') 10 11 // Use a socket pool, so tracker clients share WebSocket objects for the same server. 12 // In practice, WebSockets are pretty slow to establish, so this gives a nice performance 13 // boost, and saves browser resources. 14 const socketPool = {} 15 16 const RECONNECT_MINIMUM = 10 * 1000 17 const RECONNECT_MAXIMUM = 60 * 60 * 1000 18 const RECONNECT_VARIANCE = 5 * 60 * 1000 19 const OFFER_TIMEOUT = 50 * 1000 20 21 class WebSocketTracker extends Tracker { 22 constructor (client, announceUrl) { 23 super(client, announceUrl) 24 debug('new websocket tracker %s', announceUrl) 25 26 this.peers = {} // peers (offer id -> peer) 27 this.socket = null 28 29 this.reconnecting = false 30 this.retries = 0 31 this.reconnectTimer = null 32 33 // Simple boolean flag to track whether the socket has received data from 34 // the websocket server since the last time socket.send() was called. 35 this.expectingResponse = false 36 37 this._openSocket() 38 } 39 40 announce (opts) { 41 if (this.destroyed || this.reconnecting) return 42 if (!this.socket.connected) { 43 this.socket.once('connect', () => { 44 this.announce(opts) 45 }) 46 return 47 } 48 49 const params = Object.assign({}, opts, { 50 action: 'announce', 51 info_hash: this.client._infoHashBinary, 52 peer_id: this.client._peerIdBinary 53 }) 54 if (this._trackerId) params.trackerid = this._trackerId 55 56 if (opts.event === 'stopped' || opts.event === 'completed') { 57 // Don't include offers with 'stopped' or 'completed' event 58 this._send(params) 59 } else { 60 // Limit the number of offers that are generated, since it can be slow 61 const numwant = Math.min(opts.numwant, 5) 62 63 this._generateOffers(numwant, offers => { 64 params.numwant = numwant 65 params.offers = offers 66 this._send(params) 67 }) 68 } 69 } 70 71 scrape (opts) { 72 if (this.destroyed || this.reconnecting) return 73 if (!this.socket.connected) { 74 this.socket.once('connect', () => { 75 this.scrape(opts) 76 }) 77 return 78 } 79 80 const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) 81 ? opts.infoHash.map(infoHash => hex2bin(infoHash)) 82 : (opts.infoHash && hex2bin(opts.infoHash)) || this.client._infoHashBinary 83 const params = { 84 action: 'scrape', 85 info_hash: infoHashes 86 } 87 88 this._send(params) 89 } 90 91 destroy (cb = noop) { 92 if (this.destroyed) return cb(null) 93 94 this.destroyed = true 95 96 clearInterval(this.interval) 97 clearTimeout(this.reconnectTimer) 98 99 // Destroy peers 100 for (const peerId in this.peers) { 101 const peer = this.peers[peerId] 102 clearTimeout(peer.trackerTimeout) 103 peer.destroy() 104 } 105 this.peers = null 106 107 if (this.socket) { 108 this.socket.removeListener('connect', this._onSocketConnectBound) 109 this.socket.removeListener('data', this._onSocketDataBound) 110 this.socket.removeListener('close', this._onSocketCloseBound) 111 this.socket.removeListener('error', this._onSocketErrorBound) 112 this.socket = null 113 } 114 115 this._onSocketConnectBound = null 116 this._onSocketErrorBound = null 117 this._onSocketDataBound = null 118 this._onSocketCloseBound = null 119 120 if (socketPool[this.announceUrl]) { 121 socketPool[this.announceUrl].consumers -= 1 122 } 123 124 // Other instances are using the socket, so there's nothing left to do here 125 if (socketPool[this.announceUrl].consumers > 0) return cb() 126 127 let socket = socketPool[this.announceUrl] 128 delete socketPool[this.announceUrl] 129 socket.on('error', noop) // ignore all future errors 130 socket.once('close', cb) 131 132 let timeout 133 134 // If there is no data response expected, destroy immediately. 135 if (!this.expectingResponse) return destroyCleanup() 136 137 // Otherwise, wait a short time for potential responses to come in from the 138 // server, then force close the socket. 139 timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) 140 141 // But, if a response comes from the server before the timeout fires, do cleanup 142 // right away. 143 socket.once('data', destroyCleanup) 144 145 function destroyCleanup () { 146 if (timeout) { 147 clearTimeout(timeout) 148 timeout = null 149 } 150 socket.removeListener('data', destroyCleanup) 151 socket.destroy() 152 socket = null 153 } 154 } 155 156 _openSocket () { 157 this.destroyed = false 158 159 if (!this.peers) this.peers = {} 160 161 this._onSocketConnectBound = () => { 162 this._onSocketConnect() 163 } 164 this._onSocketErrorBound = err => { 165 this._onSocketError(err) 166 } 167 this._onSocketDataBound = data => { 168 this._onSocketData(data) 169 } 170 this._onSocketCloseBound = () => { 171 this._onSocketClose() 172 } 173 174 this.socket = socketPool[this.announceUrl] 175 if (this.socket) { 176 socketPool[this.announceUrl].consumers += 1 177 if (this.socket.connected) { 178 this._onSocketConnectBound() 179 } 180 } else { 181 const parsedUrl = new URL(this.announceUrl) 182 let agent 183 if (this.client._proxyOpts) { 184 agent = parsedUrl.protocol === 'wss:' ? this.client._proxyOpts.httpsAgent : this.client._proxyOpts.httpAgent 185 if (!agent && this.client._proxyOpts.socksProxy) { 186 agent = this.client._proxyOpts.socksProxy 187 } 188 } 189 this.socket = socketPool[this.announceUrl] = new Socket({ url: this.announceUrl, agent }) 190 this.socket.consumers = 1 191 this.socket.once('connect', this._onSocketConnectBound) 192 } 193 194 this.socket.on('data', this._onSocketDataBound) 195 this.socket.once('close', this._onSocketCloseBound) 196 this.socket.once('error', this._onSocketErrorBound) 197 } 198 199 _onSocketConnect () { 200 if (this.destroyed) return 201 202 if (this.reconnecting) { 203 this.reconnecting = false 204 this.retries = 0 205 this.announce(this.client._defaultAnnounceOpts()) 206 } 207 } 208 209 _onSocketData (data) { 210 if (this.destroyed) return 211 212 this.expectingResponse = false 213 214 try { 215 data = JSON.parse(arr2text(data)) 216 } catch (err) { 217 this.client.emit('warning', new Error('Invalid tracker response')) 218 return 219 } 220 221 if (data.action === 'announce') { 222 this._onAnnounceResponse(data) 223 } else if (data.action === 'scrape') { 224 this._onScrapeResponse(data) 225 } else { 226 this._onSocketError(new Error(`invalid action in WS response: ${data.action}`)) 227 } 228 } 229 230 _onAnnounceResponse (data) { 231 if (data.info_hash !== this.client._infoHashBinary) { 232 debug( 233 'ignoring websocket data from %s for %s (looking for %s: reused socket)', 234 this.announceUrl, bin2hex(data.info_hash), this.client.infoHash 235 ) 236 return 237 } 238 239 if (data.peer_id && data.peer_id === this.client._peerIdBinary) { 240 // ignore offers/answers from this client 241 return 242 } 243 244 debug( 245 'received %s from %s for %s', 246 JSON.stringify(data), this.announceUrl, this.client.infoHash 247 ) 248 249 const failure = data['failure reason'] 250 if (failure) return this.client.emit('warning', new Error(failure)) 251 252 const warning = data['warning message'] 253 if (warning) this.client.emit('warning', new Error(warning)) 254 255 const interval = data.interval || data['min interval'] 256 if (interval) this.setInterval(interval * 1000) 257 258 const trackerId = data['tracker id'] 259 if (trackerId) { 260 // If absent, do not discard previous trackerId value 261 this._trackerId = trackerId 262 } 263 264 if (data.complete != null) { 265 const response = Object.assign({}, data, { 266 announce: this.announceUrl, 267 infoHash: bin2hex(data.info_hash) 268 }) 269 this.client.emit('update', response) 270 } 271 272 let peer 273 if (data.offer && data.peer_id) { 274 debug('creating peer (from remote offer)') 275 peer = this._createPeer() 276 peer.id = bin2hex(data.peer_id) 277 peer.once('signal', answer => { 278 const params = { 279 action: 'announce', 280 info_hash: this.client._infoHashBinary, 281 peer_id: this.client._peerIdBinary, 282 to_peer_id: data.peer_id, 283 answer, 284 offer_id: data.offer_id 285 } 286 if (this._trackerId) params.trackerid = this._trackerId 287 this._send(params) 288 }) 289 this.client.emit('peer', peer) 290 peer.signal(data.offer) 291 } 292 293 if (data.answer && data.peer_id) { 294 const offerId = bin2hex(data.offer_id) 295 peer = this.peers[offerId] 296 if (peer) { 297 peer.id = bin2hex(data.peer_id) 298 this.client.emit('peer', peer) 299 peer.signal(data.answer) 300 301 clearTimeout(peer.trackerTimeout) 302 peer.trackerTimeout = null 303 delete this.peers[offerId] 304 } else { 305 debug(`got unexpected answer: ${JSON.stringify(data.answer)}`) 306 } 307 } 308 } 309 310 _onScrapeResponse (data) { 311 data = data.files || {} 312 313 const keys = Object.keys(data) 314 if (keys.length === 0) { 315 this.client.emit('warning', new Error('invalid scrape response')) 316 return 317 } 318 319 keys.forEach(infoHash => { 320 // TODO: optionally handle data.flags.min_request_interval 321 // (separate from announce interval) 322 const response = Object.assign(data[infoHash], { 323 announce: this.announceUrl, 324 infoHash: bin2hex(infoHash) 325 }) 326 this.client.emit('scrape', response) 327 }) 328 } 329 330 _onSocketClose () { 331 if (this.destroyed) return 332 this.destroy() 333 this._startReconnectTimer() 334 } 335 336 _onSocketError (err) { 337 if (this.destroyed) return 338 this.destroy() 339 // errors will often happen if a tracker is offline, so don't treat it as fatal 340 this.client.emit('warning', err) 341 this._startReconnectTimer() 342 } 343 344 _startReconnectTimer () { 345 const ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, this.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM) 346 347 this.reconnecting = true 348 clearTimeout(this.reconnectTimer) 349 this.reconnectTimer = setTimeout(() => { 350 this.retries++ 351 this._openSocket() 352 }, ms) 353 if (this.reconnectTimer.unref) this.reconnectTimer.unref() 354 355 debug('reconnecting socket in %s ms', ms) 356 } 357 358 _send (params) { 359 if (this.destroyed) return 360 this.expectingResponse = true 361 const message = JSON.stringify(params) 362 debug('send %s', message) 363 this.socket.send(message) 364 } 365 366 _generateOffers (numwant, cb) { 367 const self = this 368 const offers = [] 369 debug('generating %s offers', numwant) 370 371 for (let i = 0; i < numwant; ++i) { 372 generateOffer() 373 } 374 checkDone() 375 376 function generateOffer () { 377 const offerId = arr2hex(randomBytes(20)) 378 debug('creating peer (from _generateOffers)') 379 const peer = self.peers[offerId] = self._createPeer({ initiator: true }) 380 peer.once('signal', offer => { 381 offers.push({ 382 offer, 383 offer_id: hex2bin(offerId) 384 }) 385 checkDone() 386 }) 387 peer.trackerTimeout = setTimeout(() => { 388 debug('tracker timeout: destroying peer') 389 peer.trackerTimeout = null 390 delete self.peers[offerId] 391 peer.destroy() 392 }, OFFER_TIMEOUT) 393 if (peer.trackerTimeout.unref) peer.trackerTimeout.unref() 394 } 395 396 function checkDone () { 397 if (offers.length === numwant) { 398 debug('generated %s offers', numwant) 399 cb(offers) 400 } 401 } 402 } 403 404 _createPeer (opts) { 405 const self = this 406 407 opts = Object.assign({ 408 trickle: false, 409 config: self.client._rtcConfig, 410 wrtc: self.client._wrtc 411 }, opts) 412 413 const peer = new Peer(opts) 414 415 peer.once('error', onError) 416 peer.once('connect', onConnect) 417 418 return peer 419 420 // Handle peer 'error' events that are fired *before* the peer is emitted in 421 // a 'peer' event. 422 function onError (err) { 423 self.client.emit('warning', new Error(`Connection error: ${err.message}`)) 424 peer.destroy() 425 } 426 427 // Once the peer is emitted in a 'peer' event, then it's the consumer's 428 // responsibility to listen for errors, so the listeners are removed here. 429 function onConnect () { 430 peer.removeListener('error', onError) 431 peer.removeListener('connect', onConnect) 432 } 433 } 434 } 435 436 WebSocketTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 1000 // 30 seconds 437 // Normally this shouldn't be accessed but is occasionally useful 438 WebSocketTracker._socketPool = socketPool 439 440 function noop () {} 441 442 export default WebSocketTracker