/ lib / client / websocket-tracker.js
websocket-tracker.js
  1  import Debug from 'debug'
  2  import Peer from '@thaunknown/simple-peer/lite.js'
  3  import Socket from '@thaunknown/simple-websocket'
  4  import { arr2text, arr2hex, hex2bin, bin2hex, randomBytes } from 'uint8-util'
  5  
  6  import common from '../common.js'
  7  import Tracker from './tracker.js'
  8  
  9  const debug = Debug('bittorrent-tracker:websocket-tracker')
 10  
 11  // Use a socket pool, so tracker clients share WebSocket objects for the same server.
 12  // In practice, WebSockets are pretty slow to establish, so this gives a nice performance
 13  // boost, and saves browser resources.
 14  const socketPool = {}
 15  
 16  const RECONNECT_MINIMUM = 10 * 1000
 17  const RECONNECT_MAXIMUM = 60 * 60 * 1000
 18  const RECONNECT_VARIANCE = 5 * 60 * 1000
 19  const OFFER_TIMEOUT = 50 * 1000
 20  
 21  class WebSocketTracker extends Tracker {
 22    constructor (client, announceUrl) {
 23      super(client, announceUrl)
 24      debug('new websocket tracker %s', announceUrl)
 25  
 26      this.peers = {} // peers (offer id -> peer)
 27      this.socket = null
 28  
 29      this.reconnecting = false
 30      this.retries = 0
 31      this.reconnectTimer = null
 32  
 33      // Simple boolean flag to track whether the socket has received data from
 34      // the websocket server since the last time socket.send() was called.
 35      this.expectingResponse = false
 36  
 37      this._openSocket()
 38    }
 39  
 40    announce (opts) {
 41      if (this.destroyed || this.reconnecting) return
 42      if (!this.socket.connected) {
 43        this.socket.once('connect', () => {
 44          this.announce(opts)
 45        })
 46        return
 47      }
 48  
 49      const params = Object.assign({}, opts, {
 50        action: 'announce',
 51        info_hash: this.client._infoHashBinary,
 52        peer_id: this.client._peerIdBinary
 53      })
 54      if (this._trackerId) params.trackerid = this._trackerId
 55  
 56      if (opts.event === 'stopped' || opts.event === 'completed') {
 57        // Don't include offers with 'stopped' or 'completed' event
 58        this._send(params)
 59      } else {
 60        // Limit the number of offers that are generated, since it can be slow
 61        const numwant = Math.min(opts.numwant, 5)
 62  
 63        this._generateOffers(numwant, offers => {
 64          params.numwant = numwant
 65          params.offers = offers
 66          this._send(params)
 67        })
 68      }
 69    }
 70  
 71    scrape (opts) {
 72      if (this.destroyed || this.reconnecting) return
 73      if (!this.socket.connected) {
 74        this.socket.once('connect', () => {
 75          this.scrape(opts)
 76        })
 77        return
 78      }
 79  
 80      const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
 81        ? opts.infoHash.map(infoHash => hex2bin(infoHash))
 82        : (opts.infoHash && hex2bin(opts.infoHash)) || this.client._infoHashBinary
 83      const params = {
 84        action: 'scrape',
 85        info_hash: infoHashes
 86      }
 87  
 88      this._send(params)
 89    }
 90  
 91    destroy (cb = noop) {
 92      if (this.destroyed) return cb(null)
 93  
 94      this.destroyed = true
 95  
 96      clearInterval(this.interval)
 97      clearTimeout(this.reconnectTimer)
 98  
 99      // Destroy peers
100      for (const peerId in this.peers) {
101        const peer = this.peers[peerId]
102        clearTimeout(peer.trackerTimeout)
103        peer.destroy()
104      }
105      this.peers = null
106  
107      if (this.socket) {
108        this.socket.removeListener('connect', this._onSocketConnectBound)
109        this.socket.removeListener('data', this._onSocketDataBound)
110        this.socket.removeListener('close', this._onSocketCloseBound)
111        this.socket.removeListener('error', this._onSocketErrorBound)
112        this.socket = null
113      }
114  
115      this._onSocketConnectBound = null
116      this._onSocketErrorBound = null
117      this._onSocketDataBound = null
118      this._onSocketCloseBound = null
119  
120      if (socketPool[this.announceUrl]) {
121        socketPool[this.announceUrl].consumers -= 1
122      }
123  
124      // Other instances are using the socket, so there's nothing left to do here
125      if (socketPool[this.announceUrl].consumers > 0) return cb()
126  
127      let socket = socketPool[this.announceUrl]
128      delete socketPool[this.announceUrl]
129      socket.on('error', noop) // ignore all future errors
130      socket.once('close', cb)
131  
132      let timeout
133  
134      // If there is no data response expected, destroy immediately.
135      if (!this.expectingResponse) return destroyCleanup()
136  
137      // Otherwise, wait a short time for potential responses to come in from the
138      // server, then force close the socket.
139      timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
140  
141      // But, if a response comes from the server before the timeout fires, do cleanup
142      // right away.
143      socket.once('data', destroyCleanup)
144  
145      function destroyCleanup () {
146        if (timeout) {
147          clearTimeout(timeout)
148          timeout = null
149        }
150        socket.removeListener('data', destroyCleanup)
151        socket.destroy()
152        socket = null
153      }
154    }
155  
156    _openSocket () {
157      this.destroyed = false
158  
159      if (!this.peers) this.peers = {}
160  
161      this._onSocketConnectBound = () => {
162        this._onSocketConnect()
163      }
164      this._onSocketErrorBound = err => {
165        this._onSocketError(err)
166      }
167      this._onSocketDataBound = data => {
168        this._onSocketData(data)
169      }
170      this._onSocketCloseBound = () => {
171        this._onSocketClose()
172      }
173  
174      this.socket = socketPool[this.announceUrl]
175      if (this.socket) {
176        socketPool[this.announceUrl].consumers += 1
177        if (this.socket.connected) {
178          this._onSocketConnectBound()
179        }
180      } else {
181        const parsedUrl = new URL(this.announceUrl)
182        let agent
183        if (this.client._proxyOpts) {
184          agent = parsedUrl.protocol === 'wss:' ? this.client._proxyOpts.httpsAgent : this.client._proxyOpts.httpAgent
185          if (!agent && this.client._proxyOpts.socksProxy) {
186            agent = this.client._proxyOpts.socksProxy
187          }
188        }
189        this.socket = socketPool[this.announceUrl] = new Socket({ url: this.announceUrl, agent })
190        this.socket.consumers = 1
191        this.socket.once('connect', this._onSocketConnectBound)
192      }
193  
194      this.socket.on('data', this._onSocketDataBound)
195      this.socket.once('close', this._onSocketCloseBound)
196      this.socket.once('error', this._onSocketErrorBound)
197    }
198  
199    _onSocketConnect () {
200      if (this.destroyed) return
201  
202      if (this.reconnecting) {
203        this.reconnecting = false
204        this.retries = 0
205        this.announce(this.client._defaultAnnounceOpts())
206      }
207    }
208  
209    _onSocketData (data) {
210      if (this.destroyed) return
211  
212      this.expectingResponse = false
213  
214      try {
215        data = JSON.parse(arr2text(data))
216      } catch (err) {
217        this.client.emit('warning', new Error('Invalid tracker response'))
218        return
219      }
220  
221      if (data.action === 'announce') {
222        this._onAnnounceResponse(data)
223      } else if (data.action === 'scrape') {
224        this._onScrapeResponse(data)
225      } else {
226        this._onSocketError(new Error(`invalid action in WS response: ${data.action}`))
227      }
228    }
229  
230    _onAnnounceResponse (data) {
231      if (data.info_hash !== this.client._infoHashBinary) {
232        debug(
233          'ignoring websocket data from %s for %s (looking for %s: reused socket)',
234          this.announceUrl, bin2hex(data.info_hash), this.client.infoHash
235        )
236        return
237      }
238  
239      if (data.peer_id && data.peer_id === this.client._peerIdBinary) {
240        // ignore offers/answers from this client
241        return
242      }
243  
244      debug(
245        'received %s from %s for %s',
246        JSON.stringify(data), this.announceUrl, this.client.infoHash
247      )
248  
249      const failure = data['failure reason']
250      if (failure) return this.client.emit('warning', new Error(failure))
251  
252      const warning = data['warning message']
253      if (warning) this.client.emit('warning', new Error(warning))
254  
255      const interval = data.interval || data['min interval']
256      if (interval) this.setInterval(interval * 1000)
257  
258      const trackerId = data['tracker id']
259      if (trackerId) {
260        // If absent, do not discard previous trackerId value
261        this._trackerId = trackerId
262      }
263  
264      if (data.complete != null) {
265        const response = Object.assign({}, data, {
266          announce: this.announceUrl,
267          infoHash: bin2hex(data.info_hash)
268        })
269        this.client.emit('update', response)
270      }
271  
272      let peer
273      if (data.offer && data.peer_id) {
274        debug('creating peer (from remote offer)')
275        peer = this._createPeer()
276        peer.id = bin2hex(data.peer_id)
277        peer.once('signal', answer => {
278          const params = {
279            action: 'announce',
280            info_hash: this.client._infoHashBinary,
281            peer_id: this.client._peerIdBinary,
282            to_peer_id: data.peer_id,
283            answer,
284            offer_id: data.offer_id
285          }
286          if (this._trackerId) params.trackerid = this._trackerId
287          this._send(params)
288        })
289        this.client.emit('peer', peer)
290        peer.signal(data.offer)
291      }
292  
293      if (data.answer && data.peer_id) {
294        const offerId = bin2hex(data.offer_id)
295        peer = this.peers[offerId]
296        if (peer) {
297          peer.id = bin2hex(data.peer_id)
298          this.client.emit('peer', peer)
299          peer.signal(data.answer)
300  
301          clearTimeout(peer.trackerTimeout)
302          peer.trackerTimeout = null
303          delete this.peers[offerId]
304        } else {
305          debug(`got unexpected answer: ${JSON.stringify(data.answer)}`)
306        }
307      }
308    }
309  
310    _onScrapeResponse (data) {
311      data = data.files || {}
312  
313      const keys = Object.keys(data)
314      if (keys.length === 0) {
315        this.client.emit('warning', new Error('invalid scrape response'))
316        return
317      }
318  
319      keys.forEach(infoHash => {
320        // TODO: optionally handle data.flags.min_request_interval
321        // (separate from announce interval)
322        const response = Object.assign(data[infoHash], {
323          announce: this.announceUrl,
324          infoHash: bin2hex(infoHash)
325        })
326        this.client.emit('scrape', response)
327      })
328    }
329  
330    _onSocketClose () {
331      if (this.destroyed) return
332      this.destroy()
333      this._startReconnectTimer()
334    }
335  
336    _onSocketError (err) {
337      if (this.destroyed) return
338      this.destroy()
339      // errors will often happen if a tracker is offline, so don't treat it as fatal
340      this.client.emit('warning', err)
341      this._startReconnectTimer()
342    }
343  
344    _startReconnectTimer () {
345      const ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, this.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM)
346  
347      this.reconnecting = true
348      clearTimeout(this.reconnectTimer)
349      this.reconnectTimer = setTimeout(() => {
350        this.retries++
351        this._openSocket()
352      }, ms)
353      if (this.reconnectTimer.unref) this.reconnectTimer.unref()
354  
355      debug('reconnecting socket in %s ms', ms)
356    }
357  
358    _send (params) {
359      if (this.destroyed) return
360      this.expectingResponse = true
361      const message = JSON.stringify(params)
362      debug('send %s', message)
363      this.socket.send(message)
364    }
365  
366    _generateOffers (numwant, cb) {
367      const self = this
368      const offers = []
369      debug('generating %s offers', numwant)
370  
371      for (let i = 0; i < numwant; ++i) {
372        generateOffer()
373      }
374      checkDone()
375  
376      function generateOffer () {
377        const offerId = arr2hex(randomBytes(20))
378        debug('creating peer (from _generateOffers)')
379        const peer = self.peers[offerId] = self._createPeer({ initiator: true })
380        peer.once('signal', offer => {
381          offers.push({
382            offer,
383            offer_id: hex2bin(offerId)
384          })
385          checkDone()
386        })
387        peer.trackerTimeout = setTimeout(() => {
388          debug('tracker timeout: destroying peer')
389          peer.trackerTimeout = null
390          delete self.peers[offerId]
391          peer.destroy()
392        }, OFFER_TIMEOUT)
393        if (peer.trackerTimeout.unref) peer.trackerTimeout.unref()
394      }
395  
396      function checkDone () {
397        if (offers.length === numwant) {
398          debug('generated %s offers', numwant)
399          cb(offers)
400        }
401      }
402    }
403  
404    _createPeer (opts) {
405      const self = this
406  
407      opts = Object.assign({
408        trickle: false,
409        config: self.client._rtcConfig,
410        wrtc: self.client._wrtc
411      }, opts)
412  
413      const peer = new Peer(opts)
414  
415      peer.once('error', onError)
416      peer.once('connect', onConnect)
417  
418      return peer
419  
420      // Handle peer 'error' events that are fired *before* the peer is emitted in
421      // a 'peer' event.
422      function onError (err) {
423        self.client.emit('warning', new Error(`Connection error: ${err.message}`))
424        peer.destroy()
425      }
426  
427      // Once the peer is emitted in a 'peer' event, then it's the consumer's
428      // responsibility to listen for errors, so the listeners are removed here.
429      function onConnect () {
430        peer.removeListener('error', onError)
431        peer.removeListener('connect', onConnect)
432      }
433    }
434  }
435  
436  WebSocketTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 1000 // 30 seconds
437  // Normally this shouldn't be accessed but is occasionally useful
438  WebSocketTracker._socketPool = socketPool
439  
440  function noop () {}
441  
442  export default WebSocketTracker