rpc_connection.cpp
1 #include "rpc_connection.h" 2 #include "serialization.h" 3 4 #include <atomic> 5 6 static const int RpcVersion = 1; 7 static RpcConnection Instance; 8 9 /*static*/ RpcConnection* RpcConnection::Create(const char* applicationId) 10 { 11 Instance.connection = BaseConnection::Create(); 12 StringCopy(Instance.appId, applicationId); 13 return &Instance; 14 } 15 16 /*static*/ void RpcConnection::Destroy(RpcConnection*& c) 17 { 18 c->Close(); 19 BaseConnection::Destroy(c->connection); 20 c = nullptr; 21 } 22 23 void RpcConnection::Open() 24 { 25 if (state == State::Connected) { 26 return; 27 } 28 29 if (state == State::Disconnected && !connection->Open()) { 30 return; 31 } 32 33 if (state == State::SentHandshake) { 34 JsonDocument message; 35 if (Read(message)) { 36 auto cmd = GetStrMember(&message, "cmd"); 37 auto evt = GetStrMember(&message, "evt"); 38 if (cmd && evt && !strcmp(cmd, "DISPATCH") && !strcmp(evt, "READY")) { 39 state = State::Connected; 40 if (onConnect) { 41 onConnect(message); 42 } 43 } 44 } 45 } 46 else { 47 sendFrame.opcode = Opcode::Handshake; 48 sendFrame.length = (uint32_t)JsonWriteHandshakeObj( 49 sendFrame.message, sizeof(sendFrame.message), RpcVersion, appId); 50 51 if (connection->Write(&sendFrame, sizeof(MessageFrameHeader) + sendFrame.length)) { 52 state = State::SentHandshake; 53 } 54 else { 55 Close(); 56 } 57 } 58 } 59 60 void RpcConnection::Close() 61 { 62 if (onDisconnect && (state == State::Connected || state == State::SentHandshake)) { 63 onDisconnect(lastErrorCode, lastErrorMessage); 64 } 65 connection->Close(); 66 state = State::Disconnected; 67 } 68 69 bool RpcConnection::Write(const void* data, size_t length) 70 { 71 sendFrame.opcode = Opcode::Frame; 72 memcpy(sendFrame.message, data, length); 73 sendFrame.length = (uint32_t)length; 74 if (!connection->Write(&sendFrame, sizeof(MessageFrameHeader) + length)) { 75 Close(); 76 return false; 77 } 78 return true; 79 } 80 81 bool RpcConnection::Read(JsonDocument& message) 82 { 83 if (state != State::Connected && state != State::SentHandshake) { 84 return false; 85 } 86 MessageFrame readFrame; 87 for (;;) { 88 bool didRead = connection->Read(&readFrame, sizeof(MessageFrameHeader)); 89 if (!didRead) { 90 if (!connection->isOpen) { 91 lastErrorCode = (int)ErrorCode::PipeClosed; 92 StringCopy(lastErrorMessage, "Pipe closed"); 93 Close(); 94 } 95 return false; 96 } 97 98 if (readFrame.length > 0) { 99 didRead = connection->Read(readFrame.message, readFrame.length); 100 if (!didRead) { 101 lastErrorCode = (int)ErrorCode::ReadCorrupt; 102 StringCopy(lastErrorMessage, "Partial data in frame"); 103 Close(); 104 return false; 105 } 106 readFrame.message[readFrame.length] = 0; 107 } 108 109 switch (readFrame.opcode) { 110 case Opcode::Close: { 111 message.ParseInsitu(readFrame.message); 112 lastErrorCode = GetIntMember(&message, "code"); 113 StringCopy(lastErrorMessage, GetStrMember(&message, "message", "")); 114 Close(); 115 return false; 116 } 117 case Opcode::Frame: 118 message.ParseInsitu(readFrame.message); 119 return true; 120 case Opcode::Ping: 121 readFrame.opcode = Opcode::Pong; 122 if (!connection->Write(&readFrame, sizeof(MessageFrameHeader) + readFrame.length)) { 123 Close(); 124 } 125 break; 126 case Opcode::Pong: 127 break; 128 case Opcode::Handshake: 129 default: 130 // something bad happened 131 lastErrorCode = (int)ErrorCode::ReadCorrupt; 132 StringCopy(lastErrorMessage, "Bad ipc frame"); 133 Close(); 134 return false; 135 } 136 } 137 }