parser.ts
1 import { TransformOptions } from 'stream' 2 import { 3 Mode, 4 bindComplete, 5 parseComplete, 6 closeComplete, 7 noData, 8 portalSuspended, 9 copyDone, 10 replicationStart, 11 emptyQuery, 12 ReadyForQueryMessage, 13 CommandCompleteMessage, 14 CopyDataMessage, 15 CopyResponse, 16 NotificationResponseMessage, 17 RowDescriptionMessage, 18 ParameterDescriptionMessage, 19 Field, 20 DataRowMessage, 21 ParameterStatusMessage, 22 BackendKeyDataMessage, 23 DatabaseError, 24 BackendMessage, 25 MessageName, 26 AuthenticationMD5Password, 27 NoticeMessage, 28 } from './messages' 29 import { BufferReader } from './buffer-reader' 30 31 // every message is prefixed with a single bye 32 const CODE_LENGTH = 1 33 // every message has an int32 length which includes itself but does 34 // NOT include the code in the length 35 const LEN_LENGTH = 4 36 37 const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH 38 39 export type Packet = { 40 code: number 41 packet: Buffer 42 } 43 44 const emptyBuffer = Buffer.allocUnsafe(0) 45 46 type StreamOptions = TransformOptions & { 47 mode: Mode 48 } 49 50 const enum MessageCodes { 51 DataRow = 0x44, // D 52 ParseComplete = 0x31, // 1 53 BindComplete = 0x32, // 2 54 CloseComplete = 0x33, // 3 55 CommandComplete = 0x43, // C 56 ReadyForQuery = 0x5a, // Z 57 NoData = 0x6e, // n 58 NotificationResponse = 0x41, // A 59 AuthenticationResponse = 0x52, // R 60 ParameterStatus = 0x53, // S 61 BackendKeyData = 0x4b, // K 62 ErrorMessage = 0x45, // E 63 NoticeMessage = 0x4e, // N 64 RowDescriptionMessage = 0x54, // T 65 ParameterDescriptionMessage = 0x74, // t 66 PortalSuspended = 0x73, // s 67 ReplicationStart = 0x57, // W 68 EmptyQuery = 0x49, // I 69 CopyIn = 0x47, // G 70 CopyOut = 0x48, // H 71 CopyDone = 0x63, // c 72 CopyData = 0x64, // d 73 } 74 75 export type MessageCallback = (msg: BackendMessage) => void 76 77 export class Parser { 78 private buffer: Buffer = emptyBuffer 79 private bufferLength: number = 0 80 private bufferOffset: number = 0 81 private reader = new BufferReader() 82 private mode: Mode 83 84 constructor(opts?: StreamOptions) { 85 if (opts?.mode === 'binary') { 86 throw new Error('Binary mode not supported yet') 87 } 88 this.mode = opts?.mode || 'text' 89 } 90 91 public parse(buffer: Buffer, callback: MessageCallback) { 92 this.mergeBuffer(buffer) 93 const bufferFullLength = this.bufferOffset + this.bufferLength 94 let offset = this.bufferOffset 95 while (offset + HEADER_LENGTH <= bufferFullLength) { 96 // code is 1 byte long - it identifies the message type 97 const code = this.buffer[offset] 98 // length is 1 Uint32BE - it is the length of the message EXCLUDING the code 99 const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) 100 const fullMessageLength = CODE_LENGTH + length 101 if (fullMessageLength + offset <= bufferFullLength) { 102 const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) 103 callback(message) 104 offset += fullMessageLength 105 } else { 106 break 107 } 108 } 109 if (offset === bufferFullLength) { 110 // No more use for the buffer 111 this.buffer = emptyBuffer 112 this.bufferLength = 0 113 this.bufferOffset = 0 114 } else { 115 // Adjust the cursors of remainingBuffer 116 this.bufferLength = bufferFullLength - offset 117 this.bufferOffset = offset 118 } 119 } 120 121 private mergeBuffer(buffer: Buffer): void { 122 if (this.bufferLength > 0) { 123 const newLength = this.bufferLength + buffer.byteLength 124 const newFullLength = newLength + this.bufferOffset 125 if (newFullLength > this.buffer.byteLength) { 126 // We can't concat the new buffer with the remaining one 127 let newBuffer: Buffer 128 if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { 129 // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer 130 newBuffer = this.buffer 131 } else { 132 // Allocate a new larger buffer 133 let newBufferLength = this.buffer.byteLength * 2 134 while (newLength >= newBufferLength) { 135 newBufferLength *= 2 136 } 137 newBuffer = Buffer.allocUnsafe(newBufferLength) 138 } 139 // Move the remaining buffer to the new one 140 this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) 141 this.buffer = newBuffer 142 this.bufferOffset = 0 143 } 144 // Concat the new buffer with the remaining one 145 buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) 146 this.bufferLength = newLength 147 } else { 148 this.buffer = buffer 149 this.bufferOffset = 0 150 this.bufferLength = buffer.byteLength 151 } 152 } 153 154 private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage { 155 switch (code) { 156 case MessageCodes.BindComplete: 157 return bindComplete 158 case MessageCodes.ParseComplete: 159 return parseComplete 160 case MessageCodes.CloseComplete: 161 return closeComplete 162 case MessageCodes.NoData: 163 return noData 164 case MessageCodes.PortalSuspended: 165 return portalSuspended 166 case MessageCodes.CopyDone: 167 return copyDone 168 case MessageCodes.ReplicationStart: 169 return replicationStart 170 case MessageCodes.EmptyQuery: 171 return emptyQuery 172 case MessageCodes.DataRow: 173 return this.parseDataRowMessage(offset, length, bytes) 174 case MessageCodes.CommandComplete: 175 return this.parseCommandCompleteMessage(offset, length, bytes) 176 case MessageCodes.ReadyForQuery: 177 return this.parseReadyForQueryMessage(offset, length, bytes) 178 case MessageCodes.NotificationResponse: 179 return this.parseNotificationMessage(offset, length, bytes) 180 case MessageCodes.AuthenticationResponse: 181 return this.parseAuthenticationResponse(offset, length, bytes) 182 case MessageCodes.ParameterStatus: 183 return this.parseParameterStatusMessage(offset, length, bytes) 184 case MessageCodes.BackendKeyData: 185 return this.parseBackendKeyData(offset, length, bytes) 186 case MessageCodes.ErrorMessage: 187 return this.parseErrorMessage(offset, length, bytes, 'error') 188 case MessageCodes.NoticeMessage: 189 return this.parseErrorMessage(offset, length, bytes, 'notice') 190 case MessageCodes.RowDescriptionMessage: 191 return this.parseRowDescriptionMessage(offset, length, bytes) 192 case MessageCodes.ParameterDescriptionMessage: 193 return this.parseParameterDescriptionMessage(offset, length, bytes) 194 case MessageCodes.CopyIn: 195 return this.parseCopyInMessage(offset, length, bytes) 196 case MessageCodes.CopyOut: 197 return this.parseCopyOutMessage(offset, length, bytes) 198 case MessageCodes.CopyData: 199 return this.parseCopyData(offset, length, bytes) 200 default: 201 return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error') 202 } 203 } 204 205 private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) { 206 this.reader.setBuffer(offset, bytes) 207 const status = this.reader.string(1) 208 return new ReadyForQueryMessage(length, status) 209 } 210 211 private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) { 212 this.reader.setBuffer(offset, bytes) 213 const text = this.reader.cstring() 214 return new CommandCompleteMessage(length, text) 215 } 216 217 private parseCopyData(offset: number, length: number, bytes: Buffer) { 218 const chunk = bytes.slice(offset, offset + (length - 4)) 219 return new CopyDataMessage(length, chunk) 220 } 221 222 private parseCopyInMessage(offset: number, length: number, bytes: Buffer) { 223 return this.parseCopyMessage(offset, length, bytes, 'copyInResponse') 224 } 225 226 private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) { 227 return this.parseCopyMessage(offset, length, bytes, 'copyOutResponse') 228 } 229 230 private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) { 231 this.reader.setBuffer(offset, bytes) 232 const isBinary = this.reader.byte() !== 0 233 const columnCount = this.reader.int16() 234 const message = new CopyResponse(length, messageName, isBinary, columnCount) 235 for (let i = 0; i < columnCount; i++) { 236 message.columnTypes[i] = this.reader.int16() 237 } 238 return message 239 } 240 241 private parseNotificationMessage(offset: number, length: number, bytes: Buffer) { 242 this.reader.setBuffer(offset, bytes) 243 const processId = this.reader.int32() 244 const channel = this.reader.cstring() 245 const payload = this.reader.cstring() 246 return new NotificationResponseMessage(length, processId, channel, payload) 247 } 248 249 private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) { 250 this.reader.setBuffer(offset, bytes) 251 const fieldCount = this.reader.int16() 252 const message = new RowDescriptionMessage(length, fieldCount) 253 for (let i = 0; i < fieldCount; i++) { 254 message.fields[i] = this.parseField() 255 } 256 return message 257 } 258 259 private parseField(): Field { 260 const name = this.reader.cstring() 261 const tableID = this.reader.uint32() 262 const columnID = this.reader.int16() 263 const dataTypeID = this.reader.uint32() 264 const dataTypeSize = this.reader.int16() 265 const dataTypeModifier = this.reader.int32() 266 const mode = this.reader.int16() === 0 ? 'text' : 'binary' 267 return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode) 268 } 269 270 private parseParameterDescriptionMessage(offset: number, length: number, bytes: Buffer) { 271 this.reader.setBuffer(offset, bytes) 272 const parameterCount = this.reader.int16() 273 const message = new ParameterDescriptionMessage(length, parameterCount) 274 for (let i = 0; i < parameterCount; i++) { 275 message.dataTypeIDs[i] = this.reader.int32() 276 } 277 return message 278 } 279 280 private parseDataRowMessage(offset: number, length: number, bytes: Buffer) { 281 this.reader.setBuffer(offset, bytes) 282 const fieldCount = this.reader.int16() 283 const fields: any[] = new Array(fieldCount) 284 for (let i = 0; i < fieldCount; i++) { 285 const len = this.reader.int32() 286 // a -1 for length means the value of the field is null 287 fields[i] = len === -1 ? null : this.reader.string(len) 288 } 289 return new DataRowMessage(length, fields) 290 } 291 292 private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) { 293 this.reader.setBuffer(offset, bytes) 294 const name = this.reader.cstring() 295 const value = this.reader.cstring() 296 return new ParameterStatusMessage(length, name, value) 297 } 298 299 private parseBackendKeyData(offset: number, length: number, bytes: Buffer) { 300 this.reader.setBuffer(offset, bytes) 301 const processID = this.reader.int32() 302 const secretKey = this.reader.int32() 303 return new BackendKeyDataMessage(length, processID, secretKey) 304 } 305 306 public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) { 307 this.reader.setBuffer(offset, bytes) 308 const code = this.reader.int32() 309 // TODO(bmc): maybe better types here 310 const message: BackendMessage & any = { 311 name: 'authenticationOk', 312 length, 313 } 314 315 switch (code) { 316 case 0: // AuthenticationOk 317 break 318 case 3: // AuthenticationCleartextPassword 319 if (message.length === 8) { 320 message.name = 'authenticationCleartextPassword' 321 } 322 break 323 case 5: // AuthenticationMD5Password 324 if (message.length === 12) { 325 message.name = 'authenticationMD5Password' 326 const salt = this.reader.bytes(4) 327 return new AuthenticationMD5Password(length, salt) 328 } 329 break 330 case 10: // AuthenticationSASL 331 { 332 message.name = 'authenticationSASL' 333 message.mechanisms = [] 334 let mechanism: string 335 do { 336 mechanism = this.reader.cstring() 337 if (mechanism) { 338 message.mechanisms.push(mechanism) 339 } 340 } while (mechanism) 341 } 342 break 343 case 11: // AuthenticationSASLContinue 344 message.name = 'authenticationSASLContinue' 345 message.data = this.reader.string(length - 8) 346 break 347 case 12: // AuthenticationSASLFinal 348 message.name = 'authenticationSASLFinal' 349 message.data = this.reader.string(length - 8) 350 break 351 default: 352 throw new Error('Unknown authenticationOk message type ' + code) 353 } 354 return message 355 } 356 357 private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) { 358 this.reader.setBuffer(offset, bytes) 359 const fields: Record<string, string> = {} 360 let fieldType = this.reader.string(1) 361 while (fieldType !== '\0') { 362 fields[fieldType] = this.reader.cstring() 363 fieldType = this.reader.string(1) 364 } 365 366 const messageValue = fields.M 367 368 const message = 369 name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name) 370 371 message.severity = fields.S 372 message.code = fields.C 373 message.detail = fields.D 374 message.hint = fields.H 375 message.position = fields.P 376 message.internalPosition = fields.p 377 message.internalQuery = fields.q 378 message.where = fields.W 379 message.schema = fields.s 380 message.table = fields.t 381 message.column = fields.c 382 message.dataType = fields.d 383 message.constraint = fields.n 384 message.file = fields.F 385 message.line = fields.L 386 message.routine = fields.R 387 return message 388 } 389 }