/ node_modules / pg-pool / index.js
index.js
  1  'use strict'
  2  const EventEmitter = require('events').EventEmitter
  3  
  4  const NOOP = function () {}
  5  
  6  const removeWhere = (list, predicate) => {
  7    const i = list.findIndex(predicate)
  8  
  9    return i === -1 ? undefined : list.splice(i, 1)[0]
 10  }
 11  
 12  class IdleItem {
 13    constructor(client, idleListener, timeoutId) {
 14      this.client = client
 15      this.idleListener = idleListener
 16      this.timeoutId = timeoutId
 17    }
 18  }
 19  
 20  class PendingItem {
 21    constructor(callback) {
 22      this.callback = callback
 23    }
 24  }
 25  
 26  function throwOnDoubleRelease() {
 27    throw new Error('Release called on client which has already been released to the pool.')
 28  }
 29  
 30  function promisify(Promise, callback) {
 31    if (callback) {
 32      return { callback: callback, result: undefined }
 33    }
 34    let rej
 35    let res
 36    const cb = function (err, client) {
 37      err ? rej(err) : res(client)
 38    }
 39    const result = new Promise(function (resolve, reject) {
 40      res = resolve
 41      rej = reject
 42    }).catch((err) => {
 43      // replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the
 44      // application that created the query
 45      Error.captureStackTrace(err)
 46      throw err
 47    })
 48    return { callback: cb, result: result }
 49  }
 50  
 51  function makeIdleListener(pool, client) {
 52    return function idleListener(err) {
 53      err.client = client
 54  
 55      client.removeListener('error', idleListener)
 56      client.on('error', () => {
 57        pool.log('additional client error after disconnection due to error', err)
 58      })
 59      pool._remove(client)
 60      // TODO - document that once the pool emits an error
 61      // the client has already been closed & purged and is unusable
 62      pool.emit('error', err, client)
 63    }
 64  }
 65  
 66  class Pool extends EventEmitter {
 67    constructor(options, Client) {
 68      super()
 69      this.options = Object.assign({}, options)
 70  
 71      if (options != null && 'password' in options) {
 72        // "hiding" the password so it doesn't show up in stack traces
 73        // or if the client is console.logged
 74        Object.defineProperty(this.options, 'password', {
 75          configurable: true,
 76          enumerable: false,
 77          writable: true,
 78          value: options.password,
 79        })
 80      }
 81      if (options != null && options.ssl && options.ssl.key) {
 82        // "hiding" the ssl->key so it doesn't show up in stack traces
 83        // or if the client is console.logged
 84        Object.defineProperty(this.options.ssl, 'key', {
 85          enumerable: false,
 86        })
 87      }
 88  
 89      this.options.max = this.options.max || this.options.poolSize || 10
 90      this.options.min = this.options.min || 0
 91      this.options.maxUses = this.options.maxUses || Infinity
 92      this.options.allowExitOnIdle = this.options.allowExitOnIdle || false
 93      this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0
 94      this.log = this.options.log || function () {}
 95      this.Client = this.options.Client || Client || require('pg').Client
 96      this.Promise = this.options.Promise || global.Promise
 97  
 98      if (typeof this.options.idleTimeoutMillis === 'undefined') {
 99        this.options.idleTimeoutMillis = 10000
100      }
101  
102      this._clients = []
103      this._idle = []
104      this._expired = new WeakSet()
105      this._pendingQueue = []
106      this._endCallback = undefined
107      this.ending = false
108      this.ended = false
109    }
110  
111    _isFull() {
112      return this._clients.length >= this.options.max
113    }
114  
115    _isAboveMin() {
116      return this._clients.length > this.options.min
117    }
118  
119    _pulseQueue() {
120      this.log('pulse queue')
121      if (this.ended) {
122        this.log('pulse queue ended')
123        return
124      }
125      if (this.ending) {
126        this.log('pulse queue on ending')
127        if (this._idle.length) {
128          this._idle.slice().map((item) => {
129            this._remove(item.client)
130          })
131        }
132        if (!this._clients.length) {
133          this.ended = true
134          this._endCallback()
135        }
136        return
137      }
138  
139      // if we don't have any waiting, do nothing
140      if (!this._pendingQueue.length) {
141        this.log('no queued requests')
142        return
143      }
144      // if we don't have any idle clients and we have no more room do nothing
145      if (!this._idle.length && this._isFull()) {
146        return
147      }
148      const pendingItem = this._pendingQueue.shift()
149      if (this._idle.length) {
150        const idleItem = this._idle.pop()
151        clearTimeout(idleItem.timeoutId)
152        const client = idleItem.client
153        client.ref && client.ref()
154        const idleListener = idleItem.idleListener
155  
156        return this._acquireClient(client, pendingItem, idleListener, false)
157      }
158      if (!this._isFull()) {
159        return this.newClient(pendingItem)
160      }
161      throw new Error('unexpected condition')
162    }
163  
164    _remove(client, callback) {
165      const removed = removeWhere(this._idle, (item) => item.client === client)
166  
167      if (removed !== undefined) {
168        clearTimeout(removed.timeoutId)
169      }
170  
171      this._clients = this._clients.filter((c) => c !== client)
172      const context = this
173      client.end(() => {
174        context.emit('remove', client)
175  
176        if (typeof callback === 'function') {
177          callback()
178        }
179      })
180    }
181  
182    connect(cb) {
183      if (this.ending) {
184        const err = new Error('Cannot use a pool after calling end on the pool')
185        return cb ? cb(err) : this.Promise.reject(err)
186      }
187  
188      const response = promisify(this.Promise, cb)
189      const result = response.result
190  
191      // if we don't have to connect a new client, don't do so
192      if (this._isFull() || this._idle.length) {
193        // if we have idle clients schedule a pulse immediately
194        if (this._idle.length) {
195          process.nextTick(() => this._pulseQueue())
196        }
197  
198        if (!this.options.connectionTimeoutMillis) {
199          this._pendingQueue.push(new PendingItem(response.callback))
200          return result
201        }
202  
203        const queueCallback = (err, res, done) => {
204          clearTimeout(tid)
205          response.callback(err, res, done)
206        }
207  
208        const pendingItem = new PendingItem(queueCallback)
209  
210        // set connection timeout on checking out an existing client
211        const tid = setTimeout(() => {
212          // remove the callback from pending waiters because
213          // we're going to call it with a timeout error
214          removeWhere(this._pendingQueue, (i) => i.callback === queueCallback)
215          pendingItem.timedOut = true
216          response.callback(new Error('timeout exceeded when trying to connect'))
217        }, this.options.connectionTimeoutMillis)
218  
219        if (tid.unref) {
220          tid.unref()
221        }
222  
223        this._pendingQueue.push(pendingItem)
224        return result
225      }
226  
227      this.newClient(new PendingItem(response.callback))
228  
229      return result
230    }
231  
232    newClient(pendingItem) {
233      const client = new this.Client(this.options)
234      this._clients.push(client)
235      const idleListener = makeIdleListener(this, client)
236  
237      this.log('checking client timeout')
238  
239      // connection timeout logic
240      let tid
241      let timeoutHit = false
242      if (this.options.connectionTimeoutMillis) {
243        tid = setTimeout(() => {
244          this.log('ending client due to timeout')
245          timeoutHit = true
246          // force kill the node driver, and let libpq do its teardown
247          client.connection ? client.connection.stream.destroy() : client.end()
248        }, this.options.connectionTimeoutMillis)
249      }
250  
251      this.log('connecting new client')
252      client.connect((err) => {
253        if (tid) {
254          clearTimeout(tid)
255        }
256        client.on('error', idleListener)
257        if (err) {
258          this.log('client failed to connect', err)
259          // remove the dead client from our list of clients
260          this._clients = this._clients.filter((c) => c !== client)
261          if (timeoutHit) {
262            err = new Error('Connection terminated due to connection timeout', { cause: err })
263          }
264  
265          // this client won’t be released, so move on immediately
266          this._pulseQueue()
267  
268          if (!pendingItem.timedOut) {
269            pendingItem.callback(err, undefined, NOOP)
270          }
271        } else {
272          this.log('new client connected')
273  
274          if (this.options.maxLifetimeSeconds !== 0) {
275            const maxLifetimeTimeout = setTimeout(() => {
276              this.log('ending client due to expired lifetime')
277              this._expired.add(client)
278              const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client)
279              if (idleIndex !== -1) {
280                this._acquireClient(
281                  client,
282                  new PendingItem((err, client, clientRelease) => clientRelease()),
283                  idleListener,
284                  false
285                )
286              }
287            }, this.options.maxLifetimeSeconds * 1000)
288  
289            maxLifetimeTimeout.unref()
290            client.once('end', () => clearTimeout(maxLifetimeTimeout))
291          }
292  
293          return this._acquireClient(client, pendingItem, idleListener, true)
294        }
295      })
296    }
297  
298    // acquire a client for a pending work item
299    _acquireClient(client, pendingItem, idleListener, isNew) {
300      if (isNew) {
301        this.emit('connect', client)
302      }
303  
304      this.emit('acquire', client)
305  
306      client.release = this._releaseOnce(client, idleListener)
307  
308      client.removeListener('error', idleListener)
309  
310      if (!pendingItem.timedOut) {
311        if (isNew && this.options.verify) {
312          this.options.verify(client, (err) => {
313            if (err) {
314              client.release(err)
315              return pendingItem.callback(err, undefined, NOOP)
316            }
317  
318            pendingItem.callback(undefined, client, client.release)
319          })
320        } else {
321          pendingItem.callback(undefined, client, client.release)
322        }
323      } else {
324        if (isNew && this.options.verify) {
325          this.options.verify(client, client.release)
326        } else {
327          client.release()
328        }
329      }
330    }
331  
332    // returns a function that wraps _release and throws if called more than once
333    _releaseOnce(client, idleListener) {
334      let released = false
335  
336      return (err) => {
337        if (released) {
338          throwOnDoubleRelease()
339        }
340  
341        released = true
342        this._release(client, idleListener, err)
343      }
344    }
345  
346    // release a client back to the poll, include an error
347    // to remove it from the pool
348    _release(client, idleListener, err) {
349      client.on('error', idleListener)
350  
351      client._poolUseCount = (client._poolUseCount || 0) + 1
352  
353      this.emit('release', err, client)
354  
355      // TODO(bmc): expose a proper, public interface _queryable and _ending
356      if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
357        if (client._poolUseCount >= this.options.maxUses) {
358          this.log('remove expended client')
359        }
360  
361        return this._remove(client, this._pulseQueue.bind(this))
362      }
363  
364      const isExpired = this._expired.has(client)
365      if (isExpired) {
366        this.log('remove expired client')
367        this._expired.delete(client)
368        return this._remove(client, this._pulseQueue.bind(this))
369      }
370  
371      // idle timeout
372      let tid
373      if (this.options.idleTimeoutMillis && this._isAboveMin()) {
374        tid = setTimeout(() => {
375          this.log('remove idle client')
376          this._remove(client, this._pulseQueue.bind(this))
377        }, this.options.idleTimeoutMillis)
378  
379        if (this.options.allowExitOnIdle) {
380          // allow Node to exit if this is all that's left
381          tid.unref()
382        }
383      }
384  
385      if (this.options.allowExitOnIdle) {
386        client.unref()
387      }
388  
389      this._idle.push(new IdleItem(client, idleListener, tid))
390      this._pulseQueue()
391    }
392  
393    query(text, values, cb) {
394      // guard clause against passing a function as the first parameter
395      if (typeof text === 'function') {
396        const response = promisify(this.Promise, text)
397        setImmediate(function () {
398          return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported'))
399        })
400        return response.result
401      }
402  
403      // allow plain text query without values
404      if (typeof values === 'function') {
405        cb = values
406        values = undefined
407      }
408      const response = promisify(this.Promise, cb)
409      cb = response.callback
410  
411      this.connect((err, client) => {
412        if (err) {
413          return cb(err)
414        }
415  
416        let clientReleased = false
417        const onError = (err) => {
418          if (clientReleased) {
419            return
420          }
421          clientReleased = true
422          client.release(err)
423          cb(err)
424        }
425  
426        client.once('error', onError)
427        this.log('dispatching query')
428        try {
429          client.query(text, values, (err, res) => {
430            this.log('query dispatched')
431            client.removeListener('error', onError)
432            if (clientReleased) {
433              return
434            }
435            clientReleased = true
436            client.release(err)
437            if (err) {
438              return cb(err)
439            }
440            return cb(undefined, res)
441          })
442        } catch (err) {
443          client.release(err)
444          return cb(err)
445        }
446      })
447      return response.result
448    }
449  
450    end(cb) {
451      this.log('ending')
452      if (this.ending) {
453        const err = new Error('Called end on pool more than once')
454        return cb ? cb(err) : this.Promise.reject(err)
455      }
456      this.ending = true
457      const promised = promisify(this.Promise, cb)
458      this._endCallback = promised.callback
459      this._pulseQueue()
460      return promised.result
461    }
462  
463    get waitingCount() {
464      return this._pendingQueue.length
465    }
466  
467    get idleCount() {
468      return this._idle.length
469    }
470  
471    get expiredCount() {
472      return this._clients.reduce((acc, client) => acc + (this._expired.has(client) ? 1 : 0), 0)
473    }
474  
475    get totalCount() {
476      return this._clients.length
477    }
478  }
479  module.exports = Pool