index.js
  1  /*!
  2   * raw-body
  3   * Copyright(c) 2013-2014 Jonathan Ong
  4   * Copyright(c) 2014-2015 Douglas Christopher Wilson
  5   * MIT Licensed
  6   */
  7  
  8  'use strict'
  9  
 10  /**
 11   * Module dependencies.
 12   * @private
 13   */
 14  
 15  var bytes = require('bytes')
 16  var createError = require('http-errors')
 17  var iconv = require('iconv-lite')
 18  var unpipe = require('unpipe')
 19  
 20  /**
 21   * Module exports.
 22   * @public
 23   */
 24  
 25  module.exports = getRawBody
 26  
 27  /**
 28   * Module variables.
 29   * @private
 30   */
 31  
 32  var ICONV_ENCODING_MESSAGE_REGEXP = /^Encoding not recognized: /
 33  
 34  /**
 35   * Get the decoder for a given encoding.
 36   *
 37   * @param {string} encoding
 38   * @private
 39   */
 40  
 41  function getDecoder (encoding) {
 42    if (!encoding) return null
 43  
 44    try {
 45      return iconv.getDecoder(encoding)
 46    } catch (e) {
 47      // error getting decoder
 48      if (!ICONV_ENCODING_MESSAGE_REGEXP.test(e.message)) throw e
 49  
 50      // the encoding was not found
 51      throw createError(415, 'specified encoding unsupported', {
 52        encoding: encoding,
 53        type: 'encoding.unsupported'
 54      })
 55    }
 56  }
 57  
 58  /**
 59   * Get the raw body of a stream (typically HTTP).
 60   *
 61   * @param {object} stream
 62   * @param {object|string|function} [options]
 63   * @param {function} [callback]
 64   * @public
 65   */
 66  
 67  function getRawBody (stream, options, callback) {
 68    var done = callback
 69    var opts = options || {}
 70  
 71    if (options === true || typeof options === 'string') {
 72      // short cut for encoding
 73      opts = {
 74        encoding: options
 75      }
 76    }
 77  
 78    if (typeof options === 'function') {
 79      done = options
 80      opts = {}
 81    }
 82  
 83    // validate callback is a function, if provided
 84    if (done !== undefined && typeof done !== 'function') {
 85      throw new TypeError('argument callback must be a function')
 86    }
 87  
 88    // require the callback without promises
 89    if (!done && !global.Promise) {
 90      throw new TypeError('argument callback is required')
 91    }
 92  
 93    // get encoding
 94    var encoding = opts.encoding !== true
 95      ? opts.encoding
 96      : 'utf-8'
 97  
 98    // convert the limit to an integer
 99    var limit = bytes.parse(opts.limit)
100  
101    // convert the expected length to an integer
102    var length = opts.length != null && !isNaN(opts.length)
103      ? parseInt(opts.length, 10)
104      : null
105  
106    if (done) {
107      // classic callback style
108      return readStream(stream, encoding, length, limit, done)
109    }
110  
111    return new Promise(function executor (resolve, reject) {
112      readStream(stream, encoding, length, limit, function onRead (err, buf) {
113        if (err) return reject(err)
114        resolve(buf)
115      })
116    })
117  }
118  
119  /**
120   * Halt a stream.
121   *
122   * @param {Object} stream
123   * @private
124   */
125  
126  function halt (stream) {
127    // unpipe everything from the stream
128    unpipe(stream)
129  
130    // pause stream
131    if (typeof stream.pause === 'function') {
132      stream.pause()
133    }
134  }
135  
136  /**
137   * Read the data from the stream.
138   *
139   * @param {object} stream
140   * @param {string} encoding
141   * @param {number} length
142   * @param {number} limit
143   * @param {function} callback
144   * @public
145   */
146  
147  function readStream (stream, encoding, length, limit, callback) {
148    var complete = false
149    var sync = true
150  
151    // check the length and limit options.
152    // note: we intentionally leave the stream paused,
153    // so users should handle the stream themselves.
154    if (limit !== null && length !== null && length > limit) {
155      return done(createError(413, 'request entity too large', {
156        expected: length,
157        length: length,
158        limit: limit,
159        type: 'entity.too.large'
160      }))
161    }
162  
163    // streams1: assert request encoding is buffer.
164    // streams2+: assert the stream encoding is buffer.
165    //   stream._decoder: streams1
166    //   state.encoding: streams2
167    //   state.decoder: streams2, specifically < 0.10.6
168    var state = stream._readableState
169    if (stream._decoder || (state && (state.encoding || state.decoder))) {
170      // developer error
171      return done(createError(500, 'stream encoding should not be set', {
172        type: 'stream.encoding.set'
173      }))
174    }
175  
176    var received = 0
177    var decoder
178  
179    try {
180      decoder = getDecoder(encoding)
181    } catch (err) {
182      return done(err)
183    }
184  
185    var buffer = decoder
186      ? ''
187      : []
188  
189    // attach listeners
190    stream.on('aborted', onAborted)
191    stream.on('close', cleanup)
192    stream.on('data', onData)
193    stream.on('end', onEnd)
194    stream.on('error', onEnd)
195  
196    // mark sync section complete
197    sync = false
198  
199    function done () {
200      var args = new Array(arguments.length)
201  
202      // copy arguments
203      for (var i = 0; i < args.length; i++) {
204        args[i] = arguments[i]
205      }
206  
207      // mark complete
208      complete = true
209  
210      if (sync) {
211        process.nextTick(invokeCallback)
212      } else {
213        invokeCallback()
214      }
215  
216      function invokeCallback () {
217        cleanup()
218  
219        if (args[0]) {
220          // halt the stream on error
221          halt(stream)
222        }
223  
224        callback.apply(null, args)
225      }
226    }
227  
228    function onAborted () {
229      if (complete) return
230  
231      done(createError(400, 'request aborted', {
232        code: 'ECONNABORTED',
233        expected: length,
234        length: length,
235        received: received,
236        type: 'request.aborted'
237      }))
238    }
239  
240    function onData (chunk) {
241      if (complete) return
242  
243      received += chunk.length
244  
245      if (limit !== null && received > limit) {
246        done(createError(413, 'request entity too large', {
247          limit: limit,
248          received: received,
249          type: 'entity.too.large'
250        }))
251      } else if (decoder) {
252        buffer += decoder.write(chunk)
253      } else {
254        buffer.push(chunk)
255      }
256    }
257  
258    function onEnd (err) {
259      if (complete) return
260      if (err) return done(err)
261  
262      if (length !== null && received !== length) {
263        done(createError(400, 'request size did not match content length', {
264          expected: length,
265          length: length,
266          received: received,
267          type: 'request.size.invalid'
268        }))
269      } else {
270        var string = decoder
271          ? buffer + (decoder.end() || '')
272          : Buffer.concat(buffer)
273        done(null, string)
274      }
275    }
276  
277    function cleanup () {
278      buffer = null
279  
280      stream.removeListener('aborted', onAborted)
281      stream.removeListener('data', onData)
282      stream.removeListener('end', onEnd)
283      stream.removeListener('error', onEnd)
284      stream.removeListener('close', cleanup)
285    }
286  }