index.js
1 'use strict' 2 const EventEmitter = require('events').EventEmitter 3 4 const NOOP = function () {} 5 6 const removeWhere = (list, predicate) => { 7 const i = list.findIndex(predicate) 8 9 return i === -1 ? undefined : list.splice(i, 1)[0] 10 } 11 12 class IdleItem { 13 constructor(client, idleListener, timeoutId) { 14 this.client = client 15 this.idleListener = idleListener 16 this.timeoutId = timeoutId 17 } 18 } 19 20 class PendingItem { 21 constructor(callback) { 22 this.callback = callback 23 } 24 } 25 26 function throwOnDoubleRelease() { 27 throw new Error('Release called on client which has already been released to the pool.') 28 } 29 30 function promisify(Promise, callback) { 31 if (callback) { 32 return { callback: callback, result: undefined } 33 } 34 let rej 35 let res 36 const cb = function (err, client) { 37 err ? rej(err) : res(client) 38 } 39 const result = new Promise(function (resolve, reject) { 40 res = resolve 41 rej = reject 42 }).catch((err) => { 43 // replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the 44 // application that created the query 45 Error.captureStackTrace(err) 46 throw err 47 }) 48 return { callback: cb, result: result } 49 } 50 51 function makeIdleListener(pool, client) { 52 return function idleListener(err) { 53 err.client = client 54 55 client.removeListener('error', idleListener) 56 client.on('error', () => { 57 pool.log('additional client error after disconnection due to error', err) 58 }) 59 pool._remove(client) 60 // TODO - document that once the pool emits an error 61 // the client has already been closed & purged and is unusable 62 pool.emit('error', err, client) 63 } 64 } 65 66 class Pool extends EventEmitter { 67 constructor(options, Client) { 68 super() 69 this.options = Object.assign({}, options) 70 71 if (options != null && 'password' in options) { 72 // "hiding" the password so it doesn't show up in stack traces 73 // or if the client is console.logged 74 Object.defineProperty(this.options, 'password', { 75 configurable: true, 76 enumerable: false, 77 writable: true, 78 value: options.password, 79 }) 80 } 81 if (options != null && options.ssl && options.ssl.key) { 82 // "hiding" the ssl->key so it doesn't show up in stack traces 83 // or if the client is console.logged 84 Object.defineProperty(this.options.ssl, 'key', { 85 enumerable: false, 86 }) 87 } 88 89 this.options.max = this.options.max || this.options.poolSize || 10 90 this.options.min = this.options.min || 0 91 this.options.maxUses = this.options.maxUses || Infinity 92 this.options.allowExitOnIdle = this.options.allowExitOnIdle || false 93 this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0 94 this.log = this.options.log || function () {} 95 this.Client = this.options.Client || Client || require('pg').Client 96 this.Promise = this.options.Promise || global.Promise 97 98 if (typeof this.options.idleTimeoutMillis === 'undefined') { 99 this.options.idleTimeoutMillis = 10000 100 } 101 102 this._clients = [] 103 this._idle = [] 104 this._expired = new WeakSet() 105 this._pendingQueue = [] 106 this._endCallback = undefined 107 this.ending = false 108 this.ended = false 109 } 110 111 _isFull() { 112 return this._clients.length >= this.options.max 113 } 114 115 _isAboveMin() { 116 return this._clients.length > this.options.min 117 } 118 119 _pulseQueue() { 120 this.log('pulse queue') 121 if (this.ended) { 122 this.log('pulse queue ended') 123 return 124 } 125 if (this.ending) { 126 this.log('pulse queue on ending') 127 if (this._idle.length) { 128 this._idle.slice().map((item) => { 129 this._remove(item.client) 130 }) 131 } 132 if (!this._clients.length) { 133 this.ended = true 134 this._endCallback() 135 } 136 return 137 } 138 139 // if we don't have any waiting, do nothing 140 if (!this._pendingQueue.length) { 141 this.log('no queued requests') 142 return 143 } 144 // if we don't have any idle clients and we have no more room do nothing 145 if (!this._idle.length && this._isFull()) { 146 return 147 } 148 const pendingItem = this._pendingQueue.shift() 149 if (this._idle.length) { 150 const idleItem = this._idle.pop() 151 clearTimeout(idleItem.timeoutId) 152 const client = idleItem.client 153 client.ref && client.ref() 154 const idleListener = idleItem.idleListener 155 156 return this._acquireClient(client, pendingItem, idleListener, false) 157 } 158 if (!this._isFull()) { 159 return this.newClient(pendingItem) 160 } 161 throw new Error('unexpected condition') 162 } 163 164 _remove(client, callback) { 165 const removed = removeWhere(this._idle, (item) => item.client === client) 166 167 if (removed !== undefined) { 168 clearTimeout(removed.timeoutId) 169 } 170 171 this._clients = this._clients.filter((c) => c !== client) 172 const context = this 173 client.end(() => { 174 context.emit('remove', client) 175 176 if (typeof callback === 'function') { 177 callback() 178 } 179 }) 180 } 181 182 connect(cb) { 183 if (this.ending) { 184 const err = new Error('Cannot use a pool after calling end on the pool') 185 return cb ? cb(err) : this.Promise.reject(err) 186 } 187 188 const response = promisify(this.Promise, cb) 189 const result = response.result 190 191 // if we don't have to connect a new client, don't do so 192 if (this._isFull() || this._idle.length) { 193 // if we have idle clients schedule a pulse immediately 194 if (this._idle.length) { 195 process.nextTick(() => this._pulseQueue()) 196 } 197 198 if (!this.options.connectionTimeoutMillis) { 199 this._pendingQueue.push(new PendingItem(response.callback)) 200 return result 201 } 202 203 const queueCallback = (err, res, done) => { 204 clearTimeout(tid) 205 response.callback(err, res, done) 206 } 207 208 const pendingItem = new PendingItem(queueCallback) 209 210 // set connection timeout on checking out an existing client 211 const tid = setTimeout(() => { 212 // remove the callback from pending waiters because 213 // we're going to call it with a timeout error 214 removeWhere(this._pendingQueue, (i) => i.callback === queueCallback) 215 pendingItem.timedOut = true 216 response.callback(new Error('timeout exceeded when trying to connect')) 217 }, this.options.connectionTimeoutMillis) 218 219 if (tid.unref) { 220 tid.unref() 221 } 222 223 this._pendingQueue.push(pendingItem) 224 return result 225 } 226 227 this.newClient(new PendingItem(response.callback)) 228 229 return result 230 } 231 232 newClient(pendingItem) { 233 const client = new this.Client(this.options) 234 this._clients.push(client) 235 const idleListener = makeIdleListener(this, client) 236 237 this.log('checking client timeout') 238 239 // connection timeout logic 240 let tid 241 let timeoutHit = false 242 if (this.options.connectionTimeoutMillis) { 243 tid = setTimeout(() => { 244 this.log('ending client due to timeout') 245 timeoutHit = true 246 // force kill the node driver, and let libpq do its teardown 247 client.connection ? client.connection.stream.destroy() : client.end() 248 }, this.options.connectionTimeoutMillis) 249 } 250 251 this.log('connecting new client') 252 client.connect((err) => { 253 if (tid) { 254 clearTimeout(tid) 255 } 256 client.on('error', idleListener) 257 if (err) { 258 this.log('client failed to connect', err) 259 // remove the dead client from our list of clients 260 this._clients = this._clients.filter((c) => c !== client) 261 if (timeoutHit) { 262 err = new Error('Connection terminated due to connection timeout', { cause: err }) 263 } 264 265 // this client won’t be released, so move on immediately 266 this._pulseQueue() 267 268 if (!pendingItem.timedOut) { 269 pendingItem.callback(err, undefined, NOOP) 270 } 271 } else { 272 this.log('new client connected') 273 274 if (this.options.maxLifetimeSeconds !== 0) { 275 const maxLifetimeTimeout = setTimeout(() => { 276 this.log('ending client due to expired lifetime') 277 this._expired.add(client) 278 const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client) 279 if (idleIndex !== -1) { 280 this._acquireClient( 281 client, 282 new PendingItem((err, client, clientRelease) => clientRelease()), 283 idleListener, 284 false 285 ) 286 } 287 }, this.options.maxLifetimeSeconds * 1000) 288 289 maxLifetimeTimeout.unref() 290 client.once('end', () => clearTimeout(maxLifetimeTimeout)) 291 } 292 293 return this._acquireClient(client, pendingItem, idleListener, true) 294 } 295 }) 296 } 297 298 // acquire a client for a pending work item 299 _acquireClient(client, pendingItem, idleListener, isNew) { 300 if (isNew) { 301 this.emit('connect', client) 302 } 303 304 this.emit('acquire', client) 305 306 client.release = this._releaseOnce(client, idleListener) 307 308 client.removeListener('error', idleListener) 309 310 if (!pendingItem.timedOut) { 311 if (isNew && this.options.verify) { 312 this.options.verify(client, (err) => { 313 if (err) { 314 client.release(err) 315 return pendingItem.callback(err, undefined, NOOP) 316 } 317 318 pendingItem.callback(undefined, client, client.release) 319 }) 320 } else { 321 pendingItem.callback(undefined, client, client.release) 322 } 323 } else { 324 if (isNew && this.options.verify) { 325 this.options.verify(client, client.release) 326 } else { 327 client.release() 328 } 329 } 330 } 331 332 // returns a function that wraps _release and throws if called more than once 333 _releaseOnce(client, idleListener) { 334 let released = false 335 336 return (err) => { 337 if (released) { 338 throwOnDoubleRelease() 339 } 340 341 released = true 342 this._release(client, idleListener, err) 343 } 344 } 345 346 // release a client back to the poll, include an error 347 // to remove it from the pool 348 _release(client, idleListener, err) { 349 client.on('error', idleListener) 350 351 client._poolUseCount = (client._poolUseCount || 0) + 1 352 353 this.emit('release', err, client) 354 355 // TODO(bmc): expose a proper, public interface _queryable and _ending 356 if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { 357 if (client._poolUseCount >= this.options.maxUses) { 358 this.log('remove expended client') 359 } 360 361 return this._remove(client, this._pulseQueue.bind(this)) 362 } 363 364 const isExpired = this._expired.has(client) 365 if (isExpired) { 366 this.log('remove expired client') 367 this._expired.delete(client) 368 return this._remove(client, this._pulseQueue.bind(this)) 369 } 370 371 // idle timeout 372 let tid 373 if (this.options.idleTimeoutMillis && this._isAboveMin()) { 374 tid = setTimeout(() => { 375 this.log('remove idle client') 376 this._remove(client, this._pulseQueue.bind(this)) 377 }, this.options.idleTimeoutMillis) 378 379 if (this.options.allowExitOnIdle) { 380 // allow Node to exit if this is all that's left 381 tid.unref() 382 } 383 } 384 385 if (this.options.allowExitOnIdle) { 386 client.unref() 387 } 388 389 this._idle.push(new IdleItem(client, idleListener, tid)) 390 this._pulseQueue() 391 } 392 393 query(text, values, cb) { 394 // guard clause against passing a function as the first parameter 395 if (typeof text === 'function') { 396 const response = promisify(this.Promise, text) 397 setImmediate(function () { 398 return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported')) 399 }) 400 return response.result 401 } 402 403 // allow plain text query without values 404 if (typeof values === 'function') { 405 cb = values 406 values = undefined 407 } 408 const response = promisify(this.Promise, cb) 409 cb = response.callback 410 411 this.connect((err, client) => { 412 if (err) { 413 return cb(err) 414 } 415 416 let clientReleased = false 417 const onError = (err) => { 418 if (clientReleased) { 419 return 420 } 421 clientReleased = true 422 client.release(err) 423 cb(err) 424 } 425 426 client.once('error', onError) 427 this.log('dispatching query') 428 try { 429 client.query(text, values, (err, res) => { 430 this.log('query dispatched') 431 client.removeListener('error', onError) 432 if (clientReleased) { 433 return 434 } 435 clientReleased = true 436 client.release(err) 437 if (err) { 438 return cb(err) 439 } 440 return cb(undefined, res) 441 }) 442 } catch (err) { 443 client.release(err) 444 return cb(err) 445 } 446 }) 447 return response.result 448 } 449 450 end(cb) { 451 this.log('ending') 452 if (this.ending) { 453 const err = new Error('Called end on pool more than once') 454 return cb ? cb(err) : this.Promise.reject(err) 455 } 456 this.ending = true 457 const promised = promisify(this.Promise, cb) 458 this._endCallback = promised.callback 459 this._pulseQueue() 460 return promised.result 461 } 462 463 get waitingCount() { 464 return this._pendingQueue.length 465 } 466 467 get idleCount() { 468 return this._idle.length 469 } 470 471 get expiredCount() { 472 return this._clients.reduce((acc, client) => acc + (this._expired.has(client) ? 1 : 0), 0) 473 } 474 475 get totalCount() { 476 return this._clients.length 477 } 478 } 479 module.exports = Pool