/ node_modules / pg-protocol / src / serializer.ts
serializer.ts
  1  import { Writer } from './buffer-writer'
  2  
  3  const enum code {
  4    startup = 0x70,
  5    query = 0x51,
  6    parse = 0x50,
  7    bind = 0x42,
  8    execute = 0x45,
  9    flush = 0x48,
 10    sync = 0x53,
 11    end = 0x58,
 12    close = 0x43,
 13    describe = 0x44,
 14    copyFromChunk = 0x64,
 15    copyDone = 0x63,
 16    copyFail = 0x66,
 17  }
 18  
 19  const writer = new Writer()
 20  
 21  const startup = (opts: Record<string, string>): Buffer => {
 22    // protocol version
 23    writer.addInt16(3).addInt16(0)
 24    for (const key of Object.keys(opts)) {
 25      writer.addCString(key).addCString(opts[key])
 26    }
 27  
 28    writer.addCString('client_encoding').addCString('UTF8')
 29  
 30    const bodyBuffer = writer.addCString('').flush()
 31    // this message is sent without a code
 32  
 33    const length = bodyBuffer.length + 4
 34  
 35    return new Writer().addInt32(length).add(bodyBuffer).flush()
 36  }
 37  
 38  const requestSsl = (): Buffer => {
 39    const response = Buffer.allocUnsafe(8)
 40    response.writeInt32BE(8, 0)
 41    response.writeInt32BE(80877103, 4)
 42    return response
 43  }
 44  
 45  const password = (password: string): Buffer => {
 46    return writer.addCString(password).flush(code.startup)
 47  }
 48  
 49  const sendSASLInitialResponseMessage = function (mechanism: string, initialResponse: string): Buffer {
 50    // 0x70 = 'p'
 51    writer.addCString(mechanism).addInt32(Buffer.byteLength(initialResponse)).addString(initialResponse)
 52  
 53    return writer.flush(code.startup)
 54  }
 55  
 56  const sendSCRAMClientFinalMessage = function (additionalData: string): Buffer {
 57    return writer.addString(additionalData).flush(code.startup)
 58  }
 59  
 60  const query = (text: string): Buffer => {
 61    return writer.addCString(text).flush(code.query)
 62  }
 63  
 64  type ParseOpts = {
 65    name?: string
 66    types?: number[]
 67    text: string
 68  }
 69  
 70  const emptyArray: any[] = []
 71  
 72  const parse = (query: ParseOpts): Buffer => {
 73    // expect something like this:
 74    // { name: 'queryName',
 75    //   text: 'select * from blah',
 76    //   types: ['int8', 'bool'] }
 77  
 78    // normalize missing query names to allow for null
 79    const name = query.name || ''
 80    if (name.length > 63) {
 81      console.error('Warning! Postgres only supports 63 characters for query names.')
 82      console.error('You supplied %s (%s)', name, name.length)
 83      console.error('This can cause conflicts and silent errors executing queries')
 84    }
 85  
 86    const types = query.types || emptyArray
 87  
 88    const len = types.length
 89  
 90    const buffer = writer
 91      .addCString(name) // name of query
 92      .addCString(query.text) // actual query text
 93      .addInt16(len)
 94  
 95    for (let i = 0; i < len; i++) {
 96      buffer.addInt32(types[i])
 97    }
 98  
 99    return writer.flush(code.parse)
100  }
101  
102  type ValueMapper = (param: any, index: number) => any
103  
104  type BindOpts = {
105    portal?: string
106    binary?: boolean
107    statement?: string
108    values?: any[]
109    // optional map from JS value to postgres value per parameter
110    valueMapper?: ValueMapper
111  }
112  
113  const paramWriter = new Writer()
114  
115  // make this a const enum so typescript will inline the value
116  const enum ParamType {
117    STRING = 0,
118    BINARY = 1,
119  }
120  
121  const writeValues = function (values: any[], valueMapper?: ValueMapper): void {
122    for (let i = 0; i < values.length; i++) {
123      const mappedVal = valueMapper ? valueMapper(values[i], i) : values[i]
124      if (mappedVal == null) {
125        // add the param type (string) to the writer
126        writer.addInt16(ParamType.STRING)
127        // write -1 to the param writer to indicate null
128        paramWriter.addInt32(-1)
129      } else if (mappedVal instanceof Buffer) {
130        // add the param type (binary) to the writer
131        writer.addInt16(ParamType.BINARY)
132        // add the buffer to the param writer
133        paramWriter.addInt32(mappedVal.length)
134        paramWriter.add(mappedVal)
135      } else {
136        // add the param type (string) to the writer
137        writer.addInt16(ParamType.STRING)
138        paramWriter.addInt32(Buffer.byteLength(mappedVal))
139        paramWriter.addString(mappedVal)
140      }
141    }
142  }
143  
144  const bind = (config: BindOpts = {}): Buffer => {
145    // normalize config
146    const portal = config.portal || ''
147    const statement = config.statement || ''
148    const binary = config.binary || false
149    const values = config.values || emptyArray
150    const len = values.length
151  
152    writer.addCString(portal).addCString(statement)
153    writer.addInt16(len)
154  
155    writeValues(values, config.valueMapper)
156  
157    writer.addInt16(len)
158    writer.add(paramWriter.flush())
159  
160    // all results use the same format code
161    writer.addInt16(1)
162    // format code
163    writer.addInt16(binary ? ParamType.BINARY : ParamType.STRING)
164    return writer.flush(code.bind)
165  }
166  
167  type ExecOpts = {
168    portal?: string
169    rows?: number
170  }
171  
172  const emptyExecute = Buffer.from([code.execute, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00])
173  
174  const execute = (config?: ExecOpts): Buffer => {
175    // this is the happy path for most queries
176    if (!config || (!config.portal && !config.rows)) {
177      return emptyExecute
178    }
179  
180    const portal = config.portal || ''
181    const rows = config.rows || 0
182  
183    const portalLength = Buffer.byteLength(portal)
184    const len = 4 + portalLength + 1 + 4
185    // one extra bit for code
186    const buff = Buffer.allocUnsafe(1 + len)
187    buff[0] = code.execute
188    buff.writeInt32BE(len, 1)
189    buff.write(portal, 5, 'utf-8')
190    buff[portalLength + 5] = 0 // null terminate portal cString
191    buff.writeUInt32BE(rows, buff.length - 4)
192    return buff
193  }
194  
195  const cancel = (processID: number, secretKey: number): Buffer => {
196    const buffer = Buffer.allocUnsafe(16)
197    buffer.writeInt32BE(16, 0)
198    buffer.writeInt16BE(1234, 4)
199    buffer.writeInt16BE(5678, 6)
200    buffer.writeInt32BE(processID, 8)
201    buffer.writeInt32BE(secretKey, 12)
202    return buffer
203  }
204  
205  type PortalOpts = {
206    type: 'S' | 'P'
207    name?: string
208  }
209  
210  const cstringMessage = (code: code, string: string): Buffer => {
211    const stringLen = Buffer.byteLength(string)
212    const len = 4 + stringLen + 1
213    // one extra bit for code
214    const buffer = Buffer.allocUnsafe(1 + len)
215    buffer[0] = code
216    buffer.writeInt32BE(len, 1)
217    buffer.write(string, 5, 'utf-8')
218    buffer[len] = 0 // null terminate cString
219    return buffer
220  }
221  
222  const emptyDescribePortal = writer.addCString('P').flush(code.describe)
223  const emptyDescribeStatement = writer.addCString('S').flush(code.describe)
224  
225  const describe = (msg: PortalOpts): Buffer => {
226    return msg.name
227      ? cstringMessage(code.describe, `${msg.type}${msg.name || ''}`)
228      : msg.type === 'P'
229      ? emptyDescribePortal
230      : emptyDescribeStatement
231  }
232  
233  const close = (msg: PortalOpts): Buffer => {
234    const text = `${msg.type}${msg.name || ''}`
235    return cstringMessage(code.close, text)
236  }
237  
238  const copyData = (chunk: Buffer): Buffer => {
239    return writer.add(chunk).flush(code.copyFromChunk)
240  }
241  
242  const copyFail = (message: string): Buffer => {
243    return cstringMessage(code.copyFail, message)
244  }
245  
246  const codeOnlyBuffer = (code: code): Buffer => Buffer.from([code, 0x00, 0x00, 0x00, 0x04])
247  
248  const flushBuffer = codeOnlyBuffer(code.flush)
249  const syncBuffer = codeOnlyBuffer(code.sync)
250  const endBuffer = codeOnlyBuffer(code.end)
251  const copyDoneBuffer = codeOnlyBuffer(code.copyDone)
252  
253  const serialize = {
254    startup,
255    password,
256    requestSsl,
257    sendSASLInitialResponseMessage,
258    sendSCRAMClientFinalMessage,
259    query,
260    parse,
261    bind,
262    execute,
263    describe,
264    close,
265    flush: () => flushBuffer,
266    sync: () => syncBuffer,
267    end: () => endBuffer,
268    copyData,
269    copyDone: () => copyDoneBuffer,
270    copyFail,
271    cancel,
272  }
273  
274  export { serialize }