/ lib / core / processes / processLauncher.js
processLauncher.js
  1  const child_process = require('child_process');
  2  const constants = require('../../constants');
  3  const path = require('path');
  4  const ProcessLogsApi = require('../../modules/process_logs_api');
  5  
  6  let processCount = 1;
  7  class ProcessLauncher {
  8  
  9    /**
 10     * Constructor of ProcessLauncher. Forks the module and sets up the message handling
 11     * @param {Object}    options   Options tp start the process
 12     *        * modulePath      {String}    Absolute path to the module to fork
 13     *        * logger          {Object}    Logger
 14     *        * events          {Function}  Events Emitter instance
 15     * @return {ProcessLauncher}    The ProcessLauncher instance
 16     */
 17    constructor(options) {
 18      this.name = options.name || path.basename(options.modulePath);
 19  
 20      if (this._isDebug()) {
 21        const childOptions = {stdio: 'pipe', execArgv: ['--inspect-brk=' + (60000 + processCount)]};
 22        processCount++;
 23        this.process = child_process.fork(options.modulePath, [], childOptions);
 24      } else {
 25        this.process = child_process.fork(options.modulePath);
 26      }
 27      this.logger = options.logger;
 28      this.events = options.events;
 29      this.silent = options.silent;
 30      this.exitCallback = options.exitCallback;
 31      this.embark = options.embark;
 32      this.logs = [];
 33      this.processLogsApi = new ProcessLogsApi({embark: this.embark, processName: this.name, silent: this.silent});
 34  
 35      this.subscriptions = {};
 36      this._subscribeToMessages();
 37    }
 38  
 39    _isDebug() {
 40      const argvString= process.execArgv.join();
 41      return argvString.includes('--debug') || argvString.includes('--inspect');
 42    }
 43  
 44    // Subscribes to messages from the child process and delegates to the right methods
 45    _subscribeToMessages() {
 46      const self = this;
 47      this.process.on('message', (msg) => {
 48        if (msg.error) {
 49          self.logger.error(msg.error);
 50        }
 51        if (msg.result === constants.process.log) {
 52          return self.processLogsApi.logHandler.handleLog(msg);
 53        }
 54        if (msg.event) {
 55          return self._handleEvent(msg);
 56        }
 57        self._checkSubscriptions(msg);
 58      });
 59  
 60      this.process.on('exit', (code) => {
 61        if (self.exitCallback) {
 62          return self.exitCallback(code);
 63        }
 64        if (code) {
 65          this.logger.info(`Child Process ${this.name} exited with code ${code}`);
 66        }
 67      });
 68    }
 69  
 70    // Handle event calls from the child process
 71    _handleEvent(msg) {
 72      const self = this;
 73      if (!self.events[msg.event]) {
 74        self.logger.warn('Unknown event method called: ' + msg.event);
 75        return;
 76      }
 77      if (!msg.args || !Array.isArray(msg.args)) {
 78        msg.args = [];
 79      }
 80      // Add callback in the args
 81      msg.args.push((result) => {
 82        self.process.send({
 83          event: constants.process.events.response,
 84          result,
 85          eventId: msg.eventId
 86        });
 87      });
 88      self.events[msg.event](msg.requestName, ...msg.args);
 89    }
 90  
 91    // Looks at the subscriptions to see if there is a callback to call
 92    _checkSubscriptions(msg) {
 93      const messageKeys = Object.keys(msg);
 94      const subscriptionsKeys = Object.keys(this.subscriptions);
 95      let subscriptionsForKey;
 96      let messageKey;
 97      // Find if the message contains a key that we are subscribed to
 98      messageKeys.some(_messageKey => {
 99        return subscriptionsKeys.some(subscriptionKey => {
100          if (_messageKey === subscriptionKey) {
101            subscriptionsForKey = this.subscriptions[subscriptionKey];
102            messageKey = _messageKey;
103            return true;
104          }
105          return false;
106        });
107      });
108  
109      if (subscriptionsForKey) {
110        // Find if we are subscribed to one of the values
111        let subsIndex = [];
112        const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => {
113          if (msg[messageKey] === sub.value) {
114            subsIndex.push(index);
115            return true;
116          }
117          return false;
118        });
119  
120        if (subscriptionsForValue.length) {
121          // We are subscribed to that message, call the callback
122          subscriptionsForValue.forEach((subscription, index) => {
123            subscription.callback(msg);
124  
125            if (subscription.once) {
126              // Called only once, we can remove it
127              subscription = null;
128              this.subscriptions[messageKey].splice(subsIndex[index], 1);
129            }
130          });
131        }
132      }
133    }
134  
135    /**
136     * Subscribe to a message using a key-value pair
137     * @param {String}    key       Message key to subscribe to
138     * @param {String}    value     Value that the above key must have for the callback to be called
139     * @param {Function}  callback  callback(response)
140     * @return {void}
141     */
142    on(key, value, callback) {
143      if (this.subscriptions[key]) {
144        this.subscriptions[key].push({value, callback});
145        return;
146      }
147      this.subscriptions[key] = [{value, callback}];
148    }
149  
150    /**
151     * Same as .on, but only triggers once
152     * @param {String}    key       Message key to subscribe to
153     * @param {String}    value     Value that the above key must have for the callback to be called
154     * @param {Function}  callback  callback(response)
155     * @return {void}
156     */
157    once(key, value, callback) {
158      const obj = {value, callback, once: true};
159      if (this.subscriptions[key]) {
160        this.subscriptions[key].push(obj);
161        return;
162      }
163      this.subscriptions[key] = [obj];
164    }
165  
166    /**
167     * Unsubscribes from a previously subscribed key-value pair (or key if no value)
168     * @param {String}  key     Message key to unsubscribe
169     * @param {String}  value   [Optional] Value of the key to unsubscribe
170     *                            If there is no value, unsubscribes from all the values of that key
171     * @return {void}
172     */
173    unsubscribeTo(key, value) {
174      if (!value) {
175        this.subscriptions[key] = [];
176      }
177      if (this.subscriptions[key]) {
178        this.subscriptions[key].filter((val, index) => {
179          if (val.value === value) {
180            this.subscriptions[key].splice(index, 1);
181          }
182        });
183      }
184    }
185  
186    /**
187     * Unsubscribes from all subscriptions
188     * @return {void}
189     */
190    unsubscribeToAll() {
191      this.subscriptions = {};
192    }
193  
194    /**
195     * Sends a message to the child process. Same as ChildProcess.send()
196     * @params {Object}   message     Message to send
197     * For other parameters, see:
198     *  https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
199     * @return {void}
200     */
201    send() {
202      if (!this.process.connected) {
203        return false;
204      }
205      return this.process.send(...arguments);
206    }
207  
208    /**
209     * Disconnects the child process. It will exit on its own
210     * @return {void}
211     */
212    disconnect() {
213      this.process.disconnect();
214    }
215  
216    /**
217     * Kills the child process
218     *  https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal
219     * @return {void}
220     */
221    kill() {
222      this.process.kill(...arguments);
223    }
224  }
225  
226  module.exports = ProcessLauncher;