udp-tracker.js
1 import arrayRemove from 'unordered-array-remove' 2 import Debug from 'debug' 3 import dgram from 'dgram' 4 import Socks from 'socks' 5 import { concat, hex2arr, randomBytes } from 'uint8-util' 6 7 import common from '../common.js' 8 import Tracker from './tracker.js' 9 import compact2string from 'compact2string' 10 11 const debug = Debug('bittorrent-tracker:udp-tracker') 12 13 // this was done some many years ago to fix "prevent Socks instances concurrency", and used some bloated package, no clue if it's needed, but this is simpler, #356 14 const clone = obj => JSON.parse(JSON.stringify(obj)) 15 16 /** 17 * UDP torrent tracker client (for an individual tracker) 18 * 19 * @param {Client} client parent bittorrent tracker client 20 * @param {string} announceUrl announce url of tracker 21 * @param {Object} opts options object 22 */ 23 class UDPTracker extends Tracker { 24 constructor (client, announceUrl) { 25 super(client, announceUrl) 26 debug('new udp tracker %s', announceUrl) 27 28 this.cleanupFns = [] 29 this.maybeDestroyCleanup = null 30 } 31 32 announce (opts) { 33 if (this.destroyed) return 34 this._request(opts) 35 } 36 37 scrape (opts) { 38 if (this.destroyed) return 39 opts._scrape = true 40 this._request(opts) // udp scrape uses same announce url 41 } 42 43 destroy (cb) { 44 const self = this 45 if (this.destroyed) return cb(null) 46 this.destroyed = true 47 clearInterval(this.interval) 48 49 let timeout 50 51 // If there are no pending requests, destroy immediately. 52 if (this.cleanupFns.length === 0) return destroyCleanup() 53 54 // Otherwise, wait a short time for pending requests to complete, then force 55 // destroy them. 56 timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) 57 58 // But, if all pending requests complete before the timeout fires, do cleanup 59 // right away. 60 this.maybeDestroyCleanup = () => { 61 if (this.cleanupFns.length === 0) destroyCleanup() 62 } 63 64 function destroyCleanup () { 65 if (timeout) { 66 clearTimeout(timeout) 67 timeout = null 68 } 69 self.maybeDestroyCleanup = null 70 self.cleanupFns.slice(0).forEach(cleanup => { 71 cleanup() 72 }) 73 self.cleanupFns = [] 74 cb(null) 75 } 76 } 77 78 _request (opts) { 79 const self = this 80 if (!opts) opts = {} 81 82 let { hostname, port } = common.parseUrl(this.announceUrl) 83 if (port === '') port = 80 84 85 let timeout 86 // Socket used to connect to the socks server to create a relay, null if socks is disabled 87 let proxySocket 88 // Socket used to connect to the tracker or to the socks relay if socks is enabled 89 let socket 90 // Contains the host/port of the socks relay 91 let relay 92 93 let transactionId = genTransactionId() 94 95 const proxyOpts = this.client._proxyOpts && clone(this.client._proxyOpts.socksProxy) 96 if (proxyOpts) { 97 if (!proxyOpts.proxy) proxyOpts.proxy = {} 98 // UDP requests uses the associate command 99 proxyOpts.proxy.command = 'associate' 100 if (!proxyOpts.target) { 101 // This should contain client IP and port but can be set to 0 if we don't have this information 102 proxyOpts.target = { 103 host: '0.0.0.0', 104 port: 0 105 } 106 } 107 108 if (proxyOpts.proxy.type === 5) { 109 Socks.createConnection(proxyOpts, onGotConnection) 110 } else { 111 debug('Ignoring Socks proxy for UDP request because type 5 is required') 112 onGotConnection(null) 113 } 114 } else { 115 onGotConnection(null) 116 } 117 118 this.cleanupFns.push(cleanup) 119 120 function onGotConnection (err, s, info) { 121 if (err) return onError(err) 122 123 proxySocket = s 124 socket = dgram.createSocket('udp4') 125 relay = info 126 127 timeout = setTimeout(() => { 128 // does not matter if `stopped` event arrives, so supress errors 129 if (opts.event === 'stopped') cleanup() 130 else onError(new Error(`tracker request timed out (${opts.event})`)) 131 timeout = null 132 }, common.REQUEST_TIMEOUT) 133 if (timeout.unref) timeout.unref() 134 135 send(concat([ 136 common.CONNECTION_ID, 137 common.toUInt32(common.ACTIONS.CONNECT), 138 transactionId 139 ]), relay) 140 141 socket.once('error', onError) 142 socket.on('message', onSocketMessage) 143 } 144 145 function cleanup () { 146 if (timeout) { 147 clearTimeout(timeout) 148 timeout = null 149 } 150 if (socket) { 151 arrayRemove(self.cleanupFns, self.cleanupFns.indexOf(cleanup)) 152 socket.removeListener('error', onError) 153 socket.removeListener('message', onSocketMessage) 154 socket.on('error', noop) // ignore all future errors 155 try { socket.close() } catch (err) {} 156 socket = null 157 if (proxySocket) { 158 try { proxySocket.close() } catch (err) {} 159 proxySocket = null 160 } 161 } 162 if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() 163 } 164 165 function onError (err) { 166 cleanup() 167 if (self.destroyed) return 168 169 try { 170 // Error.message is readonly on some platforms. 171 if (err.message) err.message += ` (${self.announceUrl})` 172 } catch (ignoredErr) {} 173 // errors will often happen if a tracker is offline, so don't treat it as fatal 174 self.client.emit('warning', err) 175 } 176 177 function onSocketMessage (msg) { 178 if (proxySocket) msg = msg.slice(10) 179 const view = new DataView(transactionId.buffer) 180 if (msg.length < 8 || msg.readUInt32BE(4) !== view.getUint32(0)) { 181 return onError(new Error('tracker sent invalid transaction id')) 182 } 183 184 const action = msg.readUInt32BE(0) 185 debug('UDP response %s, action %s', self.announceUrl, action) 186 switch (action) { 187 case 0: { // handshake 188 // Note: no check for `self.destroyed` so that pending messages to the 189 // tracker can still be sent/received even after destroy() is called 190 191 if (msg.length < 16) return onError(new Error('invalid udp handshake')) 192 193 if (opts._scrape) scrape(msg.slice(8, 16)) 194 else announce(msg.slice(8, 16), opts) 195 196 break 197 } 198 case 1: { // announce 199 cleanup() 200 if (self.destroyed) return 201 202 if (msg.length < 20) return onError(new Error('invalid announce message')) 203 204 const interval = msg.readUInt32BE(8) 205 if (interval) self.setInterval(interval * 1000) 206 207 self.client.emit('update', { 208 announce: self.announceUrl, 209 complete: msg.readUInt32BE(16), 210 incomplete: msg.readUInt32BE(12) 211 }) 212 213 let addrs 214 try { 215 addrs = compact2string.multi(msg.slice(20)) 216 } catch (err) { 217 return self.client.emit('warning', err) 218 } 219 addrs.forEach(addr => { 220 self.client.emit('peer', addr) 221 }) 222 223 break 224 } 225 case 2: { // scrape 226 cleanup() 227 if (self.destroyed) return 228 229 if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { 230 return onError(new Error('invalid scrape message')) 231 } 232 const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) 233 ? opts.infoHash.map(infoHash => infoHash.toString('hex')) 234 : [(opts.infoHash && opts.infoHash.toString('hex')) || self.client.infoHash] 235 236 for (let i = 0, len = (msg.length - 8) / 12; i < len; i += 1) { 237 self.client.emit('scrape', { 238 announce: self.announceUrl, 239 infoHash: infoHashes[i], 240 complete: msg.readUInt32BE(8 + (i * 12)), 241 downloaded: msg.readUInt32BE(12 + (i * 12)), 242 incomplete: msg.readUInt32BE(16 + (i * 12)) 243 }) 244 } 245 246 break 247 } 248 case 3: { // error 249 cleanup() 250 if (self.destroyed) return 251 252 if (msg.length < 8) return onError(new Error('invalid error message')) 253 self.client.emit('warning', new Error(msg.slice(8).toString())) 254 255 break 256 } 257 default: 258 onError(new Error('tracker sent invalid action')) 259 break 260 } 261 } 262 263 function send (message, proxyInfo) { 264 if (proxyInfo) { 265 const pack = Socks.createUDPFrame({ host: hostname, port }, message) 266 socket.send(pack, 0, pack.length, proxyInfo.port, proxyInfo.host) 267 } else { 268 socket.send(message, 0, message.length, port, hostname) 269 } 270 } 271 272 function announce (connectionId, opts) { 273 transactionId = genTransactionId() 274 275 send(concat([ 276 connectionId, 277 common.toUInt32(common.ACTIONS.ANNOUNCE), 278 transactionId, 279 self.client._infoHashBuffer, 280 self.client._peerIdBuffer, 281 toUInt64(opts.downloaded), 282 opts.left != null ? toUInt64(opts.left) : hex2arr('ffffffffffffffff'), 283 toUInt64(opts.uploaded), 284 common.toUInt32(common.EVENTS[opts.event] || 0), 285 common.toUInt32(0), // ip address (optional) 286 common.toUInt32(0), // key (optional) 287 common.toUInt32(opts.numwant), 288 toUInt16(self.client._port) 289 ]), relay) 290 } 291 292 function scrape (connectionId) { 293 transactionId = genTransactionId() 294 295 const infoHash = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) 296 ? concat(opts.infoHash) 297 : (opts.infoHash || self.client._infoHashBuffer) 298 299 send(concat([ 300 connectionId, 301 common.toUInt32(common.ACTIONS.SCRAPE), 302 transactionId, 303 infoHash 304 ]), relay) 305 } 306 } 307 } 308 309 UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes 310 311 function genTransactionId () { 312 return randomBytes(4) 313 } 314 315 function toUInt16 (n) { 316 const buf = new Uint8Array(2) 317 const view = new DataView(buf.buffer) 318 view.setUint16(0, n) 319 return buf 320 } 321 322 const MAX_UINT = 4294967295 323 324 function toUInt64 (n) { 325 if (n > MAX_UINT || typeof n === 'string') { 326 const buf = new Uint8Array(8) 327 const view = new DataView(buf.buffer) 328 view.setBigUint64(0, BigInt(n)) 329 return buf 330 } 331 return concat([new Uint8Array(4), common.toUInt32(n)]) 332 } 333 334 function noop () {} 335 336 export default UDPTracker