processLauncher.js
1 const child_process = require('child_process'); 2 const constants = require('../../constants'); 3 const path = require('path'); 4 const ProcessLogsApi = require('../../modules/process_logs_api'); 5 6 let processCount = 1; 7 class ProcessLauncher { 8 9 /** 10 * Constructor of ProcessLauncher. Forks the module and sets up the message handling 11 * @param {Object} options Options tp start the process 12 * * modulePath {String} Absolute path to the module to fork 13 * * logger {Object} Logger 14 * * events {Function} Events Emitter instance 15 * @return {ProcessLauncher} The ProcessLauncher instance 16 */ 17 constructor(options) { 18 this.name = options.name || path.basename(options.modulePath); 19 20 if (this._isDebug()) { 21 const childOptions = {stdio: 'pipe', execArgv: ['--inspect-brk=' + (60000 + processCount)]}; 22 processCount++; 23 this.process = child_process.fork(options.modulePath, [], childOptions); 24 } else { 25 this.process = child_process.fork(options.modulePath); 26 } 27 this.logger = options.logger; 28 this.events = options.events; 29 this.silent = options.silent; 30 this.exitCallback = options.exitCallback; 31 this.embark = options.embark; 32 this.logs = []; 33 this.processLogsApi = new ProcessLogsApi({embark: this.embark, processName: this.name, silent: this.silent}); 34 35 this.subscriptions = {}; 36 this._subscribeToMessages(); 37 } 38 39 _isDebug() { 40 const argvString= process.execArgv.join(); 41 return argvString.includes('--debug') || argvString.includes('--inspect'); 42 } 43 44 // Subscribes to messages from the child process and delegates to the right methods 45 _subscribeToMessages() { 46 const self = this; 47 this.process.on('message', (msg) => { 48 if (msg.error) { 49 self.logger.error(msg.error); 50 } 51 if (msg.result === constants.process.log) { 52 return self.processLogsApi.logHandler.handleLog(msg); 53 } 54 if (msg.event) { 55 return self._handleEvent(msg); 56 } 57 self._checkSubscriptions(msg); 58 }); 59 60 this.process.on('exit', (code) => { 61 if (self.exitCallback) { 62 return self.exitCallback(code); 63 } 64 if (code) { 65 this.logger.info(`Child Process ${this.name} exited with code ${code}`); 66 } 67 }); 68 } 69 70 // Handle event calls from the child process 71 _handleEvent(msg) { 72 const self = this; 73 if (!self.events[msg.event]) { 74 self.logger.warn('Unknown event method called: ' + msg.event); 75 return; 76 } 77 if (!msg.args || !Array.isArray(msg.args)) { 78 msg.args = []; 79 } 80 // Add callback in the args 81 msg.args.push((result) => { 82 self.process.send({ 83 event: constants.process.events.response, 84 result, 85 eventId: msg.eventId 86 }); 87 }); 88 self.events[msg.event](msg.requestName, ...msg.args); 89 } 90 91 // Looks at the subscriptions to see if there is a callback to call 92 _checkSubscriptions(msg) { 93 const messageKeys = Object.keys(msg); 94 const subscriptionsKeys = Object.keys(this.subscriptions); 95 let subscriptionsForKey; 96 let messageKey; 97 // Find if the message contains a key that we are subscribed to 98 messageKeys.some(_messageKey => { 99 return subscriptionsKeys.some(subscriptionKey => { 100 if (_messageKey === subscriptionKey) { 101 subscriptionsForKey = this.subscriptions[subscriptionKey]; 102 messageKey = _messageKey; 103 return true; 104 } 105 return false; 106 }); 107 }); 108 109 if (subscriptionsForKey) { 110 // Find if we are subscribed to one of the values 111 let subsIndex = []; 112 const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => { 113 if (msg[messageKey] === sub.value) { 114 subsIndex.push(index); 115 return true; 116 } 117 return false; 118 }); 119 120 if (subscriptionsForValue.length) { 121 // We are subscribed to that message, call the callback 122 subscriptionsForValue.forEach((subscription, index) => { 123 subscription.callback(msg); 124 125 if (subscription.once) { 126 // Called only once, we can remove it 127 subscription = null; 128 this.subscriptions[messageKey].splice(subsIndex[index], 1); 129 } 130 }); 131 } 132 } 133 } 134 135 /** 136 * Subscribe to a message using a key-value pair 137 * @param {String} key Message key to subscribe to 138 * @param {String} value Value that the above key must have for the callback to be called 139 * @param {Function} callback callback(response) 140 * @return {void} 141 */ 142 on(key, value, callback) { 143 if (this.subscriptions[key]) { 144 this.subscriptions[key].push({value, callback}); 145 return; 146 } 147 this.subscriptions[key] = [{value, callback}]; 148 } 149 150 /** 151 * Same as .on, but only triggers once 152 * @param {String} key Message key to subscribe to 153 * @param {String} value Value that the above key must have for the callback to be called 154 * @param {Function} callback callback(response) 155 * @return {void} 156 */ 157 once(key, value, callback) { 158 const obj = {value, callback, once: true}; 159 if (this.subscriptions[key]) { 160 this.subscriptions[key].push(obj); 161 return; 162 } 163 this.subscriptions[key] = [obj]; 164 } 165 166 /** 167 * Unsubscribes from a previously subscribed key-value pair (or key if no value) 168 * @param {String} key Message key to unsubscribe 169 * @param {String} value [Optional] Value of the key to unsubscribe 170 * If there is no value, unsubscribes from all the values of that key 171 * @return {void} 172 */ 173 unsubscribeTo(key, value) { 174 if (!value) { 175 this.subscriptions[key] = []; 176 } 177 if (this.subscriptions[key]) { 178 this.subscriptions[key].filter((val, index) => { 179 if (val.value === value) { 180 this.subscriptions[key].splice(index, 1); 181 } 182 }); 183 } 184 } 185 186 /** 187 * Unsubscribes from all subscriptions 188 * @return {void} 189 */ 190 unsubscribeToAll() { 191 this.subscriptions = {}; 192 } 193 194 /** 195 * Sends a message to the child process. Same as ChildProcess.send() 196 * @params {Object} message Message to send 197 * For other parameters, see: 198 * https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback 199 * @return {void} 200 */ 201 send() { 202 if (!this.process.connected) { 203 return false; 204 } 205 return this.process.send(...arguments); 206 } 207 208 /** 209 * Disconnects the child process. It will exit on its own 210 * @return {void} 211 */ 212 disconnect() { 213 this.process.disconnect(); 214 } 215 216 /** 217 * Kills the child process 218 * https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal 219 * @return {void} 220 */ 221 kill() { 222 this.process.kill(...arguments); 223 } 224 } 225 226 module.exports = ProcessLauncher;