index.js
1 /*! 2 * on-finished 3 * Copyright(c) 2013 Jonathan Ong 4 * Copyright(c) 2014 Douglas Christopher Wilson 5 * MIT Licensed 6 */ 7 8 'use strict' 9 10 /** 11 * Module exports. 12 * @public 13 */ 14 15 module.exports = onFinished 16 module.exports.isFinished = isFinished 17 18 /** 19 * Module dependencies. 20 * @private 21 */ 22 23 var asyncHooks = tryRequireAsyncHooks() 24 var first = require('ee-first') 25 26 /** 27 * Variables. 28 * @private 29 */ 30 31 /* istanbul ignore next */ 32 var defer = typeof setImmediate === 'function' 33 ? setImmediate 34 : function (fn) { process.nextTick(fn.bind.apply(fn, arguments)) } 35 36 /** 37 * Invoke callback when the response has finished, useful for 38 * cleaning up resources afterwards. 39 * 40 * @param {object} msg 41 * @param {function} listener 42 * @return {object} 43 * @public 44 */ 45 46 function onFinished (msg, listener) { 47 if (isFinished(msg) !== false) { 48 defer(listener, null, msg) 49 return msg 50 } 51 52 // attach the listener to the message 53 attachListener(msg, wrap(listener)) 54 55 return msg 56 } 57 58 /** 59 * Determine if message is already finished. 60 * 61 * @param {object} msg 62 * @return {boolean} 63 * @public 64 */ 65 66 function isFinished (msg) { 67 var socket = msg.socket 68 69 if (typeof msg.finished === 'boolean') { 70 // OutgoingMessage 71 return Boolean(msg.finished || (socket && !socket.writable)) 72 } 73 74 if (typeof msg.complete === 'boolean') { 75 // IncomingMessage 76 return Boolean(msg.upgrade || !socket || !socket.readable || (msg.complete && !msg.readable)) 77 } 78 79 // don't know 80 return undefined 81 } 82 83 /** 84 * Attach a finished listener to the message. 85 * 86 * @param {object} msg 87 * @param {function} callback 88 * @private 89 */ 90 91 function attachFinishedListener (msg, callback) { 92 var eeMsg 93 var eeSocket 94 var finished = false 95 96 function onFinish (error) { 97 eeMsg.cancel() 98 eeSocket.cancel() 99 100 finished = true 101 callback(error) 102 } 103 104 // finished on first message event 105 eeMsg = eeSocket = first([[msg, 'end', 'finish']], onFinish) 106 107 function onSocket (socket) { 108 // remove listener 109 msg.removeListener('socket', onSocket) 110 111 if (finished) return 112 if (eeMsg !== eeSocket) return 113 114 // finished on first socket event 115 eeSocket = first([[socket, 'error', 'close']], onFinish) 116 } 117 118 if (msg.socket) { 119 // socket already assigned 120 onSocket(msg.socket) 121 return 122 } 123 124 // wait for socket to be assigned 125 msg.on('socket', onSocket) 126 127 if (msg.socket === undefined) { 128 // istanbul ignore next: node.js 0.8 patch 129 patchAssignSocket(msg, onSocket) 130 } 131 } 132 133 /** 134 * Attach the listener to the message. 135 * 136 * @param {object} msg 137 * @return {function} 138 * @private 139 */ 140 141 function attachListener (msg, listener) { 142 var attached = msg.__onFinished 143 144 // create a private single listener with queue 145 if (!attached || !attached.queue) { 146 attached = msg.__onFinished = createListener(msg) 147 attachFinishedListener(msg, attached) 148 } 149 150 attached.queue.push(listener) 151 } 152 153 /** 154 * Create listener on message. 155 * 156 * @param {object} msg 157 * @return {function} 158 * @private 159 */ 160 161 function createListener (msg) { 162 function listener (err) { 163 if (msg.__onFinished === listener) msg.__onFinished = null 164 if (!listener.queue) return 165 166 var queue = listener.queue 167 listener.queue = null 168 169 for (var i = 0; i < queue.length; i++) { 170 queue[i](err, msg) 171 } 172 } 173 174 listener.queue = [] 175 176 return listener 177 } 178 179 /** 180 * Patch ServerResponse.prototype.assignSocket for node.js 0.8. 181 * 182 * @param {ServerResponse} res 183 * @param {function} callback 184 * @private 185 */ 186 187 // istanbul ignore next: node.js 0.8 patch 188 function patchAssignSocket (res, callback) { 189 var assignSocket = res.assignSocket 190 191 if (typeof assignSocket !== 'function') return 192 193 // res.on('socket', callback) is broken in 0.8 194 res.assignSocket = function _assignSocket (socket) { 195 assignSocket.call(this, socket) 196 callback(socket) 197 } 198 } 199 200 /** 201 * Try to require async_hooks 202 * @private 203 */ 204 205 function tryRequireAsyncHooks () { 206 try { 207 return require('async_hooks') 208 } catch (e) { 209 return {} 210 } 211 } 212 213 /** 214 * Wrap function with async resource, if possible. 215 * AsyncResource.bind static method backported. 216 * @private 217 */ 218 219 function wrap (fn) { 220 var res 221 222 // create anonymous resource 223 if (asyncHooks.AsyncResource) { 224 res = new asyncHooks.AsyncResource(fn.name || 'bound-anonymous-fn') 225 } 226 227 // incompatible node.js 228 if (!res || !res.runInAsyncScope) { 229 return fn 230 } 231 232 // return bound function 233 return res.runInAsyncScope.bind(res, fn, null) 234 }