/ src / rpc_connection.cpp
rpc_connection.cpp
  1  #include "rpc_connection.h"
  2  #include "serialization.h"
  3  
  4  #include <atomic>
  5  
  6  static const int RpcVersion = 1;
  7  static RpcConnection Instance;
  8  
  9  /*static*/ RpcConnection* RpcConnection::Create(const char* applicationId)
 10  {
 11      Instance.connection = BaseConnection::Create();
 12      StringCopy(Instance.appId, applicationId);
 13      return &Instance;
 14  }
 15  
 16  /*static*/ void RpcConnection::Destroy(RpcConnection*& c)
 17  {
 18      c->Close();
 19      BaseConnection::Destroy(c->connection);
 20      c = nullptr;
 21  }
 22  
 23  void RpcConnection::Open()
 24  {
 25      if (state == State::Connected) {
 26          return;
 27      }
 28  
 29      if (state == State::Disconnected && !connection->Open()) {
 30          return;
 31      }
 32  
 33      if (state == State::SentHandshake) {
 34          JsonDocument message;
 35          if (Read(message)) {
 36              auto cmd = GetStrMember(&message, "cmd");
 37              auto evt = GetStrMember(&message, "evt");
 38              if (cmd && evt && !strcmp(cmd, "DISPATCH") && !strcmp(evt, "READY")) {
 39                  state = State::Connected;
 40                  if (onConnect) {
 41                      onConnect(message);
 42                  }
 43              }
 44          }
 45      }
 46      else {
 47          sendFrame.opcode = Opcode::Handshake;
 48          sendFrame.length = (uint32_t)JsonWriteHandshakeObj(
 49            sendFrame.message, sizeof(sendFrame.message), RpcVersion, appId);
 50  
 51          if (connection->Write(&sendFrame, sizeof(MessageFrameHeader) + sendFrame.length)) {
 52              state = State::SentHandshake;
 53          }
 54          else {
 55              Close();
 56          }
 57      }
 58  }
 59  
 60  void RpcConnection::Close()
 61  {
 62      if (onDisconnect && (state == State::Connected || state == State::SentHandshake)) {
 63          onDisconnect(lastErrorCode, lastErrorMessage);
 64      }
 65      connection->Close();
 66      state = State::Disconnected;
 67  }
 68  
 69  bool RpcConnection::Write(const void* data, size_t length)
 70  {
 71      sendFrame.opcode = Opcode::Frame;
 72      memcpy(sendFrame.message, data, length);
 73      sendFrame.length = (uint32_t)length;
 74      if (!connection->Write(&sendFrame, sizeof(MessageFrameHeader) + length)) {
 75          Close();
 76          return false;
 77      }
 78      return true;
 79  }
 80  
 81  bool RpcConnection::Read(JsonDocument& message)
 82  {
 83      if (state != State::Connected && state != State::SentHandshake) {
 84          return false;
 85      }
 86      MessageFrame readFrame;
 87      for (;;) {
 88          bool didRead = connection->Read(&readFrame, sizeof(MessageFrameHeader));
 89          if (!didRead) {
 90              if (!connection->isOpen) {
 91                  lastErrorCode = (int)ErrorCode::PipeClosed;
 92                  StringCopy(lastErrorMessage, "Pipe closed");
 93                  Close();
 94              }
 95              return false;
 96          }
 97  
 98          if (readFrame.length > 0) {
 99              didRead = connection->Read(readFrame.message, readFrame.length);
100              if (!didRead) {
101                  lastErrorCode = (int)ErrorCode::ReadCorrupt;
102                  StringCopy(lastErrorMessage, "Partial data in frame");
103                  Close();
104                  return false;
105              }
106              readFrame.message[readFrame.length] = 0;
107          }
108  
109          switch (readFrame.opcode) {
110          case Opcode::Close: {
111              message.ParseInsitu(readFrame.message);
112              lastErrorCode = GetIntMember(&message, "code");
113              StringCopy(lastErrorMessage, GetStrMember(&message, "message", ""));
114              Close();
115              return false;
116          }
117          case Opcode::Frame:
118              message.ParseInsitu(readFrame.message);
119              return true;
120          case Opcode::Ping:
121              readFrame.opcode = Opcode::Pong;
122              if (!connection->Write(&readFrame, sizeof(MessageFrameHeader) + readFrame.length)) {
123                  Close();
124              }
125              break;
126          case Opcode::Pong:
127              break;
128          case Opcode::Handshake:
129          default:
130              // something bad happened
131              lastErrorCode = (int)ErrorCode::ReadCorrupt;
132              StringCopy(lastErrorMessage, "Bad ipc frame");
133              Close();
134              return false;
135          }
136      }
137  }