serializer.ts
1 import { Writer } from './buffer-writer' 2 3 const enum code { 4 startup = 0x70, 5 query = 0x51, 6 parse = 0x50, 7 bind = 0x42, 8 execute = 0x45, 9 flush = 0x48, 10 sync = 0x53, 11 end = 0x58, 12 close = 0x43, 13 describe = 0x44, 14 copyFromChunk = 0x64, 15 copyDone = 0x63, 16 copyFail = 0x66, 17 } 18 19 const writer = new Writer() 20 21 const startup = (opts: Record<string, string>): Buffer => { 22 // protocol version 23 writer.addInt16(3).addInt16(0) 24 for (const key of Object.keys(opts)) { 25 writer.addCString(key).addCString(opts[key]) 26 } 27 28 writer.addCString('client_encoding').addCString('UTF8') 29 30 const bodyBuffer = writer.addCString('').flush() 31 // this message is sent without a code 32 33 const length = bodyBuffer.length + 4 34 35 return new Writer().addInt32(length).add(bodyBuffer).flush() 36 } 37 38 const requestSsl = (): Buffer => { 39 const response = Buffer.allocUnsafe(8) 40 response.writeInt32BE(8, 0) 41 response.writeInt32BE(80877103, 4) 42 return response 43 } 44 45 const password = (password: string): Buffer => { 46 return writer.addCString(password).flush(code.startup) 47 } 48 49 const sendSASLInitialResponseMessage = function (mechanism: string, initialResponse: string): Buffer { 50 // 0x70 = 'p' 51 writer.addCString(mechanism).addInt32(Buffer.byteLength(initialResponse)).addString(initialResponse) 52 53 return writer.flush(code.startup) 54 } 55 56 const sendSCRAMClientFinalMessage = function (additionalData: string): Buffer { 57 return writer.addString(additionalData).flush(code.startup) 58 } 59 60 const query = (text: string): Buffer => { 61 return writer.addCString(text).flush(code.query) 62 } 63 64 type ParseOpts = { 65 name?: string 66 types?: number[] 67 text: string 68 } 69 70 const emptyArray: any[] = [] 71 72 const parse = (query: ParseOpts): Buffer => { 73 // expect something like this: 74 // { name: 'queryName', 75 // text: 'select * from blah', 76 // types: ['int8', 'bool'] } 77 78 // normalize missing query names to allow for null 79 const name = query.name || '' 80 if (name.length > 63) { 81 console.error('Warning! Postgres only supports 63 characters for query names.') 82 console.error('You supplied %s (%s)', name, name.length) 83 console.error('This can cause conflicts and silent errors executing queries') 84 } 85 86 const types = query.types || emptyArray 87 88 const len = types.length 89 90 const buffer = writer 91 .addCString(name) // name of query 92 .addCString(query.text) // actual query text 93 .addInt16(len) 94 95 for (let i = 0; i < len; i++) { 96 buffer.addInt32(types[i]) 97 } 98 99 return writer.flush(code.parse) 100 } 101 102 type ValueMapper = (param: any, index: number) => any 103 104 type BindOpts = { 105 portal?: string 106 binary?: boolean 107 statement?: string 108 values?: any[] 109 // optional map from JS value to postgres value per parameter 110 valueMapper?: ValueMapper 111 } 112 113 const paramWriter = new Writer() 114 115 // make this a const enum so typescript will inline the value 116 const enum ParamType { 117 STRING = 0, 118 BINARY = 1, 119 } 120 121 const writeValues = function (values: any[], valueMapper?: ValueMapper): void { 122 for (let i = 0; i < values.length; i++) { 123 const mappedVal = valueMapper ? valueMapper(values[i], i) : values[i] 124 if (mappedVal == null) { 125 // add the param type (string) to the writer 126 writer.addInt16(ParamType.STRING) 127 // write -1 to the param writer to indicate null 128 paramWriter.addInt32(-1) 129 } else if (mappedVal instanceof Buffer) { 130 // add the param type (binary) to the writer 131 writer.addInt16(ParamType.BINARY) 132 // add the buffer to the param writer 133 paramWriter.addInt32(mappedVal.length) 134 paramWriter.add(mappedVal) 135 } else { 136 // add the param type (string) to the writer 137 writer.addInt16(ParamType.STRING) 138 paramWriter.addInt32(Buffer.byteLength(mappedVal)) 139 paramWriter.addString(mappedVal) 140 } 141 } 142 } 143 144 const bind = (config: BindOpts = {}): Buffer => { 145 // normalize config 146 const portal = config.portal || '' 147 const statement = config.statement || '' 148 const binary = config.binary || false 149 const values = config.values || emptyArray 150 const len = values.length 151 152 writer.addCString(portal).addCString(statement) 153 writer.addInt16(len) 154 155 writeValues(values, config.valueMapper) 156 157 writer.addInt16(len) 158 writer.add(paramWriter.flush()) 159 160 // all results use the same format code 161 writer.addInt16(1) 162 // format code 163 writer.addInt16(binary ? ParamType.BINARY : ParamType.STRING) 164 return writer.flush(code.bind) 165 } 166 167 type ExecOpts = { 168 portal?: string 169 rows?: number 170 } 171 172 const emptyExecute = Buffer.from([code.execute, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00]) 173 174 const execute = (config?: ExecOpts): Buffer => { 175 // this is the happy path for most queries 176 if (!config || (!config.portal && !config.rows)) { 177 return emptyExecute 178 } 179 180 const portal = config.portal || '' 181 const rows = config.rows || 0 182 183 const portalLength = Buffer.byteLength(portal) 184 const len = 4 + portalLength + 1 + 4 185 // one extra bit for code 186 const buff = Buffer.allocUnsafe(1 + len) 187 buff[0] = code.execute 188 buff.writeInt32BE(len, 1) 189 buff.write(portal, 5, 'utf-8') 190 buff[portalLength + 5] = 0 // null terminate portal cString 191 buff.writeUInt32BE(rows, buff.length - 4) 192 return buff 193 } 194 195 const cancel = (processID: number, secretKey: number): Buffer => { 196 const buffer = Buffer.allocUnsafe(16) 197 buffer.writeInt32BE(16, 0) 198 buffer.writeInt16BE(1234, 4) 199 buffer.writeInt16BE(5678, 6) 200 buffer.writeInt32BE(processID, 8) 201 buffer.writeInt32BE(secretKey, 12) 202 return buffer 203 } 204 205 type PortalOpts = { 206 type: 'S' | 'P' 207 name?: string 208 } 209 210 const cstringMessage = (code: code, string: string): Buffer => { 211 const stringLen = Buffer.byteLength(string) 212 const len = 4 + stringLen + 1 213 // one extra bit for code 214 const buffer = Buffer.allocUnsafe(1 + len) 215 buffer[0] = code 216 buffer.writeInt32BE(len, 1) 217 buffer.write(string, 5, 'utf-8') 218 buffer[len] = 0 // null terminate cString 219 return buffer 220 } 221 222 const emptyDescribePortal = writer.addCString('P').flush(code.describe) 223 const emptyDescribeStatement = writer.addCString('S').flush(code.describe) 224 225 const describe = (msg: PortalOpts): Buffer => { 226 return msg.name 227 ? cstringMessage(code.describe, `${msg.type}${msg.name || ''}`) 228 : msg.type === 'P' 229 ? emptyDescribePortal 230 : emptyDescribeStatement 231 } 232 233 const close = (msg: PortalOpts): Buffer => { 234 const text = `${msg.type}${msg.name || ''}` 235 return cstringMessage(code.close, text) 236 } 237 238 const copyData = (chunk: Buffer): Buffer => { 239 return writer.add(chunk).flush(code.copyFromChunk) 240 } 241 242 const copyFail = (message: string): Buffer => { 243 return cstringMessage(code.copyFail, message) 244 } 245 246 const codeOnlyBuffer = (code: code): Buffer => Buffer.from([code, 0x00, 0x00, 0x00, 0x04]) 247 248 const flushBuffer = codeOnlyBuffer(code.flush) 249 const syncBuffer = codeOnlyBuffer(code.sync) 250 const endBuffer = codeOnlyBuffer(code.end) 251 const copyDoneBuffer = codeOnlyBuffer(code.copyDone) 252 253 const serialize = { 254 startup, 255 password, 256 requestSsl, 257 sendSASLInitialResponseMessage, 258 sendSCRAMClientFinalMessage, 259 query, 260 parse, 261 bind, 262 execute, 263 describe, 264 close, 265 flush: () => flushBuffer, 266 sync: () => syncBuffer, 267 end: () => endBuffer, 268 copyData, 269 copyDone: () => copyDoneBuffer, 270 copyFail, 271 cancel, 272 } 273 274 export { serialize }