Datagram.cpp
1 /* 2 * Copyright (c) 2013-2026, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include <string.h> 10 #include "Crypto.h" 11 #include "Log.h" 12 #include "TunnelBase.h" 13 #include "RouterContext.h" 14 #include "Destination.h" 15 #include "Datagram.h" 16 17 namespace i2p 18 { 19 namespace datagram 20 { 21 DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner, 22 bool gzip, DatagramVersion version): 23 m_Owner (owner), m_DefaultReceiver (nullptr), m_DefaultRawReceiver (nullptr), 24 m_Gzip (gzip), m_Version (version) 25 { 26 if (m_Gzip) 27 m_Deflator.reset (new i2p::data::GzipDeflator); 28 29 auto identityLen = m_Owner->GetIdentity ()->GetFullLen (); 30 m_From.resize (identityLen); 31 m_Owner->GetIdentity ()->ToBuffer (m_From.data (), identityLen); 32 m_Signature.resize (m_Owner->GetIdentity ()->GetSignatureLen ()); 33 } 34 35 DatagramDestination::~DatagramDestination () 36 { 37 m_Sessions.clear(); 38 } 39 40 void DatagramDestination::SendDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort) 41 { 42 auto session = ObtainSession(identity); 43 SendDatagram (session, payload, len, fromPort, toPort); 44 FlushSendQueue (session); 45 } 46 47 void DatagramDestination::SendRawDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort) 48 { 49 auto session = ObtainSession(identity); 50 SendRawDatagram (session, payload, len, fromPort, toPort); 51 FlushSendQueue (session); 52 } 53 54 std::shared_ptr<DatagramSession> DatagramDestination::GetSession(const i2p::data::IdentHash & ident) 55 { 56 return ObtainSession(ident); 57 } 58 59 void DatagramDestination::SendDatagram (std::shared_ptr<DatagramSession> session, const uint8_t * payload, size_t len, 60 uint16_t fromPort, uint16_t toPort, const i2p::util::Mapping * options) 61 { 62 if (session) 63 { 64 std::shared_ptr<I2NPMessage> msg; 65 switch (session->GetVersion ()) 66 { 67 case eDatagramV3: 68 { 69 uint8_t flags[] = { 0x00, 0x03 }; // datagram3, no options 70 if (options) 71 { 72 uint8_t optionsBuf[256]; // TODO: evaluate actual size 73 size_t optionsLen = options->ToBuffer (optionsBuf, 256); 74 if (optionsLen) flags[1] |= DATAGRAM3_FLAG_OPTIONS; 75 msg = CreateDataMessage ({{m_Owner->GetIdentity ()->GetIdentHash (), 32}, {flags, 2}, 76 {optionsBuf, optionsLen}, {payload, len}}, fromPort, toPort, i2p::client::PROTOCOL_TYPE_DATAGRAM3, false); // datagram3 77 } 78 else 79 msg = CreateDataMessage ({{m_Owner->GetIdentity ()->GetIdentHash (), 32}, 80 {flags, 2}, {payload, len}}, fromPort, toPort, i2p::client::PROTOCOL_TYPE_DATAGRAM3, false); // datagram3 81 break; 82 } 83 case eDatagramV1: 84 { 85 if (m_Owner->GetIdentity ()->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) 86 { 87 uint8_t hash[32]; 88 SHA256(payload, len, hash); 89 m_Owner->Sign (hash, 32, m_Signature.data ()); 90 } 91 else 92 m_Owner->Sign (payload, len, m_Signature.data ()); 93 msg = CreateDataMessage ({{m_From.data (), m_From.size ()}, {m_Signature.data (), m_Signature.size ()}, {payload, len}}, 94 fromPort, toPort, i2p::client::PROTOCOL_TYPE_DATAGRAM, !session->IsRatchets ()); // datagram1 95 break; 96 } 97 case eDatagramV2: 98 { 99 constexpr uint8_t flags[] = { 0x00, 0x02 }; // datagram2, no options 100 // signature 101 std::vector<uint8_t> signedData (len + 32 + 2); 102 memcpy (signedData.data (), m_Owner->GetIdentity ()->GetIdentHash (), 32); 103 memcpy (signedData.data () + 32, flags, 2); 104 memcpy (signedData.data () + 34, payload, len); 105 m_Owner->Sign (signedData.data (), signedData.size (), m_Signature.data ()); 106 // TODO: offline signatures and options 107 msg = CreateDataMessage ({{m_From.data (), m_From.size ()}, {flags, 2}, {payload, len}, 108 {m_Signature.data (), m_Signature.size ()}}, fromPort, toPort, i2p::client::PROTOCOL_TYPE_DATAGRAM2, false); // datagram2 109 110 break; 111 } 112 default: 113 LogPrint (eLogError, "Datagram: datagram type ", (int)session->GetVersion (), " is not supported"); 114 } 115 if (msg) session->SendMsg(msg); 116 } 117 } 118 119 void DatagramDestination::SendRawDatagram (std::shared_ptr<DatagramSession> session, const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort) 120 { 121 if (session) 122 session->SendMsg(CreateDataMessage ({{payload, len}}, fromPort, toPort, i2p::client::PROTOCOL_TYPE_RAW, !session->IsRatchets ())); // raw 123 } 124 125 void DatagramDestination::FlushSendQueue (std::shared_ptr<DatagramSession> session) 126 { 127 if (session) 128 session->FlushSendQueue (); 129 } 130 131 void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, 132 const uint8_t * buf, size_t len, i2p::garlic::ECIESX25519AEADRatchetSession * from) 133 { 134 i2p::data::IdentityEx identity; 135 size_t identityLen = identity.FromBuffer (buf, len); 136 if (!identityLen) return; 137 const uint8_t * signature = buf + identityLen; 138 size_t headerLen = identityLen + identity.GetSignatureLen (); 139 140 std::shared_ptr<i2p::data::LeaseSet> ls; 141 bool verified = false; 142 if (from) 143 { 144 ls = m_Owner->FindLeaseSet (identity.GetIdentHash ()); 145 if (ls) 146 { 147 uint8_t staticKey[32]; 148 ls->Encrypt (nullptr, staticKey); 149 if (!memcmp (from->GetRemoteStaticKey (), staticKey, 32)) 150 verified = true; 151 else 152 { 153 LogPrint (eLogError, "Datagram: Remote LeaseSet static key mismatch for datagram from ", 154 identity.GetIdentHash ().ToBase32 ()); 155 return; 156 } 157 } 158 } 159 if (!verified) 160 { 161 if (identity.GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) 162 { 163 uint8_t hash[32]; 164 SHA256(buf + headerLen, len - headerLen, hash); 165 verified = identity.Verify (hash, 32, signature); 166 } 167 else 168 verified = identity.Verify (buf + headerLen, len - headerLen, signature); 169 } 170 171 if (verified) 172 { 173 auto session = ObtainSession (identity.GetIdentHash()); 174 if (ls) session->SetRemoteLeaseSet (ls); 175 session->Ack(); 176 auto r = FindReceiver(toPort); 177 if(r) 178 r(identity, fromPort, toPort, buf + headerLen, len - headerLen, nullptr); 179 else 180 LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort); 181 } 182 else 183 LogPrint (eLogWarning, "Datagram signature verification failed"); 184 } 185 186 void DatagramDestination::HandleRawDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) 187 { 188 auto r = FindRawReceiver(toPort); 189 190 if (r) 191 r (fromPort, toPort, buf, len); 192 else 193 LogPrint (eLogWarning, "DatagramDestination: no receiver for raw datagram"); 194 } 195 196 void DatagramDestination::HandleDatagram2 (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, 197 i2p::garlic::ECIESX25519AEADRatchetSession * from) 198 { 199 if (len < 433) 200 { 201 LogPrint (eLogWarning, "Datagram: datagram2 is too short ", len); 202 return; 203 } 204 i2p::data::IdentityEx identity; 205 size_t identityLen = identity.FromBuffer (buf, len); 206 if (!identityLen) return; 207 size_t signatureLen = identity.GetSignatureLen (); 208 if (signatureLen + identityLen > len) return; 209 210 std::shared_ptr<i2p::data::LeaseSet> ls; 211 bool verified = false; 212 if (from) 213 { 214 ls = m_Owner->FindLeaseSet (identity.GetIdentHash ()); 215 if (ls) 216 { 217 uint8_t staticKey[32]; 218 ls->Encrypt (nullptr, staticKey); 219 if (!memcmp (from->GetRemoteStaticKey (), staticKey, 32)) 220 verified = true; 221 else 222 { 223 LogPrint (eLogError, "Datagram: Remote LeaseSet static key mismatch for datagram2 from ", 224 identity.GetIdentHash ().ToBase32 ()); 225 return; 226 } 227 } 228 } 229 const uint8_t * flags = buf + identityLen; 230 size_t offset = identityLen + 2; 231 bool isOptions = false; 232 if (flags[1] & DATAGRAM2_FLAG_OPTIONS) 233 { 234 isOptions = true; 235 m_Options.CleanUp (); 236 auto optionsLen = m_Options.FromBuffer (buf + offset, len - offset); 237 if (optionsLen) 238 offset += optionsLen; 239 else 240 { 241 LogPrint (eLogWarning, "Datagram: datagram2 can't read options"); 242 return; 243 } 244 } 245 if (offset > len) 246 { 247 LogPrint (eLogWarning, "Datagram: datagram2 is too short ", len, " expected ", offset); 248 return; 249 } 250 if (!verified) 251 { 252 std::shared_ptr<i2p::crypto::Verifier> transientVerifier; 253 if (flags[1] & DATAGRAM2_FLAG_OFFLINE_SIGNATURE) 254 { 255 transientVerifier = i2p::data::ProcessOfflineSignature (&identity, buf, len, offset); 256 if (!transientVerifier) 257 { 258 LogPrint (eLogWarning, "Datagram: datagram2 offline signature failed"); 259 return; 260 } 261 signatureLen = transientVerifier->GetSignatureLen (); 262 } 263 std::vector<uint8_t> signedData (len + 32 - identityLen - signatureLen); 264 memcpy (signedData.data (), identity.GetIdentHash (), 32); 265 memcpy (signedData.data () + 32, buf + identityLen, signedData.size () - 32); 266 verified = transientVerifier ? transientVerifier->Verify (signedData.data (), signedData.size (), buf + len - signatureLen) : 267 identity.Verify (signedData.data (), signedData.size (), buf + len - signatureLen); 268 if (!verified) 269 { 270 LogPrint (eLogWarning, "Datagram: datagram2 signature verification failed"); 271 return; 272 } 273 } 274 275 auto session = ObtainSession (identity.GetIdentHash()); 276 session->SetVersion (eDatagramV2); 277 if (ls) session->SetRemoteLeaseSet (ls); 278 session->Ack(); 279 auto r = FindReceiver(toPort); 280 if(r) 281 r(identity, fromPort, toPort, buf + offset, len - offset - signatureLen, isOptions ? &m_Options : nullptr); 282 else 283 LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort); 284 } 285 286 void DatagramDestination::HandleDatagram3 (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, 287 i2p::garlic::ECIESX25519AEADRatchetSession * from) 288 { 289 if (len < 34) 290 { 291 LogPrint (eLogWarning, "Datagram: datagram3 is too short ", len); 292 return; 293 } 294 if (from) 295 { 296 std::shared_ptr<const i2p::data::LeaseSet> ls; 297 i2p::data::IdentHash ident(buf); 298 auto session = FindSession (ident); 299 if (session) ls = session->GetRemoteLeaseSet (); 300 if (!ls) ls = m_Owner->FindLeaseSet (ident); 301 if (ls) 302 { 303 uint8_t staticKey[32]; 304 ls->Encrypt (nullptr, staticKey); 305 if (!memcmp (from->GetRemoteStaticKey (), staticKey, 32)) 306 { 307 if (!session) 308 { 309 session = ObtainSession (ident); 310 session->SetVersion (eDatagramV3); 311 } 312 session->SetRemoteLeaseSet (ls); 313 auto r = FindReceiver(toPort); 314 if (r) 315 { 316 const uint8_t * flags = buf + 32; 317 size_t offset = 34; 318 bool isOptions = false; 319 if (flags[1] & DATAGRAM3_FLAG_OPTIONS) 320 { 321 isOptions = true; 322 m_Options.CleanUp (); 323 auto optionsLen = m_Options.FromBuffer (buf + offset, len - offset); 324 if (optionsLen) 325 offset += optionsLen; 326 else 327 { 328 LogPrint (eLogWarning, "Datagram: datagram3 can't read options"); 329 return; 330 } 331 } 332 if (offset > len) 333 { 334 LogPrint (eLogWarning, "Datagram: datagram3 is too short ", len, " expected ", offset); 335 return; 336 } 337 r(*ls->GetIdentity (), fromPort, toPort, buf + offset, len - offset, isOptions ? &m_Options : nullptr); 338 } 339 else 340 LogPrint (eLogWarning, "Datagram: no receiver for port ", toPort); 341 session->Ack (); 342 } 343 else 344 LogPrint (eLogError, "Datagram: Remote LeaseSet static key mismatch for datagram3 from ", ident.ToBase32 ()); 345 } 346 else 347 LogPrint (eLogError, "Datagram: No remote LeaseSet for ", ident.ToBase32 ()); 348 } 349 else 350 LogPrint (eLogInfo, "Datagram: datagram3 received from non-ratchets session"); 351 } 352 353 void DatagramDestination::SetReceiver (const Receiver& receiver, uint16_t port) 354 { 355 std::lock_guard<std::mutex> lock(m_ReceiversMutex); 356 m_ReceiversByPorts[port] = receiver; 357 if (!m_DefaultReceiver) { 358 m_DefaultReceiver = receiver; 359 m_DefaultReceiverPort = port; 360 } 361 } 362 363 void DatagramDestination::ResetReceiver (uint16_t port) 364 { 365 std::lock_guard<std::mutex> lock(m_ReceiversMutex); 366 m_ReceiversByPorts.erase (port); 367 if (m_DefaultReceiverPort == port) { 368 m_DefaultReceiver = nullptr; 369 m_DefaultReceiverPort = 0; 370 } 371 } 372 373 374 void DatagramDestination::SetRawReceiver (const RawReceiver& receiver, uint16_t port) 375 { 376 std::lock_guard<std::mutex> lock(m_RawReceiversMutex); 377 m_RawReceiversByPorts[port] = receiver; 378 if (!m_DefaultRawReceiver) { 379 m_DefaultRawReceiver = receiver; 380 m_DefaultRawReceiverPort = port; 381 } 382 } 383 384 void DatagramDestination::ResetRawReceiver (uint16_t port) 385 { 386 std::lock_guard<std::mutex> lock(m_RawReceiversMutex); 387 m_RawReceiversByPorts.erase (port); 388 if (m_DefaultRawReceiverPort == port) { 389 m_DefaultRawReceiver = nullptr; 390 m_DefaultRawReceiverPort = 0; 391 } 392 } 393 394 395 DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port) 396 { 397 std::lock_guard<std::mutex> lock(m_ReceiversMutex); 398 Receiver r = nullptr; 399 auto itr = m_ReceiversByPorts.find(port); 400 if (itr != m_ReceiversByPorts.end()) 401 r = itr->second; 402 else { 403 r = m_DefaultReceiver; 404 } 405 return r; 406 } 407 408 DatagramDestination::RawReceiver DatagramDestination::FindRawReceiver(uint16_t port) 409 { 410 std::lock_guard<std::mutex> lock(m_RawReceiversMutex); 411 RawReceiver r = nullptr; 412 auto itr = m_RawReceiversByPorts.find(port); 413 if (itr != m_RawReceiversByPorts.end()) 414 r = itr->second; 415 else { 416 r = m_DefaultRawReceiver; 417 } 418 return r; 419 } 420 421 void DatagramDestination::HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, 422 const uint8_t * buf, size_t len, uint8_t protocolType, i2p::garlic::ECIESX25519AEADRatchetSession * from) 423 { 424 // unzip it 425 uint8_t uncompressed[MAX_DATAGRAM_SIZE]; 426 size_t uncompressedLen = m_Inflator.Inflate (buf, len, uncompressed, MAX_DATAGRAM_SIZE); 427 if (uncompressedLen) 428 { 429 switch (protocolType) 430 { 431 case i2p::client::PROTOCOL_TYPE_RAW: 432 HandleRawDatagram (fromPort, toPort, uncompressed, uncompressedLen); 433 break; 434 case i2p::client::PROTOCOL_TYPE_DATAGRAM3: 435 HandleDatagram3 (fromPort, toPort, uncompressed, uncompressedLen, from); 436 break; 437 case i2p::client::PROTOCOL_TYPE_DATAGRAM: 438 HandleDatagram (fromPort, toPort, uncompressed, uncompressedLen, from); 439 break; 440 case i2p::client::PROTOCOL_TYPE_DATAGRAM2: 441 HandleDatagram2 (fromPort, toPort, uncompressed, uncompressedLen, from); 442 break; 443 default: 444 LogPrint (eLogInfo, "Datagram: unknown protocol type ", protocolType); 445 }; 446 } 447 else 448 LogPrint (eLogWarning, "Datagram: decompression failed"); 449 } 450 451 std::shared_ptr<I2NPMessage> DatagramDestination::CreateDataMessage ( 452 const std::vector<std::pair<const uint8_t *, size_t> >& payloads, 453 uint16_t fromPort, uint16_t toPort, uint8_t protocolType, bool checksum) 454 { 455 size_t size; 456 auto msg = m_I2NPMsgsPool.AcquireShared (); 457 uint8_t * buf = msg->GetPayload (); 458 buf += 4; // reserve for length 459 460 if (m_Gzip && m_Deflator) 461 size = m_Deflator->Deflate (payloads, buf, msg->maxLen - msg->len); 462 else 463 size = i2p::data::GzipNoCompression (payloads, buf, msg->maxLen - msg->len); 464 465 if (size) 466 { 467 htobe32buf (msg->GetPayload (), size); // length 468 htobe16buf (buf + 4, fromPort); // source port 469 htobe16buf (buf + 6, toPort); // destination port 470 buf[9] = protocolType; // raw or datagram protocol 471 msg->len += size + 4; 472 msg->FillI2NPMessageHeader (eI2NPData, 0, checksum); 473 } 474 else 475 msg = nullptr; 476 return msg; 477 } 478 479 void DatagramDestination::CleanUp () 480 { 481 if (m_Sessions.empty ()) return; 482 auto now = i2p::util::GetMillisecondsSinceEpoch(); 483 LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); 484 std::unique_lock<std::mutex> lock(m_SessionsMutex); 485 // for each session ... 486 for (auto it = m_Sessions.begin (); it != m_Sessions.end (); ) 487 { 488 // check if expired 489 if (now - it->second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) 490 { 491 LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", it->first.ToBase32()); 492 it->second->Stop (); 493 it = m_Sessions.erase (it); // we are expired 494 } 495 else 496 it++; 497 } 498 } 499 500 std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & identity) 501 { 502 std::shared_ptr<DatagramSession> session = nullptr; 503 std::lock_guard<std::mutex> lock(m_SessionsMutex); 504 auto itr = m_Sessions.find(identity); 505 if (itr == m_Sessions.end()) 506 { 507 // not found, create new session 508 session = std::make_shared<DatagramSession>(m_Owner, identity); 509 session->SetVersion (m_Version); 510 session->Start (); 511 m_Sessions.emplace (identity, session); 512 } 513 else 514 session = itr->second; 515 return session; 516 } 517 518 std::shared_ptr<DatagramSession> DatagramDestination::FindSession(const i2p::data::IdentHash& ident) const 519 { 520 std::lock_guard<std::mutex> lock(m_SessionsMutex); 521 auto it = m_Sessions.find(ident); 522 if (it != m_Sessions.end()) return it->second; 523 return nullptr; 524 } 525 526 std::shared_ptr<DatagramSession::Info> DatagramDestination::GetInfoForRemote(const i2p::data::IdentHash & remote) 527 { 528 std::lock_guard<std::mutex> lock(m_SessionsMutex); 529 for ( auto & item : m_Sessions) 530 { 531 if(item.first == remote) return std::make_shared<DatagramSession::Info>(item.second->GetSessionInfo()); 532 } 533 return nullptr; 534 } 535 536 DatagramSession::DatagramSession(std::shared_ptr<i2p::client::ClientDestination> localDestination, 537 const i2p::data::IdentHash & remoteIdent) : 538 m_LocalDestination(localDestination), m_RemoteIdent(remoteIdent), 539 m_LastUse (0), m_LastFlush (0), m_RequestingLS (false), m_Version (eDatagramV1) 540 { 541 } 542 543 void DatagramSession::Start () 544 { 545 m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); 546 } 547 548 void DatagramSession::Stop () 549 { 550 } 551 552 void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg) 553 { 554 // we used this session 555 m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); 556 if (msg || m_SendQueue.empty ()) 557 m_SendQueue.push_back(msg); 558 // flush queue right away if full 559 if (!msg || m_SendQueue.size() >= DATAGRAM_SEND_QUEUE_MAX_SIZE || 560 m_LastUse > m_LastFlush + DATAGRAM_MAX_FLUSH_INTERVAL) 561 { 562 FlushSendQueue(); 563 m_LastFlush = m_LastUse; 564 } 565 } 566 567 DatagramSession::Info DatagramSession::GetSessionInfo() const 568 { 569 if(!m_RoutingSession) 570 return DatagramSession::Info(nullptr, nullptr, m_LastUse); 571 572 auto routingPath = m_RoutingSession->GetSharedRoutingPath(); 573 if (!routingPath) 574 return DatagramSession::Info(nullptr, nullptr, m_LastUse); 575 auto lease = routingPath->remoteLease; 576 auto tunnel = routingPath->outboundTunnel; 577 if(lease) 578 { 579 if(tunnel) 580 return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse); 581 else 582 return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse); 583 } 584 else if(tunnel) 585 return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse); 586 else 587 return DatagramSession::Info(nullptr, nullptr, m_LastUse); 588 } 589 590 void DatagramSession::Ack() 591 { 592 m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); 593 auto path = GetSharedRoutingPath(); 594 if(path) 595 path->updateTime = i2p::util::GetSecondsSinceEpoch (); 596 if (IsRatchets ()) 597 SendMsg (nullptr); // send empty message in case if we don't have some data to send 598 } 599 600 std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetSharedRoutingPath () 601 { 602 if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) 603 { 604 m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent); 605 if (!m_RemoteLeaseSet) 606 { 607 if(!m_RequestingLS) 608 { 609 m_RequestingLS = true; 610 m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1)); 611 } 612 return nullptr; 613 } 614 } 615 616 if (!m_RoutingSession || m_RoutingSession->IsTerminated () || !m_RoutingSession->IsReadyToSend ()) 617 { 618 bool found = false; 619 if (!m_PendingRoutingSessions.empty ()) 620 { 621 std::vector<std::weak_ptr<i2p::garlic::GarlicRoutingSession> > tmp; 622 for (auto& it: m_PendingRoutingSessions) 623 { 624 auto s = it.lock (); 625 if (s) 626 { 627 if (s->GetOwner () && s->IsReadyToSend ()) // found established session 628 { 629 m_RoutingSession = s; 630 tmp.clear (); 631 found = true; 632 break; 633 } 634 tmp.push_back (s); 635 } 636 } 637 m_PendingRoutingSessions.swap (tmp); 638 } 639 if (!found) 640 { 641 m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); 642 if (m_RoutingSession) 643 { 644 m_RoutingSession->SetAckRequestInterval (DATAGRAM_SESSION_ACK_REQUEST_INTERVAL); 645 if (!m_RoutingSession->GetOwner () || !m_RoutingSession->IsReadyToSend ()) 646 m_PendingRoutingSessions.push_back (m_RoutingSession); 647 } 648 } 649 } 650 651 auto path = m_RoutingSession->GetSharedRoutingPath(); 652 if (path && m_RoutingSession->IsRatchets () && m_RoutingSession->CleanupUnconfirmedTags ()) 653 { 654 LogPrint (eLogDebug, "Datagram: path reset"); 655 m_RoutingSession->SetSharedRoutingPath (nullptr); 656 path = nullptr; 657 } 658 659 if (path) 660 { 661 if (path->outboundTunnel && !path->outboundTunnel->IsEstablished ()) 662 { 663 // bad outbound tunnel, switch outbound tunnel 664 path->outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(path->outboundTunnel); 665 if (!path->outboundTunnel) 666 m_RoutingSession->SetSharedRoutingPath (nullptr); 667 } 668 669 if (path->remoteLease && path->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) 670 { 671 // bad lease, switch to next one 672 if (m_RemoteLeaseSet) 673 { 674 auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( 675 [&](const i2p::data::Lease& l) -> bool 676 { 677 return l.tunnelID == path->remoteLease->tunnelID; 678 }); 679 auto sz = ls.size(); 680 if (sz) 681 { 682 int idx = -1; 683 if (m_LocalDestination) 684 idx = m_LocalDestination->GetRng ()() % sz; 685 if (idx < 0) idx = rand () % sz; 686 path->remoteLease = ls[idx]; 687 } 688 else 689 m_RoutingSession->SetSharedRoutingPath (nullptr); 690 } 691 else 692 { 693 // no remote lease set? 694 LogPrint(eLogWarning, "DatagramSession: no cached remote lease set for ", m_RemoteIdent.ToBase32()); 695 m_RoutingSession->SetSharedRoutingPath (nullptr); 696 } 697 } 698 } 699 else 700 { 701 // no current path, make one 702 path = std::make_shared<i2p::garlic::GarlicRoutingPath>(); 703 704 if (m_RemoteLeaseSet) 705 { 706 // pick random next good lease 707 auto ls = m_RemoteLeaseSet->GetNonExpiredLeases(); 708 auto sz = ls.size(); 709 if (sz) 710 { 711 int idx = -1; 712 if (m_LocalDestination) 713 idx = m_LocalDestination->GetRng ()() % sz; 714 if (idx < 0) idx = rand () % sz; 715 path->remoteLease = ls[idx]; 716 } 717 else 718 return nullptr; 719 720 auto leaseRouter = i2p::data::netdb.FindRouter (path->remoteLease->tunnelGateway); 721 path->outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(nullptr, 722 leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports); 723 if (!path->outboundTunnel) return nullptr; 724 } 725 else 726 { 727 // no remote lease set currently, bail 728 LogPrint(eLogWarning, "DatagramSession: no remote lease set found for ", m_RemoteIdent.ToBase32()); 729 return nullptr; 730 } 731 m_RoutingSession->SetSharedRoutingPath(path); 732 } 733 return path; 734 } 735 736 void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr<i2p::data::LeaseSet> ls) 737 { 738 m_RequestingLS = false; 739 if(!ls) return; 740 // only update lease set if found and newer than previous lease set 741 uint64_t oldExpire = 0; 742 if(m_RemoteLeaseSet) oldExpire = m_RemoteLeaseSet->GetExpirationTime(); 743 if(ls && ls->GetExpirationTime() > oldExpire) m_RemoteLeaseSet = ls; 744 } 745 746 void DatagramSession::FlushSendQueue () 747 { 748 if (m_SendQueue.empty ()) return; 749 auto routingPath = GetSharedRoutingPath(); 750 // if we don't have a routing path we will drop all queued messages 751 if(routingPath && routingPath->outboundTunnel && routingPath->remoteLease) 752 { 753 if (m_Version == eDatagramV3) 754 { 755 auto msgs = m_RoutingSession->WrapMultipleMessages (m_SendQueue); 756 if (!msgs.empty ()) 757 routingPath->outboundTunnel->SendTunnelDataMsgsTo (routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, msgs); 758 } 759 else 760 { 761 // for compatibility with older versions 762 std::vector<i2p::tunnel::TunnelMessageBlock> send; 763 for (const auto & msg : m_SendQueue) 764 { 765 auto m = m_RoutingSession->WrapSingleMessage(msg); 766 if (m) 767 send.push_back(i2p::tunnel::TunnelMessageBlock{i2p::tunnel::eDeliveryTypeTunnel,routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, m}); 768 } 769 routingPath->outboundTunnel->SendTunnelDataMsgs(send); 770 } 771 } 772 m_SendQueue.clear(); 773 } 774 } 775 }