net.cpp
1 // Copyright (c) 2020-present The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <test/util/net.h> 6 7 #include <net.h> 8 #include <net_processing.h> 9 #include <netaddress.h> 10 #include <netmessagemaker.h> 11 #include <node/connection_types.h> 12 #include <node/eviction.h> 13 #include <protocol.h> 14 #include <random.h> 15 #include <serialize.h> 16 #include <span.h> 17 #include <sync.h> 18 19 #include <chrono> 20 #include <optional> 21 #include <vector> 22 23 void ConnmanTestMsg::Handshake(CNode& node, 24 bool successfully_connected, 25 ServiceFlags remote_services, 26 ServiceFlags local_services, 27 int32_t version, 28 bool relay_txs) 29 { 30 auto& peerman{static_cast<PeerManager&>(*m_msgproc)}; 31 auto& connman{*this}; 32 33 peerman.InitializeNode(node, local_services); 34 peerman.SendMessages(node); 35 FlushSendBuffer(node); // Drop the version message added by SendMessages. 36 37 CSerializedNetMsg msg_version{ 38 NetMsg::Make(NetMsgType::VERSION, 39 version, // 40 Using<CustomUintFormatter<8>>(remote_services), // 41 int64_t{}, // dummy time 42 int64_t{}, // ignored service bits 43 CNetAddr::V1(CService{}), // dummy 44 int64_t{}, // ignored service bits 45 CNetAddr::V1(CService{}), // ignored 46 uint64_t{1}, // dummy nonce 47 std::string{}, // dummy subver 48 int32_t{}, // dummy starting_height 49 relay_txs), 50 }; 51 52 (void)connman.ReceiveMsgFrom(node, std::move(msg_version)); 53 node.fPauseSend = false; 54 connman.ProcessMessagesOnce(node); 55 peerman.SendMessages(node); 56 FlushSendBuffer(node); // Drop the verack message added by SendMessages. 57 if (node.fDisconnect) return; 58 assert(node.nVersion == version); 59 assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION)); 60 CNodeStateStats statestats; 61 assert(peerman.GetNodeStateStats(node.GetId(), statestats)); 62 assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn())); 63 assert(statestats.their_services == remote_services); 64 if (successfully_connected) { 65 CSerializedNetMsg msg_verack{NetMsg::Make(NetMsgType::VERACK)}; 66 (void)connman.ReceiveMsgFrom(node, std::move(msg_verack)); 67 node.fPauseSend = false; 68 connman.ProcessMessagesOnce(node); 69 peerman.SendMessages(node); 70 assert(node.fSuccessfullyConnected == true); 71 } 72 } 73 74 void ConnmanTestMsg::ResetAddrCache() { m_addr_response_caches = {}; } 75 76 void ConnmanTestMsg::ResetMaxOutboundCycle() 77 { 78 LOCK(m_total_bytes_sent_mutex); 79 nMaxOutboundCycleStartTime = 0s; 80 nMaxOutboundTotalBytesSentInCycle = 0; 81 } 82 83 void ConnmanTestMsg::Reset() 84 { 85 ResetAddrCache(); 86 ResetMaxOutboundCycle(); 87 m_private_broadcast.m_outbound_tor_ok_at_least_once.store(false); 88 m_private_broadcast.m_num_to_open.store(0); 89 } 90 91 void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const 92 { 93 assert(node.ReceiveMsgBytes(msg_bytes, complete)); 94 if (complete) { 95 node.MarkReceivedMsgsForProcessing(); 96 } 97 } 98 99 void ConnmanTestMsg::FlushSendBuffer(CNode& node) const 100 { 101 LOCK(node.cs_vSend); 102 node.vSendMsg.clear(); 103 node.m_send_memusage = 0; 104 while (true) { 105 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false); 106 if (to_send.empty()) break; 107 node.m_transport->MarkBytesSent(to_send.size()); 108 } 109 } 110 111 bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const 112 { 113 bool queued = node.m_transport->SetMessageToSend(ser_msg); 114 assert(queued); 115 bool complete{false}; 116 while (true) { 117 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false); 118 if (to_send.empty()) break; 119 NodeReceiveMsgBytes(node, to_send, complete); 120 node.m_transport->MarkBytesSent(to_send.size()); 121 } 122 return complete; 123 } 124 125 CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type) 126 { 127 CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true, /*proxy_override=*/std::nullopt); 128 if (!node) return nullptr; 129 node->SetCommonVersion(PROTOCOL_VERSION); 130 peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS)); 131 node->fSuccessfullyConnected = true; 132 AddTestNode(*node); 133 return node; 134 } 135 136 std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context) 137 { 138 std::vector<NodeEvictionCandidate> candidates; 139 candidates.reserve(n_candidates); 140 for (int id = 0; id < n_candidates; ++id) { 141 candidates.push_back({ 142 .id=id, 143 .m_connected=NodeSeconds{std::chrono::seconds{random_context.randrange(100)}}, 144 .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)}, 145 .m_last_block_time=std::chrono::seconds{random_context.randrange(100)}, 146 .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)}, 147 .fRelevantServices=random_context.randbool(), 148 .m_relay_txs=random_context.randbool(), 149 .fBloomFilter=random_context.randbool(), 150 .nKeyedNetGroup=random_context.randrange(100u), 151 .prefer_evict=random_context.randbool(), 152 .m_is_local=random_context.randbool(), 153 .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())], 154 .m_noban=false, 155 .m_conn_type=ConnectionType::INBOUND, 156 }); 157 } 158 return candidates; 159 } 160 161 // Have different ZeroSock (or others that inherit from it) objects have different 162 // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two 163 // different objects comparing as equal. 164 static std::atomic<SOCKET> g_mocked_sock_fd{0}; 165 166 ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {} 167 168 // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that. 169 ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; } 170 171 ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; } 172 173 ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const 174 { 175 memset(buf, 0x0, len); 176 return len; 177 } 178 179 int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; } 180 181 int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; } 182 183 int ZeroSock::Listen(int) const { return 0; } 184 185 std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const 186 { 187 if (addr != nullptr) { 188 // Pretend all connections come from 5.5.5.5:6789 189 memset(addr, 0x00, *addr_len); 190 const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in)); 191 if (*addr_len >= write_len) { 192 *addr_len = write_len; 193 sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr); 194 addr_in->sin_family = AF_INET; 195 memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr)); 196 addr_in->sin_port = htons(6789); 197 } 198 } 199 return std::make_unique<ZeroSock>(); 200 } 201 202 int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const 203 { 204 std::memset(opt_val, 0x0, *opt_len); 205 return 0; 206 } 207 208 int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; } 209 210 int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const 211 { 212 std::memset(name, 0x0, *name_len); 213 return 0; 214 } 215 216 bool ZeroSock::SetNonBlocking() const { return true; } 217 218 bool ZeroSock::IsSelectable() const { return true; } 219 220 bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const 221 { 222 if (occurred != nullptr) { 223 *occurred = requested; 224 } 225 return true; 226 } 227 228 bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const 229 { 230 for (auto& [sock, events] : events_per_sock) { 231 (void)sock; 232 events.occurred = events.requested; 233 } 234 return true; 235 } 236 237 ZeroSock& ZeroSock::operator=(Sock&& other) 238 { 239 assert(false && "Move of Sock into ZeroSock not allowed."); 240 return *this; 241 } 242 243 StaticContentsSock::StaticContentsSock(const std::string& contents) 244 : m_contents{contents} 245 { 246 } 247 248 ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const 249 { 250 const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; 251 std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); 252 if ((flags & MSG_PEEK) == 0) { 253 m_consumed += consume_bytes; 254 } 255 return consume_bytes; 256 } 257 258 StaticContentsSock& StaticContentsSock::operator=(Sock&& other) 259 { 260 assert(false && "Move of Sock into StaticContentsSock not allowed."); 261 return *this; 262 } 263 264 ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags) 265 { 266 WAIT_LOCK(m_mutex, lock); 267 268 if (m_data.empty()) { 269 if (m_eof) { 270 return 0; 271 } 272 errno = EAGAIN; // Same as recv(2) on a non-blocking socket. 273 return -1; 274 } 275 276 const size_t read_bytes{std::min(len, m_data.size())}; 277 278 std::memcpy(buf, m_data.data(), read_bytes); 279 if ((flags & MSG_PEEK) == 0) { 280 m_data.erase(m_data.begin(), m_data.begin() + read_bytes); 281 } 282 283 return read_bytes; 284 } 285 286 std::optional<CNetMessage> DynSock::Pipe::GetNetMsg() 287 { 288 V1Transport transport{NodeId{0}}; 289 290 { 291 WAIT_LOCK(m_mutex, lock); 292 293 WaitForDataOrEof(lock); 294 if (m_eof && m_data.empty()) { 295 return std::nullopt; 296 } 297 298 for (;;) { 299 std::span<const uint8_t> s{m_data}; 300 if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s. 301 return std::nullopt; 302 } 303 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size()); 304 if (transport.ReceivedMessageComplete()) { 305 break; 306 } 307 if (m_data.empty()) { 308 WaitForDataOrEof(lock); 309 if (m_eof && m_data.empty()) { 310 return std::nullopt; 311 } 312 } 313 } 314 } 315 316 bool reject{false}; 317 CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)}; 318 if (reject) { 319 return std::nullopt; 320 } 321 return std::make_optional<CNetMessage>(std::move(msg)); 322 } 323 324 void DynSock::Pipe::PushBytes(const void* buf, size_t len) 325 { 326 LOCK(m_mutex); 327 const uint8_t* b = static_cast<const uint8_t*>(buf); 328 m_data.insert(m_data.end(), b, b + len); 329 m_cond.notify_all(); 330 } 331 332 void DynSock::Pipe::Eof() 333 { 334 LOCK(m_mutex); 335 m_eof = true; 336 m_cond.notify_all(); 337 } 338 339 void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock) 340 { 341 Assert(lock.mutex() == &m_mutex); 342 343 m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { 344 AssertLockHeld(m_mutex); 345 return !m_data.empty() || m_eof; 346 }); 347 } 348 349 DynSock::DynSock(std::shared_ptr<Pipes> pipes, Queue* accept_sockets) 350 : m_pipes{pipes}, m_accept_sockets{accept_sockets} 351 { 352 } 353 354 DynSock::DynSock(std::shared_ptr<Pipes> pipes) 355 : m_pipes{pipes}, m_accept_sockets{} 356 { 357 } 358 359 DynSock::~DynSock() 360 { 361 m_pipes->send.Eof(); 362 } 363 364 ssize_t DynSock::Recv(void* buf, size_t len, int flags) const 365 { 366 return m_pipes->recv.GetBytes(buf, len, flags); 367 } 368 369 ssize_t DynSock::Send(const void* buf, size_t len, int) const 370 { 371 m_pipes->send.PushBytes(buf, len); 372 return len; 373 } 374 375 std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const 376 { 377 assert(m_accept_sockets && "Accept() called on non-listening DynSock"); 378 ZeroSock::Accept(addr, addr_len); 379 return m_accept_sockets->Pop().value_or(nullptr); 380 } 381 382 bool DynSock::Wait(std::chrono::milliseconds timeout, 383 Event requested, 384 Event* occurred) const 385 { 386 EventsPerSock ev; 387 ev.emplace(this, Events{requested}); 388 const bool ret{WaitMany(timeout, ev)}; 389 if (occurred != nullptr) { 390 *occurred = ev.begin()->second.occurred; 391 } 392 return ret; 393 } 394 395 bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const 396 { 397 const auto deadline = std::chrono::steady_clock::now() + timeout; 398 bool at_least_one_event_occurred{false}; 399 400 for (;;) { 401 // Check all sockets for readiness without waiting. 402 for (auto& [sock, events] : events_per_sock) { 403 if ((events.requested & Sock::SEND) != 0) { 404 // Always ready for Send(). 405 events.occurred |= Sock::SEND; 406 at_least_one_event_occurred = true; 407 } 408 409 if ((events.requested & Sock::RECV) != 0) { 410 auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get()); 411 uint8_t b; 412 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || (dyn_sock->m_accept_sockets && !dyn_sock->m_accept_sockets->Empty())) { 413 events.occurred |= Sock::RECV; 414 at_least_one_event_occurred = true; 415 } 416 } 417 } 418 419 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) { 420 break; 421 } 422 423 std::this_thread::sleep_for(10ms); 424 } 425 426 return true; 427 } 428 429 DynSock& DynSock::operator=(Sock&&) 430 { 431 assert(false && "Move of Sock into DynSock not allowed."); 432 return *this; 433 }