index.js
  1  /*!
  2   * on-finished
  3   * Copyright(c) 2013 Jonathan Ong
  4   * Copyright(c) 2014 Douglas Christopher Wilson
  5   * MIT Licensed
  6   */
  7  
  8  'use strict'
  9  
 10  /**
 11   * Module exports.
 12   * @public
 13   */
 14  
 15  module.exports = onFinished
 16  module.exports.isFinished = isFinished
 17  
 18  /**
 19   * Module dependencies.
 20   * @private
 21   */
 22  
 23  var asyncHooks = tryRequireAsyncHooks()
 24  var first = require('ee-first')
 25  
 26  /**
 27   * Variables.
 28   * @private
 29   */
 30  
 31  /* istanbul ignore next */
 32  var defer = typeof setImmediate === 'function'
 33    ? setImmediate
 34    : function (fn) { process.nextTick(fn.bind.apply(fn, arguments)) }
 35  
 36  /**
 37   * Invoke callback when the response has finished, useful for
 38   * cleaning up resources afterwards.
 39   *
 40   * @param {object} msg
 41   * @param {function} listener
 42   * @return {object}
 43   * @public
 44   */
 45  
 46  function onFinished (msg, listener) {
 47    if (isFinished(msg) !== false) {
 48      defer(listener, null, msg)
 49      return msg
 50    }
 51  
 52    // attach the listener to the message
 53    attachListener(msg, wrap(listener))
 54  
 55    return msg
 56  }
 57  
 58  /**
 59   * Determine if message is already finished.
 60   *
 61   * @param {object} msg
 62   * @return {boolean}
 63   * @public
 64   */
 65  
 66  function isFinished (msg) {
 67    var socket = msg.socket
 68  
 69    if (typeof msg.finished === 'boolean') {
 70      // OutgoingMessage
 71      return Boolean(msg.finished || (socket && !socket.writable))
 72    }
 73  
 74    if (typeof msg.complete === 'boolean') {
 75      // IncomingMessage
 76      return Boolean(msg.upgrade || !socket || !socket.readable || (msg.complete && !msg.readable))
 77    }
 78  
 79    // don't know
 80    return undefined
 81  }
 82  
 83  /**
 84   * Attach a finished listener to the message.
 85   *
 86   * @param {object} msg
 87   * @param {function} callback
 88   * @private
 89   */
 90  
 91  function attachFinishedListener (msg, callback) {
 92    var eeMsg
 93    var eeSocket
 94    var finished = false
 95  
 96    function onFinish (error) {
 97      eeMsg.cancel()
 98      eeSocket.cancel()
 99  
100      finished = true
101      callback(error)
102    }
103  
104    // finished on first message event
105    eeMsg = eeSocket = first([[msg, 'end', 'finish']], onFinish)
106  
107    function onSocket (socket) {
108      // remove listener
109      msg.removeListener('socket', onSocket)
110  
111      if (finished) return
112      if (eeMsg !== eeSocket) return
113  
114      // finished on first socket event
115      eeSocket = first([[socket, 'error', 'close']], onFinish)
116    }
117  
118    if (msg.socket) {
119      // socket already assigned
120      onSocket(msg.socket)
121      return
122    }
123  
124    // wait for socket to be assigned
125    msg.on('socket', onSocket)
126  
127    if (msg.socket === undefined) {
128      // istanbul ignore next: node.js 0.8 patch
129      patchAssignSocket(msg, onSocket)
130    }
131  }
132  
133  /**
134   * Attach the listener to the message.
135   *
136   * @param {object} msg
137   * @return {function}
138   * @private
139   */
140  
141  function attachListener (msg, listener) {
142    var attached = msg.__onFinished
143  
144    // create a private single listener with queue
145    if (!attached || !attached.queue) {
146      attached = msg.__onFinished = createListener(msg)
147      attachFinishedListener(msg, attached)
148    }
149  
150    attached.queue.push(listener)
151  }
152  
153  /**
154   * Create listener on message.
155   *
156   * @param {object} msg
157   * @return {function}
158   * @private
159   */
160  
161  function createListener (msg) {
162    function listener (err) {
163      if (msg.__onFinished === listener) msg.__onFinished = null
164      if (!listener.queue) return
165  
166      var queue = listener.queue
167      listener.queue = null
168  
169      for (var i = 0; i < queue.length; i++) {
170        queue[i](err, msg)
171      }
172    }
173  
174    listener.queue = []
175  
176    return listener
177  }
178  
179  /**
180   * Patch ServerResponse.prototype.assignSocket for node.js 0.8.
181   *
182   * @param {ServerResponse} res
183   * @param {function} callback
184   * @private
185   */
186  
187  // istanbul ignore next: node.js 0.8 patch
188  function patchAssignSocket (res, callback) {
189    var assignSocket = res.assignSocket
190  
191    if (typeof assignSocket !== 'function') return
192  
193    // res.on('socket', callback) is broken in 0.8
194    res.assignSocket = function _assignSocket (socket) {
195      assignSocket.call(this, socket)
196      callback(socket)
197    }
198  }
199  
200  /**
201   * Try to require async_hooks
202   * @private
203   */
204  
205  function tryRequireAsyncHooks () {
206    try {
207      return require('async_hooks')
208    } catch (e) {
209      return {}
210    }
211  }
212  
213  /**
214   * Wrap function with async resource, if possible.
215   * AsyncResource.bind static method backported.
216   * @private
217   */
218  
219  function wrap (fn) {
220    var res
221  
222    // create anonymous resource
223    if (asyncHooks.AsyncResource) {
224      res = new asyncHooks.AsyncResource(fn.name || 'bound-anonymous-fn')
225    }
226  
227    // incompatible node.js
228    if (!res || !res.runInAsyncScope) {
229      return fn
230    }
231  
232    // return bound function
233    return res.runInAsyncScope.bind(res, fn, null)
234  }