/ src / test / util / net.cpp
net.cpp
  1  // Copyright (c) 2020-present The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #include <test/util/net.h>
  6  
  7  #include <net.h>
  8  #include <net_processing.h>
  9  #include <netaddress.h>
 10  #include <netmessagemaker.h>
 11  #include <node/connection_types.h>
 12  #include <node/eviction.h>
 13  #include <protocol.h>
 14  #include <random.h>
 15  #include <serialize.h>
 16  #include <span.h>
 17  #include <sync.h>
 18  
 19  #include <chrono>
 20  #include <optional>
 21  #include <vector>
 22  
 23  void ConnmanTestMsg::Handshake(CNode& node,
 24                                 bool successfully_connected,
 25                                 ServiceFlags remote_services,
 26                                 ServiceFlags local_services,
 27                                 int32_t version,
 28                                 bool relay_txs)
 29  {
 30      auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
 31      auto& connman{*this};
 32  
 33      peerman.InitializeNode(node, local_services);
 34      peerman.SendMessages(node);
 35      FlushSendBuffer(node); // Drop the version message added by SendMessages.
 36  
 37      CSerializedNetMsg msg_version{
 38          NetMsg::Make(NetMsgType::VERSION,
 39                  version,                                        //
 40                  Using<CustomUintFormatter<8>>(remote_services), //
 41                  int64_t{},                                      // dummy time
 42                  int64_t{},                                      // ignored service bits
 43                  CNetAddr::V1(CService{}),                       // dummy
 44                  int64_t{},                                      // ignored service bits
 45                  CNetAddr::V1(CService{}),                       // ignored
 46                  uint64_t{1},                                    // dummy nonce
 47                  std::string{},                                  // dummy subver
 48                  int32_t{},                                      // dummy starting_height
 49                  relay_txs),
 50      };
 51  
 52      (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
 53      node.fPauseSend = false;
 54      connman.ProcessMessagesOnce(node);
 55      peerman.SendMessages(node);
 56      FlushSendBuffer(node); // Drop the verack message added by SendMessages.
 57      if (node.fDisconnect) return;
 58      assert(node.nVersion == version);
 59      assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
 60      CNodeStateStats statestats;
 61      assert(peerman.GetNodeStateStats(node.GetId(), statestats));
 62      assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
 63      assert(statestats.their_services == remote_services);
 64      if (successfully_connected) {
 65          CSerializedNetMsg msg_verack{NetMsg::Make(NetMsgType::VERACK)};
 66          (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
 67          node.fPauseSend = false;
 68          connman.ProcessMessagesOnce(node);
 69          peerman.SendMessages(node);
 70          assert(node.fSuccessfullyConnected == true);
 71      }
 72  }
 73  
 74  void ConnmanTestMsg::ResetAddrCache() { m_addr_response_caches = {}; }
 75  
 76  void ConnmanTestMsg::ResetMaxOutboundCycle()
 77  {
 78      LOCK(m_total_bytes_sent_mutex);
 79      nMaxOutboundCycleStartTime = 0s;
 80      nMaxOutboundTotalBytesSentInCycle = 0;
 81  }
 82  
 83  void ConnmanTestMsg::Reset()
 84  {
 85      ResetAddrCache();
 86      ResetMaxOutboundCycle();
 87      m_private_broadcast.m_outbound_tor_ok_at_least_once.store(false);
 88      m_private_broadcast.m_num_to_open.store(0);
 89  }
 90  
 91  void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const
 92  {
 93      assert(node.ReceiveMsgBytes(msg_bytes, complete));
 94      if (complete) {
 95          node.MarkReceivedMsgsForProcessing();
 96      }
 97  }
 98  
 99  void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
100  {
101      LOCK(node.cs_vSend);
102      node.vSendMsg.clear();
103      node.m_send_memusage = 0;
104      while (true) {
105          const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
106          if (to_send.empty()) break;
107          node.m_transport->MarkBytesSent(to_send.size());
108      }
109  }
110  
111  bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
112  {
113      bool queued = node.m_transport->SetMessageToSend(ser_msg);
114      assert(queued);
115      bool complete{false};
116      while (true) {
117          const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
118          if (to_send.empty()) break;
119          NodeReceiveMsgBytes(node, to_send, complete);
120          node.m_transport->MarkBytesSent(to_send.size());
121      }
122      return complete;
123  }
124  
125  CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
126  {
127      CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true, /*proxy_override=*/std::nullopt);
128      if (!node) return nullptr;
129      node->SetCommonVersion(PROTOCOL_VERSION);
130      peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS));
131      node->fSuccessfullyConnected = true;
132      AddTestNode(*node);
133      return node;
134  }
135  
136  std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
137  {
138      std::vector<NodeEvictionCandidate> candidates;
139      candidates.reserve(n_candidates);
140      for (int id = 0; id < n_candidates; ++id) {
141          candidates.push_back({
142              .id=id,
143              .m_connected=std::chrono::seconds{random_context.randrange(100)},
144              .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
145              .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
146              .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
147              .fRelevantServices=random_context.randbool(),
148              .m_relay_txs=random_context.randbool(),
149              .fBloomFilter=random_context.randbool(),
150              .nKeyedNetGroup=random_context.randrange(100u),
151              .prefer_evict=random_context.randbool(),
152              .m_is_local=random_context.randbool(),
153              .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
154              .m_noban=false,
155              .m_conn_type=ConnectionType::INBOUND,
156          });
157      }
158      return candidates;
159  }
160  
161  // Have different ZeroSock (or others that inherit from it) objects have different
162  // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
163  // different objects comparing as equal.
164  static std::atomic<SOCKET> g_mocked_sock_fd{0};
165  
166  ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {}
167  
168  // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
169  ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; }
170  
171  ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
172  
173  ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
174  {
175      memset(buf, 0x0, len);
176      return len;
177  }
178  
179  int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
180  
181  int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
182  
183  int ZeroSock::Listen(int) const { return 0; }
184  
185  std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
186  {
187      if (addr != nullptr) {
188          // Pretend all connections come from 5.5.5.5:6789
189          memset(addr, 0x00, *addr_len);
190          const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
191          if (*addr_len >= write_len) {
192              *addr_len = write_len;
193              sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
194              addr_in->sin_family = AF_INET;
195              memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
196              addr_in->sin_port = htons(6789);
197          }
198      }
199      return std::make_unique<ZeroSock>();
200  }
201  
202  int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
203  {
204      std::memset(opt_val, 0x0, *opt_len);
205      return 0;
206  }
207  
208  int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
209  
210  int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
211  {
212      std::memset(name, 0x0, *name_len);
213      return 0;
214  }
215  
216  bool ZeroSock::SetNonBlocking() const { return true; }
217  
218  bool ZeroSock::IsSelectable() const { return true; }
219  
220  bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
221  {
222      if (occurred != nullptr) {
223          *occurred = requested;
224      }
225      return true;
226  }
227  
228  bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
229  {
230      for (auto& [sock, events] : events_per_sock) {
231          (void)sock;
232          events.occurred = events.requested;
233      }
234      return true;
235  }
236  
237  ZeroSock& ZeroSock::operator=(Sock&& other)
238  {
239      assert(false && "Move of Sock into ZeroSock not allowed.");
240      return *this;
241  }
242  
243  StaticContentsSock::StaticContentsSock(const std::string& contents)
244      : m_contents{contents}
245  {
246  }
247  
248  ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
249  {
250      const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
251      std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
252      if ((flags & MSG_PEEK) == 0) {
253          m_consumed += consume_bytes;
254      }
255      return consume_bytes;
256  }
257  
258  StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
259  {
260      assert(false && "Move of Sock into StaticContentsSock not allowed.");
261      return *this;
262  }
263  
264  ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
265  {
266      WAIT_LOCK(m_mutex, lock);
267  
268      if (m_data.empty()) {
269          if (m_eof) {
270              return 0;
271          }
272          errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
273          return -1;
274      }
275  
276      const size_t read_bytes{std::min(len, m_data.size())};
277  
278      std::memcpy(buf, m_data.data(), read_bytes);
279      if ((flags & MSG_PEEK) == 0) {
280          m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
281      }
282  
283      return read_bytes;
284  }
285  
286  std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
287  {
288      V1Transport transport{NodeId{0}};
289  
290      {
291          WAIT_LOCK(m_mutex, lock);
292  
293          WaitForDataOrEof(lock);
294          if (m_eof && m_data.empty()) {
295              return std::nullopt;
296          }
297  
298          for (;;) {
299              std::span<const uint8_t> s{m_data};
300              if (!transport.ReceivedBytes(s)) {  // Consumed bytes are removed from the front of s.
301                  return std::nullopt;
302              }
303              m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
304              if (transport.ReceivedMessageComplete()) {
305                  break;
306              }
307              if (m_data.empty()) {
308                  WaitForDataOrEof(lock);
309                  if (m_eof && m_data.empty()) {
310                      return std::nullopt;
311                  }
312              }
313          }
314      }
315  
316      bool reject{false};
317      CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
318      if (reject) {
319          return std::nullopt;
320      }
321      return std::make_optional<CNetMessage>(std::move(msg));
322  }
323  
324  void DynSock::Pipe::PushBytes(const void* buf, size_t len)
325  {
326      LOCK(m_mutex);
327      const uint8_t* b = static_cast<const uint8_t*>(buf);
328      m_data.insert(m_data.end(), b, b + len);
329      m_cond.notify_all();
330  }
331  
332  void DynSock::Pipe::Eof()
333  {
334      LOCK(m_mutex);
335      m_eof = true;
336      m_cond.notify_all();
337  }
338  
339  void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock)
340  {
341      Assert(lock.mutex() == &m_mutex);
342  
343      m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
344          AssertLockHeld(m_mutex);
345          return !m_data.empty() || m_eof;
346      });
347  }
348  
349  DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
350      : m_pipes{pipes}, m_accept_sockets{accept_sockets}
351  {
352  }
353  
354  DynSock::~DynSock()
355  {
356      m_pipes->send.Eof();
357  }
358  
359  ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
360  {
361      return m_pipes->recv.GetBytes(buf, len, flags);
362  }
363  
364  ssize_t DynSock::Send(const void* buf, size_t len, int) const
365  {
366      m_pipes->send.PushBytes(buf, len);
367      return len;
368  }
369  
370  std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
371  {
372      ZeroSock::Accept(addr, addr_len);
373      return m_accept_sockets->Pop().value_or(nullptr);
374  }
375  
376  bool DynSock::Wait(std::chrono::milliseconds timeout,
377                     Event requested,
378                     Event* occurred) const
379  {
380      EventsPerSock ev;
381      ev.emplace(this, Events{requested});
382      const bool ret{WaitMany(timeout, ev)};
383      if (occurred != nullptr) {
384          *occurred = ev.begin()->second.occurred;
385      }
386      return ret;
387  }
388  
389  bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
390  {
391      const auto deadline = std::chrono::steady_clock::now() + timeout;
392      bool at_least_one_event_occurred{false};
393  
394      for (;;) {
395          // Check all sockets for readiness without waiting.
396          for (auto& [sock, events] : events_per_sock) {
397              if ((events.requested & Sock::SEND) != 0) {
398                  // Always ready for Send().
399                  events.occurred |= Sock::SEND;
400                  at_least_one_event_occurred = true;
401              }
402  
403              if ((events.requested & Sock::RECV) != 0) {
404                  auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
405                  uint8_t b;
406                  if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
407                      events.occurred |= Sock::RECV;
408                      at_least_one_event_occurred = true;
409                  }
410              }
411          }
412  
413          if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
414              break;
415          }
416  
417          std::this_thread::sleep_for(10ms);
418      }
419  
420      return true;
421  }
422  
423  DynSock& DynSock::operator=(Sock&&)
424  {
425      assert(false && "Move of Sock into DynSock not allowed.");
426      return *this;
427  }