/ lib / client / udp-tracker.js
udp-tracker.js
  1  import arrayRemove from 'unordered-array-remove'
  2  import Debug from 'debug'
  3  import dgram from 'dgram'
  4  import Socks from 'socks'
  5  import { concat, hex2arr, randomBytes } from 'uint8-util'
  6  
  7  import common from '../common.js'
  8  import Tracker from './tracker.js'
  9  import compact2string from 'compact2string'
 10  
 11  const debug = Debug('bittorrent-tracker:udp-tracker')
 12  
 13  // this was done some many years ago to fix "prevent Socks instances concurrency", and used some bloated package, no clue if it's needed, but this is simpler, #356
 14  const clone = obj => JSON.parse(JSON.stringify(obj))
 15  
 16  /**
 17   * UDP torrent tracker client (for an individual tracker)
 18   *
 19   * @param {Client} client       parent bittorrent tracker client
 20   * @param {string} announceUrl  announce url of tracker
 21   * @param {Object} opts         options object
 22   */
 23  class UDPTracker extends Tracker {
 24    constructor (client, announceUrl) {
 25      super(client, announceUrl)
 26      debug('new udp tracker %s', announceUrl)
 27  
 28      this.cleanupFns = []
 29      this.maybeDestroyCleanup = null
 30    }
 31  
 32    announce (opts) {
 33      if (this.destroyed) return
 34      this._request(opts)
 35    }
 36  
 37    scrape (opts) {
 38      if (this.destroyed) return
 39      opts._scrape = true
 40      this._request(opts) // udp scrape uses same announce url
 41    }
 42  
 43    destroy (cb) {
 44      const self = this
 45      if (this.destroyed) return cb(null)
 46      this.destroyed = true
 47      clearInterval(this.interval)
 48  
 49      let timeout
 50  
 51      // If there are no pending requests, destroy immediately.
 52      if (this.cleanupFns.length === 0) return destroyCleanup()
 53  
 54      // Otherwise, wait a short time for pending requests to complete, then force
 55      // destroy them.
 56      timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
 57  
 58      // But, if all pending requests complete before the timeout fires, do cleanup
 59      // right away.
 60      this.maybeDestroyCleanup = () => {
 61        if (this.cleanupFns.length === 0) destroyCleanup()
 62      }
 63  
 64      function destroyCleanup () {
 65        if (timeout) {
 66          clearTimeout(timeout)
 67          timeout = null
 68        }
 69        self.maybeDestroyCleanup = null
 70        self.cleanupFns.slice(0).forEach(cleanup => {
 71          cleanup()
 72        })
 73        self.cleanupFns = []
 74        cb(null)
 75      }
 76    }
 77  
 78    _request (opts) {
 79      const self = this
 80      if (!opts) opts = {}
 81  
 82      let { hostname, port } = common.parseUrl(this.announceUrl)
 83      if (port === '') port = 80
 84  
 85      let timeout
 86      // Socket used to connect to the socks server to create a relay, null if socks is disabled
 87      let proxySocket
 88      // Socket used to connect to the tracker or to the socks relay if socks is enabled
 89      let socket
 90      // Contains the host/port of the socks relay
 91      let relay
 92  
 93      let transactionId = genTransactionId()
 94  
 95      const proxyOpts = this.client._proxyOpts && clone(this.client._proxyOpts.socksProxy)
 96      if (proxyOpts) {
 97        if (!proxyOpts.proxy) proxyOpts.proxy = {}
 98        // UDP requests uses the associate command
 99        proxyOpts.proxy.command = 'associate'
100        if (!proxyOpts.target) {
101          // This should contain client IP and port but can be set to 0 if we don't have this information
102          proxyOpts.target = {
103            host: '0.0.0.0',
104            port: 0
105          }
106        }
107  
108        if (proxyOpts.proxy.type === 5) {
109          Socks.createConnection(proxyOpts, onGotConnection)
110        } else {
111          debug('Ignoring Socks proxy for UDP request because type 5 is required')
112          onGotConnection(null)
113        }
114      } else {
115        onGotConnection(null)
116      }
117  
118      this.cleanupFns.push(cleanup)
119  
120      function onGotConnection (err, s, info) {
121        if (err) return onError(err)
122  
123        proxySocket = s
124        socket = dgram.createSocket('udp4')
125        relay = info
126  
127        timeout = setTimeout(() => {
128          // does not matter if `stopped` event arrives, so supress errors
129          if (opts.event === 'stopped') cleanup()
130          else onError(new Error(`tracker request timed out (${opts.event})`))
131          timeout = null
132        }, common.REQUEST_TIMEOUT)
133        if (timeout.unref) timeout.unref()
134  
135        send(concat([
136          common.CONNECTION_ID,
137          common.toUInt32(common.ACTIONS.CONNECT),
138          transactionId
139        ]), relay)
140  
141        socket.once('error', onError)
142        socket.on('message', onSocketMessage)
143      }
144  
145      function cleanup () {
146        if (timeout) {
147          clearTimeout(timeout)
148          timeout = null
149        }
150        if (socket) {
151          arrayRemove(self.cleanupFns, self.cleanupFns.indexOf(cleanup))
152          socket.removeListener('error', onError)
153          socket.removeListener('message', onSocketMessage)
154          socket.on('error', noop) // ignore all future errors
155          try { socket.close() } catch (err) {}
156          socket = null
157          if (proxySocket) {
158            try { proxySocket.close() } catch (err) {}
159            proxySocket = null
160          }
161        }
162        if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
163      }
164  
165      function onError (err) {
166        cleanup()
167        if (self.destroyed) return
168  
169        try {
170          // Error.message is readonly on some platforms.
171          if (err.message) err.message += ` (${self.announceUrl})`
172        } catch (ignoredErr) {}
173        // errors will often happen if a tracker is offline, so don't treat it as fatal
174        self.client.emit('warning', err)
175      }
176  
177      function onSocketMessage (msg) {
178        if (proxySocket) msg = msg.slice(10)
179        const view = new DataView(transactionId.buffer)
180        if (msg.length < 8 || msg.readUInt32BE(4) !== view.getUint32(0)) {
181          return onError(new Error('tracker sent invalid transaction id'))
182        }
183  
184        const action = msg.readUInt32BE(0)
185        debug('UDP response %s, action %s', self.announceUrl, action)
186        switch (action) {
187          case 0: { // handshake
188            // Note: no check for `self.destroyed` so that pending messages to the
189            // tracker can still be sent/received even after destroy() is called
190  
191            if (msg.length < 16) return onError(new Error('invalid udp handshake'))
192  
193            if (opts._scrape) scrape(msg.slice(8, 16))
194            else announce(msg.slice(8, 16), opts)
195  
196            break
197          }
198          case 1: { // announce
199            cleanup()
200            if (self.destroyed) return
201  
202            if (msg.length < 20) return onError(new Error('invalid announce message'))
203  
204            const interval = msg.readUInt32BE(8)
205            if (interval) self.setInterval(interval * 1000)
206  
207            self.client.emit('update', {
208              announce: self.announceUrl,
209              complete: msg.readUInt32BE(16),
210              incomplete: msg.readUInt32BE(12)
211            })
212  
213            let addrs
214            try {
215              addrs = compact2string.multi(msg.slice(20))
216            } catch (err) {
217              return self.client.emit('warning', err)
218            }
219            addrs.forEach(addr => {
220              self.client.emit('peer', addr)
221            })
222  
223            break
224          }
225          case 2: { // scrape
226            cleanup()
227            if (self.destroyed) return
228  
229            if (msg.length < 20 || (msg.length - 8) % 12 !== 0) {
230              return onError(new Error('invalid scrape message'))
231            }
232            const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
233              ? opts.infoHash.map(infoHash => infoHash.toString('hex'))
234              : [(opts.infoHash && opts.infoHash.toString('hex')) || self.client.infoHash]
235  
236            for (let i = 0, len = (msg.length - 8) / 12; i < len; i += 1) {
237              self.client.emit('scrape', {
238                announce: self.announceUrl,
239                infoHash: infoHashes[i],
240                complete: msg.readUInt32BE(8 + (i * 12)),
241                downloaded: msg.readUInt32BE(12 + (i * 12)),
242                incomplete: msg.readUInt32BE(16 + (i * 12))
243              })
244            }
245  
246            break
247          }
248          case 3: { // error
249            cleanup()
250            if (self.destroyed) return
251  
252            if (msg.length < 8) return onError(new Error('invalid error message'))
253            self.client.emit('warning', new Error(msg.slice(8).toString()))
254  
255            break
256          }
257          default:
258            onError(new Error('tracker sent invalid action'))
259            break
260        }
261      }
262  
263      function send (message, proxyInfo) {
264        if (proxyInfo) {
265          const pack = Socks.createUDPFrame({ host: hostname, port }, message)
266          socket.send(pack, 0, pack.length, proxyInfo.port, proxyInfo.host)
267        } else {
268          socket.send(message, 0, message.length, port, hostname)
269        }
270      }
271  
272      function announce (connectionId, opts) {
273        transactionId = genTransactionId()
274  
275        send(concat([
276          connectionId,
277          common.toUInt32(common.ACTIONS.ANNOUNCE),
278          transactionId,
279          self.client._infoHashBuffer,
280          self.client._peerIdBuffer,
281          toUInt64(opts.downloaded),
282          opts.left != null ? toUInt64(opts.left) : hex2arr('ffffffffffffffff'),
283          toUInt64(opts.uploaded),
284          common.toUInt32(common.EVENTS[opts.event] || 0),
285          common.toUInt32(0), // ip address (optional)
286          common.toUInt32(0), // key (optional)
287          common.toUInt32(opts.numwant),
288          toUInt16(self.client._port)
289        ]), relay)
290      }
291  
292      function scrape (connectionId) {
293        transactionId = genTransactionId()
294  
295        const infoHash = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
296          ? concat(opts.infoHash)
297          : (opts.infoHash || self.client._infoHashBuffer)
298  
299        send(concat([
300          connectionId,
301          common.toUInt32(common.ACTIONS.SCRAPE),
302          transactionId,
303          infoHash
304        ]), relay)
305      }
306    }
307  }
308  
309  UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
310  
311  function genTransactionId () {
312    return randomBytes(4)
313  }
314  
315  function toUInt16 (n) {
316    const buf = new Uint8Array(2)
317    const view = new DataView(buf.buffer)
318    view.setUint16(0, n)
319    return buf
320  }
321  
322  const MAX_UINT = 4294967295
323  
324  function toUInt64 (n) {
325    if (n > MAX_UINT || typeof n === 'string') {
326      const buf = new Uint8Array(8)
327      const view = new DataView(buf.buffer)
328      view.setBigUint64(0, BigInt(n))
329      return buf
330    }
331    return concat([new Uint8Array(4), common.toUInt32(n)])
332  }
333  
334  function noop () {}
335  
336  export default UDPTracker