SSU2.cpp
1 /* 2 * Copyright (c) 2022-2026, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include <random> 10 #include "Log.h" 11 #include "RouterContext.h" 12 #include "Transports.h" 13 #include "NetDb.hpp" 14 #include "Config.h" 15 #include "SSU2.h" 16 17 namespace i2p 18 { 19 namespace transport 20 { 21 SSU2Server::SSU2Server (): 22 RunnableServiceWithWork ("SSU2"), m_ReceiveService ("SSU2r"), 23 m_SocketV4 (m_ReceiveService.GetService ()), m_SocketV6 (m_ReceiveService.GetService ()), 24 m_AddressV4 (boost::asio::ip::address_v4()), m_AddressV6 (boost::asio::ip::address_v6()), 25 m_TerminationTimer (GetService ()), m_CleanupTimer (GetService ()), m_ResendTimer (GetService ()), 26 m_IntroducersUpdateTimer (GetService ()), m_IntroducersUpdateTimerV6 (GetService ()), 27 m_IsPublished (true), m_IsSyncClockFromPeers (true), m_PendingTimeOffset (0), 28 m_Rng(i2p::util::GetMonotonicMicroseconds ()%1000000LL), m_IsForcedFirewalled4 (false), 29 m_IsForcedFirewalled6 (false), m_IsThroughProxy (false) 30 { 31 } 32 33 void SSU2Server::Start () 34 { 35 if (!IsRunning ()) 36 { 37 StartIOService (); 38 i2p::config::GetOption ("ssu2.published", m_IsPublished); 39 i2p::config::GetOption("nettime.frompeers", m_IsSyncClockFromPeers); 40 bool found = false; 41 auto addresses = i2p::context.GetRouterInfo ().GetAddresses (); 42 if (!addresses) return; 43 for (const auto& address: *addresses) 44 { 45 if (!address) continue; 46 if (address->transportStyle == i2p::data::RouterInfo::eTransportSSU2) 47 { 48 if (m_IsThroughProxy) 49 { 50 found = true; 51 if (address->IsV6 ()) 52 { 53 uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu); 54 if (!mtu || mtu > SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE) 55 mtu = SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE; 56 i2p::context.SetMTU (mtu, false); 57 } 58 else 59 { 60 uint16_t mtu; i2p::config::GetOption ("ssu2.mtu4", mtu); 61 if (!mtu || mtu > SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE) 62 mtu = SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE; 63 i2p::context.SetMTU (mtu, true); 64 } 65 continue; // we don't need port for proxy 66 } 67 auto port = address->port; 68 if (!port) 69 { 70 uint16_t ssu2Port; i2p::config::GetOption ("ssu2.port", ssu2Port); 71 if (ssu2Port) port = ssu2Port; 72 else 73 { 74 uint16_t p; i2p::config::GetOption ("port", p); 75 if (p) port = p; 76 } 77 } 78 if (port) 79 { 80 if (address->IsV4 ()) 81 { 82 found = true; 83 i2p::config::GetOption ("ssu2.firewalled4", m_IsForcedFirewalled4); 84 LogPrint (eLogDebug, "SSU2: Opening IPv4 socket at Start"); 85 OpenSocket (boost::asio::ip::udp::endpoint (m_AddressV4, port)); 86 boost::asio::post (m_ReceiveService.GetService (), 87 [this]() 88 { 89 Receive (m_SocketV4); 90 }); 91 ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers 92 } 93 if (address->IsV6 ()) 94 { 95 found = true; 96 i2p::config::GetOption ("ssu2.firewalled6", m_IsForcedFirewalled6); 97 LogPrint (eLogDebug, "SSU2: Opening IPv6 socket at Start"); 98 OpenSocket (boost::asio::ip::udp::endpoint (m_AddressV6, port)); 99 boost::asio::post (m_ReceiveService.GetService (), 100 [this]() 101 { 102 Receive (m_SocketV6); 103 }); 104 ScheduleIntroducersUpdateTimerV6 (); // wait for 30 seconds and decide if we need introducers 105 } 106 } 107 else 108 LogPrint (eLogCritical, "SSU2: Can't start server because port not specified"); 109 } 110 } 111 if (found) 112 { 113 if (m_IsThroughProxy) 114 ConnectToProxy (); 115 m_ReceiveService.Start (); 116 } 117 ScheduleTermination (); 118 ScheduleCleanup (); 119 ScheduleResend (false); 120 } 121 } 122 123 void SSU2Server::Stop () 124 { 125 if (IsRunning ()) 126 { 127 m_TerminationTimer.cancel (); 128 m_CleanupTimer.cancel (); 129 m_ResendTimer.cancel (); 130 m_IntroducersUpdateTimer.cancel (); 131 m_IntroducersUpdateTimerV6.cancel (); 132 } 133 134 auto sessions = m_Sessions; 135 for (auto& it: sessions) 136 { 137 it.second->RequestTermination (eSSU2TerminationReasonRouterShutdown); 138 it.second->Done (); 139 } 140 141 if (context.SupportsV4 () || context.SupportsV6 ()) 142 m_ReceiveService.Stop (); 143 m_SocketV4.close (); 144 m_SocketV6.close (); 145 146 if (m_UDPAssociateSocket) 147 { 148 m_UDPAssociateSocket->close (); 149 m_UDPAssociateSocket.reset (nullptr); 150 } 151 152 StopIOService (); 153 154 m_Sessions.clear (); 155 m_SessionsByRouterHash.clear (); 156 m_PendingOutgoingSessions.clear (); 157 m_Relays.clear (); 158 m_PeerTests.clear (); 159 m_Introducers.clear (); 160 m_IntroducersV6.clear (); 161 m_ConnectedRecently.clear (); 162 m_RequestedPeerTests.clear (); 163 164 m_PacketsPool.ReleaseMt (m_ReceivedPacketsQueue); 165 m_ReceivedPacketsQueue.clear (); 166 } 167 168 void SSU2Server::SetLocalAddress (const boost::asio::ip::address& localAddress) 169 { 170 if (localAddress.is_unspecified ()) return; 171 if (localAddress.is_v4 ()) 172 { 173 m_AddressV4 = localAddress; 174 uint16_t mtu; i2p::config::GetOption ("ssu2.mtu4", mtu); 175 if (!mtu) mtu = i2p::util::net::GetMTU (localAddress); 176 if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE; 177 if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE; 178 i2p::context.SetMTU (mtu, true); 179 } 180 else if (localAddress.is_v6 ()) 181 { 182 m_AddressV6 = localAddress; 183 uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu); 184 if (!mtu) 185 { 186 int maxMTU = i2p::util::net::GetMaxMTU (localAddress.to_v6 ()); 187 mtu = i2p::util::net::GetMTU (localAddress); 188 if (mtu > maxMTU) mtu = maxMTU; 189 } 190 else 191 if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE; 192 if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE; 193 i2p::context.SetMTU (mtu, false); 194 } 195 } 196 197 bool SSU2Server::IsSupported (const boost::asio::ip::address& addr) const 198 { 199 if (m_IsThroughProxy) 200 return m_SocketV4.is_open (); 201 if (addr.is_v4 ()) 202 { 203 if (m_SocketV4.is_open ()) 204 return true; 205 } 206 else if (addr.is_v6 ()) 207 { 208 if (m_SocketV6.is_open ()) 209 return true; 210 } 211 return false; 212 } 213 214 uint16_t SSU2Server::GetPort (bool v4) const 215 { 216 boost::system::error_code ec; 217 boost::asio::ip::udp::endpoint ep = (v4 || m_IsThroughProxy) ? m_SocketV4.local_endpoint (ec) : m_SocketV6.local_endpoint (ec); 218 if (ec) return 0; 219 return ep.port (); 220 } 221 222 bool SSU2Server::IsConnectedRecently (const boost::asio::ip::udp::endpoint& ep, bool max) 223 { 224 if (!ep.port () || ep.address ().is_unspecified ()) return false; 225 std::lock_guard<std::mutex> l(m_ConnectedRecentlyMutex); 226 auto it = m_ConnectedRecently.find (ep); 227 if (it != m_ConnectedRecently.end ()) 228 { 229 if (i2p::util::GetSecondsSinceEpoch () <= it->second + (max ? SSU2_MAX_HOLE_PUNCH_EXPIRATION : SSU2_MIN_HOLE_PUNCH_EXPIRATION)) 230 return true; 231 else if (max) 232 m_ConnectedRecently.erase (it); 233 } 234 return false; 235 } 236 237 void SSU2Server::AddConnectedRecently (const boost::asio::ip::udp::endpoint& ep, uint64_t ts) 238 { 239 if (!ep.port () || ep.address ().is_unspecified () || 240 i2p::util::GetSecondsSinceEpoch () > ts + SSU2_MAX_HOLE_PUNCH_EXPIRATION) return; 241 std::lock_guard<std::mutex> l(m_ConnectedRecentlyMutex); 242 auto [it, added] = m_ConnectedRecently.try_emplace (ep, ts); 243 if (!added && ts > it->second) 244 it->second = ts; // renew timestamp of existing endpoint 245 } 246 247 void SSU2Server::AdjustTimeOffset (int64_t offset, std::shared_ptr<const i2p::data::IdentityEx> from) 248 { 249 if (offset) 250 { 251 if (m_PendingTimeOffset) // one more 252 { 253 if (m_PendingTimeOffsetFrom && from && 254 m_PendingTimeOffsetFrom->GetIdentHash ().GetLL()[0] != from->GetIdentHash ().GetLL()[0]) // from different routers 255 { 256 if (std::abs (m_PendingTimeOffset - offset) < SSU2_CLOCK_SKEW) 257 { 258 offset = (m_PendingTimeOffset + offset)/2; // average 259 LogPrint (eLogWarning, "SSU2: Clock adjusted by ", offset, " seconds"); 260 i2p::util::AdjustTimeOffset (offset); 261 } 262 else 263 LogPrint (eLogWarning, "SSU2: Time offsets are too different. Clock not adjusted"); 264 m_PendingTimeOffset = 0; 265 m_PendingTimeOffsetFrom = nullptr; 266 } 267 else 268 LogPrint (eLogWarning, "SSU2: Time offsets from same router. Clock not adjusted"); 269 } 270 else 271 { 272 m_PendingTimeOffset = offset; // first 273 m_PendingTimeOffsetFrom = from; 274 } 275 } 276 else 277 { 278 m_PendingTimeOffset = 0; // reset 279 m_PendingTimeOffsetFrom = nullptr; 280 } 281 } 282 283 boost::asio::ip::udp::socket& SSU2Server::OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint) 284 { 285 boost::asio::ip::udp::socket& socket = localEndpoint.address ().is_v6 () ? m_SocketV6 : m_SocketV4; 286 try 287 { 288 if (socket.is_open ()) 289 socket.close (); 290 socket.open (localEndpoint.protocol ()); 291 if (localEndpoint.address ().is_v6 ()) 292 #if !defined(__HAIKU__) 293 socket.set_option (boost::asio::ip::v6_only (true)); 294 #else 295 { 296 LogPrint (eLogWarning, "SSU2: Socket option IPV6_V6ONLY is not supported"); 297 m_IsForcedFirewalled6 = true; // IPV6_V6ONLY is not supported, always Firewalled for ipv6 298 } 299 #endif 300 301 uint64_t bufferSize = i2p::context.GetBandwidthLimit() * 1024 / 5; // max lag = 200ms 302 bufferSize = std::max(SSU2_SOCKET_MIN_BUFFER_SIZE, std::min(bufferSize, SSU2_SOCKET_MAX_BUFFER_SIZE)); 303 304 boost::asio::socket_base::receive_buffer_size receiveBufferSizeSet (bufferSize); 305 boost::asio::socket_base::send_buffer_size sendBufferSizeSet (bufferSize); 306 socket.set_option (receiveBufferSizeSet); 307 socket.set_option (sendBufferSizeSet); 308 boost::asio::socket_base::receive_buffer_size receiveBufferSizeGet; 309 boost::asio::socket_base::send_buffer_size sendBufferSizeGet; 310 socket.get_option (receiveBufferSizeGet); 311 socket.get_option (sendBufferSizeGet); 312 if (receiveBufferSizeGet.value () != receiveBufferSizeSet.value () || 313 sendBufferSizeGet.value () != sendBufferSizeSet.value ()) 314 { 315 LogPrint (eLogWarning, "SSU2: Socket receive buffer size: requested = ", 316 receiveBufferSizeSet.value (), ", got = ", receiveBufferSizeGet.value ()); 317 LogPrint (eLogWarning, "SSU2: Socket send buffer size: requested = ", 318 sendBufferSizeSet.value (), ", got = ", sendBufferSizeGet.value ()); 319 } 320 else 321 { 322 LogPrint (eLogInfo, "SSU2: Socket receive buffer size: ", receiveBufferSizeGet.value ()); 323 LogPrint (eLogInfo, "SSU2: Socket send buffer size: ", sendBufferSizeGet.value ()); 324 } 325 326 socket.non_blocking (true); 327 } 328 catch (std::exception& ex ) 329 { 330 LogPrint (eLogCritical, "SSU2: Failed to open socket on ", localEndpoint.address (), ": ", ex.what()); 331 ThrowFatal ("Unable to start SSU2 transport on ", localEndpoint.address (), ": ", ex.what ()); 332 return socket; 333 } 334 try 335 { 336 socket.bind (localEndpoint); 337 LogPrint (eLogInfo, "SSU2: Start listening on ", localEndpoint); 338 } 339 catch (std::exception& ex ) 340 { 341 LogPrint (eLogWarning, "SSU2: Failed to bind to ", localEndpoint, ": ", ex.what(), ". Actual endpoint is ", socket.local_endpoint ()); 342 // we can continue without binding being firewalled 343 } 344 return socket; 345 } 346 347 void SSU2Server::Receive (boost::asio::ip::udp::socket& socket) 348 { 349 Packet * packet = m_PacketsPool.AcquireMt (); 350 socket.async_receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 351 std::bind (&SSU2Server::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet, std::ref (socket))); 352 } 353 354 void SSU2Server::HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, 355 Packet * packet, boost::asio::ip::udp::socket& socket) 356 { 357 if (!ecode 358 || ecode == boost::asio::error::connection_refused 359 || ecode == boost::asio::error::connection_reset 360 || ecode == boost::asio::error::network_reset 361 || ecode == boost::asio::error::network_unreachable 362 || ecode == boost::asio::error::host_unreachable 363 #ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO 364 || ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_ 365 || ecode.value() == boost::winapi::WSAENETRESET_ // 10052 366 || ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_ 367 || ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_ 368 #endif 369 ) 370 // just try continue reading when received ICMP response otherwise socket can crash, 371 // but better to find out which host were sent it and mark that router as unreachable 372 { 373 i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); 374 if (bytes_transferred < SSU2_MIN_RECEIVED_PACKET_SIZE) 375 { 376 // drop too short packets 377 m_PacketsPool.ReleaseMt (packet); 378 Receive (socket); 379 return; 380 } 381 packet->len = bytes_transferred; 382 383 boost::system::error_code ec; 384 size_t moreBytes = socket.available (ec); 385 if (!ec && moreBytes) 386 { 387 std::list<Packet *> packets; 388 packets.push_back (packet); 389 while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH) 390 { 391 packet = m_PacketsPool.AcquireMt (); 392 packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec); 393 if (!ec) 394 { 395 i2p::transport::transports.UpdateReceivedBytes (packet->len); 396 if (packet->len >= SSU2_MIN_RECEIVED_PACKET_SIZE) 397 packets.push_back (packet); 398 else // drop too short packets 399 m_PacketsPool.ReleaseMt (packet); 400 moreBytes = socket.available(ec); 401 if (ec) break; 402 } 403 else 404 { 405 LogPrint (eLogError, "SSU2: receive_from error: code ", ec.value(), ": ", ec.message ()); 406 m_PacketsPool.ReleaseMt (packet); 407 break; 408 } 409 } 410 InsertToReceivedPacketsQueue (packets); 411 } 412 else 413 InsertToReceivedPacketsQueue (packet); 414 Receive (socket); 415 } 416 else 417 { 418 m_PacketsPool.ReleaseMt (packet); 419 if (ecode != boost::asio::error::operation_aborted) 420 { 421 LogPrint (eLogError, "SSU2: Receive error: code ", ecode.value(), ": ", ecode.message ()); 422 if (m_IsThroughProxy) 423 { 424 m_UDPAssociateSocket.reset (nullptr); 425 m_ProxyRelayEndpoint.reset (nullptr); 426 m_SocketV4.close (); 427 ConnectToProxy (); 428 } 429 else 430 { 431 auto ep = socket.local_endpoint (); 432 LogPrint (eLogCritical, "SSU2: Reopening socket in HandleReceivedFrom: code ", ecode.value(), ": ", ecode.message ()); 433 OpenSocket (ep); 434 Receive (socket); 435 } 436 } 437 } 438 } 439 440 void SSU2Server::HandleReceivedPackets (std::list<Packet *>&& packets) 441 { 442 if (packets.empty ()) return; 443 if (m_IsThroughProxy) 444 for (auto it: packets) 445 ProcessNextPacketFromProxy (it->buf, it->len); 446 else 447 for (auto it: packets) 448 ProcessNextPacket (it->buf, it->len, it->from); 449 m_PacketsPool.ReleaseMt (packets); 450 if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated) 451 m_LastSession->FlushData (); 452 } 453 454 void SSU2Server::InsertToReceivedPacketsQueue (Packet * packet) 455 { 456 if (!packet) return; 457 bool empty = false; 458 { 459 std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex); 460 empty = m_ReceivedPacketsQueue.empty (); 461 m_ReceivedPacketsQueue.push_back (packet); 462 } 463 if (empty) 464 boost::asio::post (GetService (), [this]() { HandleReceivedPacketsQueue (); }); 465 } 466 467 void SSU2Server::InsertToReceivedPacketsQueue (std::list<Packet *>& packets) 468 { 469 if (packets.empty ()) return; 470 size_t queueSize = 0; 471 { 472 std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex); 473 queueSize = m_ReceivedPacketsQueue.size (); 474 if (queueSize < SSU2_MAX_RECEIVED_QUEUE_SIZE) 475 m_ReceivedPacketsQueue.splice (m_ReceivedPacketsQueue.end (), packets); 476 else 477 { 478 LogPrint (eLogError, "SSU2: Received queue size ", queueSize, " exceeds max size ", SSU2_MAX_RECEIVED_QUEUE_SIZE); 479 m_PacketsPool.ReleaseMt (packets); 480 queueSize = 0; // invoke processing just in case 481 } 482 } 483 if (!queueSize) 484 boost::asio::post (GetService (), [this]() { HandleReceivedPacketsQueue (); }); 485 } 486 487 void SSU2Server::HandleReceivedPacketsQueue () 488 { 489 std::list<Packet *> receivedPackets; 490 { 491 std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex); 492 m_ReceivedPacketsQueue.swap (receivedPackets); 493 } 494 HandleReceivedPackets (std::move (receivedPackets)); 495 } 496 497 bool SSU2Server::AddSession (std::shared_ptr<SSU2Session> session) 498 { 499 if (session) 500 { 501 if (m_Sessions.emplace (session->GetConnID (), session).second) 502 { 503 if (session->GetState () != eSSU2SessionStatePeerTest) 504 AddSessionByRouterHash (session); 505 return true; 506 } 507 } 508 return false; 509 } 510 511 void SSU2Server::RemoveSession (uint64_t connID) 512 { 513 auto it = m_Sessions.find (connID); 514 if (it != m_Sessions.end ()) 515 { 516 if (it->second->GetState () != eSSU2SessionStatePeerTest) 517 { 518 auto ident = it->second->GetRemoteIdentity (); 519 if (ident) 520 { 521 std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex); 522 auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ()); 523 if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second.lock ()) 524 m_SessionsByRouterHash.erase (it1); 525 } 526 } 527 if (m_LastSession == it->second) 528 m_LastSession = nullptr; 529 m_Sessions.erase (it); 530 } 531 } 532 533 void SSU2Server::RequestRemoveSession (uint64_t connID) 534 { 535 boost::asio::post (GetService (), [connID, this]() { RemoveSession (connID); }); 536 } 537 538 void SSU2Server::AddSessionByRouterHash (std::shared_ptr<SSU2Session> session) 539 { 540 if (session) 541 { 542 auto ident = session->GetRemoteIdentity (); 543 if (ident) 544 { 545 std::shared_ptr<SSU2Session> oldSession; 546 { 547 std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex); 548 auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session); 549 if (!ret.second) 550 { 551 oldSession = ret.first->second.lock (); 552 // update session 553 ret.first->second = session; 554 } 555 } 556 if (oldSession && oldSession != session) 557 { 558 // session already exists 559 LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists"); 560 // move unsent msgs to new session 561 oldSession->MoveSendQueue (session); 562 // terminate existing 563 boost::asio::post (GetService (), std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession)); 564 } 565 } 566 } 567 } 568 569 bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr<SSU2Session> session) 570 { 571 if (!session) return false; 572 std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex); 573 return m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session).second; 574 } 575 576 std::shared_ptr<SSU2Session> SSU2Server::FindSession (const i2p::data::IdentHash& ident) 577 { 578 std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex); 579 auto it = m_SessionsByRouterHash.find (ident); 580 if (it != m_SessionsByRouterHash.end ()) 581 { 582 if (!it->second.expired ()) 583 { 584 auto s = it->second.lock (); 585 if (s && s->GetState () != eSSU2SessionStateTerminated) 586 return s; 587 } 588 m_SessionsByRouterHash.erase (it); 589 } 590 return nullptr; 591 } 592 593 std::shared_ptr<SSU2Session> SSU2Server::FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const 594 { 595 std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex); 596 auto it = m_PendingOutgoingSessions.find (ep); 597 if (it != m_PendingOutgoingSessions.end ()) 598 return it->second; 599 return nullptr; 600 } 601 602 void SSU2Server::RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) 603 { 604 std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex); 605 m_PendingOutgoingSessions.erase (ep); 606 } 607 608 std::shared_ptr<SSU2Session> SSU2Server::GetRandomPeerTestSession ( 609 i2p::data::RouterInfo::CompatibleTransports remoteTransports, const i2p::data::IdentHash& excluded) 610 { 611 if (m_Sessions.empty ()) return nullptr; 612 int ind = m_Rng () % m_Sessions.size (); 613 auto it = m_Sessions.begin (); 614 std::advance (it, ind); 615 while (it != m_Sessions.end ()) 616 { 617 if (it->second->IsEstablished () && (it->second->GetRemotePeerTestTransports () & remoteTransports) && 618 it->second->GetRemoteVersion () >= i2p::data::NETDB_MIN_PEER_TEST_VERSION && 619 it->second->GetRemoteIdentity ()->GetIdentHash () != excluded) 620 return it->second; 621 it++; 622 } 623 // not found, try from beginning 624 it = m_Sessions.begin (); 625 while (it != m_Sessions.end () && ind) 626 { 627 if (it->second->IsEstablished () && (it->second->GetRemotePeerTestTransports () & remoteTransports) && 628 it->second->GetRemoteVersion () >= i2p::data::NETDB_MIN_PEER_TEST_VERSION && 629 it->second->GetRemoteIdentity ()->GetIdentHash () != excluded) 630 return it->second; 631 it++; ind--; 632 } 633 return nullptr; 634 } 635 636 void SSU2Server::AddRelay (uint32_t tag, std::shared_ptr<SSU2Session> relay) 637 { 638 m_Relays.emplace (tag, relay); 639 } 640 641 void SSU2Server::RemoveRelay (uint32_t tag) 642 { 643 m_Relays.erase (tag); 644 } 645 646 std::shared_ptr<SSU2Session> SSU2Server::FindRelaySession (uint32_t tag) 647 { 648 auto it = m_Relays.find (tag); 649 if (it != m_Relays.end ()) 650 { 651 if (!it->second.expired ()) 652 { 653 auto s = it->second.lock (); 654 if (s && s->IsEstablished ()) 655 return s; 656 } 657 m_Relays.erase (it); 658 } 659 return nullptr; 660 } 661 662 bool SSU2Server::AddPeerTest (uint32_t nonce, std::shared_ptr<SSU2Session> aliceSession, uint64_t ts) 663 { 664 return m_PeerTests.emplace (nonce, std::pair{ aliceSession, ts }).second; 665 } 666 667 std::shared_ptr<SSU2Session> SSU2Server::GetPeerTest (uint32_t nonce) 668 { 669 auto it = m_PeerTests.find (nonce); 670 if (it != m_PeerTests.end ()) 671 { 672 auto s = it->second.first.lock (); 673 m_PeerTests.erase (it); 674 return s; 675 } 676 return nullptr; 677 } 678 679 bool SSU2Server::AddRequestedPeerTest (uint32_t nonce, std::shared_ptr<SSU2PeerTestSession> session, uint64_t ts) 680 { 681 return m_RequestedPeerTests.emplace (nonce, std::pair{ session, ts }).second; 682 } 683 684 std::shared_ptr<SSU2PeerTestSession> SSU2Server::GetRequestedPeerTest (uint32_t nonce) 685 { 686 auto it = m_RequestedPeerTests.find (nonce); 687 if (it != m_RequestedPeerTests.end ()) 688 { 689 auto s = it->second.first.lock (); 690 m_RequestedPeerTests.erase (it); 691 return s; 692 } 693 return nullptr; 694 } 695 696 void SSU2Server::ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint) 697 { 698 if (len < 24) return; 699 uint64_t connID; 700 memcpy (&connID, buf, 8); 701 connID ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24)); 702 if (!m_LastSession || m_LastSession->GetConnID () != connID) 703 { 704 if (m_LastSession) m_LastSession->FlushData (); 705 auto it = m_Sessions.find (connID); 706 if (it != m_Sessions.end ()) 707 m_LastSession = it->second; 708 else 709 m_LastSession = nullptr; 710 } 711 if (m_LastSession) 712 { 713 switch (m_LastSession->GetState ()) 714 { 715 case eSSU2SessionStateEstablished: 716 case eSSU2SessionStateSessionConfirmedSent: 717 m_LastSession->ProcessData (buf, len, senderEndpoint); 718 break; 719 case eSSU2SessionStateSessionCreatedSent: 720 if (!m_LastSession->ProcessSessionConfirmed (buf, len)) 721 { 722 m_LastSession->Done (); 723 m_LastSession = nullptr; 724 } 725 break; 726 case eSSU2SessionStateIntroduced: 727 if (m_LastSession->GetRemoteEndpoint ().address ().is_unspecified ()) 728 m_LastSession->SetRemoteEndpoint (senderEndpoint); 729 if (m_LastSession->GetRemoteEndpoint ().address () == senderEndpoint.address ()) // port might be different 730 m_LastSession->ProcessHolePunch (buf, len); 731 else 732 { 733 LogPrint (eLogWarning, "SSU2: HolePunch address ", senderEndpoint.address (), 734 " doesn't match RelayResponse ", m_LastSession->GetRemoteEndpoint ().address ()); 735 m_LastSession->Done (); 736 m_LastSession = nullptr; 737 } 738 break; 739 case eSSU2SessionStatePeerTest: 740 m_LastSession->SetRemoteEndpoint (senderEndpoint); 741 m_LastSession->ProcessPeerTest (buf, len); 742 break; 743 case eSSU2SessionStateHolePunch: 744 m_LastSession->ProcessFirstIncomingMessage (connID, buf, len); // SessionRequest 745 break; 746 case eSSU2SessionStateClosing: 747 m_LastSession->ProcessData (buf, len, senderEndpoint); // we might receive termintaion block 748 if (m_LastSession && m_LastSession->GetState () == eSSU2SessionStateClosing) 749 m_LastSession->RequestTermination (eSSU2TerminationReasonIdleTimeout); // send termination again 750 break; 751 case eSSU2SessionStateClosingConfirmed: 752 case eSSU2SessionStateTerminated: 753 m_LastSession = nullptr; 754 break; 755 default: 756 LogPrint (eLogWarning, "SSU2: Invalid session state ", (int)m_LastSession->GetState ()); 757 } 758 } 759 else 760 { 761 // check pending sessions if it's SessionCreated or Retry 762 auto it1 = m_PendingOutgoingSessions.find (senderEndpoint); 763 if (it1 != m_PendingOutgoingSessions.end ()) 764 { 765 if (it1->second->GetState () == eSSU2SessionStateSessionRequestSent && 766 it1->second->ProcessSessionCreated (buf, len)) 767 { 768 std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex); 769 m_PendingOutgoingSessions.erase (it1); // we are done with that endpoint 770 } 771 else 772 it1->second->ProcessRetry (buf, len); 773 } 774 else if (!i2p::transport::transports.IsInReservedRange(senderEndpoint.address ()) && senderEndpoint.port () && 775 !i2p::transport::transports.IsBanned (senderEndpoint.address ())) 776 { 777 // assume new incoming session 778 auto queueSize = m_ReceivedPacketsQueue.size (); 779 if (queueSize < SSU2_STOP_ACCEPTING_NEW_SESSIONS_QUEUE_SIZE) 780 { 781 auto session = std::make_shared<SSU2Session> (*this); 782 session->SetRemoteEndpoint (senderEndpoint); 783 session->ProcessFirstIncomingMessage (connID, buf, len); 784 } 785 else 786 LogPrint (eLogWarning, "SSU2: Incoming session dropped from ", senderEndpoint, ". Queue size ", queueSize, " exceeds ", SSU2_STOP_ACCEPTING_NEW_SESSIONS_QUEUE_SIZE); 787 } 788 else 789 LogPrint (eLogWarning, "SSU2: Incoming packet received from invalid or banned endpoint ", senderEndpoint); 790 } 791 } 792 793 void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * payload, size_t payloadLen, 794 const boost::asio::ip::udp::endpoint& to) 795 { 796 if (m_IsThroughProxy) 797 { 798 SendThroughProxy (header, headerLen, nullptr, 0, payload, payloadLen, to); 799 return; 800 } 801 802 std::vector<boost::asio::const_buffer> bufs 803 { 804 boost::asio::buffer (header, headerLen), 805 boost::asio::buffer (payload, payloadLen) 806 }; 807 808 boost::system::error_code ec; 809 if (to.address ().is_v6 ()) 810 { 811 if (!m_SocketV6.is_open ()) return; 812 m_SocketV6.send_to (bufs, to, 0, ec); 813 } 814 else 815 { 816 if (!m_SocketV4.is_open ()) return; 817 m_SocketV4.send_to (bufs, to, 0, ec); 818 } 819 820 if (!ec) 821 i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen); 822 else 823 { 824 LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError, 825 "SSU2: Send exception: ", ec.message (), " to ", to); 826 } 827 } 828 829 void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * headerX, size_t headerXLen, 830 const uint8_t * payload, size_t payloadLen, const boost::asio::ip::udp::endpoint& to) 831 { 832 if (m_IsThroughProxy) 833 { 834 SendThroughProxy (header, headerLen, headerX, headerXLen, payload, payloadLen, to); 835 return; 836 } 837 838 std::vector<boost::asio::const_buffer> bufs 839 { 840 boost::asio::buffer (header, headerLen), 841 boost::asio::buffer (headerX, headerXLen), 842 boost::asio::buffer (payload, payloadLen) 843 }; 844 845 boost::system::error_code ec; 846 if (to.address ().is_v6 ()) 847 { 848 if (!m_SocketV6.is_open ()) return; 849 m_SocketV6.send_to (bufs, to, 0, ec); 850 } 851 else 852 { 853 if (!m_SocketV4.is_open ()) return; 854 m_SocketV4.send_to (bufs, to, 0, ec); 855 } 856 857 if (!ec) 858 i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen); 859 else 860 { 861 LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError, 862 "SSU2: Send exception: ", ec.message (), " to ", to); 863 } 864 } 865 866 bool SSU2Server::CheckPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep, bool peerTest) 867 { 868 auto s = FindPendingOutgoingSession (ep); 869 if (s) 870 { 871 if (peerTest) 872 { 873 // if peer test requested add it to the list for pending session 874 auto onEstablished = s->GetOnEstablished (); 875 if (onEstablished) 876 s->SetOnEstablished ([s, onEstablished]() 877 { 878 onEstablished (); 879 s->SendPeerTest (); 880 }); 881 else 882 s->SetOnEstablished ([s]() { s->SendPeerTest (); }); 883 } 884 return true; 885 } 886 return false; 887 } 888 889 bool SSU2Server::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, 890 std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest) 891 { 892 if (router && address) 893 { 894 // check if no session 895 auto existingSession = FindSession (router->GetIdentHash ()); 896 if (existingSession) 897 { 898 // session with router found, trying to send peer test if requested 899 if (peerTest && existingSession->IsEstablished ()) 900 boost::asio::post (GetService (), [existingSession]() { existingSession->SendPeerTest (); }); 901 return false; 902 } 903 // check is no pending session 904 bool isValidEndpoint = !address->host.is_unspecified () && address->port; 905 if (isValidEndpoint) 906 { 907 if (i2p::transport::transports.IsInReservedRange(address->host)) return false; 908 if (CheckPendingOutgoingSession (boost::asio::ip::udp::endpoint (address->host, address->port), peerTest)) return false; 909 } 910 911 auto session = std::make_shared<SSU2Session> (*this, router, address); 912 if (!isValidEndpoint && router->HasProfile () && router->GetProfile ()->HasLastEndpoint (address->IsV4 ())) 913 { 914 // router doesn't publish endpoint, but we connected before and hole punch might be alive 915 auto ep = router->GetProfile ()->GetLastEndpoint (); 916 if (IsConnectedRecently (ep, false)) 917 { 918 if (CheckPendingOutgoingSession (ep, peerTest)) return false; 919 session->SetRemoteEndpoint (ep); 920 isValidEndpoint = true; 921 } 922 } 923 if (peerTest) 924 session->SetOnEstablished ([session]() {session->SendPeerTest (); }); 925 926 if (isValidEndpoint) // we know endpoint 927 boost::asio::post (GetService (), [session]() { session->Connect (); }); 928 else if (address->UsesIntroducer ()) // we don't know endpoint yet 929 boost::asio::post (GetService (), std::bind (&SSU2Server::ConnectThroughIntroducer, this, session)); 930 else 931 return false; 932 } 933 else 934 return false; 935 return true; 936 } 937 938 void SSU2Server::ConnectThroughIntroducer (std::shared_ptr<SSU2Session> session) 939 { 940 if (!session) return; 941 auto address = session->GetAddress (); 942 if (!address) return; 943 session->WaitForIntroduction (); 944 auto ts = i2p::util::GetSecondsSinceEpoch (); 945 std::vector<int> indices; int i = 0; 946 // try to find existing session first 947 for (auto& it: address->ssu->introducers) 948 { 949 if (it.iTag && ts < it.iExp) 950 { 951 auto s = FindSession (it.iH); 952 if (s) 953 { 954 auto addr = s->GetAddress (); 955 if (addr && addr->IsIntroducer ()) 956 { 957 s->Introduce (session, it.iTag); 958 return; 959 } 960 } 961 else 962 indices.push_back(i); 963 } 964 i++; 965 } 966 // we have to start a new session to an introducer 967 std::vector<i2p::data::IdentHash> newRouters; 968 std::shared_ptr<i2p::data::RouterInfo> r; 969 std::shared_ptr<const i2p::data::RouterInfo::Address> addr; 970 uint32_t relayTag = 0; 971 if (!indices.empty ()) 972 { 973 if (indices.size () > 1) 974 std::shuffle (indices.begin(), indices.end(), m_Rng); 975 976 for (auto ind: indices) 977 { 978 const auto& introducer = address->ssu->introducers[ind]; 979 // introducer is not expired, because in indices 980 r = i2p::data::netdb.FindRouter (introducer.iH); 981 if (r) 982 { 983 if (r->IsPublishedOn (i2p::context.GetRouterInfo ().GetCompatibleTransports (false) & // outgoing 984 (i2p::data::RouterInfo::eSSU2V4 | i2p::data::RouterInfo::eSSU2V6))) 985 { 986 relayTag = introducer.iTag; 987 addr = address->IsV6 () ? r->GetSSU2V6Address () : r->GetSSU2V4Address (); 988 if (addr && addr->IsIntroducer () && !addr->host.is_unspecified () && addr->port && 989 !i2p::transport::transports.IsInReservedRange(addr->host)) 990 break; 991 else 992 { 993 // address is invalid or not intrudcer, try another SSU2 address if exists 994 if (address->IsV4 ()) 995 { 996 if (i2p::context.SupportsV6 ()) 997 addr = r->GetSSU2V6Address (); 998 } 999 else 1000 { 1001 if (i2p::context.SupportsV4 ()) 1002 addr = r->GetSSU2V4Address (); 1003 } 1004 if (addr && addr->IsIntroducer () && !addr->host.is_unspecified () && addr->port && 1005 !i2p::transport::transports.IsInReservedRange(addr->host)) 1006 break; 1007 else 1008 { 1009 // all addresses are invalid, try next introducer 1010 relayTag = 0; 1011 addr = nullptr; 1012 r = nullptr; 1013 } 1014 } 1015 } 1016 else 1017 r = nullptr; 1018 } 1019 else if (!i2p::data::IsRouterBanned (introducer.iH)) 1020 newRouters.push_back (introducer.iH); 1021 } 1022 } 1023 if (r) 1024 { 1025 if (relayTag && addr) 1026 { 1027 // introducer and tag found connect to it through SSU2 1028 auto s = FindPendingOutgoingSession (boost::asio::ip::udp::endpoint (addr->host, addr->port)); 1029 if (!s) 1030 { 1031 s = std::make_shared<SSU2Session> (*this, r, addr); 1032 s->SetOnEstablished ([session, s, relayTag]() { s->Introduce (session, relayTag); }); 1033 s->Connect (); 1034 } 1035 else 1036 { 1037 auto onEstablished = s->GetOnEstablished (); 1038 if (onEstablished) 1039 s->SetOnEstablished ([session, s, relayTag, onEstablished]() 1040 { 1041 onEstablished (); 1042 s->Introduce (session, relayTag); 1043 }); 1044 else 1045 s->SetOnEstablished ([session, s, relayTag]() {s->Introduce (session, relayTag); }); 1046 } 1047 } 1048 else 1049 session->Done (); 1050 } 1051 else 1052 { 1053 // introducers not found, try to request them 1054 for (auto& it: newRouters) 1055 i2p::data::netdb.RequestDestination (it); 1056 session->Done (); // don't wait for connect timeout 1057 } 1058 } 1059 1060 bool SSU2Server::StartPeerTest (std::shared_ptr<const i2p::data::RouterInfo> router, bool v4) 1061 { 1062 if (!router) return false; 1063 auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address (); 1064 if (!addr) return false; 1065 auto session = FindSession (router->GetIdentHash ()); 1066 if (session) 1067 { 1068 auto remoteAddr = session->GetAddress (); 1069 if (!remoteAddr || !remoteAddr->IsPeerTesting () || 1070 (v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false; 1071 if (session->IsEstablished ()) 1072 boost::asio::post (GetService (), [session]() { session->SendPeerTest (); }); 1073 else 1074 session->SetOnEstablished ([session]() { session->SendPeerTest (); }); 1075 return true; 1076 } 1077 else 1078 CreateSession (router, addr, true); 1079 return true; 1080 } 1081 1082 void SSU2Server::ScheduleTermination () 1083 { 1084 m_TerminationTimer.expires_from_now (boost::posix_time::seconds( 1085 SSU2_TERMINATION_CHECK_TIMEOUT + m_Rng () % SSU2_TERMINATION_CHECK_TIMEOUT_VARIANCE)); 1086 m_TerminationTimer.async_wait (std::bind (&SSU2Server::HandleTerminationTimer, 1087 this, std::placeholders::_1)); 1088 } 1089 1090 void SSU2Server::HandleTerminationTimer (const boost::system::error_code& ecode) 1091 { 1092 if (ecode != boost::asio::error::operation_aborted) 1093 { 1094 auto ts = i2p::util::GetSecondsSinceEpoch (); 1095 1096 { 1097 std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex); 1098 for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();) 1099 { 1100 if (it->second->IsTerminationTimeoutExpired (ts)) 1101 { 1102 //it->second->Terminate (); 1103 it = m_PendingOutgoingSessions.erase (it); 1104 } 1105 else 1106 it++; 1107 } 1108 } 1109 1110 for (auto it: m_Sessions) 1111 { 1112 auto state = it.second->GetState (); 1113 if (state == eSSU2SessionStateTerminated || state == eSSU2SessionStateClosing) 1114 it.second->Done (); 1115 else if (it.second->IsTerminationTimeoutExpired (ts)) 1116 { 1117 if (it.second->IsEstablished ()) 1118 it.second->RequestTermination (eSSU2TerminationReasonIdleTimeout); 1119 else 1120 it.second->Done (); 1121 } 1122 else 1123 it.second->CleanUp (ts); 1124 } 1125 1126 ScheduleTermination (); 1127 } 1128 } 1129 1130 void SSU2Server::ScheduleCleanup () 1131 { 1132 m_CleanupTimer.expires_from_now (boost::posix_time::seconds(SSU2_CLEANUP_INTERVAL)); 1133 m_CleanupTimer.async_wait (std::bind (&SSU2Server::HandleCleanupTimer, 1134 this, std::placeholders::_1)); 1135 } 1136 1137 void SSU2Server::HandleCleanupTimer (const boost::system::error_code& ecode) 1138 { 1139 if (ecode != boost::asio::error::operation_aborted) 1140 { 1141 auto ts = i2p::util::GetSecondsSinceEpoch (); 1142 for (auto it = m_Relays.begin (); it != m_Relays.begin ();) 1143 { 1144 if (it->second.expired ()) 1145 it = m_Relays.erase (it); 1146 else 1147 it++; 1148 } 1149 1150 for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();) 1151 { 1152 if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT || it->second.first.expired ()) 1153 { 1154 LogPrint (eLogInfo, "SSU2: Peer test nonce ", it->first, " was not responded in ", SSU2_PEER_TEST_EXPIRATION_TIMEOUT, " seconds or session invalid. Deleted"); 1155 it = m_PeerTests.erase (it); 1156 } 1157 else 1158 it++; 1159 } 1160 1161 for (auto it = m_IncomingTokens.begin (); it != m_IncomingTokens.end (); ) 1162 { 1163 if (ts > it->second.second) 1164 it = m_IncomingTokens.erase (it); 1165 else 1166 it++; 1167 } 1168 1169 for (auto it = m_OutgoingTokens.begin (); it != m_OutgoingTokens.end (); ) 1170 { 1171 if (ts > it->second.second) 1172 it = m_OutgoingTokens.erase (it); 1173 else 1174 it++; 1175 } 1176 1177 for (auto it = m_ConnectedRecently.begin (); it != m_ConnectedRecently.end (); ) 1178 { 1179 if (ts > it->second + SSU2_MAX_HOLE_PUNCH_EXPIRATION) 1180 it = m_ConnectedRecently.erase (it); 1181 else 1182 it++; 1183 } 1184 1185 for (auto it = m_RequestedPeerTests.begin (); it != m_RequestedPeerTests.end ();) 1186 { 1187 if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT) 1188 it = m_RequestedPeerTests.erase (it); 1189 else 1190 it++; 1191 } 1192 1193 { 1194 std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex); 1195 for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();) 1196 { 1197 if (it->second.expired ()) 1198 it = m_SessionsByRouterHash.erase (it); 1199 else 1200 it++; 1201 } 1202 } 1203 1204 m_PacketsPool.CleanUpMt (); 1205 m_SentPacketsPool.CleanUp (); 1206 m_IncompleteMessagesPool.CleanUp (); 1207 m_FragmentsPool.CleanUp (); 1208 ScheduleCleanup (); 1209 } 1210 } 1211 1212 void SSU2Server::ScheduleResend (bool more) 1213 { 1214 m_ResendTimer.expires_from_now (boost::posix_time::milliseconds (more ? 1215 (SSU2_RESEND_CHECK_MORE_TIMEOUT + m_Rng () % SSU2_RESEND_CHECK_MORE_TIMEOUT_VARIANCE): 1216 (SSU2_RESEND_CHECK_TIMEOUT + m_Rng () % SSU2_RESEND_CHECK_TIMEOUT_VARIANCE))); 1217 m_ResendTimer.async_wait (std::bind (&SSU2Server::HandleResendTimer, 1218 this, std::placeholders::_1)); 1219 } 1220 1221 void SSU2Server::HandleResendTimer (const boost::system::error_code& ecode) 1222 { 1223 if (ecode != boost::asio::error::operation_aborted) 1224 { 1225 size_t resentPacketsNum = 0; 1226 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1227 for (auto it: m_Sessions) 1228 { 1229 if (ts >= it.second->GetLastResendTime () + SSU2_RESEND_CHECK_TIMEOUT) 1230 resentPacketsNum += it.second->Resend (ts); 1231 if (resentPacketsNum > SSU2_MAX_RESEND_PACKETS) break; 1232 } 1233 for (auto it: m_PendingOutgoingSessions) 1234 it.second->Resend (ts); 1235 ScheduleResend (resentPacketsNum > SSU2_MAX_RESEND_PACKETS); 1236 } 1237 } 1238 1239 void SSU2Server::UpdateOutgoingToken (const boost::asio::ip::udp::endpoint& ep, uint64_t token, uint32_t exp) 1240 { 1241 m_OutgoingTokens[ep] = {token, exp}; 1242 } 1243 1244 uint64_t SSU2Server::FindOutgoingToken (const boost::asio::ip::udp::endpoint& ep) 1245 { 1246 auto it = m_OutgoingTokens.find (ep); 1247 if (it != m_OutgoingTokens.end ()) 1248 { 1249 if (i2p::util::GetSecondsSinceEpoch () + SSU2_TOKEN_EXPIRATION_THRESHOLD > it->second.second) 1250 { 1251 // token expired 1252 m_OutgoingTokens.erase (it); 1253 return 0; 1254 } 1255 return it->second.first; 1256 } 1257 return 0; 1258 } 1259 1260 uint64_t SSU2Server::GetIncomingToken (const boost::asio::ip::udp::endpoint& ep) 1261 { 1262 auto ts = i2p::util::GetSecondsSinceEpoch (); 1263 auto it = m_IncomingTokens.find (ep); 1264 if (it != m_IncomingTokens.end ()) 1265 { 1266 if (ts + SSU2_TOKEN_EXPIRATION_THRESHOLD <= it->second.second) 1267 return it->second.first; 1268 else // token expired 1269 m_IncomingTokens.erase (it); 1270 } 1271 uint64_t token; 1272 RAND_bytes ((uint8_t *)&token, 8); 1273 if (!token) token = 1; // token can't be zero 1274 m_IncomingTokens.try_emplace (ep, token, uint32_t(ts + SSU2_TOKEN_EXPIRATION_TIMEOUT)); 1275 return token; 1276 } 1277 1278 std::pair<uint64_t, uint32_t> SSU2Server::NewIncomingToken (const boost::asio::ip::udp::endpoint& ep) 1279 { 1280 uint64_t token; 1281 RAND_bytes ((uint8_t *)&token, 8); 1282 if (!token) token = 1; // token can't be zero 1283 uint32_t expires = i2p::util::GetSecondsSinceEpoch () + SSU2_NEXT_TOKEN_EXPIRATION_TIMEOUT; 1284 auto [it, inserted] = m_IncomingTokens.try_emplace (ep, token, expires); 1285 if (!inserted) 1286 it->second = { token, expires }; // override 1287 return it->second; 1288 } 1289 1290 std::vector<std::shared_ptr<SSU2Session> > SSU2Server::FindIntroducers (int maxNumIntroducers, 1291 bool v4, const std::unordered_set<i2p::data::IdentHash>& excluded) 1292 { 1293 std::vector<std::shared_ptr<SSU2Session> > ret; 1294 if (maxNumIntroducers <= 0 || m_Sessions.empty ()) return ret; 1295 1296 std::vector<std::shared_ptr<SSU2Session> > eligible; 1297 eligible.reserve (m_Sessions.size ()/2); 1298 auto ts = i2p::util::GetSecondsSinceEpoch (); 1299 for (const auto& s : m_Sessions) 1300 { 1301 if (s.second->IsEstablished () && (s.second->GetRelayTag () && s.second->IsOutgoing ()) && 1302 ts < s.second->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION/2 && 1303 !excluded.count (s.second->GetRemoteIdentity ()->GetIdentHash ()) && 1304 ((v4 && (s.second->GetRemoteTransports () & i2p::data::RouterInfo::eSSU2V4)) || 1305 (!v4 && (s.second->GetRemoteTransports () & i2p::data::RouterInfo::eSSU2V6)))) 1306 eligible.push_back (s.second); 1307 } 1308 1309 if (eligible.size () <= (size_t)maxNumIntroducers) 1310 return eligible; 1311 else 1312 std::sample (eligible.begin(), eligible.end(), std::back_inserter(ret), maxNumIntroducers, m_Rng); 1313 return ret; 1314 } 1315 1316 void SSU2Server::UpdateIntroducers (bool v4) 1317 { 1318 uint32_t ts = i2p::util::GetSecondsSinceEpoch (); 1319 std::list<std::pair<i2p::data::IdentHash, uint32_t> > newList, impliedList; 1320 auto& introducers = v4 ? m_Introducers : m_IntroducersV6; 1321 std::unordered_set<i2p::data::IdentHash> excluded; 1322 for (const auto& [ident, tag] : introducers) 1323 { 1324 std::shared_ptr<SSU2Session> session = FindSession (ident); 1325 if (session) 1326 excluded.insert (ident); 1327 if (session) 1328 { 1329 if (session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer? 1330 ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION) 1331 { 1332 session->SendKeepAlive (); 1333 if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION) 1334 { 1335 newList.push_back ({ident, session->GetRelayTag ()}); 1336 if (tag != session->GetRelayTag ()) 1337 { 1338 LogPrint (eLogDebug, "SSU2: Introducer session to ", session->GetIdentHashBase64() , " was replaced. iTag ", tag, "->", session->GetRelayTag ()); 1339 i2p::context.UpdateSSU2Introducer (ident, v4, session->GetRelayTag (), 1340 session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION); 1341 } 1342 } 1343 else 1344 { 1345 impliedList.push_back ({ident, session->GetRelayTag ()}); // keep in introducers list, but not publish 1346 session = nullptr; 1347 } 1348 } 1349 else 1350 session = nullptr; 1351 } 1352 1353 if (!session) 1354 i2p::context.RemoveSSU2Introducer (ident, v4); 1355 } 1356 int numOldSessions = 0; 1357 if (newList.size () < SSU2_MAX_NUM_INTRODUCERS) 1358 { 1359 auto sessions = FindIntroducers (SSU2_MAX_NUM_INTRODUCERS - newList.size (), v4, excluded); 1360 if (sessions.empty () && !impliedList.empty ()) 1361 { 1362 LogPrint (eLogDebug, "SSU2: No new introducers found. Trying to reuse existing"); 1363 for (const auto& it : impliedList) 1364 { 1365 auto session = FindSession (it.first); 1366 if (session) 1367 { 1368 if (std::find_if (newList.begin (), newList.end (), 1369 [&ident = it.first](const auto& s){ return ident == s.first; }) == newList.end ()) 1370 { 1371 sessions.push_back (session); 1372 numOldSessions++; 1373 } 1374 } 1375 } 1376 impliedList.clear (); 1377 } 1378 1379 for (const auto& it : sessions) 1380 { 1381 uint32_t tag = it->GetRelayTag (); 1382 uint32_t exp = it->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION; 1383 if (!tag && ts >= exp) 1384 continue; // don't publish expired introducer 1385 i2p::data::RouterInfo::Introducer introducer; 1386 introducer.iTag = tag; 1387 introducer.iH = it->GetRemoteIdentity ()->GetIdentHash (); 1388 introducer.iExp = exp; 1389 excluded.insert (it->GetRemoteIdentity ()->GetIdentHash ()); 1390 if (i2p::context.AddSSU2Introducer (introducer, v4)) 1391 { 1392 LogPrint (eLogDebug, "SSU2: Introducer added ", it->GetRelayTag (), " at ", 1393 i2p::data::GetIdentHashAbbreviation (it->GetRemoteIdentity ()->GetIdentHash ())); 1394 newList.push_back ({ it->GetRemoteIdentity ()->GetIdentHash (), tag }); 1395 it->SendKeepAlive (); 1396 if (newList.size () >= SSU2_MAX_NUM_INTRODUCERS) break; 1397 } 1398 } 1399 } 1400 introducers = newList; 1401 1402 if (introducers.size () < SSU2_MAX_NUM_INTRODUCERS || numOldSessions) 1403 { 1404 // we need to create more sessions with relay tag 1405 1406 // exclude all existing sessions 1407 excluded.clear (); 1408 { 1409 std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex); 1410 for (const auto& [ident, s] : m_SessionsByRouterHash) 1411 excluded.insert (ident); 1412 } 1413 1414 // session about to expire are not counted 1415 for (auto i = introducers.size (); i < SSU2_MAX_NUM_INTRODUCERS + numOldSessions; i++) 1416 { 1417 auto introducer = i2p::data::netdb.GetRandomSSU2Introducer (v4, excluded); 1418 if (introducer) 1419 { 1420 auto address = v4 ? introducer->GetSSU2V4Address () : introducer->GetSSU2V6Address (); 1421 if (address) 1422 { 1423 CreateSession (introducer, address); 1424 excluded.insert (introducer->GetIdentHash ()); 1425 } 1426 } 1427 else 1428 { 1429 LogPrint (eLogDebug, "SSU2: Can't find more introducers"); 1430 break; 1431 } 1432 } 1433 } 1434 introducers.splice (introducers.end (), impliedList); // insert non-published, but non-expired introducers back 1435 } 1436 1437 void SSU2Server::ScheduleIntroducersUpdateTimer () 1438 { 1439 if (m_IsPublished) 1440 { 1441 m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds( 1442 SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)); 1443 m_IntroducersUpdateTimer.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer, 1444 this, std::placeholders::_1, true)); 1445 } 1446 } 1447 1448 void SSU2Server::RescheduleIntroducersUpdateTimer () 1449 { 1450 if (m_IsPublished) 1451 { 1452 m_IntroducersUpdateTimer.cancel (); 1453 i2p::context.ClearSSU2Introducers (true); 1454 m_Introducers.clear (); 1455 m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds( 1456 (SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)/2)); 1457 m_IntroducersUpdateTimer.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer, 1458 this, std::placeholders::_1, true)); 1459 } 1460 } 1461 1462 void SSU2Server::ScheduleIntroducersUpdateTimerV6 () 1463 { 1464 if (m_IsPublished) 1465 { 1466 m_IntroducersUpdateTimerV6.expires_from_now (boost::posix_time::seconds( 1467 SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)); 1468 m_IntroducersUpdateTimerV6.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer, 1469 this, std::placeholders::_1, false)); 1470 } 1471 } 1472 1473 void SSU2Server::RescheduleIntroducersUpdateTimerV6 () 1474 { 1475 if (m_IsPublished) 1476 { 1477 m_IntroducersUpdateTimerV6.cancel (); 1478 i2p::context.ClearSSU2Introducers (false); 1479 m_IntroducersV6.clear (); 1480 m_IntroducersUpdateTimerV6.expires_from_now (boost::posix_time::seconds( 1481 (SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)/2)); 1482 m_IntroducersUpdateTimerV6.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer, 1483 this, std::placeholders::_1, false)); 1484 } 1485 } 1486 1487 void SSU2Server::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode, bool v4) 1488 { 1489 if (ecode != boost::asio::error::operation_aborted) 1490 { 1491 // timeout expired 1492 if (v4) 1493 { 1494 if (i2p::context.GetTesting ()) 1495 { 1496 // we still don't know if we need introducers 1497 ScheduleIntroducersUpdateTimer (); 1498 return; 1499 } 1500 if (i2p::context.GetStatus () != eRouterStatusFirewalled) 1501 { 1502 // we don't need introducers 1503 i2p::context.ClearSSU2Introducers (true); 1504 m_Introducers.clear (); 1505 return; 1506 } 1507 // we are firewalled 1508 auto addr = i2p::context.GetRouterInfo ().GetSSU2V4Address (); 1509 if (addr && addr->ssu && addr->ssu->introducers.empty ()) 1510 i2p::context.SetUnreachable (true, false); // v4 1511 1512 UpdateIntroducers (true); 1513 ScheduleIntroducersUpdateTimer (); 1514 } 1515 else 1516 { 1517 if (i2p::context.GetTestingV6 ()) 1518 { 1519 // we still don't know if we need introducers 1520 ScheduleIntroducersUpdateTimerV6 (); 1521 return; 1522 } 1523 if (i2p::context.GetStatusV6 () != eRouterStatusFirewalled) 1524 { 1525 // we don't need introducers 1526 i2p::context.ClearSSU2Introducers (false); 1527 m_IntroducersV6.clear (); 1528 return; 1529 } 1530 // we are firewalled 1531 auto addr = i2p::context.GetRouterInfo ().GetSSU2V6Address (); 1532 if (addr && addr->ssu && addr->ssu->introducers.empty ()) 1533 i2p::context.SetUnreachable (false, true); // v6 1534 1535 UpdateIntroducers (false); 1536 ScheduleIntroducersUpdateTimerV6 (); 1537 } 1538 } 1539 } 1540 1541 bool SSU2Server::AEADChaCha20Poly1305Encrypt (const uint8_t * msg, size_t msgLen, 1542 const uint8_t * ad, size_t adLen, const uint8_t * key, const uint8_t * nonce, uint8_t * buf, size_t len) 1543 { 1544 return m_Encryptor.Encrypt (msg, msgLen, ad, adLen, key, nonce, buf, len); 1545 } 1546 1547 bool SSU2Server::AEADChaCha20Poly1305Decrypt (const uint8_t * msg, size_t msgLen, 1548 const uint8_t * ad, size_t adLen, const uint8_t * key, const uint8_t * nonce, uint8_t * buf, size_t len) 1549 { 1550 return m_Decryptor.Decrypt (msg, msgLen, ad, adLen, key, nonce, buf, len); 1551 } 1552 1553 void SSU2Server::ChaCha20 (const uint8_t * msg, size_t msgLen, const uint8_t * key, const uint8_t * nonce, uint8_t * out) 1554 { 1555 m_ChaCha20 (msg, msgLen, key, nonce, out); 1556 } 1557 1558 void SSU2Server::SendThroughProxy (const uint8_t * header, size_t headerLen, const uint8_t * headerX, size_t headerXLen, 1559 const uint8_t * payload, size_t payloadLen, const boost::asio::ip::udp::endpoint& to) 1560 { 1561 if (!m_ProxyRelayEndpoint) return; 1562 size_t requestHeaderSize = 0; 1563 memset (m_UDPRequestHeader, 0, 3); 1564 if (to.address ().is_v6 ()) 1565 { 1566 m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV6; 1567 memcpy (m_UDPRequestHeader + 4, to.address ().to_v6().to_bytes().data(), 16); 1568 requestHeaderSize = SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE; 1569 } 1570 else 1571 { 1572 m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV4; 1573 memcpy (m_UDPRequestHeader + 4, to.address ().to_v4().to_bytes().data(), 4); 1574 requestHeaderSize = SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE; 1575 } 1576 htobe16buf (m_UDPRequestHeader + requestHeaderSize - 2, to.port ()); 1577 1578 std::vector<boost::asio::const_buffer> bufs; 1579 bufs.push_back (boost::asio::buffer (m_UDPRequestHeader, requestHeaderSize)); 1580 bufs.push_back (boost::asio::buffer (header, headerLen)); 1581 if (headerX) bufs.push_back (boost::asio::buffer (headerX, headerXLen)); 1582 bufs.push_back (boost::asio::buffer (payload, payloadLen)); 1583 1584 boost::system::error_code ec; 1585 m_SocketV4.send_to (bufs, *m_ProxyRelayEndpoint, 0, ec); // TODO: implement ipv6 proxy 1586 if (!ec) 1587 i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen); 1588 else 1589 LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to); 1590 } 1591 1592 void SSU2Server::ProcessNextPacketFromProxy (uint8_t * buf, size_t len) 1593 { 1594 if (buf[2]) // FRAG 1595 { 1596 LogPrint (eLogWarning, "SSU2: Proxy packet fragmentation is not supported"); 1597 return; 1598 } 1599 size_t offset = 0; 1600 boost::asio::ip::udp::endpoint ep; 1601 switch (buf[3]) // ATYP 1602 { 1603 case SOCKS5_ATYP_IPV4: 1604 { 1605 offset = SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE; 1606 if (offset > len) return; 1607 boost::asio::ip::address_v4::bytes_type bytes; 1608 memcpy (bytes.data (), buf + 4, 4); 1609 uint16_t port = bufbe16toh (buf + 8); 1610 ep = boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4 (bytes), port); 1611 break; 1612 } 1613 case SOCKS5_ATYP_IPV6: 1614 { 1615 offset = SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE; 1616 if (offset > len) return; 1617 boost::asio::ip::address_v6::bytes_type bytes; 1618 memcpy (bytes.data (), buf + 4, 16); 1619 uint16_t port = bufbe16toh (buf + 20); 1620 ep = boost::asio::ip::udp::endpoint (boost::asio::ip::address_v6 (bytes), port); 1621 break; 1622 } 1623 default: 1624 { 1625 LogPrint (eLogWarning, "SSU2: Unknown ATYP ", (int)buf[3], " from proxy relay"); 1626 return; 1627 } 1628 } 1629 ProcessNextPacket (buf + offset, len - offset, ep); 1630 } 1631 1632 void SSU2Server::ConnectToProxy () 1633 { 1634 if (!m_ProxyEndpoint) return; 1635 m_UDPAssociateSocket.reset (new boost::asio::ip::tcp::socket (m_ReceiveService.GetService ())); 1636 m_UDPAssociateSocket->async_connect (*m_ProxyEndpoint, 1637 [this] (const boost::system::error_code& ecode) 1638 { 1639 if (ecode) 1640 { 1641 LogPrint (eLogError, "SSU2: Can't connect to proxy ", *m_ProxyEndpoint, " ", ecode.message ()); 1642 m_UDPAssociateSocket.reset (nullptr); 1643 ReconnectToProxy (); 1644 } 1645 else 1646 HandshakeWithProxy (); 1647 }); 1648 } 1649 1650 void SSU2Server::HandshakeWithProxy () 1651 { 1652 if (!m_UDPAssociateSocket) return; 1653 m_UDPRequestHeader[0] = SOCKS5_VER; 1654 m_UDPRequestHeader[1] = 1; // 1 method 1655 m_UDPRequestHeader[2] = 0; // no authentication 1656 boost::asio::async_write (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, 3), boost::asio::transfer_all(), 1657 [this] (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1658 { 1659 (void) bytes_transferred; 1660 if (ecode) 1661 { 1662 LogPrint(eLogError, "SSU2: Proxy write error ", ecode.message()); 1663 m_UDPAssociateSocket.reset (nullptr); 1664 ReconnectToProxy (); 1665 } 1666 else 1667 ReadHandshakeWithProxyReply (); 1668 }); 1669 } 1670 1671 void SSU2Server::ReadHandshakeWithProxyReply () 1672 { 1673 if (!m_UDPAssociateSocket) return; 1674 boost::asio::async_read (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, 2), boost::asio::transfer_all(), 1675 [this] (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1676 { 1677 (void) bytes_transferred; 1678 if (ecode) 1679 { 1680 LogPrint(eLogError, "SSU2: Proxy read error ", ecode.message()); 1681 m_UDPAssociateSocket.reset (nullptr); 1682 ReconnectToProxy (); 1683 } 1684 else 1685 { 1686 if (m_UDPRequestHeader[0] == SOCKS5_VER && !m_UDPRequestHeader[1]) 1687 SendUDPAssociateRequest (); 1688 else 1689 { 1690 LogPrint(eLogError, "SSU2: Invalid proxy reply"); 1691 m_UDPAssociateSocket.reset (nullptr); 1692 } 1693 } 1694 }); 1695 } 1696 1697 void SSU2Server::SendUDPAssociateRequest () 1698 { 1699 if (!m_UDPAssociateSocket) return; 1700 m_UDPRequestHeader[0] = SOCKS5_VER; 1701 m_UDPRequestHeader[1] = SOCKS5_CMD_UDP_ASSOCIATE; 1702 m_UDPRequestHeader[2] = 0; // RSV 1703 m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV4; // TODO: implement ipv6 proxy 1704 memset (m_UDPRequestHeader + 4, 0, 6); // address and port all zeros 1705 boost::asio::async_write (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE), boost::asio::transfer_all(), 1706 [this] (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1707 { 1708 (void) bytes_transferred; 1709 if (ecode) 1710 { 1711 LogPrint(eLogError, "SSU2: Proxy write error ", ecode.message()); 1712 m_UDPAssociateSocket.reset (nullptr); 1713 ReconnectToProxy (); 1714 } 1715 else 1716 ReadUDPAssociateReply (); 1717 }); 1718 } 1719 1720 void SSU2Server::ReadUDPAssociateReply () 1721 { 1722 if (!m_UDPAssociateSocket) return; 1723 boost::asio::async_read (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE), boost::asio::transfer_all(), 1724 [this] (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1725 { 1726 (void) bytes_transferred; 1727 if (ecode) 1728 { 1729 LogPrint(eLogError, "SSU2: Proxy read error ", ecode.message()); 1730 m_UDPAssociateSocket.reset (nullptr); 1731 ReconnectToProxy (); 1732 } 1733 else 1734 { 1735 if (m_UDPRequestHeader[0] == SOCKS5_VER && !m_UDPRequestHeader[1]) 1736 { 1737 if (m_UDPRequestHeader[3] == SOCKS5_ATYP_IPV4) 1738 { 1739 boost::asio::ip::address_v4::bytes_type bytes; 1740 memcpy (bytes.data (), m_UDPRequestHeader + 4, 4); 1741 uint16_t port = bufbe16toh (m_UDPRequestHeader + 8); 1742 m_ProxyRelayEndpoint.reset (new boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4 (bytes), port)); 1743 m_SocketV4.open (boost::asio::ip::udp::v4 ()); 1744 Receive (m_SocketV4); 1745 ReadUDPAssociateSocket (); 1746 } 1747 else 1748 { 1749 LogPrint(eLogError, "SSU2: Proxy UDP associate unsupported ATYP ", (int)m_UDPRequestHeader[3]); 1750 m_UDPAssociateSocket.reset (nullptr); 1751 } 1752 } 1753 else 1754 { 1755 LogPrint(eLogError, "SSU2: Proxy UDP associate error ", (int)m_UDPRequestHeader[1]); 1756 m_UDPAssociateSocket.reset (nullptr); 1757 } 1758 } 1759 }); 1760 } 1761 1762 void SSU2Server::ReadUDPAssociateSocket () 1763 { 1764 if (!m_UDPAssociateSocket) return; 1765 m_UDPAssociateSocket->async_read_some (boost::asio::buffer (m_UDPRequestHeader, 1), 1766 [this] (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1767 { 1768 (void) bytes_transferred; 1769 if (ecode) 1770 { 1771 LogPrint(eLogWarning, "SSU2: Proxy UDP Associate socket error ", ecode.message()); 1772 m_UDPAssociateSocket.reset (nullptr); 1773 m_ProxyRelayEndpoint.reset (nullptr); 1774 m_SocketV4.close (); 1775 ConnectToProxy (); // try to reconnect immediately 1776 } 1777 else 1778 ReadUDPAssociateSocket (); 1779 }); 1780 } 1781 1782 void SSU2Server::ReconnectToProxy () 1783 { 1784 LogPrint(eLogInfo, "SSU2: Reconnect to proxy after ", SSU2_PROXY_CONNECT_RETRY_TIMEOUT, " seconds"); 1785 if (m_ProxyConnectRetryTimer) 1786 m_ProxyConnectRetryTimer->cancel (); 1787 else 1788 m_ProxyConnectRetryTimer.reset (new boost::asio::deadline_timer (m_ReceiveService.GetService ())); 1789 m_ProxyConnectRetryTimer->expires_from_now (boost::posix_time::seconds (SSU2_PROXY_CONNECT_RETRY_TIMEOUT)); 1790 m_ProxyConnectRetryTimer->async_wait ( 1791 [this](const boost::system::error_code& ecode) 1792 { 1793 if (ecode != boost::asio::error::operation_aborted) 1794 { 1795 m_UDPAssociateSocket.reset (nullptr); 1796 m_ProxyRelayEndpoint.reset (nullptr); 1797 LogPrint(eLogInfo, "SSU2: Reconnecting to proxy"); 1798 ConnectToProxy (); 1799 } 1800 }); 1801 } 1802 1803 bool SSU2Server::SetProxy (const std::string& address, uint16_t port) 1804 { 1805 boost::system::error_code ecode; 1806 auto addr = boost::asio::ip::make_address (address, ecode); 1807 if (!ecode && !addr.is_unspecified () && port) 1808 { 1809 m_IsThroughProxy = true; 1810 m_ProxyEndpoint.reset (new boost::asio::ip::tcp::endpoint (addr, port)); 1811 } 1812 else 1813 { 1814 if (ecode) 1815 LogPrint (eLogError, "SSU2: Invalid proxy address ", address, " ", ecode.message()); 1816 return false; 1817 } 1818 return true; 1819 } 1820 } 1821 }