streamify.js
  1  var async = require('./async.js');
  2  
  3  // API
  4  module.exports = {
  5    iterator: wrapIterator,
  6    callback: wrapCallback
  7  };
  8  
  9  /**
 10   * Wraps iterators with long signature
 11   *
 12   * @this    ReadableAsyncKit#
 13   * @param   {function} iterator - function to wrap
 14   * @returns {function} - wrapped function
 15   */
 16  function wrapIterator(iterator)
 17  {
 18    var stream = this;
 19  
 20    return function(item, key, cb)
 21    {
 22      var aborter
 23        , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
 24        ;
 25  
 26      stream.jobs[key] = wrappedCb;
 27  
 28      // it's either shortcut (item, cb)
 29      if (iterator.length == 2)
 30      {
 31        aborter = iterator(item, wrappedCb);
 32      }
 33      // or long format (item, key, cb)
 34      else
 35      {
 36        aborter = iterator(item, key, wrappedCb);
 37      }
 38  
 39      return aborter;
 40    };
 41  }
 42  
 43  /**
 44   * Wraps provided callback function
 45   * allowing to execute snitch function before
 46   * real callback
 47   *
 48   * @this    ReadableAsyncKit#
 49   * @param   {function} callback - function to wrap
 50   * @returns {function} - wrapped function
 51   */
 52  function wrapCallback(callback)
 53  {
 54    var stream = this;
 55  
 56    var wrapped = function(error, result)
 57    {
 58      return finisher.call(stream, error, result, callback);
 59    };
 60  
 61    return wrapped;
 62  }
 63  
 64  /**
 65   * Wraps provided iterator callback function
 66   * makes sure snitch only called once,
 67   * but passes secondary calls to the original callback
 68   *
 69   * @this    ReadableAsyncKit#
 70   * @param   {function} callback - callback to wrap
 71   * @param   {number|string} key - iteration key
 72   * @returns {function} wrapped callback
 73   */
 74  function wrapIteratorCallback(callback, key)
 75  {
 76    var stream = this;
 77  
 78    return function(error, output)
 79    {
 80      // don't repeat yourself
 81      if (!(key in stream.jobs))
 82      {
 83        callback(error, output);
 84        return;
 85      }
 86  
 87      // clean up jobs
 88      delete stream.jobs[key];
 89  
 90      return streamer.call(stream, error, {key: key, value: output}, callback);
 91    };
 92  }
 93  
 94  /**
 95   * Stream wrapper for iterator callback
 96   *
 97   * @this  ReadableAsyncKit#
 98   * @param {mixed} error - error response
 99   * @param {mixed} output - iterator output
100   * @param {function} callback - callback that expects iterator results
101   */
102  function streamer(error, output, callback)
103  {
104    if (error && !this.error)
105    {
106      this.error = error;
107      this.pause();
108      this.emit('error', error);
109      // send back value only, as expected
110      callback(error, output && output.value);
111      return;
112    }
113  
114    // stream stuff
115    this.push(output);
116  
117    // back to original track
118    // send back value only, as expected
119    callback(error, output && output.value);
120  }
121  
122  /**
123   * Stream wrapper for finishing callback
124   *
125   * @this  ReadableAsyncKit#
126   * @param {mixed} error - error response
127   * @param {mixed} output - iterator output
128   * @param {function} callback - callback that expects final results
129   */
130  function finisher(error, output, callback)
131  {
132    // signal end of the stream
133    // only for successfully finished streams
134    if (!error)
135    {
136      this.push(null);
137    }
138  
139    // back to original track
140    callback(error, output);
141  }