Tunnel.cpp
1 /* 2 * Copyright (c) 2013-2025, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include <string.h> 10 #include "I2PEndian.h" 11 #include <random> 12 #include <thread> 13 #include <algorithm> 14 #include <vector> 15 #include "Crypto.h" 16 #include "RouterContext.h" 17 #include "Log.h" 18 #include "Timestamp.h" 19 #include "I2NPProtocol.h" 20 #include "Transports.h" 21 #include "NetDb.hpp" 22 #include "Config.h" 23 #include "Tunnel.h" 24 #include "TunnelPool.h" 25 #include "util.h" 26 #include "ECIESX25519AEADRatchetSession.h" 27 28 namespace i2p 29 { 30 namespace tunnel 31 { 32 Tunnel::Tunnel (std::shared_ptr<TunnelConfig> config): 33 TunnelBase (config->GetTunnelID (), config->GetNextTunnelID (), config->GetNextIdentHash ()), 34 m_Config (config), m_IsShortBuildMessage (false), m_Pool (nullptr), 35 m_State (eTunnelStatePending), m_FarEndTransports (i2p::data::RouterInfo::eAllTransports), 36 m_IsRecreated (false), m_Latency (UNKNOWN_LATENCY) 37 { 38 } 39 40 Tunnel::~Tunnel () 41 { 42 } 43 44 void Tunnel::Build (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> outboundTunnel) 45 { 46 auto numHops = m_Config->GetNumHops (); 47 bool insertPhonyRecord = m_Config->IsInbound() && numHops < MAX_NUM_RECORDS; 48 if (insertPhonyRecord) 49 { 50 m_Config->CreatePhonyHop (); 51 numHops++; 52 } 53 int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : MAX_NUM_RECORDS; 54 auto msg = numRecords <= STANDARD_NUM_RECORDS ? NewI2NPShortMessage () : NewI2NPMessage (); 55 *msg->GetPayload () = numRecords; 56 const size_t recordSize = m_Config->IsShort () ? SHORT_TUNNEL_BUILD_RECORD_SIZE : TUNNEL_BUILD_RECORD_SIZE; 57 msg->len += numRecords*recordSize + 1; 58 // shuffle records 59 std::vector<int> recordIndicies; 60 for (int i = 0; i < numRecords; i++) recordIndicies.push_back(i); 61 std::shuffle (recordIndicies.begin(), recordIndicies.end(), tunnels.GetRng ()); 62 // create real records 63 uint8_t * records = msg->GetPayload () + 1; 64 TunnelHopConfig * hop = m_Config->GetFirstHop (); 65 int i = 0; 66 while (hop) 67 { 68 uint32_t msgID; 69 if (hop->next && hop->next->ident) // we set replyMsgID for last non-phony hop only 70 RAND_bytes ((uint8_t *)&msgID, 4); 71 else 72 msgID = replyMsgID; 73 hop->recordIndex = recordIndicies[i]; i++; 74 hop->CreateBuildRequestRecord (records, msgID); 75 hop = hop->next; 76 } 77 // fill up fake records with random data 78 for (int i = numHops; i < numRecords; i++) 79 { 80 int idx = recordIndicies[i]; 81 RAND_bytes (records + idx*recordSize, recordSize); 82 } 83 84 // decrypt real records 85 hop = m_Config->GetLastHop ()->prev; 86 while (hop) 87 { 88 // decrypt records after current hop 89 TunnelHopConfig * hop1 = hop->next; 90 while (hop1) 91 { 92 hop->DecryptRecord (records, hop1->recordIndex); 93 hop1 = hop1->next; 94 } 95 hop = hop->prev; 96 } 97 // delete phony hop after encryption 98 if (insertPhonyRecord) m_Config->DeletePhonyHop (); 99 100 msg->FillI2NPMessageHeader (m_Config->IsShort () ? eI2NPShortTunnelBuild : eI2NPVariableTunnelBuild); 101 auto s = shared_from_this (); 102 msg->onDrop = [s]() 103 { 104 LogPrint (eLogInfo, "I2NP: Tunnel ", s->GetTunnelID (), " request was not sent"); 105 s->SetState (i2p::tunnel::eTunnelStateBuildFailed); 106 }; 107 108 // send message 109 if (outboundTunnel) 110 { 111 if (m_Config->IsShort ()) 112 { 113 auto ident = m_Config->GetFirstHop () ? m_Config->GetFirstHop ()->ident : nullptr; 114 if (ident && ident->GetIdentHash () != outboundTunnel->GetEndpointIdentHash ()) // don't encrypt if IBGW = OBEP 115 { 116 auto msg1 = i2p::garlic::WrapECIESX25519MessageForRouter (msg, ident->GetEncryptionPublicKey ()); 117 if (msg1) msg = msg1; 118 } 119 } 120 outboundTunnel->SendTunnelDataMsgTo (GetNextIdentHash (), 0, msg); 121 } 122 else 123 { 124 if (m_Config->IsShort () && m_Config->GetLastHop () && 125 m_Config->GetLastHop ()->ident->GetIdentHash () != m_Config->GetLastHop ()->nextIdent) 126 { 127 // add garlic key/tag for reply 128 uint8_t key[32]; 129 uint64_t tag = m_Config->GetLastHop ()->GetGarlicKey (key); 130 if (m_Pool && m_Pool->GetLocalDestination ()) 131 m_Pool->GetLocalDestination ()->SubmitECIESx25519Key (key, tag); 132 else 133 i2p::context.SubmitECIESx25519Key (key, tag); 134 } 135 i2p::transport::transports.SendMessage (GetNextIdentHash (), msg); 136 } 137 } 138 139 bool Tunnel::HandleTunnelBuildResponse (uint8_t * msg, size_t len) 140 { 141 int num = msg[0]; 142 LogPrint (eLogDebug, "Tunnel: TunnelBuildResponse ", num, " records."); 143 if (num > MAX_NUM_RECORDS) 144 { 145 LogPrint (eLogError, "Tunnel: Too many records in TunnelBuildResponse", num); 146 return false; 147 } 148 if (len < num*m_Config->GetRecordSize () + 1) 149 { 150 LogPrint (eLogError, "Tunnel: TunnelBuildResponse of ", num, " records is too short ", len); 151 return false; 152 } 153 154 TunnelHopConfig * hop = m_Config->GetLastHop (); 155 while (hop) 156 { 157 // decrypt current hop 158 if (hop->recordIndex >= 0 && hop->recordIndex < msg[0]) 159 { 160 if (!hop->DecryptBuildResponseRecord (msg + 1)) 161 return false; 162 } 163 else 164 { 165 LogPrint (eLogWarning, "Tunnel: Hop index ", hop->recordIndex, " is out of range"); 166 return false; 167 } 168 169 // decrypt records before current hop 170 TunnelHopConfig * hop1 = hop->prev; 171 while (hop1) 172 { 173 auto idx = hop1->recordIndex; 174 if (idx >= 0 && idx < num) 175 hop->DecryptRecord (msg + 1, idx); 176 else 177 LogPrint (eLogWarning, "Tunnel: Hop index ", idx, " is out of range"); 178 hop1 = hop1->prev; 179 } 180 hop = hop->prev; 181 } 182 183 bool established = true; 184 size_t numHops = 0; 185 hop = m_Config->GetFirstHop (); 186 while (hop) 187 { 188 uint8_t ret = hop->GetRetCode (msg + 1); 189 LogPrint (eLogDebug, "Tunnel: Build response ret code=", (int)ret); 190 if (hop->ident) 191 i2p::data::UpdateRouterProfile (hop->ident->GetIdentHash (), 192 [ret](std::shared_ptr<i2p::data::RouterProfile> profile) 193 { 194 if (profile) profile->TunnelBuildResponse (ret); 195 }); 196 if (ret) 197 // if any of participants declined the tunnel is not established 198 established = false; 199 hop = hop->next; 200 numHops++; 201 } 202 if (established) 203 { 204 // create tunnel decryptions from layer and iv keys in reverse order 205 m_Hops.resize (numHops); 206 hop = m_Config->GetLastHop (); 207 int i = 0; 208 while (hop) 209 { 210 m_Hops[i].ident = hop->ident; 211 m_Hops[i].decryption.SetKeys (hop->layerKey, hop->ivKey); 212 hop = hop->prev; 213 i++; 214 } 215 m_IsShortBuildMessage = m_Config->IsShort (); 216 m_FarEndTransports = m_Config->GetFarEndTransports (); 217 m_Config = nullptr; 218 } 219 if (established) m_State = eTunnelStateEstablished; 220 return established; 221 } 222 223 bool Tunnel::LatencyFitsRange(int lowerbound, int upperbound) const 224 { 225 auto latency = GetMeanLatency(); 226 return latency >= lowerbound && latency <= upperbound; 227 } 228 229 void Tunnel::EncryptTunnelMsg (std::shared_ptr<const I2NPMessage> in, std::shared_ptr<I2NPMessage> out) 230 { 231 const uint8_t * inPayload = in->GetPayload () + 4; 232 uint8_t * outPayload = out->GetPayload () + 4; 233 for (auto& it: m_Hops) 234 { 235 it.decryption.Decrypt (inPayload, outPayload); 236 inPayload = outPayload; 237 } 238 } 239 240 void Tunnel::SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg) 241 { 242 LogPrint (eLogWarning, "Tunnel: Can't send I2NP messages without delivery instructions"); 243 } 244 245 std::vector<std::shared_ptr<const i2p::data::IdentityEx> > Tunnel::GetPeers () const 246 { 247 auto peers = GetInvertedPeers (); 248 std::reverse (peers.begin (), peers.end ()); 249 return peers; 250 } 251 252 std::vector<std::shared_ptr<const i2p::data::IdentityEx> > Tunnel::GetInvertedPeers () const 253 { 254 // hops are in inverted order 255 std::vector<std::shared_ptr<const i2p::data::IdentityEx> > ret; 256 for (const auto& it: m_Hops) 257 ret.push_back (it.ident); 258 return ret; 259 } 260 261 void Tunnel::SetState(TunnelState state) 262 { 263 m_State = state; 264 } 265 266 bool Tunnel::IsSlow () const 267 { 268 return LatencyIsKnown() && m_Latency > HIGH_LATENCY_PER_HOP*GetNumHops () + 269 i2p::transport::transports.GetLocalDelay ()*1000; 270 } 271 272 void Tunnel::VisitTunnelHops(TunnelHopVisitor v) 273 { 274 // hops are in inverted order, we must return in direct order 275 for (auto it = m_Hops.rbegin (); it != m_Hops.rend (); it++) 276 v((*it).ident); 277 } 278 279 void InboundTunnel::HandleTunnelDataMsg (std::shared_ptr<I2NPMessage>&& msg) 280 { 281 if (!IsEstablished () && GetState () != eTunnelStateExpiring) 282 { 283 // incoming messages means a tunnel is alive 284 SetState (eTunnelStateEstablished); 285 auto pool = GetTunnelPool (); 286 if (pool) 287 { 288 // update LeaseSet 289 auto dest = pool->GetLocalDestination (); 290 if (dest) dest->SetLeaseSetUpdated (true); 291 } 292 } 293 EncryptTunnelMsg (msg, msg); 294 msg->from = GetSharedFromThis (); 295 m_Endpoint.HandleDecryptedTunnelDataMsg (msg); 296 } 297 298 bool InboundTunnel::Recreate () 299 { 300 if (!IsRecreated ()) 301 { 302 auto pool = GetTunnelPool (); 303 if (pool) 304 { 305 SetRecreated (true); 306 pool->RecreateInboundTunnel (std::static_pointer_cast<InboundTunnel>(shared_from_this ())); 307 return true; 308 } 309 } 310 return false; 311 } 312 313 ZeroHopsInboundTunnel::ZeroHopsInboundTunnel (): 314 InboundTunnel (std::make_shared<ZeroHopsTunnelConfig> ()), 315 m_NumReceivedBytes (0) 316 { 317 } 318 319 void ZeroHopsInboundTunnel::SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg) 320 { 321 if (msg) 322 { 323 m_NumReceivedBytes += msg->GetLength (); 324 msg->from = GetSharedFromThis (); 325 HandleI2NPMessage (msg); 326 } 327 } 328 329 TunnelMessageBlock OutboundTunnel::CreateTunnelMessageBlock (const uint8_t * gwHash, uint32_t gwTunnel) 330 { 331 TunnelMessageBlock block; 332 block.tunnelID = 0; // Initialize tunnelID to a default value 333 334 if (gwHash) 335 { 336 block.hash = gwHash; 337 if (gwTunnel) 338 { 339 block.deliveryType = eDeliveryTypeTunnel; 340 block.tunnelID = gwTunnel; // Set tunnelID only if gwTunnel is non-zero 341 } 342 else 343 { 344 block.deliveryType = eDeliveryTypeRouter; 345 } 346 } 347 else 348 { 349 block.deliveryType = eDeliveryTypeLocal; 350 } 351 return block; 352 } 353 354 void OutboundTunnel::SendTunnelDataMsgTo (const uint8_t * gwHash, uint32_t gwTunnel, std::shared_ptr<i2p::I2NPMessage> msg) 355 { 356 auto block = CreateTunnelMessageBlock (gwHash, gwTunnel); 357 block.data = msg; 358 SendTunnelDataMsgs({block}); 359 } 360 361 void OutboundTunnel::SendTunnelDataMsgs (const std::vector<TunnelMessageBlock>& msgs) 362 { 363 std::unique_lock<std::mutex> l(m_SendMutex); 364 for (auto& it : msgs) 365 m_Gateway.PutTunnelDataMsg (it); 366 m_Gateway.SendBuffer (); 367 } 368 369 void OutboundTunnel::SendTunnelDataMsgsTo (const uint8_t * gwHash, uint32_t gwTunnel, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs) 370 { 371 auto block = CreateTunnelMessageBlock (gwHash, gwTunnel); 372 std::unique_lock<std::mutex> l(m_SendMutex); 373 for (auto& it : msgs) 374 { 375 block.data = it; 376 m_Gateway.PutTunnelDataMsg (block); 377 } 378 m_Gateway.SendBuffer (); 379 } 380 381 void OutboundTunnel::HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage>&& tunnelMsg) 382 { 383 LogPrint (eLogError, "Tunnel: Incoming message for outbound tunnel ", GetTunnelID ()); 384 } 385 386 bool OutboundTunnel::Recreate () 387 { 388 if (!IsRecreated ()) 389 { 390 auto pool = GetTunnelPool (); 391 if (pool) 392 { 393 SetRecreated (true); 394 pool->RecreateOutboundTunnel (std::static_pointer_cast<OutboundTunnel>(shared_from_this ())); 395 return true; 396 } 397 } 398 return false; 399 } 400 401 ZeroHopsOutboundTunnel::ZeroHopsOutboundTunnel (): 402 OutboundTunnel (std::make_shared<ZeroHopsTunnelConfig> ()), 403 m_NumSentBytes (0) 404 { 405 } 406 407 void ZeroHopsOutboundTunnel::SendTunnelDataMsgs (const std::vector<TunnelMessageBlock>& msgs) 408 { 409 for (auto& msg : msgs) 410 { 411 if (!msg.data) continue; 412 m_NumSentBytes += msg.data->GetLength (); 413 switch (msg.deliveryType) 414 { 415 case eDeliveryTypeLocal: 416 HandleI2NPMessage (msg.data); 417 break; 418 case eDeliveryTypeTunnel: 419 i2p::transport::transports.SendMessage (msg.hash, i2p::CreateTunnelGatewayMsg (msg.tunnelID, msg.data)); 420 break; 421 case eDeliveryTypeRouter: 422 i2p::transport::transports.SendMessage (msg.hash, msg.data); 423 break; 424 default: 425 LogPrint (eLogError, "Tunnel: Unknown delivery type ", (int)msg.deliveryType); 426 } 427 } 428 } 429 430 void ZeroHopsOutboundTunnel::SendTunnelDataMsgsTo (const uint8_t * gwHash, uint32_t gwTunnel, 431 const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs) 432 { 433 auto block = CreateTunnelMessageBlock (gwHash, gwTunnel); 434 std::vector<TunnelMessageBlock> blocks; 435 for (auto& it: msgs) 436 { 437 block.data = it; 438 blocks.push_back (block); 439 } 440 SendTunnelDataMsgs (blocks); 441 } 442 443 Tunnels tunnels; 444 445 Tunnels::Tunnels (): m_IsRunning (false), m_Thread (nullptr), m_MaxNumTransitTunnels (DEFAULT_MAX_NUM_TRANSIT_TUNNELS), 446 m_TotalNumSuccesiveTunnelCreations (0), m_TotalNumFailedTunnelCreations (0), // for normal average 447 m_TunnelCreationSuccessRate (TCSR_START_VALUE), m_TunnelCreationAttemptsNum(0), 448 m_Rng(i2p::util::GetMonotonicMicroseconds ()%1000000LL) 449 { 450 } 451 452 Tunnels::~Tunnels () 453 { 454 DeleteTunnelPool(m_ExploratoryPool); 455 } 456 457 std::shared_ptr<TunnelBase> Tunnels::GetTunnel (uint32_t tunnelID) 458 { 459 std::lock_guard<std::mutex> l(m_TunnelsMutex); 460 auto it = m_Tunnels.find(tunnelID); 461 if (it != m_Tunnels.end ()) 462 return it->second; 463 return nullptr; 464 } 465 466 bool Tunnels::AddTunnel (std::shared_ptr<TunnelBase> tunnel) 467 { 468 if (!tunnel) return false; 469 std::lock_guard<std::mutex> l(m_TunnelsMutex); 470 return m_Tunnels.emplace (tunnel->GetTunnelID (), tunnel).second; 471 } 472 473 void Tunnels::RemoveTunnel (uint32_t tunnelID) 474 { 475 std::lock_guard<std::mutex> l(m_TunnelsMutex); 476 m_Tunnels.erase (tunnelID); 477 } 478 479 std::shared_ptr<InboundTunnel> Tunnels::GetPendingInboundTunnel (uint32_t replyMsgID) 480 { 481 return GetPendingTunnel (replyMsgID, m_PendingInboundTunnels); 482 } 483 484 std::shared_ptr<OutboundTunnel> Tunnels::GetPendingOutboundTunnel (uint32_t replyMsgID) 485 { 486 return GetPendingTunnel (replyMsgID, m_PendingOutboundTunnels); 487 } 488 489 template<class TTunnel> 490 std::shared_ptr<TTunnel> Tunnels::GetPendingTunnel (uint32_t replyMsgID, const std::map<uint32_t, std::shared_ptr<TTunnel> >& pendingTunnels) 491 { 492 auto it = pendingTunnels.find(replyMsgID); 493 if (it != pendingTunnels.end () && it->second->GetState () == eTunnelStatePending) 494 { 495 it->second->SetState (eTunnelStateBuildReplyReceived); 496 return it->second; 497 } 498 return nullptr; 499 } 500 501 std::shared_ptr<InboundTunnel> Tunnels::GetNextInboundTunnel () 502 { 503 std::shared_ptr<InboundTunnel> tunnel; 504 size_t minReceived = 0; 505 for (const auto& it : m_InboundTunnels) 506 { 507 if (!it->IsEstablished ()) continue; 508 if (!tunnel || it->GetNumReceivedBytes () < minReceived) 509 { 510 tunnel = it; 511 minReceived = it->GetNumReceivedBytes (); 512 } 513 } 514 return tunnel; 515 } 516 517 std::shared_ptr<OutboundTunnel> Tunnels::GetNextOutboundTunnel () 518 { 519 if (m_OutboundTunnels.empty ()) return nullptr; 520 uint32_t ind = m_Rng () % m_OutboundTunnels.size (), i = 0; 521 std::shared_ptr<OutboundTunnel> tunnel; 522 for (const auto& it: m_OutboundTunnels) 523 { 524 if (it->IsEstablished ()) 525 { 526 tunnel = it; 527 i++; 528 } 529 if (i > ind && tunnel) break; 530 } 531 return tunnel; 532 } 533 534 std::shared_ptr<TunnelPool> Tunnels::CreateTunnelPool (int numInboundHops, 535 int numOutboundHops, int numInboundTunnels, int numOutboundTunnels, 536 int inboundVariance, int outboundVariance, bool isHighBandwidth) 537 { 538 auto pool = std::make_shared<TunnelPool> (numInboundHops, numOutboundHops, 539 numInboundTunnels, numOutboundTunnels, inboundVariance, outboundVariance, isHighBandwidth); 540 std::unique_lock<std::mutex> l(m_PoolsMutex); 541 m_Pools.push_back (pool); 542 return pool; 543 } 544 545 void Tunnels::DeleteTunnelPool (std::shared_ptr<TunnelPool> pool) 546 { 547 if (pool) 548 { 549 StopTunnelPool (pool); 550 { 551 std::unique_lock<std::mutex> l(m_PoolsMutex); 552 m_Pools.remove (pool); 553 } 554 } 555 } 556 557 void Tunnels::StopTunnelPool (std::shared_ptr<TunnelPool> pool) 558 { 559 if (pool) 560 { 561 pool->SetActive (false); 562 pool->DetachTunnels (); 563 } 564 } 565 566 void Tunnels::Start () 567 { 568 m_IsRunning = true; 569 m_Thread = new std::thread (std::bind (&Tunnels::Run, this)); 570 m_TransitTunnels.Start (); 571 } 572 573 void Tunnels::Stop () 574 { 575 m_TransitTunnels.Stop (); 576 m_IsRunning = false; 577 m_Queue.WakeUp (); 578 if (m_Thread) 579 { 580 m_Thread->join (); 581 delete m_Thread; 582 m_Thread = 0; 583 } 584 } 585 586 void Tunnels::Run () 587 { 588 i2p::util::SetThreadName("Tunnels"); 589 std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready 590 591 uint64_t lastTs = 0, lastPoolsTs = 0, lastMemoryPoolTs = 0; 592 std::list<std::shared_ptr<I2NPMessage> > msgs; 593 while (m_IsRunning) 594 { 595 try 596 { 597 if (m_Queue.Wait (1,0)) // 1 sec 598 { 599 m_Queue.GetWholeQueue (msgs); 600 auto mts = i2p::util::GetMillisecondsSinceEpoch (); 601 int numMsgs = 0; 602 uint32_t prevTunnelID = 0, tunnelID = 0; 603 std::shared_ptr<TunnelBase> prevTunnel; 604 while (!msgs.empty ()) 605 { 606 auto msg = msgs.front (); msgs.pop_front (); 607 if (!msg || msg->IsExpired (mts)) continue; 608 std::shared_ptr<TunnelBase> tunnel; 609 uint8_t typeID = msg->GetTypeID (); 610 switch (typeID) 611 { 612 case eI2NPTunnelData: 613 case eI2NPTunnelGateway: 614 { 615 tunnelID = bufbe32toh (msg->GetPayload ()); 616 if (tunnelID == prevTunnelID) 617 tunnel = prevTunnel; 618 else if (prevTunnel) 619 prevTunnel->FlushTunnelDataMsgs (); 620 621 if (!tunnel) 622 tunnel = GetTunnel (tunnelID); 623 if (tunnel) 624 { 625 if (typeID == eI2NPTunnelData) 626 tunnel->HandleTunnelDataMsg (std::move (msg)); 627 else // tunnel gateway assumed 628 HandleTunnelGatewayMsg (tunnel, msg); 629 } 630 else 631 LogPrint (eLogWarning, "Tunnel: Tunnel not found, tunnelID=", tunnelID, " previousTunnelID=", prevTunnelID, " type=", (int)typeID); 632 633 break; 634 } 635 case eI2NPShortTunnelBuild: 636 HandleShortTunnelBuildMsg (msg); 637 break; 638 case eI2NPVariableTunnelBuild: 639 HandleVariableTunnelBuildMsg (msg); 640 break; 641 case eI2NPShortTunnelBuildReply: 642 HandleTunnelBuildReplyMsg (msg, true); 643 break; 644 case eI2NPVariableTunnelBuildReply: 645 HandleTunnelBuildReplyMsg (msg, false); 646 break; 647 case eI2NPTunnelBuild: 648 case eI2NPTunnelBuildReply: 649 LogPrint (eLogWarning, "Tunnel: TunnelBuild is too old for ECIES router"); 650 break; 651 default: 652 LogPrint (eLogWarning, "Tunnel: Unexpected message type ", (int) typeID); 653 } 654 655 prevTunnelID = tunnelID; 656 prevTunnel = tunnel; 657 numMsgs++; 658 659 if (msgs.empty ()) 660 { 661 if (numMsgs < MAX_TUNNEL_MSGS_BATCH_SIZE && !m_Queue.IsEmpty ()) 662 m_Queue.GetWholeQueue (msgs); // try more 663 else if (tunnel) 664 tunnel->FlushTunnelDataMsgs (); // otherwise flush last 665 } 666 } 667 } 668 669 if (i2p::transport::transports.IsOnline()) 670 { 671 uint64_t ts = i2p::util::GetSecondsSinceEpoch (); 672 if (ts - lastTs >= TUNNEL_MANAGE_INTERVAL || // manage tunnels every 15 seconds 673 ts + TUNNEL_MANAGE_INTERVAL < lastTs) 674 { 675 ManageTunnels (ts); 676 lastTs = ts; 677 } 678 if (ts - lastPoolsTs >= TUNNEL_POOLS_MANAGE_INTERVAL || // manage pools every 5 seconds 679 ts + TUNNEL_POOLS_MANAGE_INTERVAL < lastPoolsTs) 680 { 681 ManageTunnelPools (ts); 682 lastPoolsTs = ts; 683 } 684 if (ts - lastMemoryPoolTs >= TUNNEL_MEMORY_POOL_MANAGE_INTERVAL || 685 ts + TUNNEL_MEMORY_POOL_MANAGE_INTERVAL < lastMemoryPoolTs) // manage memory pool every 2 minutes 686 { 687 m_I2NPTunnelEndpointMessagesMemoryPool.CleanUpMt (); 688 m_I2NPTunnelMessagesMemoryPool.CleanUpMt (); 689 lastMemoryPoolTs = ts; 690 } 691 } 692 } 693 catch (std::exception& ex) 694 { 695 LogPrint (eLogError, "Tunnel: Runtime exception: ", ex.what ()); 696 } 697 } 698 } 699 700 void Tunnels::HandleTunnelGatewayMsg (std::shared_ptr<TunnelBase> tunnel, std::shared_ptr<I2NPMessage> msg) 701 { 702 if (!tunnel) 703 { 704 LogPrint (eLogError, "Tunnel: Missing tunnel for gateway"); 705 return; 706 } 707 const uint8_t * payload = msg->GetPayload (); 708 uint16_t len = bufbe16toh(payload + TUNNEL_GATEWAY_HEADER_LENGTH_OFFSET); 709 // we make payload as new I2NP message to send 710 msg->offset += I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE; 711 if (msg->offset + len > msg->len) 712 { 713 LogPrint (eLogError, "Tunnel: Gateway payload ", (int)len, " exceeds message length ", (int)msg->len); 714 return; 715 } 716 msg->len = msg->offset + len; 717 auto typeID = msg->GetTypeID (); 718 LogPrint (eLogDebug, "Tunnel: Gateway of ", (int) len, " bytes for tunnel ", tunnel->GetTunnelID (), ", msg type ", (int)typeID); 719 720 tunnel->SendTunnelDataMsg (msg); 721 } 722 723 void Tunnels::HandleShortTunnelBuildMsg (std::shared_ptr<I2NPMessage> msg) 724 { 725 if (!msg) return; 726 auto tunnel = GetPendingInboundTunnel (msg->GetMsgID()); // replyMsgID 727 if (tunnel) 728 { 729 // endpoint of inbound tunnel 730 LogPrint (eLogDebug, "Tunnel: ShortTunnelBuild reply for tunnel ", tunnel->GetTunnelID ()); 731 if (tunnel->HandleTunnelBuildResponse (msg->GetPayload(), msg->GetPayloadLength())) 732 { 733 LogPrint (eLogInfo, "Tunnel: Inbound tunnel ", tunnel->GetTunnelID (), " has been created"); 734 tunnel->SetState (eTunnelStateEstablished); 735 AddInboundTunnel (tunnel); 736 } 737 else 738 { 739 LogPrint (eLogInfo, "Tunnel: Inbound tunnel ", tunnel->GetTunnelID (), " has been declined"); 740 tunnel->SetState (eTunnelStateBuildFailed); 741 } 742 return; 743 } 744 else 745 m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg)); 746 } 747 748 void Tunnels::HandleVariableTunnelBuildMsg (std::shared_ptr<I2NPMessage> msg) 749 { 750 auto tunnel = GetPendingInboundTunnel (msg->GetMsgID()); // replyMsgID 751 if (tunnel) 752 { 753 // endpoint of inbound tunnel 754 LogPrint (eLogDebug, "Tunnel: VariableTunnelBuild reply for tunnel ", tunnel->GetTunnelID ()); 755 if (tunnel->HandleTunnelBuildResponse (msg->GetPayload(), msg->GetPayloadLength())) 756 { 757 LogPrint (eLogInfo, "Tunnel: Inbound tunnel ", tunnel->GetTunnelID (), " has been created"); 758 tunnel->SetState (eTunnelStateEstablished); 759 AddInboundTunnel (tunnel); 760 } 761 else 762 { 763 LogPrint (eLogInfo, "Tunnel: Inbound tunnel ", tunnel->GetTunnelID (), " has been declined"); 764 tunnel->SetState (eTunnelStateBuildFailed); 765 } 766 } 767 else 768 m_TransitTunnels.PostTransitTunnelBuildMsg (std::move (msg)); 769 } 770 771 void Tunnels::HandleTunnelBuildReplyMsg (std::shared_ptr<I2NPMessage> msg, bool isShort) 772 { 773 auto tunnel = GetPendingOutboundTunnel (msg->GetMsgID()); // replyMsgID 774 if (tunnel) 775 { 776 // reply for outbound tunnel 777 LogPrint (eLogDebug, "Tunnel: TunnelBuildReply for tunnel ", tunnel->GetTunnelID ()); 778 if (tunnel->HandleTunnelBuildResponse (msg->GetPayload(), msg->GetPayloadLength())) 779 { 780 LogPrint (eLogInfo, "Tunnel: Outbound tunnel ", tunnel->GetTunnelID (), " has been created"); 781 tunnel->SetState (eTunnelStateEstablished); 782 AddOutboundTunnel (tunnel); 783 } 784 else 785 { 786 LogPrint (eLogInfo, "Tunnel: Outbound tunnel ", tunnel->GetTunnelID (), " has been declined"); 787 tunnel->SetState (eTunnelStateBuildFailed); 788 } 789 } 790 else 791 LogPrint (eLogWarning, "Tunnel: Pending tunnel for message ", msg->GetMsgID(), " not found"); 792 793 } 794 795 void Tunnels::ManageTunnels (uint64_t ts) 796 { 797 ManagePendingTunnels (ts); 798 std::vector<std::shared_ptr<Tunnel> > tunnelsToRecreate; 799 ManageInboundTunnels (ts, tunnelsToRecreate); 800 ManageOutboundTunnels (ts, tunnelsToRecreate); 801 // rec-create in random order 802 if (!tunnelsToRecreate.empty ()) 803 { 804 if (tunnelsToRecreate.size () > 1) 805 std::shuffle (tunnelsToRecreate.begin(), tunnelsToRecreate.end(), m_Rng); 806 for (auto& it: tunnelsToRecreate) 807 it->Recreate (); 808 } 809 } 810 811 void Tunnels::ManagePendingTunnels (uint64_t ts) 812 { 813 ManagePendingTunnels (m_PendingInboundTunnels, ts); 814 ManagePendingTunnels (m_PendingOutboundTunnels, ts); 815 } 816 817 template<class PendingTunnels> 818 void Tunnels::ManagePendingTunnels (PendingTunnels& pendingTunnels, uint64_t ts) 819 { 820 // check pending tunnel. delete failed or timeout 821 for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();) 822 { 823 auto tunnel = it->second; 824 switch (tunnel->GetState ()) 825 { 826 case eTunnelStatePending: 827 if (ts > tunnel->GetCreationTime () + TUNNEL_CREATION_TIMEOUT || 828 ts + TUNNEL_CREATION_TIMEOUT < tunnel->GetCreationTime ()) 829 { 830 LogPrint (eLogDebug, "Tunnel: Pending build request ", it->first, " timeout, deleted"); 831 // update stats 832 auto config = tunnel->GetTunnelConfig (); 833 if (config) 834 { 835 auto hop = config->GetFirstHop (); 836 while (hop) 837 { 838 if (hop->ident) 839 i2p::data::UpdateRouterProfile (hop->ident->GetIdentHash (), 840 [](std::shared_ptr<i2p::data::RouterProfile> profile) 841 { 842 if (profile) profile->TunnelNonReplied (); 843 }); 844 hop = hop->next; 845 } 846 } 847 // delete 848 it = pendingTunnels.erase (it); 849 FailedTunnelCreation(); 850 } 851 else 852 ++it; 853 break; 854 case eTunnelStateBuildFailed: 855 LogPrint (eLogDebug, "Tunnel: Pending build request ", it->first, " failed, deleted"); 856 it = pendingTunnels.erase (it); 857 FailedTunnelCreation(); 858 break; 859 case eTunnelStateBuildReplyReceived: 860 // intermediate state, will be either established of build failed 861 ++it; 862 break; 863 default: 864 // success 865 it = pendingTunnels.erase (it); 866 SuccesiveTunnelCreation(); 867 } 868 } 869 } 870 871 void Tunnels::ManageOutboundTunnels (uint64_t ts, std::vector<std::shared_ptr<Tunnel> >& toRecreate) 872 { 873 for (auto it = m_OutboundTunnels.begin (); it != m_OutboundTunnels.end ();) 874 { 875 auto tunnel = *it; 876 if (tunnel->IsFailed () || ts > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT || 877 ts + TUNNEL_EXPIRATION_TIMEOUT < tunnel->GetCreationTime ()) 878 { 879 LogPrint (eLogDebug, "Tunnel: Tunnel with id ", tunnel->GetTunnelID (), " expired or failed"); 880 auto pool = tunnel->GetTunnelPool (); 881 if (pool) 882 pool->TunnelExpired (tunnel); 883 // we don't have outbound tunnels in m_Tunnels 884 it = m_OutboundTunnels.erase (it); 885 } 886 else 887 { 888 if (tunnel->IsEstablished ()) 889 { 890 if (!tunnel->IsRecreated () && ts + TUNNEL_RECREATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) 891 { 892 auto pool = tunnel->GetTunnelPool (); 893 // let it die if the tunnel pool has been reconfigured and this is old 894 if (pool && tunnel->GetNumHops() == pool->GetNumOutboundHops()) 895 toRecreate.push_back (tunnel); 896 } 897 if (ts + TUNNEL_EXPIRATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) 898 tunnel->SetState (eTunnelStateExpiring); 899 } 900 ++it; 901 } 902 } 903 904 if (m_OutboundTunnels.size () < 3) 905 { 906 // trying to create one more outbound tunnel 907 auto inboundTunnel = GetNextInboundTunnel (); 908 auto router = i2p::transport::transports.RoutesRestricted() ? 909 i2p::transport::transports.GetRestrictedPeer() : 910 i2p::data::netdb.GetRandomRouter (i2p::context.GetSharedRouterInfo (), false, true, false); // reachable by us 911 if (!inboundTunnel || !router) return; 912 LogPrint (eLogDebug, "Tunnel: Creating one hop outbound tunnel"); 913 CreateTunnel<OutboundTunnel> ( 914 std::make_shared<TunnelConfig> (std::vector<std::shared_ptr<const i2p::data::IdentityEx> > { router->GetRouterIdentity () }, 915 inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash (), false), nullptr 916 ); 917 } 918 } 919 920 void Tunnels::ManageInboundTunnels (uint64_t ts, std::vector<std::shared_ptr<Tunnel> >& toRecreate) 921 { 922 for (auto it = m_InboundTunnels.begin (); it != m_InboundTunnels.end ();) 923 { 924 auto tunnel = *it; 925 if (tunnel->IsFailed () || ts > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT || 926 ts + TUNNEL_EXPIRATION_TIMEOUT < tunnel->GetCreationTime ()) 927 { 928 LogPrint (eLogDebug, "Tunnel: Tunnel with id ", tunnel->GetTunnelID (), " expired or failed"); 929 auto pool = tunnel->GetTunnelPool (); 930 if (pool) 931 pool->TunnelExpired (tunnel); 932 RemoveTunnel (tunnel->GetTunnelID ()); 933 it = m_InboundTunnels.erase (it); 934 } 935 else 936 { 937 if (tunnel->IsEstablished ()) 938 { 939 if (!tunnel->IsRecreated () && ts + TUNNEL_RECREATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) 940 { 941 auto pool = tunnel->GetTunnelPool (); 942 // let it die if the tunnel pool was reconfigured and has different number of hops 943 if (pool && tunnel->GetNumHops() == pool->GetNumInboundHops()) 944 toRecreate.push_back (tunnel); 945 } 946 947 if (ts + TUNNEL_EXPIRATION_THRESHOLD > tunnel->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT) 948 tunnel->SetState (eTunnelStateExpiring); 949 else // we don't need to cleanup expiring tunnels 950 tunnel->Cleanup (); 951 } 952 it++; 953 } 954 } 955 956 if (m_InboundTunnels.empty ()) 957 { 958 LogPrint (eLogDebug, "Tunnel: Creating zero hops inbound tunnel"); 959 CreateZeroHopsInboundTunnel (nullptr); 960 CreateZeroHopsOutboundTunnel (nullptr); 961 if (!m_ExploratoryPool) 962 { 963 int ibLen; i2p::config::GetOption("exploratory.inbound.length", ibLen); 964 int obLen; i2p::config::GetOption("exploratory.outbound.length", obLen); 965 int ibNum; i2p::config::GetOption("exploratory.inbound.quantity", ibNum); 966 int obNum; i2p::config::GetOption("exploratory.outbound.quantity", obNum); 967 m_ExploratoryPool = CreateTunnelPool (ibLen, obLen, ibNum, obNum, 0, 0, false); 968 m_ExploratoryPool->SetLocalDestination (i2p::context.GetSharedDestination ()); 969 } 970 return; 971 } 972 973 if (m_OutboundTunnels.empty () || m_InboundTunnels.size () < 3) 974 { 975 // trying to create one more inbound tunnel 976 auto router = i2p::transport::transports.RoutesRestricted() ? 977 i2p::transport::transports.GetRestrictedPeer() : 978 // should be reachable by us because we send build request directly 979 i2p::data::netdb.GetRandomRouter (i2p::context.GetSharedRouterInfo (), false, true, false); 980 if (!router) { 981 LogPrint (eLogWarning, "Tunnel: Can't find any router, skip creating tunnel"); 982 return; 983 } 984 LogPrint (eLogDebug, "Tunnel: Creating one hop inbound tunnel"); 985 CreateTunnel<InboundTunnel> ( 986 std::make_shared<TunnelConfig> (std::vector<std::shared_ptr<const i2p::data::IdentityEx> > { router->GetRouterIdentity () }, false), nullptr 987 ); 988 } 989 } 990 991 void Tunnels::ManageTunnelPools (uint64_t ts) 992 { 993 std::unique_lock<std::mutex> l(m_PoolsMutex); 994 for (auto& pool : m_Pools) 995 { 996 if (pool && pool->IsActive ()) 997 pool->ManageTunnels (ts); 998 } 999 } 1000 1001 void Tunnels::PostTunnelData (std::shared_ptr<I2NPMessage> msg) 1002 { 1003 if (msg) m_Queue.Put (msg); 1004 } 1005 1006 void Tunnels::PostTunnelData (std::list<std::shared_ptr<I2NPMessage> >& msgs) 1007 { 1008 m_Queue.Put (msgs); 1009 } 1010 1011 template<class TTunnel> 1012 std::shared_ptr<TTunnel> Tunnels::CreateTunnel (std::shared_ptr<TunnelConfig> config, 1013 std::shared_ptr<TunnelPool> pool, std::shared_ptr<OutboundTunnel> outboundTunnel) 1014 { 1015 auto newTunnel = std::make_shared<TTunnel> (config); 1016 newTunnel->SetTunnelPool (pool); 1017 uint32_t replyMsgID; 1018 RAND_bytes ((uint8_t *)&replyMsgID, 4); 1019 AddPendingTunnel (replyMsgID, newTunnel); 1020 newTunnel->Build (replyMsgID, outboundTunnel); 1021 return newTunnel; 1022 } 1023 1024 std::shared_ptr<InboundTunnel> Tunnels::CreateInboundTunnel (std::shared_ptr<TunnelConfig> config, 1025 std::shared_ptr<TunnelPool> pool, std::shared_ptr<OutboundTunnel> outboundTunnel) 1026 { 1027 if (config) 1028 return CreateTunnel<InboundTunnel>(config, pool, outboundTunnel); 1029 else 1030 return CreateZeroHopsInboundTunnel (pool); 1031 } 1032 1033 std::shared_ptr<OutboundTunnel> Tunnels::CreateOutboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool) 1034 { 1035 if (config) 1036 return CreateTunnel<OutboundTunnel>(config, pool); 1037 else 1038 return CreateZeroHopsOutboundTunnel (pool); 1039 } 1040 1041 void Tunnels::AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<InboundTunnel> tunnel) 1042 { 1043 m_PendingInboundTunnels[replyMsgID] = tunnel; 1044 } 1045 1046 void Tunnels::AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> tunnel) 1047 { 1048 m_PendingOutboundTunnels[replyMsgID] = tunnel; 1049 } 1050 1051 void Tunnels::AddOutboundTunnel (std::shared_ptr<OutboundTunnel> newTunnel) 1052 { 1053 // we don't need to insert it to m_Tunnels 1054 m_OutboundTunnels.push_back (newTunnel); 1055 auto pool = newTunnel->GetTunnelPool (); 1056 if (pool && pool->IsActive ()) 1057 pool->TunnelCreated (newTunnel); 1058 else 1059 newTunnel->SetTunnelPool (nullptr); 1060 } 1061 1062 void Tunnels::AddInboundTunnel (std::shared_ptr<InboundTunnel> newTunnel) 1063 { 1064 if (AddTunnel (newTunnel)) 1065 { 1066 m_InboundTunnels.push_back (newTunnel); 1067 auto pool = newTunnel->GetTunnelPool (); 1068 if (!pool) 1069 { 1070 // build symmetric outbound tunnel 1071 CreateTunnel<OutboundTunnel> (std::make_shared<TunnelConfig>(newTunnel->GetInvertedPeers (), 1072 newTunnel->GetNextTunnelID (), newTunnel->GetNextIdentHash (), false), nullptr, 1073 GetNextOutboundTunnel ()); 1074 } 1075 else 1076 { 1077 if (pool->IsActive ()) 1078 pool->TunnelCreated (newTunnel); 1079 else 1080 newTunnel->SetTunnelPool (nullptr); 1081 } 1082 } 1083 else 1084 LogPrint (eLogError, "Tunnel: Tunnel with id ", newTunnel->GetTunnelID (), " already exists"); 1085 } 1086 1087 1088 std::shared_ptr<ZeroHopsInboundTunnel> Tunnels::CreateZeroHopsInboundTunnel (std::shared_ptr<TunnelPool> pool) 1089 { 1090 auto inboundTunnel = std::make_shared<ZeroHopsInboundTunnel> (); 1091 inboundTunnel->SetTunnelPool (pool); 1092 inboundTunnel->SetState (eTunnelStateEstablished); 1093 m_InboundTunnels.push_back (inboundTunnel); 1094 AddTunnel (inboundTunnel); 1095 return inboundTunnel; 1096 } 1097 1098 std::shared_ptr<ZeroHopsOutboundTunnel> Tunnels::CreateZeroHopsOutboundTunnel (std::shared_ptr<TunnelPool> pool) 1099 { 1100 auto outboundTunnel = std::make_shared<ZeroHopsOutboundTunnel> (); 1101 outboundTunnel->SetTunnelPool (pool); 1102 outboundTunnel->SetState (eTunnelStateEstablished); 1103 m_OutboundTunnels.push_back (outboundTunnel); 1104 // we don't insert into m_Tunnels 1105 return outboundTunnel; 1106 } 1107 1108 std::shared_ptr<I2NPMessage> Tunnels::NewI2NPTunnelMessage (bool endpoint) 1109 { 1110 if (endpoint) 1111 { 1112 // should fit two tunnel message + tunnel gateway header, enough for one garlic encrypted streaming packet 1113 auto msg = m_I2NPTunnelEndpointMessagesMemoryPool.AcquireSharedMt (); 1114 msg->Align (6); 1115 msg->offset += TUNNEL_GATEWAY_HEADER_SIZE; // reserve room for TunnelGateway header 1116 return msg; 1117 } 1118 else 1119 { 1120 auto msg = m_I2NPTunnelMessagesMemoryPool.AcquireSharedMt (); 1121 msg->Align (12); 1122 return msg; 1123 } 1124 } 1125 1126 int Tunnels::GetTransitTunnelsExpirationTimeout () 1127 { 1128 return m_TransitTunnels.GetTransitTunnelsExpirationTimeout (); 1129 } 1130 1131 size_t Tunnels::CountTransitTunnels() const 1132 { 1133 return m_TransitTunnels.GetNumTransitTunnels (); 1134 } 1135 1136 size_t Tunnels::CountInboundTunnels() const 1137 { 1138 // TODO: locking 1139 return m_InboundTunnels.size(); 1140 } 1141 1142 size_t Tunnels::CountOutboundTunnels() const 1143 { 1144 // TODO: locking 1145 return m_OutboundTunnels.size(); 1146 } 1147 1148 void Tunnels::SetMaxNumTransitTunnels (uint32_t maxNumTransitTunnels) 1149 { 1150 if (maxNumTransitTunnels > 0 && m_MaxNumTransitTunnels != maxNumTransitTunnels) 1151 { 1152 LogPrint (eLogDebug, "Tunnel: Max number of transit tunnels set to ", maxNumTransitTunnels); 1153 m_MaxNumTransitTunnels = maxNumTransitTunnels; 1154 } 1155 } 1156 } 1157 }