/ libi2pd_client / UDPTunnel.cpp
UDPTunnel.cpp
1 /* 2 * Copyright (c) 2013-2026, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include <string_view> 10 #include "Log.h" 11 #include "util.h" 12 #include "ClientContext.h" 13 #include "I2PTunnel.h" // for GetLoopbackAddressFor 14 #include "UDPTunnel.h" 15 16 namespace i2p 17 { 18 namespace client 19 { 20 constexpr std::string_view UDP_SESSION_SEQN { "seqn" }; 21 constexpr std::string_view UDP_SESSION_ACKED { "acked" }; 22 constexpr std::string_view UDP_SESSION_FLAGS { "flags" }; 23 24 constexpr uint8_t UDP_SESSION_FLAG_RESET_PATH = 0x01; 25 constexpr uint8_t UDP_SESSION_FLAG_ACK_REQUESTED = 0x02; 26 27 void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, 28 const uint8_t * buf, size_t len, const i2p::util::Mapping * options) 29 { 30 if (!m_LastSession || m_LastSession->Identity.GetLL()[0] != from.GetIdentHash ().GetLL()[0] || (fromPort && fromPort != m_LastSession->RemotePort)) 31 m_LastSession = ObtainUDPSession(from, toPort, fromPort); 32 boost::system::error_code ec; 33 if (len > 0) 34 m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint, 0, ec); 35 if (!ec) 36 m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); 37 else 38 LogPrint (eLogInfo, "UDP Server: Send exception: ", ec.message (), " to ", m_RemoteEndpoint); 39 if (options) 40 { 41 uint32_t seqn = 0; 42 if (options->Get (UDP_SESSION_SEQN, seqn) && seqn > m_LastSession->m_LastReceivedPacketNum) 43 m_LastSession->m_LastReceivedPacketNum = seqn; 44 uint8_t flags = 0; 45 if (options->Get (UDP_SESSION_FLAGS, flags)) 46 { 47 if (flags & UDP_SESSION_FLAG_RESET_PATH) 48 m_LastSession->GetDatagramSession ()->DropSharedRoutingPath (); 49 if (flags & UDP_SESSION_FLAG_ACK_REQUESTED) 50 { 51 i2p::util::Mapping replyOptions; 52 replyOptions.Put (UDP_SESSION_ACKED, m_LastSession->m_LastReceivedPacketNum); 53 m_LastSession->m_Destination->SendDatagram(m_LastSession->GetDatagramSession (), 54 nullptr, 0, m_LastSession->LocalPort, m_LastSession->RemotePort, &replyOptions); // Ack only, no payload 55 m_LastSession->LastRepliableDatagramTime = i2p::util::GetMillisecondsSinceEpoch (); 56 } 57 } 58 if (options->Get (UDP_SESSION_ACKED, seqn)) 59 m_LastSession->Acked (seqn); 60 } 61 } 62 63 void I2PUDPServerTunnel::HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) 64 { 65 if (m_LastSession && (fromPort != m_LastSession->RemotePort || toPort != m_LastSession->LocalPort)) 66 { 67 std::lock_guard<std::mutex> lock(m_SessionsMutex); 68 auto it = m_Sessions.find (GetSessionIndex (fromPort, toPort)); 69 if (it != m_Sessions.end ()) 70 m_LastSession = it->second; 71 else 72 m_LastSession = nullptr; 73 } 74 if (m_LastSession) 75 { 76 boost::system::error_code ec; 77 m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint, 0, ec); 78 if (!ec) 79 m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); 80 else 81 LogPrint (eLogInfo, "UDP Server: Send exception: ", ec.message (), " to ", m_RemoteEndpoint); 82 } 83 } 84 85 void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) 86 { 87 std::lock_guard<std::mutex> lock(m_SessionsMutex); 88 uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); 89 auto itr = m_Sessions.begin(); 90 while(itr != m_Sessions.end()) 91 { 92 if(now - itr->second->LastActivity >= delta ) 93 itr = m_Sessions.erase(itr); 94 else 95 itr++; 96 } 97 } 98 99 void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) 100 { 101 std::lock_guard<std::mutex> lock(m_SessionsMutex); 102 uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); 103 std::vector<uint16_t> removePorts; 104 for (const auto & s : m_Sessions) { 105 if (now - s.second->second >= delta) 106 removePorts.push_back(s.first); 107 } 108 for(auto port : removePorts) { 109 m_Sessions.erase(port); 110 } 111 } 112 113 UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) 114 { 115 auto ih = from.GetIdentHash(); 116 auto idx = GetSessionIndex (remotePort, localPort); 117 { 118 std::lock_guard<std::mutex> lock(m_SessionsMutex); 119 auto it = m_Sessions.find (idx); 120 if (it != m_Sessions.end ()) 121 { 122 if (it->second->Identity.GetLL()[0] == ih.GetLL()[0]) 123 { 124 LogPrint(eLogDebug, "UDPServer: Found session ", it->second->IPSocket.local_endpoint(), " ", ih.ToBase32()); 125 return it->second; 126 } 127 else 128 { 129 LogPrint(eLogWarning, "UDPServer: Session with from ", remotePort, " and to ", localPort, " ports already exists. But from different address. Removed"); 130 m_Sessions.erase (it); 131 } 132 } 133 } 134 135 boost::asio::ip::address addr; 136 /** create new udp session */ 137 if(m_IsUniqueLocal && m_LocalAddress.is_loopback()) 138 { 139 auto ident = from.GetIdentHash(); 140 addr = GetLoopbackAddressFor(ident); 141 } 142 else 143 addr = m_LocalAddress; 144 145 auto s = std::make_shared<UDPSession>(boost::asio::ip::udp::endpoint(addr, 0), 146 m_LocalDest, m_RemoteEndpoint, ih, localPort, remotePort); 147 std::lock_guard<std::mutex> lock(m_SessionsMutex); 148 m_Sessions.emplace (idx, s); 149 return s; 150 } 151 152 void UDPConnection::Stop () 153 { 154 m_AckTimer.cancel (); 155 } 156 157 void UDPConnection::Acked (uint32_t seqn) 158 { 159 if (!m_AckTimerSeqn) seqn = m_UnackedDatagrams.back ().first; // if we recieve ack after paht change, clear window and send new datagrams 160 m_IsFirstPacket = false; // first packet confirmed 161 if (m_AckTimerSeqn && seqn >= m_AckTimerSeqn) 162 { 163 m_AckTimerSeqn = 0; 164 m_AckTimer.cancel (); 165 } 166 if (m_UnackedDatagrams.empty () && seqn < m_UnackedDatagrams.front ().first) return; 167 bool acknowledged = false; 168 auto it = m_UnackedDatagrams.begin (); 169 while (it != m_UnackedDatagrams.end ()) 170 { 171 if (it->first > seqn) break; 172 if (it->first == seqn && m_IsSendingAllowed) // ignore first ack after path change 173 { 174 auto rtt = i2p::util::GetMillisecondsSinceEpoch () - it->second; 175 m_RTT = m_RTT ? (m_RTT + rtt)/2 : rtt; 176 acknowledged = true; 177 } 178 it++; 179 } 180 m_UnackedDatagrams.erase (m_UnackedDatagrams.begin (), it); 181 m_IsSendingAllowed = true; // if we recieve ack after path change, now can send new datagrams 182 if (acknowledged && !m_UnackedDatagrams.empty ()) 183 { 184 m_AckTimer.cancel (); 185 m_AckTimerSeqn = 0; 186 ScheduleAckTimer (m_UnackedDatagrams.back ().first); 187 } 188 } 189 190 void UDPConnection::ScheduleAckTimer (uint32_t seqn) 191 { 192 if (!m_AckTimerSeqn) 193 { 194 m_AckTimerSeqn = seqn; 195 m_AckTimer.expires_from_now (boost::posix_time::milliseconds (m_RTT ? 2*m_RTT : I2P_UDP_MAX_UNACKED_DATAGRAM_TIME)); 196 m_AckTimer.async_wait ([this](const boost::system::error_code& ecode) 197 { 198 if (ecode != boost::asio::error::operation_aborted) 199 { 200 LogPrint (eLogInfo, "UDP Connection: Packet ", m_AckTimerSeqn, " was not acked"); 201 // DeleteExpiredUnackedDatagrams (); 202 m_IsSendingAllowed = false; // stop sending datagrams 203 m_AckTimerSeqn = 0; 204 m_RTT = 0; 205 if (!m_UnackedDatagrams.empty ()) ScheduleAckTimer (0); // try again if failed 206 // send empty packet with reset path flag 207 i2p::util::Mapping options; 208 options.Put (UDP_SESSION_FLAGS, UDP_SESSION_FLAG_RESET_PATH | UDP_SESSION_FLAG_ACK_REQUESTED); 209 auto session = GetDatagramSession (); 210 session->DropSharedRoutingPath (); 211 GetDatagramDestination ()->SendDatagram (session, nullptr, 0, 0, 0, &options); 212 } 213 }); 214 } 215 } 216 217 void UDPConnection::DeleteExpiredUnackedDatagrams () 218 { 219 if (m_UnackedDatagrams.empty ()) return; 220 auto expired = i2p::util::GetMillisecondsSinceEpoch () - (m_RTT ? 2*m_RTT : I2P_UDP_MAX_UNACKED_DATAGRAM_TIME); 221 auto it = m_UnackedDatagrams.begin (); 222 while (it != m_UnackedDatagrams.end ()) 223 { 224 if (it->second < expired) break; 225 it++; 226 } 227 m_UnackedDatagrams.erase (m_UnackedDatagrams.begin (), it); 228 } 229 230 UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, 231 const std::shared_ptr<i2p::client::ClientDestination> & localDestination, 232 const boost::asio::ip::udp::endpoint& endpoint, const i2p::data::IdentHash& to, 233 uint16_t ourPort, uint16_t theirPort) : 234 UDPConnection (localDestination->GetService()), 235 m_Destination(localDestination->GetDatagramDestination()), 236 IPSocket(localDestination->GetService(), localEndpoint), Identity (to), 237 SendEndpoint(endpoint), LastActivity(i2p::util::GetMillisecondsSinceEpoch()), 238 LastRepliableDatagramTime (0), LocalPort(ourPort), RemotePort(theirPort) 239 { 240 Start (); 241 IPSocket.set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU )); 242 IPSocket.non_blocking (true); 243 Receive(); 244 } 245 246 void UDPSession::Receive() 247 { 248 LogPrint(eLogDebug, "UDPSession: Receive"); 249 IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), 250 FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); 251 } 252 253 void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) 254 { 255 if(!ecode) 256 { 257 if (!m_UnackedDatagrams.empty () && m_NextSendPacketNum > m_UnackedDatagrams.front ().first + I2P_UDP_MAX_NUM_UNACKED_DATAGRAMS) 258 { 259 // window is full, drop packet 260 Receive (); 261 return; 262 } 263 LogPrint(eLogDebug, "UDPSession: Forward ", len, "B from ", FromEndpoint); 264 auto ts = i2p::util::GetMillisecondsSinceEpoch(); 265 auto session = GetDatagramSession (); 266 uint64_t repliableDatagramInterval = I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL; 267 if (m_RTT && m_RTT >= I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL && m_RTT < I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL*10) repliableDatagramInterval = m_RTT/10; // 10 - 100 ms 268 if (ts > LastRepliableDatagramTime + repliableDatagramInterval) 269 { 270 if (session->GetVersion () == i2p::datagram::eDatagramV3) 271 { 272 uint8_t flags = 0; 273 if (!m_RTT || !m_AckTimerSeqn || (!m_UnackedDatagrams.empty () && 274 ts > m_UnackedDatagrams.back ().second + repliableDatagramInterval)) // last ack request 275 { 276 flags |= UDP_SESSION_FLAG_ACK_REQUESTED; 277 m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts }); 278 ScheduleAckTimer (m_NextSendPacketNum); 279 } 280 i2p::util::Mapping options; 281 options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum); 282 if (m_LastReceivedPacketNum > 0) 283 options.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum); 284 if (flags) 285 options.Put (UDP_SESSION_FLAGS, flags); 286 m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort, &options); 287 ScheduleAckTimer (m_NextSendPacketNum); 288 } 289 else 290 m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort); 291 LastRepliableDatagramTime = ts; 292 } 293 else 294 m_Destination->SendRawDatagram(session, m_Buffer, len, LocalPort, RemotePort); 295 size_t numPackets = 0; 296 while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) 297 { 298 boost::system::error_code ec; 299 size_t moreBytes = IPSocket.available(ec); 300 if (ec || !moreBytes) break; 301 len = IPSocket.receive_from (boost::asio::buffer (m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, 0, ec); 302 m_Destination->SendRawDatagram (session, m_Buffer, len, LocalPort, RemotePort); 303 numPackets++; 304 } 305 if (numPackets > 0) 306 LogPrint(eLogDebug, "UDPSession: Forward more ", numPackets, "packets B from ", FromEndpoint); 307 m_NextSendPacketNum += numPackets + 1; 308 m_Destination->FlushSendQueue (session); 309 LastActivity = ts; 310 Receive(); 311 } 312 else 313 LogPrint(eLogError, "UDPSession: ", ecode.message()); 314 } 315 316 std::shared_ptr<i2p::datagram::DatagramSession> UDPSession::GetDatagramSession () 317 { 318 auto session = m_LastDatagramSession.lock (); 319 if (!session) 320 { 321 session = m_Destination->GetSession (Identity); 322 m_LastDatagramSession = session; 323 } 324 return session; 325 } 326 327 I2PUDPServerTunnel::I2PUDPServerTunnel (const std::string & name, std::shared_ptr<i2p::client::ClientDestination> localDestination, 328 const boost::asio::ip::address& localAddress, const boost::asio::ip::udp::endpoint& forwardTo, uint16_t inPort, bool gzip) : 329 m_IsUniqueLocal (true), m_Name (name), m_LocalAddress (localAddress), 330 m_RemoteEndpoint (forwardTo), m_LocalDest (localDestination), m_inPort(inPort), m_Gzip (gzip) 331 { 332 } 333 334 I2PUDPServerTunnel::~I2PUDPServerTunnel () 335 { 336 Stop (); 337 } 338 339 void I2PUDPServerTunnel::Start () 340 { 341 m_LocalDest->Start (); 342 343 auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip); 344 dgram->SetReceiver ( 345 std::bind (&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, 346 std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6), 347 m_inPort 348 ); 349 dgram->SetRawReceiver ( 350 std::bind (&I2PUDPServerTunnel::HandleRecvFromI2PRaw, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), 351 m_inPort 352 ); 353 } 354 355 void I2PUDPServerTunnel::Stop () 356 { 357 auto dgram = m_LocalDest->GetDatagramDestination (); 358 if (dgram) { 359 dgram->ResetReceiver (m_inPort); 360 dgram->ResetRawReceiver (m_inPort); 361 } 362 m_Sessions.clear (); 363 } 364 365 std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPServerTunnel::GetSessions () 366 { 367 std::vector<std::shared_ptr<DatagramSessionInfo> > sessions; 368 std::lock_guard<std::mutex> lock (m_SessionsMutex); 369 370 for (const auto &it: m_Sessions) 371 { 372 auto s = it.second; 373 if (!s->m_Destination) continue; 374 auto info = s->m_Destination->GetInfoForRemote (s->Identity); 375 if (!info) continue; 376 377 auto sinfo = std::make_shared<DatagramSessionInfo> (); 378 sinfo->Name = m_Name; 379 sinfo->LocalIdent = std::make_shared<i2p::data::IdentHash> (m_LocalDest->GetIdentHash ().data ()); 380 sinfo->RemoteIdent = std::make_shared<i2p::data::IdentHash> (s->Identity.data ()); 381 sinfo->CurrentIBGW = info->IBGW; 382 sinfo->CurrentOBEP = info->OBEP; 383 sessions.push_back (sinfo); 384 } 385 return sessions; 386 } 387 388 I2PUDPClientTunnel::I2PUDPClientTunnel (const std::string & name, const std::string &remoteDest, 389 const boost::asio::ip::udp::endpoint& localEndpoint, 390 std::shared_ptr<i2p::client::ClientDestination> localDestination, 391 uint16_t remotePort, bool gzip, i2p::datagram::DatagramVersion datagramVersion) : 392 UDPConnection (localDestination->GetService ()), 393 m_Name (name), m_RemoteDest (remoteDest), m_LocalDest (localDestination), m_LocalEndpoint (localEndpoint), 394 m_ResolveThread (nullptr), m_LocalSocket (nullptr), RemotePort (remotePort), 395 m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip), m_DatagramVersion (datagramVersion), 396 m_LastRepliableDatagramTime (0) 397 { 398 } 399 400 I2PUDPClientTunnel::~I2PUDPClientTunnel () 401 { 402 Stop (); 403 } 404 405 void I2PUDPClientTunnel::Start () 406 { 407 UDPConnection::Start (); 408 // Reset flag in case of tunnel reload 409 if (m_cancel_resolve) m_cancel_resolve = false; 410 411 m_LocalSocket.reset (new boost::asio::ip::udp::socket (m_LocalDest->GetService (), m_LocalEndpoint)); 412 m_LocalSocket->set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU)); 413 m_LocalSocket->set_option (boost::asio::socket_base::reuse_address (true)); 414 m_LocalSocket->non_blocking (true); 415 416 auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip, m_DatagramVersion); 417 dgram->SetReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2P, this, 418 std::placeholders::_1, std::placeholders::_2, 419 std::placeholders::_3, std::placeholders::_4, 420 std::placeholders::_5, std::placeholders::_6), 421 RemotePort 422 ); 423 dgram->SetRawReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2PRaw, this, 424 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), 425 RemotePort 426 ); 427 428 m_LocalDest->Start (); 429 if (m_ResolveThread == nullptr) 430 m_ResolveThread = new std::thread (std::bind (&I2PUDPClientTunnel::TryResolving, this)); 431 RecvFromLocal (); 432 } 433 434 void I2PUDPClientTunnel::Stop () 435 { 436 auto dgram = m_LocalDest->GetDatagramDestination (); 437 if (dgram) { 438 dgram->ResetReceiver (RemotePort); 439 dgram->ResetRawReceiver (RemotePort); 440 } 441 m_cancel_resolve = true; 442 443 m_Sessions.clear(); 444 445 if(m_LocalSocket && m_LocalSocket->is_open ()) 446 m_LocalSocket->close (); 447 448 if(m_ResolveThread) 449 { 450 m_ResolveThread->join (); 451 delete m_ResolveThread; 452 m_ResolveThread = nullptr; 453 } 454 m_RemoteAddr = nullptr; 455 UDPConnection::Stop (); 456 } 457 458 void I2PUDPClientTunnel::RecvFromLocal () 459 { 460 m_LocalSocket->async_receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), 461 m_RecvEndpoint, std::bind (&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2)); 462 } 463 464 void I2PUDPClientTunnel::HandleRecvFromLocal (const boost::system::error_code & ec, std::size_t transferred) 465 { 466 if (m_cancel_resolve) { 467 LogPrint (eLogDebug, "UDP Client: Ignoring incoming data: stopping"); 468 return; 469 } 470 if (ec) { 471 LogPrint (eLogError, "UDP Client: Reading from socket error: ", ec.message (), ". Restarting listener..."); 472 RecvFromLocal (); // Restart listener and continue work 473 return; 474 } 475 if (!m_RemoteAddr || !m_RemoteAddr->IsIdentHash ()) // TODO: handle B33 476 { 477 LogPrint (eLogWarning, "UDP Client: Remote endpoint not resolved yet"); 478 RecvFromLocal (); 479 return; // drop, remote not resolved 480 } 481 if ((!m_UnackedDatagrams.empty () && m_NextSendPacketNum > m_UnackedDatagrams.front ().first + I2P_UDP_MAX_NUM_UNACKED_DATAGRAMS) || !m_IsSendingAllowed) 482 { 483 // window is full, drop packet 484 RecvFromLocal (); 485 return; 486 } 487 auto remotePort = m_RecvEndpoint.port (); 488 if (!m_LastPort || m_LastPort != remotePort) 489 { 490 auto itr = m_Sessions.find (remotePort); 491 if (itr != m_Sessions.end ()) 492 m_LastSession = itr->second; 493 else 494 { 495 m_LastSession = std::make_shared<UDPConvo> (boost::asio::ip::udp::endpoint (m_RecvEndpoint), 0); 496 m_Sessions.emplace (remotePort, m_LastSession); 497 } 498 m_LastPort = remotePort; 499 } 500 // send off to remote i2p destination 501 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 502 LogPrint (eLogDebug, "UDP Client: Send ", transferred, " to ", m_RemoteAddr->identHash.ToBase32 (), ":", RemotePort); 503 auto session = GetDatagramSession (); 504 uint64_t repliableDatagramInterval = I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL; 505 if (m_RTT && m_RTT >= I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL && m_RTT < I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL*10) repliableDatagramInterval = m_RTT/10; // 10 - 100 ms 506 if (ts > m_LastRepliableDatagramTime + repliableDatagramInterval) 507 { 508 if (m_DatagramVersion == i2p::datagram::eDatagramV3) 509 { 510 uint8_t flags = 0; 511 if (!m_RTT || !m_AckTimerSeqn || (!m_UnackedDatagrams.empty () && 512 ts > m_UnackedDatagrams.back ().second + repliableDatagramInterval)) // last ack request 513 { 514 flags |= UDP_SESSION_FLAG_ACK_REQUESTED; 515 m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts }); 516 ScheduleAckTimer (m_NextSendPacketNum); 517 } 518 i2p::util::Mapping options; 519 options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum); 520 if (m_LastReceivedPacketNum > 0) 521 options.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum); 522 if (flags) 523 options.Put (UDP_SESSION_FLAGS, flags); 524 m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort, &options); 525 if (m_IsFirstPacket) m_IsSendingAllowed = false; // send only one packet at the start and wait ack 526 } 527 else 528 m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); 529 m_LastRepliableDatagramTime = ts; 530 } 531 else 532 m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); 533 size_t numPackets = 0; 534 while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) 535 { 536 boost::system::error_code ec; 537 size_t moreBytes = m_LocalSocket->available (ec); 538 if (ec || !moreBytes) break; 539 transferred = m_LocalSocket->receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), m_RecvEndpoint, 0, ec); 540 remotePort = m_RecvEndpoint.port (); 541 // TODO: check remotePort 542 m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort); 543 numPackets++; 544 } 545 if (numPackets) 546 LogPrint (eLogDebug, "UDP Client: Sent ", numPackets, " more packets to ", m_RemoteAddr->identHash.ToBase32 ()); 547 m_NextSendPacketNum += numPackets + 1; 548 m_LocalDest->GetDatagramDestination ()->FlushSendQueue (session); 549 550 // mark convo as active 551 if (m_LastSession) 552 m_LastSession->second = ts; 553 RecvFromLocal (); 554 } 555 556 std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPClientTunnel::GetSessions () 557 { 558 // TODO: implement 559 std::vector<std::shared_ptr<DatagramSessionInfo> > infos; 560 return infos; 561 } 562 563 void I2PUDPClientTunnel::TryResolving () 564 { 565 i2p::util::SetThreadName ("UDP Resolver"); 566 LogPrint (eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); 567 568 while (!(m_RemoteAddr = context.GetAddressBook().GetAddress(m_RemoteDest)) && !m_cancel_resolve) 569 { 570 LogPrint (eLogWarning, "UDP Tunnel: Failed to lookup ", m_RemoteDest); 571 std::this_thread::sleep_for (std::chrono::seconds (1)); 572 } 573 if (m_cancel_resolve) 574 { 575 LogPrint(eLogError, "UDP Tunnel: Lookup of ", m_RemoteDest, " was cancelled"); 576 return; 577 } 578 if (!m_RemoteAddr) 579 { 580 LogPrint (eLogError, "UDP Tunnel: ", m_RemoteDest, " not found"); 581 return; 582 } 583 LogPrint(eLogInfo, "UDP Tunnel: Resolved ", m_RemoteDest, " to ", m_RemoteAddr->identHash.ToBase32 ()); 584 } 585 586 std::shared_ptr<i2p::datagram::DatagramSession> I2PUDPClientTunnel::GetDatagramSession () 587 { 588 auto session = m_LastDatagramSession.lock (); 589 if (!session) 590 { 591 session = m_LocalDest->GetDatagramDestination ()->GetSession (m_RemoteAddr->identHash); 592 m_LastDatagramSession = session; 593 } 594 return session; 595 } 596 597 void I2PUDPClientTunnel::HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, 598 const uint8_t * buf, size_t len, const i2p::util::Mapping * options) 599 { 600 if (m_RemoteAddr && from.GetIdentHash() == m_RemoteAddr->identHash) 601 { 602 if (options) 603 { 604 uint32_t seqn = 0; 605 if (options->Get (UDP_SESSION_SEQN, seqn) && seqn > m_LastReceivedPacketNum) 606 m_LastReceivedPacketNum = seqn; 607 uint8_t flags = 0; 608 if (options->Get (UDP_SESSION_FLAGS, flags) && (flags & UDP_SESSION_FLAG_ACK_REQUESTED)) 609 { 610 i2p::util::Mapping replyOptions; 611 replyOptions.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum); 612 m_LocalDest->GetDatagramDestination ()->SendDatagram (GetDatagramSession (), 613 nullptr, 0, m_LastPort, RemotePort, &replyOptions); // Ack only, no payload 614 m_LastRepliableDatagramTime = i2p::util::GetMillisecondsSinceEpoch (); 615 } 616 if (options->Get (UDP_SESSION_ACKED, seqn)) 617 Acked (seqn); 618 } 619 if (len > 0) 620 HandleRecvFromI2PRaw (fromPort, toPort, buf, len); 621 } 622 else 623 LogPrint(eLogWarning, "UDP Client: Unwarranted traffic from ", from.GetIdentHash().ToBase32 ()); 624 } 625 626 void I2PUDPClientTunnel::HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) 627 { 628 auto itr = m_Sessions.find (toPort); 629 // found convo ? 630 if (itr != m_Sessions.end ()) 631 { 632 // found convo 633 if (len > 0) 634 { 635 LogPrint (eLogDebug, "UDP Client: Got ", len, "B from ", m_RemoteAddr ? m_RemoteAddr->identHash.ToBase32 () : ""); 636 boost::system::error_code ec; 637 m_LocalSocket->send_to (boost::asio::buffer (buf, len), itr->second->first, 0, ec); 638 if (!ec) 639 // mark convo as active 640 itr->second->second = i2p::util::GetMillisecondsSinceEpoch (); 641 else 642 LogPrint (eLogInfo, "UDP Client: Send exception: ", ec.message (), " to ", itr->second->first); 643 } 644 } 645 else 646 LogPrint (eLogWarning, "UDP Client: Not tracking udp session using port ", (int) toPort); 647 } 648 } 649 }