Streaming.cpp
1 /* 2 * Copyright (c) 2013-2026, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include "Crypto.h" 10 #include "Log.h" 11 #include "RouterInfo.h" 12 #include "RouterContext.h" 13 #include "Tunnel.h" 14 #include "Timestamp.h" 15 #include "Destination.h" 16 #include "Streaming.h" 17 18 namespace i2p 19 { 20 namespace stream 21 { 22 void SendBufferQueue::Add (std::shared_ptr<SendBuffer>&& buf) 23 { 24 if (buf) 25 { 26 m_Size += buf->len; 27 m_Buffers.push_back (std::move (buf)); 28 } 29 } 30 31 size_t SendBufferQueue::Get (uint8_t * buf, size_t len) 32 { 33 if (!m_Size) return 0; 34 size_t offset = 0; 35 if (len >= m_Size) 36 { 37 for (auto& it: m_Buffers) 38 { 39 auto rem = it->GetRemainingSize (); 40 memcpy (buf + offset, it->GetRemaningBuffer (), rem); 41 offset += rem; 42 } 43 m_Buffers.clear (); 44 m_Size = 0; 45 return offset; 46 } 47 else 48 { 49 while (!m_Buffers.empty () && offset < len) 50 { 51 auto nextBuffer = m_Buffers.front (); 52 auto rem = nextBuffer->GetRemainingSize (); 53 if (offset + rem <= len) 54 { 55 // whole buffer 56 memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem); 57 offset += rem; 58 m_Buffers.pop_front (); // delete it 59 } 60 else 61 { 62 // partially 63 rem = len - offset; 64 memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem); 65 nextBuffer->offset += rem; 66 offset = len; // break 67 } 68 } 69 m_Size -= offset; 70 } 71 return offset; 72 } 73 74 void SendBufferQueue::CleanUp () 75 { 76 if (!m_Buffers.empty ()) 77 { 78 for (auto& it: m_Buffers) 79 it->Cancel (); 80 m_Buffers.clear (); 81 m_Size = 0; 82 } 83 } 84 85 Stream::Stream (boost::asio::io_context& service, StreamingDestination& local, 86 std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service), 87 m_SendStreamID (0), m_SequenceNumber (0), m_DropWindowDelaySequenceNumber (INITIAL_WINDOW_SIZE), 88 m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), 89 m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed 90 m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), 91 m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), 92 m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false), 93 m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), 94 m_IsBufferEmpty (false), m_IsJavaClient (false), m_DontSign (local.GetOwner ()->IsStreamingDontSign ()), 95 m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), 96 m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), 97 m_NumReceivedBytes (0), m_Port (port), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), 98 m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE), 99 m_MaxWindowSize (local.GetOwner ()->GetStreamingMaxWindowSize ()), m_LastWindowDropSize (0), 100 m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), 101 m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT), 102 m_Jitter (0), m_MinPacingTime (0), m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), 103 m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), 104 m_RemoteLeaseChangeTime (0), m_LastWindowIncTime (0), m_LastACKRequestTime (0), m_LastACKSendTime (0), 105 m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed 106 m_MaxNumResendAttempts (local.GetOwner ()->GetStreamingMaxResends ()), 107 m_NumResendAttempts (0), m_NumPacketsToSend (0), m_JitterAccum (0), m_JitterDiv (1), m_MTU (STREAMING_MTU) 108 { 109 RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); 110 m_RemoteIdentity = remote->GetIdentity (); 111 auto outboundSpeed = local.GetOwner ()->GetStreamingOutboundSpeed (); 112 if (outboundSpeed) 113 m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed; 114 115 auto inboundSpeed = local.GetOwner ()->GetStreamingInboundSpeed (); // for limit inbound speed 116 if (inboundSpeed) 117 m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed; 118 } 119 120 Stream::Stream (boost::asio::io_context& service, StreamingDestination& local): 121 m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_DropWindowDelaySequenceNumber (INITIAL_WINDOW_SIZE), 122 m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1), 123 m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed 124 m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false), 125 m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true), 126 m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false), 127 m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false), 128 m_IsBufferEmpty (false), m_IsJavaClient (false), m_DontSign (local.GetOwner ()->IsStreamingDontSign ()), 129 m_LocalDestination (local),m_ReceiveTimer (m_Service), m_SendTimer (m_Service), 130 m_ResendTimer (m_Service), m_AckSendTimer (m_Service),m_NumSentBytes (0), m_NumReceivedBytes (0), 131 m_Port (0), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), 132 m_WindowSize (INITIAL_WINDOW_SIZE), m_MaxWindowSize (local.GetOwner ()->GetStreamingMaxWindowSize ()), 133 m_LastWindowDropSize (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO), 134 m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),m_PrevRTTSample (INITIAL_RTT), m_Jitter (0), 135 m_MinPacingTime (0), m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0), 136 m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()), 137 m_RemoteLeaseChangeTime (0), m_LastWindowIncTime (0), m_LastACKRequestTime (0), 138 m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed 139 m_MaxNumResendAttempts (local.GetOwner ()->GetStreamingMaxResends ()), 140 m_NumResendAttempts (0), m_NumPacketsToSend (0), m_JitterAccum (0), m_JitterDiv (1), m_MTU (STREAMING_MTU) 141 { 142 RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); 143 auto outboundSpeed = local.GetOwner ()->GetStreamingOutboundSpeed (); 144 if (outboundSpeed) 145 m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed; 146 147 auto inboundSpeed = local.GetOwner ()->GetStreamingInboundSpeed (); // for limit inbound speed 148 if (inboundSpeed) 149 m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed; 150 } 151 152 Stream::~Stream () 153 { 154 CleanUp (); 155 LogPrint (eLogDebug, "Streaming: Stream deleted"); 156 } 157 158 void Stream::Terminate (bool deleteFromDestination) // should be called from StreamingDestination::Stop only 159 { 160 m_Status = eStreamStatusTerminated; 161 m_AckSendTimer.cancel (); 162 m_ReceiveTimer.cancel (); 163 m_ResendTimer.cancel (); 164 m_SendTimer.cancel (); 165 //CleanUp (); /* Need to recheck - broke working on windows */ 166 if (deleteFromDestination) 167 m_LocalDestination.DeleteStream (shared_from_this ()); 168 } 169 170 void Stream::CleanUp () 171 { 172 if (m_RoutingSession && !m_SentPackets.empty ()) // free up space in shared window 173 { 174 int numPackets = m_SentPackets.size (); 175 int numSentPackets = m_RoutingSession->NumSentPackets (); 176 numSentPackets -= numPackets; 177 if (numSentPackets < 0) numSentPackets = 0; 178 m_RoutingSession->SetNumSentPackets (numSentPackets); 179 } 180 181 m_SendBuffer.CleanUp (); 182 while (!m_ReceiveQueue.empty ()) 183 { 184 auto packet = m_ReceiveQueue.front (); 185 m_ReceiveQueue.pop (); 186 m_LocalDestination.DeletePacket (packet); 187 } 188 189 m_NACKedPackets.clear (); 190 191 for (auto it: m_SentPackets) 192 m_LocalDestination.DeletePacket (it); 193 m_SentPackets.clear (); 194 195 for (auto it: m_SavedPackets) 196 m_LocalDestination.DeletePacket (it); 197 m_SavedPackets.clear (); 198 } 199 200 void Stream::HandleNextPacket (Packet * packet) 201 { 202 if (m_Status == eStreamStatusTerminated) 203 { 204 m_LocalDestination.DeletePacket (packet); 205 return; 206 } 207 m_NumReceivedBytes += packet->GetLength (); 208 if (!m_SendStreamID) 209 { 210 m_SendStreamID = packet->GetReceiveStreamID (); 211 if (!m_RemoteIdentity && !packet->from && packet->GetNACKCount () == 8 && // first incoming packet 212 memcmp (packet->GetNACKs (), m_LocalDestination.GetOwner ()->GetIdentHash (), 32)) 213 { 214 LogPrint (eLogWarning, "Streaming: Destination mismatch for ", m_LocalDestination.GetOwner ()->GetIdentHash ().ToBase32 ()); 215 m_LocalDestination.DeletePacket (packet); 216 return; 217 } 218 } 219 220 if (!packet->IsNoAck ()) // ack received 221 ProcessAck (packet); 222 223 int32_t receivedSeqn = packet->GetSeqn (); 224 if (!receivedSeqn && m_LastReceivedSequenceNumber >= 0) 225 { 226 uint16_t flags = packet->GetFlags (); 227 if (flags) 228 // plain ack with options 229 ProcessOptions (flags, packet); 230 else 231 // plain ack 232 { 233 LogPrint (eLogDebug, "Streaming: Plain ACK received"); 234 if (m_IsImmediateAckRequested) 235 { 236 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 237 if (m_IsFirstRttSample) 238 { 239 m_RTT = ts - m_LastSendTime; 240 m_IsFirstRttSample = false; 241 } 242 else 243 m_RTT = (m_RTT + (ts - m_LastSendTime)) / 2; 244 m_IsImmediateAckRequested = false; 245 } 246 } 247 m_LocalDestination.DeletePacket (packet); 248 return; 249 } 250 251 LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID); 252 if (receivedSeqn == m_LastReceivedSequenceNumber + 1) 253 { 254 // we have received next in sequence message 255 ProcessPacket (packet); 256 if (m_Status == eStreamStatusTerminated) return; 257 258 // we should also try stored messages if any 259 for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) 260 { 261 if ((*it)->GetSeqn () == (uint32_t)(m_LastReceivedSequenceNumber + 1)) 262 { 263 Packet * savedPacket = *it; 264 m_SavedPackets.erase (it++); 265 266 ProcessPacket (savedPacket); 267 if (m_Status == eStreamStatusTerminated) return; 268 } 269 else 270 break; 271 } 272 273 // schedule ack for last message 274 if (m_Status == eStreamStatusOpen) 275 { 276 if (!m_IsAckSendScheduled) 277 { 278 auto ackTimeout = m_RTT/10; 279 if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay; 280 ScheduleAck (ackTimeout); 281 } 282 } 283 else if (packet->IsSYN ()) 284 // we have to send SYN back to incoming connection 285 SendBuffer (); // also sets m_IsOpen 286 } 287 else 288 { 289 if (receivedSeqn <= m_LastReceivedSequenceNumber) 290 { 291 // we have received duplicate 292 LogPrint (eLogInfo, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); 293 if (receivedSeqn <= m_PreviousReceivedSequenceNumber || receivedSeqn == m_LastReceivedSequenceNumber) 294 { 295 m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); 296 CancelRemoteLeaseChange (); 297 UpdateCurrentRemoteLease (); 298 } 299 m_PreviousReceivedSequenceNumber = receivedSeqn; 300 m_LocalDestination.DeletePacket (packet); // packet dropped 301 if (!m_IsAckSendScheduled) 302 { 303 SendQuickAck (); // resend ack for previous message again 304 auto ackTimeout = m_RTT/10; 305 if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay; 306 ScheduleAck (ackTimeout); 307 } 308 } 309 else 310 { 311 LogPrint (eLogInfo, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); 312 if ((receivedSeqn - m_LastReceivedSequenceNumber) >= m_MaxWindowSize*3) 313 { 314 m_LocalDestination.DeletePacket (packet); 315 m_IsChoking2 = true; 316 } 317 else if (m_SavedPackets.empty () && (receivedSeqn - m_LastReceivedSequenceNumber) >= 256) 318 { 319 m_LocalDestination.DeletePacket (packet); 320 m_IsChoking2 = true; 321 } 322 else if (!m_SavedPackets.empty ()) 323 { 324 uint8_t numNacks = 0; 325 auto lastSavedSeq = 0; 326 auto nextSeqn = m_LastReceivedSequenceNumber + 1; 327 for (auto it: m_SavedPackets) 328 { 329 auto seqn = it->GetSeqn (); 330 if ((int)seqn > lastSavedSeq) lastSavedSeq = seqn; 331 for (uint32_t i = nextSeqn; i < seqn; i++) numNacks++; 332 nextSeqn = seqn + 1; 333 } 334 335 if (numNacks + (receivedSeqn - lastSavedSeq) >= 256) 336 { 337 m_LocalDestination.DeletePacket (packet); 338 m_IsChoking2 = true; 339 } 340 else 341 // save message and wait for missing message again 342 SavePacket (packet); 343 } 344 else 345 // save message and wait for missing message again 346 SavePacket (packet); 347 if (m_LastReceivedSequenceNumber >= 0) 348 { 349 if (!m_IsAckSendScheduled) 350 { 351 // send NACKs for missing messages 352 SendQuickAck (); 353 auto ackTimeout = m_RTT/10; 354 if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay; 355 ScheduleAck (ackTimeout); 356 } 357 } 358 else 359 // wait for SYN 360 ScheduleAck (SYN_TIMEOUT); 361 } 362 } 363 } 364 365 void Stream::SavePacket (Packet * packet) 366 { 367 if (!m_SavedPackets.insert (packet).second) 368 m_LocalDestination.DeletePacket (packet); 369 } 370 371 void Stream::ProcessPacket (Packet * packet) 372 { 373 uint32_t receivedSeqn = packet->GetSeqn (); 374 uint16_t flags = packet->GetFlags (); 375 LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags); 376 377 if (!ProcessOptions (flags, packet)) 378 { 379 m_LocalDestination.DeletePacket (packet); 380 Terminate (); 381 return; 382 } 383 384 packet->offset = packet->GetPayload () - packet->buf; 385 if (packet->GetLength () > 0) 386 { 387 m_ReceiveQueue.push (packet); 388 m_ReceiveTimer.cancel (); 389 } 390 else 391 m_LocalDestination.DeletePacket (packet); 392 393 m_LastReceivedSequenceNumber = receivedSeqn; 394 395 if (flags & PACKET_FLAG_RESET) 396 { 397 LogPrint (eLogDebug, "Streaming: closing stream sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ": reset flag received in packet #", receivedSeqn); 398 m_Status = eStreamStatusReset; 399 Close (); 400 } 401 else if (flags & PACKET_FLAG_CLOSE) 402 { 403 if (m_Status != eStreamStatusClosed) 404 SendClose (); 405 m_Status = eStreamStatusClosed; 406 Terminate (); 407 } 408 } 409 410 bool Stream::ProcessOptions (uint16_t flags, Packet * packet) 411 { 412 const uint8_t * optionData = packet->GetOptionData (); 413 size_t optionSize = packet->GetOptionSize (); 414 if (optionSize > packet->len) 415 { 416 LogPrint (eLogInfo, "Streaming: Invalid option size ", optionSize, " Discarded"); 417 return false; 418 } 419 if (!flags) return true; 420 bool immediateAckRequested = false; 421 if (flags & PACKET_FLAG_DELAY_REQUESTED) 422 { 423 uint16_t delayRequested = bufbe16toh (optionData); 424 if (!delayRequested) // 0 requests an immediate ack 425 immediateAckRequested = true; 426 else if (!m_IsAckSendScheduled) 427 { 428 if (delayRequested < m_RTT) 429 { 430 m_IsAckSendScheduled = true; 431 m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(delayRequested)); 432 m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, 433 shared_from_this (), std::placeholders::_1)); 434 } 435 if (delayRequested >= DELAY_CHOKING) 436 { 437 if (delayRequested == 65535) 438 { 439 m_IsClientChoked2 = true; 440 m_DropWindowDelaySequenceNumber = m_SequenceNumber-1; 441 } 442 else if (!m_IsClientChoked) 443 { 444 LogPrint (eLogDebug, "Streaming: Client choked, set min. window size"); 445 if (delayRequested == DELAY_CHOKING_JAVA) // java detected 446 { 447 LogPrint (eLogDebug, "Streaming: limit window size for java client"); 448 m_MaxWindowSize = 64; 449 m_IsJavaClient = true; 450 if (m_RoutingSession) m_RoutingSession->SetIsWithJava (true); 451 } 452 m_WindowDropTargetSize = MIN_WINDOW_SIZE; 453 m_LastWindowDropSize = 0; 454 m_WindowIncCounter = 0; 455 m_IsClientChoked = true; 456 m_IsWinDropped = false; 457 m_DropWindowDelaySequenceNumber = m_SequenceNumber-1; 458 m_IsFirstRttSample = true; 459 UpdatePacingTime (); 460 } 461 } 462 } 463 optionData += 2; 464 } 465 466 bool verified = true; 467 if (flags & PACKET_FLAG_FROM_INCLUDED) 468 { 469 verified = false; 470 if (m_RemoteLeaseSet) m_RemoteIdentity = m_RemoteLeaseSet->GetIdentity (); 471 if (!m_RemoteIdentity) 472 m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, optionSize); 473 if (m_RemoteIdentity->IsRSA ()) 474 { 475 LogPrint (eLogInfo, "Streaming: Incoming stream from RSA destination ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " Discarded"); 476 return false; 477 } 478 optionData += m_RemoteIdentity->GetFullLen (); 479 if (!m_RemoteLeaseSet) 480 { 481 LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase32 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); 482 if (packet->from) // try to obtain LeaseSet if came from ratchets session 483 m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); 484 } 485 if (packet->from && m_RemoteLeaseSet) 486 { 487 // stream came from ratchets session and static key must match one from LeaseSet 488 uint8_t staticKey[32]; 489 m_RemoteLeaseSet->Encrypt (nullptr, staticKey); 490 if (memcmp (packet->from->GetRemoteStaticKey (), staticKey, 32)) 491 { 492 LogPrint (eLogError, "Streaming: Remote LeaseSet static key mismatch for stream from ", 493 m_RemoteIdentity->GetIdentHash ().ToBase32 ()); 494 return false; 495 } 496 verified = true; 497 if (!(flags & PACKET_FLAG_SIGNATURE_INCLUDED)) 498 m_DontSign = true; // don't sign if the remote didn't sign 499 } 500 } 501 502 if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) 503 { 504 uint16_t maxPacketSize = bufbe16toh (optionData); 505 LogPrint (eLogDebug, "Streaming: Max packet size ", maxPacketSize); 506 optionData += 2; 507 } 508 509 if (flags & (PACKET_FLAG_CLOSE | PACKET_FLAG_RESET)) 510 { 511 verified = false; 512 if (packet->from) 513 { 514 if (!m_RemoteLeaseSet && m_RemoteIdentity) 515 m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); 516 if (m_RemoteLeaseSet) 517 { 518 uint8_t staticKey[32]; 519 m_RemoteLeaseSet->Encrypt (nullptr, staticKey); 520 if (memcmp (packet->from->GetRemoteStaticKey (), staticKey, 32)) 521 { 522 LogPrint (eLogError, "Streaming: Remote LeaseSet static key mismatch for stream from ", 523 m_RemoteIdentity->GetIdentHash ().ToBase32 ()); 524 return false; 525 } 526 verified = true; 527 } 528 else // invalid stream, safe to close 529 verified = true; 530 } 531 } 532 533 if (flags & PACKET_FLAG_OFFLINE_SIGNATURE) 534 { 535 if (!m_RemoteIdentity) 536 { 537 LogPrint (eLogInfo, "Streaming: offline signature without identity"); 538 return false; 539 } 540 if (verified) 541 { 542 // skip offline signature 543 optionData += 4; // timestamp 544 uint16_t keyType = bufbe16toh (optionData); optionData += 2; // key type 545 std::unique_ptr<i2p::crypto::Verifier> transientVerifier (i2p::data::IdentityEx::CreateVerifier (keyType)); 546 if (!transientVerifier) 547 { 548 LogPrint (eLogInfo, "Streaming: Unknown offline signature key type ", (int)keyType); 549 return false; 550 } 551 optionData += transientVerifier->GetPublicKeyLen (); // public key 552 optionData += m_RemoteIdentity->GetSignatureLen (); // signature 553 } 554 else 555 { 556 // if we have it in LeaseSet already we don't need to parse it again 557 if (m_RemoteLeaseSet) m_TransientVerifier = m_RemoteLeaseSet->GetTransientVerifier (); 558 if (m_TransientVerifier) 559 { 560 // skip option data 561 optionData += 6; // timestamp and key type 562 optionData += m_TransientVerifier->GetPublicKeyLen (); // public key 563 optionData += m_RemoteIdentity->GetSignatureLen (); // signature 564 } 565 else 566 { 567 // transient key 568 size_t offset = 0; 569 m_TransientVerifier = i2p::data::ProcessOfflineSignature (m_RemoteIdentity, optionData, optionSize - (optionData - packet->GetOptionData ()), offset); 570 optionData += offset; 571 if (!m_TransientVerifier) 572 { 573 LogPrint (eLogError, "Streaming: offline signature failed"); 574 return false; 575 } 576 } 577 } 578 } 579 580 if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) 581 { 582 auto signatureLen = m_TransientVerifier ? m_TransientVerifier->GetSignatureLen () : m_RemoteIdentity->GetSignatureLen (); 583 if (signatureLen > packet->GetLength ()) 584 { 585 LogPrint (eLogError, "Streaming: Signature too big, ", signatureLen, " bytes"); 586 return false; 587 } 588 if (!verified) // packet was not verified through session 589 { 590 // verify actual signature 591 if (signatureLen <= 256) 592 { 593 // standard 594 uint8_t signature[256]; 595 memcpy (signature, optionData, signatureLen); 596 memset (const_cast<uint8_t *>(optionData), 0, signatureLen); 597 verified = m_TransientVerifier ? 598 m_TransientVerifier->Verify (packet->GetBuffer (), packet->GetLength (), signature) : 599 m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature); 600 if (verified) 601 memcpy (const_cast<uint8_t *>(optionData), signature, signatureLen); 602 } 603 else 604 { 605 // post quantum 606 std::vector<uint8_t> signature(signatureLen); 607 memcpy (signature.data (), optionData, signatureLen); 608 memset (const_cast<uint8_t *>(optionData), 0, signatureLen); 609 verified = m_TransientVerifier ? 610 m_TransientVerifier->Verify (packet->GetBuffer (), packet->GetLength (), signature.data ()) : 611 m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature.data ()); 612 } 613 } 614 if (verified) 615 optionData += signatureLen; 616 else 617 { 618 LogPrint (eLogError, "Streaming: Signature verification failed, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); 619 return false; 620 } 621 } 622 if (!verified) 623 { 624 LogPrint (eLogError, "Streaming: Missing signature, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); 625 return false; 626 } 627 if (immediateAckRequested) 628 { 629 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 630 if (m_LastACKSendTime != ts) // preventing multiple acks when reading m_SavedPackets 631 { 632 if (m_IsAckSendScheduled) 633 { 634 SendQuickAck (); 635 auto ackTimeout = m_RTT/10; 636 if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay; 637 ScheduleAck (ackTimeout); 638 } 639 else 640 SendQuickAck (); 641 } 642 } 643 return true; 644 } 645 646 void Stream::HandlePing (Packet * packet) 647 { 648 uint16_t flags = packet->GetFlags (); 649 if (ProcessOptions (flags, packet) && m_RemoteIdentity) 650 { 651 // send pong 652 Packet p; 653 memset (p.buf, 0, 22); // minimal header all zeroes 654 memcpy (p.buf + 4, packet->buf, 4); // but receiveStreamID is the sendStreamID from the ping 655 htobe16buf (p.buf + 18, PACKET_FLAG_ECHO); // and echo flag 656 auto payloadLen = int(packet->len) - (packet->GetPayload () - packet->buf); 657 if (payloadLen > 0) 658 memcpy (p.buf + 22, packet->GetPayload (), payloadLen); 659 else 660 payloadLen = 0; 661 p.len = payloadLen + 22; 662 SendPackets (std::vector<Packet *> { &p }); 663 LogPrint (eLogDebug, "Streaming: Pong of ", p.len, " bytes sent"); 664 } 665 m_LocalDestination.DeletePacket (packet); 666 } 667 668 void Stream::ProcessAck (Packet * packet) 669 { 670 bool acknowledged = false; 671 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 672 uint32_t ackThrough = packet->GetAckThrough (); 673 m_NACKedPackets.clear (); 674 if (ackThrough > m_SequenceNumber) 675 { 676 LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); 677 return; 678 } 679 int rttSample = INT_MAX; 680 int incCounter = 0; 681 int ackPacketsCounter = 0; 682 m_IsNAcked = false; 683 m_IsResendNeeded = false; 684 int nackCount = packet->GetNACKCount (); 685 for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) 686 { 687 auto seqn = (*it)->GetSeqn (); 688 if (seqn <= ackThrough) 689 { 690 if (nackCount > 0) 691 { 692 bool nacked = false; 693 for (int i = 0; i < nackCount; i++) 694 if (seqn == packet->GetNACK (i)) 695 { 696 m_NACKedPackets.insert (*it); 697 m_IsNAcked = true; 698 nacked = true; 699 break; 700 } 701 if (nacked) 702 { 703 LogPrint (eLogDebug, "Streaming: Packet ", seqn, " NACK"); 704 ++it; 705 continue; 706 } 707 } 708 auto sentPacket = *it; 709 int64_t rtt = (int64_t)ts - (int64_t)sentPacket->sendTime; 710 if (rtt < 0) 711 LogPrint (eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); 712 if (!seqn) 713 { 714 m_IsFirstRttSample = true; 715 rttSample = rtt < 0 ? 1 : rtt; 716 } 717 else if (!sentPacket->resent && seqn > m_TunnelsChangeSequenceNumber && rtt >= 0) 718 rttSample = std::min (rttSample, (int)rtt); 719 LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); 720 m_SentPackets.erase (it++); 721 m_LocalDestination.DeletePacket (sentPacket); 722 acknowledged = true; 723 ackPacketsCounter++; 724 if (m_WindowIncCounter < m_MaxWindowSize && !m_IsFirstACK && !m_IsWinDropped) 725 incCounter++; 726 } 727 else 728 break; 729 } 730 if (m_LastACKRecieveTime) 731 { 732 uint64_t interval = ts - m_LastACKRecieveTime; 733 if (m_ACKRecieveInterval) 734 m_ACKRecieveInterval = (m_ACKRecieveInterval + interval) / 2; 735 else 736 m_ACKRecieveInterval = interval; 737 } 738 m_LastACKRecieveTime = ts; 739 if (rttSample != INT_MAX) 740 { 741 if (m_IsFirstRttSample && !m_IsFirstACK) 742 { 743 m_RTT = rttSample; 744 m_MinRTT = m_RTT; 745 m_SlowRTT = rttSample; 746 m_FastRTT = rttSample; 747 m_PrevRTTSample = rttSample; 748 m_Jitter = rttSample / 5; // 20% 749 m_Jitter += 5; // for low-latency connections 750 m_JitterAccum = m_Jitter; 751 m_JitterDiv = 1; 752 m_IsFirstRttSample = false; 753 } 754 else 755 { 756 m_RTT = (m_PrevRTTSample + rttSample) / 2; 757 } 758 if (!m_IsWinDropped) 759 { 760 m_SlowRTT = SLOWRTT_EWMA_ALPHA * m_RTT + (1.0 - SLOWRTT_EWMA_ALPHA) * m_SlowRTT; 761 m_FastRTT = RTT_EWMA_ALPHA * m_RTT + (1.0 - RTT_EWMA_ALPHA) * m_FastRTT; 762 // calculate jitter 763 double jitter = 0; 764 if (rttSample > m_PrevRTTSample) 765 jitter = rttSample - m_PrevRTTSample; 766 else if (rttSample < m_PrevRTTSample) 767 jitter = m_PrevRTTSample - rttSample; 768 if (jitter) 769 { 770 jitter += 5; // for low-latency connections 771 m_JitterAccum += jitter; 772 m_Jitter = m_JitterAccum / m_JitterDiv; 773 m_JitterDiv++; 774 } 775 if (m_MinRTT > m_RTT) 776 { 777 m_MinRTT = m_RTT; 778 m_FastRTT = m_MinRTT + m_Jitter; 779 m_SlowRTT = m_MinRTT + m_Jitter; 780 } 781 } 782 if (m_IsBufferEmpty || m_FastRTT >= m_MinRTT + m_Jitter*4 || m_RTT >= m_MinRTT + m_Jitter*4 || m_SlowRTT >= m_MinRTT + m_Jitter*4 || m_RTT > m_FastRTT) 783 { 784 incCounter = 0; 785 m_WindowIncCounter = 0; 786 } 787 m_WindowIncCounter = m_WindowIncCounter + incCounter; 788 // 789 // delay-based CC 790 if ((m_RTT > m_SlowRTT) && (m_SlowRTT >= m_FastRTT) && (m_FastRTT > m_MinRTT + m_Jitter*8) && (m_SlowRTT > m_MinRTT + m_Jitter*8) && !m_IsWinDropped && !m_IsClientChoked) // Drop window if RTT grows too fast 791 { 792 LogPrint (eLogDebug, "Streaming: Congestion detected, reduce window size"); 793 ProcessWindowDrop (); 794 } 795 UpdatePacingTime (); 796 m_PrevRTTSample = rttSample; 797 798 bool wasInitial = m_RTO == INITIAL_RTO; 799 m_RTO = std::max (MIN_RTO, (int)(m_RTT + m_Jitter*2 + m_ACKRecieveInterval)); // TODO: implement it better 800 801 if (wasInitial) 802 ScheduleResend (); 803 } 804 if (m_IsClientChoked && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ())) 805 m_IsClientChoked = false; 806 if (m_IsClientChoked2 && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ())) 807 m_IsClientChoked2 = false; 808 if (m_IsWinDropped && ackThrough > m_DropWindowDelaySequenceNumber) 809 { 810 m_IsFirstRttSample = true; 811 m_IsWinDropped = false; 812 } 813 if (m_WindowDropTargetSize && int(m_SentPackets.size ()) <= m_WindowDropTargetSize) 814 { 815 m_WindowSize = m_WindowDropTargetSize; 816 m_WindowDropTargetSize = 0; 817 } 818 if (acknowledged || m_IsNAcked) 819 { 820 ScheduleResend (); 821 } 822 if (m_SendBuffer.IsEmpty () && m_SentPackets.size () > 0) // tail loss 823 { 824 m_IsResendNeeded = true; 825 m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter + m_ACKRecieveInterval)); // to prevent spurious retransmit 826 } 827 if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) 828 { 829 m_ResendTimer.cancel (); 830 m_SendTimer.cancel (); 831 m_LastACKRecieveTime = 0; 832 m_ACKRecieveInterval = m_AckDelay; 833 } 834 if (acknowledged && m_IsFirstACK) 835 { 836 if (m_RoutingSession) 837 m_RoutingSession->SetSharedRoutingPath ( 838 std::make_shared<i2p::garlic::GarlicRoutingPath> ( 839 i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0})); 840 m_IsFirstACK = false; 841 } 842 if (acknowledged) 843 { 844 if (m_RoutingSession) 845 { 846 int numSentPackets = m_RoutingSession->NumSentPackets (); 847 numSentPackets -= ackPacketsCounter; 848 if (numSentPackets < 0) numSentPackets = 0; 849 m_RoutingSession->SetNumSentPackets (numSentPackets); 850 } 851 m_NumResendAttempts = 0; 852 m_IsTimeOutResend = false; 853 SendBuffer (); 854 } 855 if (m_Status == eStreamStatusClosed) 856 Terminate (); 857 else if (m_Status == eStreamStatusClosing) 858 Close (); // check is all outgoing messages have been sent and we can send close 859 } 860 861 size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) 862 { 863 if (!len) return 0; 864 size_t ret = 0; 865 volatile bool done = false; 866 std::condition_variable newDataReceived; 867 std::mutex newDataReceivedMutex; 868 AsyncReceive (boost::asio::buffer (buf, len), 869 [&ret, &done, &newDataReceived, &newDataReceivedMutex](const boost::system::error_code& ecode, std::size_t bytes_transferred) 870 { 871 if (ecode == boost::asio::error::timed_out) 872 ret = 0; 873 else 874 ret = bytes_transferred; 875 std::unique_lock<std::mutex> l(newDataReceivedMutex); 876 newDataReceived.notify_all (); 877 done = true; 878 }, 879 timeout); 880 if (!done) 881 { std::unique_lock<std::mutex> l(newDataReceivedMutex); 882 if (!done && newDataReceived.wait_for (l, std::chrono::seconds (timeout)) == std::cv_status::timeout) 883 ret = 0; 884 } 885 if (!done) 886 { 887 // make sure that AsycReceive complete 888 auto s = shared_from_this(); 889 boost::asio::post (m_Service, [s]() 890 { 891 s->m_ReceiveTimer.cancel (); 892 }); 893 int i = 0; 894 while (!done && i < 100) // 1 sec 895 { 896 std::this_thread::sleep_for (std::chrono::milliseconds(10)); 897 i++; 898 } 899 } 900 return ret; 901 } 902 903 size_t Stream::Send (const uint8_t * buf, size_t len) 904 { 905 AsyncSend (buf, len, nullptr); 906 return len; 907 } 908 909 void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler) 910 { 911 std::shared_ptr<i2p::stream::SendBuffer> buffer; 912 if (len > 0 && buf) 913 buffer = std::make_shared<i2p::stream::SendBuffer>(buf, len, handler); 914 else if (handler) 915 handler(boost::system::error_code ()); 916 auto s = shared_from_this (); 917 boost::asio::post (m_Service, [s, buffer = std::move(buffer)]() mutable 918 { 919 if (buffer) 920 s->m_SendBuffer.Add (std::move(buffer)); 921 s->SendBuffer (); 922 }); 923 } 924 925 void Stream::SendBuffer () 926 { 927 if (m_RemoteLeaseSet) // don't scheudle send for first SYN for incoming stream 928 ScheduleSend (); 929 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 930 int numMsgs = m_WindowSize - m_SentPackets.size (); 931 if (numMsgs <= 0 || !m_IsSendTime) // window is full 932 { 933 m_LastSendTime = ts; 934 return; 935 } 936 else if (numMsgs > m_NumPacketsToSend) 937 numMsgs = m_NumPacketsToSend; 938 939 if (!m_RemoteLeaseSet) m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); 940 if (m_RemoteLeaseSet) 941 { 942 if (!m_RoutingSession) 943 m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true, false); 944 } 945 946 if (m_RoutingSession) 947 { 948 m_IsJavaClient = m_RoutingSession->IsWithJava (); 949 if (m_IsJavaClient) m_MaxWindowSize = 64; 950 int numSentPackets = m_RoutingSession->NumSentPackets (); 951 int numPacketsToSend = m_MaxWindowSize - numSentPackets; 952 if (numPacketsToSend <= 0) // shared window is full 953 { 954 if (m_LastReceivedSequenceNumber <= 0 && m_SequenceNumber == 0) 955 { 956 LogPrint (eLogWarning, "Streaming: limit of unacknowledged packets has been reached, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); 957 m_Status = eStreamStatusReset; 958 Close (); 959 return; 960 } 961 m_LastSendTime = ts; 962 return; 963 } 964 else if (numMsgs > numPacketsToSend) 965 numMsgs = numPacketsToSend; 966 } 967 bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet 968 std::vector<Packet *> packets; 969 while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0)) 970 { 971 Packet * p = m_LocalDestination.NewPacket (); 972 uint8_t * packet = p->GetBuffer (); 973 // TODO: implement setters 974 size_t size = 0; 975 htobe32buf (packet + size, m_SendStreamID); 976 size += 4; // sendStreamID 977 htobe32buf (packet + size, m_RecvStreamID); 978 size += 4; // receiveStreamID 979 htobe32buf (packet + size, m_SequenceNumber++); 980 size += 4; // sequenceNum 981 if (isNoAck) 982 htobuf32 (packet + size, 0); 983 else 984 htobe32buf (packet + size, m_LastReceivedSequenceNumber); 985 size += 4; // ack Through 986 if (m_Status == eStreamStatusNew && !m_SendStreamID && m_RemoteIdentity) 987 { 988 // first SYN packet 989 if (m_DontSign) 990 { 991 // remote ident is useless without signature, don't include it 992 packet[size] = 0; size++; // NACK count 993 } 994 else 995 { 996 packet[size] = 8; 997 size++; // NACK count 998 memcpy (packet + size, m_RemoteIdentity->GetIdentHash (), 32); 999 size += 32; 1000 } 1001 } 1002 else 1003 { 1004 packet[size] = 0; 1005 size++; // NACK count 1006 } 1007 packet[size] = m_RTO/1000; 1008 size++; // resend delay 1009 if (m_Status == eStreamStatusNew) 1010 { 1011 // initial packet 1012 m_Status = eStreamStatusOpen; 1013 if (!m_RemoteLeaseSet) m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); 1014 if (m_RemoteLeaseSet) 1015 { 1016 m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true, !m_IsIncoming); 1017 m_MTU = (m_RoutingSession && m_RoutingSession->IsRatchets ()) ? STREAMING_MTU_RATCHETS : STREAMING_MTU; 1018 } 1019 uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; 1020 if (!m_DontSign) flags |= PACKET_FLAG_SIGNATURE_INCLUDED; 1021 if (isNoAck) flags |= PACKET_FLAG_NO_ACK; 1022 bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature (); 1023 if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE; 1024 htobe16buf (packet + size, flags); 1025 size += 2; // flags 1026 size_t identityLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetFullLen (); 1027 size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen (); 1028 uint8_t * optionsSize = packet + size; // set options size later 1029 size += 2; // options size 1030 m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); 1031 size += identityLen; // from 1032 htobe16buf (packet + size, m_MTU); 1033 size += 2; // max packet size 1034 if (m_DontSign) 1035 { 1036 htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size 1037 size += m_SendBuffer.Get (packet + size, m_MTU); // payload 1038 } 1039 else 1040 { 1041 if (isOfflineSignature) 1042 { 1043 const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature (); 1044 memcpy (packet + size, offlineSignature.data (), offlineSignature.size ()); 1045 size += offlineSignature.size (); // offline signature 1046 } 1047 uint8_t * signature = packet + size; // set it later 1048 memset (signature, 0, signatureLen); // zeroes for now 1049 size += signatureLen; // signature 1050 htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size 1051 size += m_SendBuffer.Get (packet + size, m_MTU); // payload 1052 m_LocalDestination.GetOwner ()->Sign (packet, size, signature); 1053 } 1054 } 1055 else 1056 { 1057 // follow on packet 1058 if (!m_LastACKRequestTime || ts - m_LastACKRequestTime > m_MinRTT / 10) 1059 { 1060 m_LastACKRequestTime = ts; 1061 htobe16buf (packet + size, PACKET_FLAG_DELAY_REQUESTED); 1062 size += 2; // flags 1063 htobe16buf (packet + size, 2); // 2 bytes delay interval 1064 htobe16buf (packet + size + 2, 0); // set immediate ack interval 1065 size += 2; 1066 } 1067 else 1068 { 1069 htobuf16 (packet + size, 0); 1070 size += 2; // flags 1071 htobuf16 (packet + size, 0); // no options 1072 } 1073 size += 2; // options size 1074 size += m_SendBuffer.Get(packet + size, m_MTU); // payload 1075 } 1076 p->len = size; 1077 packets.push_back (p); 1078 numMsgs--; 1079 } 1080 if (m_SendBuffer.GetSize() == 0) m_IsBufferEmpty = true; 1081 else m_IsBufferEmpty = false; 1082 int numPackets = packets.size (); 1083 if (packets.size () > 0) 1084 { 1085 if (m_SavedPackets.empty ()) // no NACKS 1086 { 1087 m_IsAckSendScheduled = false; 1088 m_AckSendTimer.cancel (); 1089 } 1090 bool isEmpty = m_SentPackets.empty (); 1091 // auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1092 for (auto& it: packets) 1093 { 1094 it->sendTime = ts; 1095 m_SentPackets.insert (it); 1096 } 1097 SendPackets (packets); 1098 m_LastSendTime = ts; 1099 m_IsSendTime = false; 1100 if (m_RoutingSession) 1101 { 1102 int numSentPackets = m_RoutingSession->NumSentPackets (); 1103 m_RoutingSession->SetNumSentPackets (numSentPackets + numPackets); 1104 } 1105 if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ()) 1106 SendClose (); 1107 if (isEmpty) 1108 ScheduleResend (); 1109 } 1110 } 1111 1112 void Stream::SendQuickAck () 1113 { 1114 int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber; 1115 // for limit inbound speed 1116 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1117 int numPackets = 0; 1118 bool lostPackets = false; 1119 int64_t passedTime = m_PacketACKInterval * INITIAL_WINDOW_SIZE; // in microseconds // while m_LastACKSendTime == 0 1120 if (m_LastACKSendTime) 1121 passedTime = (ts - m_LastACKSendTime)*1000; // in microseconds 1122 numPackets = (passedTime + m_PacketACKIntervalRem) / m_PacketACKInterval; 1123 m_PacketACKIntervalRem = (passedTime + m_PacketACKIntervalRem) - (numPackets * m_PacketACKInterval); 1124 if (m_LastConfirmedReceivedSequenceNumber + numPackets < m_LastReceivedSequenceNumber) 1125 { 1126 lastReceivedSeqn = m_LastConfirmedReceivedSequenceNumber + numPackets; 1127 if (!m_IsAckSendScheduled) 1128 { 1129 auto ackTimeout = m_RTT/10; 1130 if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay; 1131 ScheduleAck (ackTimeout); 1132 } 1133 } 1134 if (numPackets == 0) return; 1135 // for limit inbound speed 1136 if (!m_SavedPackets.empty ()) 1137 { 1138 for (auto it: m_SavedPackets) 1139 { 1140 auto seqn = it->GetSeqn (); 1141 // for limit inbound speed 1142 if (m_LastConfirmedReceivedSequenceNumber + numPackets < int(seqn)) 1143 { 1144 if (!m_IsAckSendScheduled) 1145 { 1146 auto ackTimeout = m_RTT/10; 1147 if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay; 1148 ScheduleAck (ackTimeout); 1149 } 1150 if (lostPackets) 1151 break; 1152 else 1153 return; 1154 } 1155 // for limit inbound speed 1156 if ((int)seqn > lastReceivedSeqn) 1157 { 1158 lastReceivedSeqn = seqn; 1159 lostPackets = true; // for limit inbound speed 1160 } 1161 } 1162 } 1163 if (lastReceivedSeqn < 0) 1164 { 1165 LogPrint (eLogError, "Streaming: No packets have been received yet"); 1166 if (m_SequenceNumber == 0) 1167 { 1168 if (m_NumResendAttempts > 1) 1169 { 1170 m_Status = eStreamStatusReset; 1171 Close (); 1172 return; 1173 } 1174 m_NumResendAttempts++; 1175 } 1176 return; 1177 } 1178 1179 Packet p; 1180 uint8_t * packet = p.GetBuffer (); 1181 size_t size = 0; 1182 htobe32buf (packet + size, m_SendStreamID); 1183 size += 4; // sendStreamID 1184 htobe32buf (packet + size, m_RecvStreamID); 1185 size += 4; // receiveStreamID 1186 htobuf32 (packet + size, 0); // this is plain Ack message 1187 size += 4; // sequenceNum 1188 htobe32buf (packet + size, lastReceivedSeqn); 1189 size += 4; // ack Through 1190 uint8_t numNacks = 0; 1191 bool choking = m_IsChoking2; 1192 if (lastReceivedSeqn > m_LastReceivedSequenceNumber) 1193 { 1194 // fill NACKs 1195 uint8_t * nacks = packet + size + 1; 1196 auto nextSeqn = m_LastReceivedSequenceNumber + 1; 1197 for (auto it: m_SavedPackets) 1198 { 1199 auto seqn = it->GetSeqn (); 1200 if (m_LastConfirmedReceivedSequenceNumber + numPackets < int(seqn)) // for limit inbound speed 1201 { 1202 htobe32buf (packet + 12, nextSeqn - 1); 1203 break; 1204 } 1205 if (numNacks + (seqn - nextSeqn) >= 256) 1206 { 1207 LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); 1208 htobe32buf (packet + 12, nextSeqn - 1); // change ack Through back 1209 choking = true; 1210 break; 1211 } 1212 for (uint32_t i = nextSeqn; i < seqn; i++) 1213 { 1214 htobe32buf (nacks, i); 1215 nacks += 4; 1216 numNacks++; 1217 } 1218 nextSeqn = seqn + 1; 1219 } 1220 packet[size] = numNacks; 1221 size++; // NACK count 1222 size += numNacks*4; // NACKs 1223 } 1224 else 1225 { 1226 // No NACKs 1227 packet[size] = 0; 1228 size++; // NACK count 1229 } 1230 packet[size] = 0; 1231 size++; // resend delay 1232 bool requestImmediateAck = false; 1233 if (!choking) 1234 requestImmediateAck = m_LastSendTime && ts > m_LastSendTime + REQUEST_IMMEDIATE_ACK_INTERVAL && 1235 ts > m_LastSendTime + REQUEST_IMMEDIATE_ACK_INTERVAL + m_LocalDestination.GetRandom () % REQUEST_IMMEDIATE_ACK_INTERVAL_VARIANCE; 1236 htobe16buf (packet + size, (choking || requestImmediateAck) ? PACKET_FLAG_DELAY_REQUESTED : 0); // no flags set or delay requested 1237 size += 2; // flags 1238 if (choking || requestImmediateAck) 1239 { 1240 htobe16buf (packet + size, 2); // 2 bytes delay interval 1241 if (m_IsChoking2) 1242 htobe16buf (packet + size + 2, DELAY_CHOKING_2); // set choking2 1243 else 1244 htobe16buf (packet + size + 2, choking ? DELAY_CHOKING : 0); // set choking or immediate ack interval 1245 size += 2; 1246 if (requestImmediateAck) // ack request sent 1247 { 1248 m_LastSendTime = ts; 1249 m_IsImmediateAckRequested = true; 1250 } 1251 } 1252 else 1253 htobuf16 (packet + size, 0); // no options 1254 size += 2; // options size 1255 p.len = size; 1256 1257 SendPackets (std::vector<Packet *> { &p }); 1258 m_LastACKSendTime = ts; // for limit inbound speed 1259 m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn; // for limit inbound speed 1260 m_IsChoking2 = false; 1261 LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); 1262 } 1263 1264 void Stream::SendPing () 1265 { 1266 Packet p; 1267 uint8_t * packet = p.GetBuffer (); 1268 size_t size = 0; 1269 htobe32buf (packet, m_RecvStreamID); 1270 size += 4; // sendStreamID 1271 memset (packet + size, 0, 14); 1272 size += 14; // all zeroes 1273 uint16_t flags = PACKET_FLAG_ECHO | PACKET_FLAG_FROM_INCLUDED; 1274 if (!m_DontSign) flags |= PACKET_FLAG_SIGNATURE_INCLUDED; 1275 bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature (); 1276 if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE; 1277 htobe16buf (packet + size, flags); 1278 size += 2; // flags 1279 size_t identityLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetFullLen (); 1280 uint8_t * optionsSize = packet + size; // set options size later 1281 size += 2; // options size 1282 m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); 1283 size += identityLen; // from 1284 if (m_DontSign) 1285 htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size 1286 else 1287 { 1288 if (isOfflineSignature) 1289 { 1290 const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature (); 1291 memcpy (packet + size, offlineSignature.data (), offlineSignature.size ()); 1292 size += offlineSignature.size (); // offline signature 1293 } 1294 size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen (); 1295 uint8_t * signature = packet + size; // set it later 1296 memset (signature, 0, signatureLen); // zeroes for now 1297 size += signatureLen; // signature 1298 htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size 1299 m_LocalDestination.GetOwner ()->Sign (packet, size, signature); 1300 } 1301 p.len = size; 1302 SendPackets (std::vector<Packet *> { &p }); 1303 LogPrint (eLogDebug, "Streaming: Ping of ", p.len, " bytes sent"); 1304 } 1305 1306 void Stream::Close () 1307 { 1308 LogPrint(eLogDebug, "Streaming: closing stream with sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ", status=", m_Status); 1309 switch (m_Status) 1310 { 1311 case eStreamStatusOpen: 1312 m_Status = eStreamStatusClosing; 1313 Close (); // recursion 1314 if (m_Status == eStreamStatusClosing) //still closing 1315 LogPrint (eLogDebug, "Streaming: Trying to send stream data before closing, sSID=", m_SendStreamID); 1316 break; 1317 case eStreamStatusReset: 1318 // TODO: send reset 1319 Terminate (); 1320 break; 1321 case eStreamStatusClosing: 1322 if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send 1323 { 1324 m_Status = eStreamStatusClosed; 1325 SendClose(); 1326 } 1327 break; 1328 case eStreamStatusClosed: 1329 // already closed 1330 Terminate (); 1331 break; 1332 default: 1333 LogPrint (eLogWarning, "Streaming: Unexpected stream status=", (int)m_Status, " for sSID=", m_SendStreamID); 1334 }; 1335 } 1336 1337 void Stream::SendClose () 1338 { 1339 Packet * p = m_LocalDestination.NewPacket (); 1340 uint8_t * packet = p->GetBuffer (); 1341 size_t size = 0; 1342 htobe32buf (packet + size, m_SendStreamID); 1343 size += 4; // sendStreamID 1344 htobe32buf (packet + size, m_RecvStreamID); 1345 size += 4; // receiveStreamID 1346 htobe32buf (packet + size, m_SequenceNumber++); 1347 size += 4; // sequenceNum 1348 htobe32buf (packet + size, m_LastReceivedSequenceNumber >= 0 ? m_LastReceivedSequenceNumber : 0); 1349 size += 4; // ack Through 1350 packet[size] = 0; 1351 size++; // NACK count 1352 packet[size] = 0; 1353 size++; // resend delay 1354 uint16_t flags = PACKET_FLAG_CLOSE; 1355 if (!m_DontSign) flags |= PACKET_FLAG_SIGNATURE_INCLUDED; 1356 bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature (); 1357 if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE; 1358 htobe16buf (packet + size, flags); 1359 size += 2; // flags 1360 if (m_DontSign) 1361 { 1362 memset (packet + size, 0, 2); // no options 1363 size += 2; // options size 1364 } 1365 else 1366 { 1367 if (isOfflineSignature) 1368 { 1369 const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature (); 1370 memcpy (packet + size, offlineSignature.data (), offlineSignature.size ()); 1371 size += offlineSignature.size (); // offline signature 1372 } 1373 size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen (); 1374 htobe16buf (packet + size, signatureLen); // signature only 1375 size += 2; // options size 1376 uint8_t * signature = packet + size; 1377 memset (packet + size, 0, signatureLen); 1378 size += signatureLen; // signature 1379 m_LocalDestination.GetOwner ()->Sign (packet, size, signature); 1380 } 1381 1382 p->len = size; 1383 boost::asio::post (m_Service, std::bind (&Stream::SendPacket, shared_from_this (), p)); 1384 if (m_RoutingSession) 1385 { 1386 int numSentPackets = m_RoutingSession->NumSentPackets (); 1387 m_RoutingSession->SetNumSentPackets (numSentPackets + 1); 1388 } 1389 LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); 1390 } 1391 1392 size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) 1393 { 1394 size_t pos = 0; 1395 while (pos < len && !m_ReceiveQueue.empty ()) 1396 { 1397 Packet * packet = m_ReceiveQueue.front (); 1398 size_t l = std::min (packet->GetLength (), len - pos); 1399 memcpy (buf + pos, packet->GetBuffer (), l); 1400 pos += l; 1401 packet->offset += l; 1402 if (!packet->GetLength ()) 1403 { 1404 m_ReceiveQueue.pop (); 1405 m_LocalDestination.DeletePacket (packet); 1406 } 1407 } 1408 return pos; 1409 } 1410 1411 bool Stream::SendPacket (Packet * packet) 1412 { 1413 if (packet) 1414 { 1415 if (m_IsAckSendScheduled) 1416 { 1417 m_IsAckSendScheduled = false; 1418 m_AckSendTimer.cancel (); 1419 } 1420 if (!packet->sendTime) packet->sendTime = i2p::util::GetMillisecondsSinceEpoch (); 1421 SendPackets (std::vector<Packet *> { packet }); 1422 bool isEmpty = m_SentPackets.empty (); 1423 m_SentPackets.insert (packet); 1424 if (isEmpty) 1425 ScheduleResend (); 1426 return true; 1427 } 1428 else 1429 return false; 1430 } 1431 1432 void Stream::SendPackets (const std::vector<Packet *>& packets) 1433 { 1434 if (!m_RemoteLeaseSet) 1435 { 1436 CancelRemoteLeaseChange (); 1437 UpdateCurrentRemoteLease (); 1438 if (!m_RemoteLeaseSet) 1439 { 1440 LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID); 1441 return; 1442 } 1443 } 1444 if (!m_RoutingSession || m_RoutingSession->IsTerminated () || !m_RoutingSession->IsReadyToSend ()) // expired and detached or new session sent 1445 { 1446 m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true, !m_IsIncoming || m_SequenceNumber > 1); 1447 if (!m_RoutingSession) 1448 { 1449 LogPrint (eLogError, "Streaming: Can't obtain routing session, sSID=", m_SendStreamID); 1450 Terminate (); 1451 return; 1452 } 1453 } 1454 if (!m_CurrentOutboundTunnel && m_RoutingSession) // first message to send 1455 { 1456 // try to get shared path first 1457 auto routingPath = m_RoutingSession->GetSharedRoutingPath (); 1458 if (routingPath) 1459 { 1460 m_CurrentOutboundTunnel = routingPath->outboundTunnel; 1461 m_CurrentRemoteLease = routingPath->remoteLease; 1462 m_RTT = routingPath->rtt; 1463 } 1464 } 1465 1466 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1467 if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate) // excluded from LeaseSet 1468 { 1469 CancelRemoteLeaseChange (); 1470 UpdateCurrentRemoteLease (true); 1471 } 1472 if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTO) 1473 { 1474 LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size"); 1475 CancelRemoteLeaseChange (); 1476 m_CurrentRemoteLease = m_NextRemoteLease; 1477 ResetWindowSize (); 1478 } 1479 auto currentRemoteLease = m_CurrentRemoteLease; 1480 if (!m_IsRemoteLeaseChangeInProgress && m_RemoteLeaseSet && m_CurrentRemoteLease && ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD) 1481 { 1482 auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (false); 1483 if (leases.size ()) 1484 { 1485 m_IsRemoteLeaseChangeInProgress = true; 1486 UpdateCurrentRemoteLease (true); 1487 m_NextRemoteLease = m_CurrentRemoteLease; 1488 } 1489 else 1490 UpdateCurrentRemoteLease (true); 1491 } 1492 if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD) 1493 { 1494 bool freshTunnel = false; 1495 if (!m_CurrentOutboundTunnel) 1496 { 1497 auto leaseRouter = i2p::data::netdb.FindRouter (m_CurrentRemoteLease->tunnelGateway); 1498 m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (nullptr, 1499 leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports); 1500 freshTunnel = true; 1501 } 1502 else if (!m_CurrentOutboundTunnel->IsEstablished ()) 1503 std::tie(m_CurrentOutboundTunnel, freshTunnel) = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); 1504 if (!m_CurrentOutboundTunnel) 1505 { 1506 LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID); 1507 m_CurrentRemoteLease = nullptr; 1508 return; 1509 } 1510 if (freshTunnel) 1511 { 1512 LogPrint (eLogDebug, "Streaming: OutboundTunnel changed, set initial window size"); 1513 ResetWindowSize (); 1514 // m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely 1515 } 1516 1517 std::vector<i2p::tunnel::TunnelMessageBlock> msgs; 1518 for (const auto& it: packets) 1519 { 1520 auto msg = m_RoutingSession->WrapSingleMessage (m_LocalDestination.CreateDataMessage ( 1521 it->GetBuffer (), it->GetLength (), m_Port, !m_RoutingSession->IsRatchets (), it->IsSYN ())); 1522 msgs.push_back (i2p::tunnel::TunnelMessageBlock 1523 { 1524 i2p::tunnel::eDeliveryTypeTunnel, 1525 m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID, 1526 msg 1527 }); 1528 m_NumSentBytes += it->GetLength (); 1529 if (m_IsRemoteLeaseChangeInProgress && !m_RemoteLeaseChangeTime) 1530 { 1531 m_RemoteLeaseChangeTime = ts; 1532 m_CurrentRemoteLease = currentRemoteLease; // change it back before new lease is confirmed 1533 } 1534 } 1535 m_CurrentOutboundTunnel->SendTunnelDataMsgs (msgs); 1536 } 1537 else 1538 { 1539 LogPrint (eLogWarning, "Streaming: Remote lease is not available, sSID=", m_SendStreamID); 1540 if (m_RoutingSession) 1541 m_RoutingSession->SetSharedRoutingPath (nullptr); // invalidate routing path 1542 } 1543 } 1544 1545 void Stream::SendUpdatedLeaseSet () 1546 { 1547 if (m_RoutingSession && !m_RoutingSession->IsTerminated ()) 1548 { 1549 if (m_RoutingSession->IsLeaseSetNonConfirmed ()) 1550 { 1551 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1552 if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT) 1553 { 1554 // LeaseSet was not confirmed, should try other tunnels 1555 LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit"); 1556 m_RoutingSession->SetSharedRoutingPath (nullptr); 1557 m_CurrentOutboundTunnel = nullptr; 1558 m_CurrentRemoteLease = nullptr; 1559 SendQuickAck (); 1560 } 1561 } 1562 else if (m_RoutingSession->IsLeaseSetUpdated ()) 1563 { 1564 LogPrint (eLogDebug, "Streaming: sending updated LeaseSet"); 1565 SendQuickAck (); 1566 } 1567 } 1568 else 1569 SendQuickAck (); 1570 } 1571 1572 void Stream::ScheduleSend () 1573 { 1574 if (m_Status != eStreamStatusTerminated) 1575 { 1576 m_SendTimer.cancel (); 1577 uint64_t interval = SEND_INTERVAL + m_LocalDestination.GetRandom () % SEND_INTERVAL_VARIANCE; 1578 if (interval < m_PacingTime) interval = m_PacingTime; 1579 m_SendTimer.expires_from_now (boost::posix_time::microseconds(interval)); 1580 m_SendTimer.async_wait (std::bind (&Stream::HandleSendTimer, 1581 shared_from_this (), std::placeholders::_1)); 1582 } 1583 } 1584 1585 void Stream::HandleSendTimer (const boost::system::error_code& ecode) 1586 { 1587 if (ecode != boost::asio::error::operation_aborted) 1588 { 1589 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1590 if (m_LastSendTime && ts*1000 > m_LastSendTime*1000 + m_PacingTime) 1591 { 1592 if (m_PacingTime) 1593 { 1594 auto numPackets = std::lldiv (m_PacingTimeRem + ts*1000 - m_LastSendTime*1000, m_PacingTime); 1595 m_NumPacketsToSend = numPackets.quot; 1596 m_PacingTimeRem = numPackets.rem; 1597 } 1598 else 1599 { 1600 LogPrint (eLogError, "Streaming: pacing time is zero"); 1601 m_NumPacketsToSend = 1; m_PacingTimeRem = 0; 1602 } 1603 m_IsSendTime = true; 1604 if (m_WindowIncCounter && (m_WindowSize < m_MaxWindowSize || m_WindowDropTargetSize) && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime) 1605 { 1606 float winSize = m_WindowSize; 1607 if (m_WindowDropTargetSize) 1608 winSize = m_WindowDropTargetSize; 1609 float maxWinSize = m_MaxWindowSize; 1610 if (m_LastWindowIncTime) 1611 maxWinSize = (ts - m_LastWindowIncTime) / (m_RTT / MAX_WINDOW_SIZE_INC_PER_RTT) + winSize; 1612 for (int i = 0; i < m_NumPacketsToSend; i++) 1613 { 1614 if (m_WindowIncCounter) 1615 { 1616 if (m_WindowDropTargetSize) 1617 { 1618 if (m_LastWindowDropSize && (m_LastWindowDropSize >= m_WindowDropTargetSize)) 1619 m_WindowDropTargetSize += 1 - (1 / ((m_LastWindowDropSize + PREV_SPEED_KEEP_TIME_COEFF) / m_WindowDropTargetSize)); // some magic here 1620 else if (m_LastWindowDropSize && (m_LastWindowDropSize < m_WindowDropTargetSize)) 1621 m_WindowDropTargetSize += (m_WindowDropTargetSize - (m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowDropTargetSize; // some magic here 1622 else 1623 m_WindowDropTargetSize += (m_WindowDropTargetSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowDropTargetSize; 1624 if (m_WindowDropTargetSize > m_MaxWindowSize) m_WindowDropTargetSize = m_MaxWindowSize; 1625 m_WindowIncCounter--; 1626 if (m_WindowDropTargetSize >= maxWinSize) 1627 { 1628 m_WindowDropTargetSize = maxWinSize; 1629 break; 1630 } 1631 } 1632 else 1633 { 1634 if (m_LastWindowDropSize && (m_LastWindowDropSize >= m_WindowSize)) 1635 m_WindowSize += 1 - (1 / ((m_LastWindowDropSize + PREV_SPEED_KEEP_TIME_COEFF) / m_WindowSize)); // some magic here 1636 else if (m_LastWindowDropSize && (m_LastWindowDropSize < m_WindowSize)) 1637 m_WindowSize += (m_WindowSize - (m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize; // some magic here 1638 else 1639 m_WindowSize += (m_WindowSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize; 1640 if (m_WindowSize > m_MaxWindowSize) m_WindowSize = m_MaxWindowSize; 1641 m_WindowIncCounter--; 1642 if (m_WindowSize >= maxWinSize) 1643 { 1644 m_WindowSize = maxWinSize; 1645 break; 1646 } 1647 } 1648 } 1649 else 1650 break; 1651 } 1652 UpdatePacingTime (); 1653 } 1654 m_LastWindowIncTime = ts; 1655 if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) // resend packets 1656 ResendPacket (); 1657 else if (m_WindowSize > int(m_SentPackets.size ())) // send packets 1658 SendBuffer (); 1659 } 1660 else // pass 1661 ScheduleSend (); 1662 } 1663 } 1664 1665 void Stream::ScheduleResend () 1666 { 1667 if (m_Status != eStreamStatusTerminated) 1668 { 1669 m_ResendTimer.cancel (); 1670 // check for invalid value 1671 if (m_RTO <= 0) m_RTO = INITIAL_RTO; 1672 m_ResendTimer.expires_from_now (boost::posix_time::milliseconds(m_RTO)); 1673 m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer, 1674 shared_from_this (), std::placeholders::_1)); 1675 } 1676 } 1677 1678 void Stream::HandleResendTimer (const boost::system::error_code& ecode) 1679 { 1680 if (ecode != boost::asio::error::operation_aborted) 1681 { 1682 m_IsSendTime = true; 1683 if (m_RTO > INITIAL_RTO) m_RTO = INITIAL_RTO; 1684 m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit 1685 m_IsTimeOutResend = true; 1686 m_IsNAcked = false; 1687 m_IsResendNeeded = false; 1688 m_NumPacketsToSend = 1; 1689 ResendPacket (); // send one packet per RTO, waiting for ack 1690 } 1691 } 1692 1693 void Stream::ResendPacket () 1694 { 1695 // check for resend attempts 1696 if (m_IsIncoming && m_SequenceNumber == 1 && m_NumResendAttempts > 0) 1697 { 1698 LogPrint (eLogWarning, "Streaming: SYNACK packet was not ACKed after ", m_NumResendAttempts, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); 1699 m_Status = eStreamStatusReset; 1700 Close (); 1701 return; 1702 } 1703 if (m_NumResendAttempts >= m_MaxNumResendAttempts) 1704 { 1705 LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", m_MaxNumResendAttempts, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); 1706 m_Status = eStreamStatusReset; 1707 Close (); 1708 return; 1709 } 1710 1711 // collect packets to resend 1712 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1713 std::vector<Packet *> packets; 1714 if (m_IsNAcked && !m_IsClientChoked && !m_IsClientChoked2) 1715 { 1716 for (auto it : m_NACKedPackets) 1717 { 1718 if (ts >= it->sendTime + m_RTO) 1719 { 1720 if (ts < it->sendTime + m_RTO*3) 1721 it->resent = true; 1722 else 1723 it->resent = false; 1724 it->sendTime = ts; 1725 packets.push_back (it); 1726 if ((int)packets.size () >= m_NumPacketsToSend) break; 1727 } 1728 } 1729 } 1730 else 1731 { 1732 for (auto it : m_SentPackets) 1733 { 1734 if (ts >= it->sendTime + m_RTO) 1735 { 1736 if (ts < it->sendTime + m_RTO*3) 1737 it->resent = true; 1738 else 1739 it->resent = false; 1740 it->sendTime = ts; 1741 packets.push_back (it); 1742 if (m_IsClientChoked2 && it->GetSeqn () == m_DropWindowDelaySequenceNumber) 1743 m_IsClientChoked2 = false; 1744 if ((int)packets.size () >= m_NumPacketsToSend) break; 1745 } 1746 } 1747 } 1748 1749 // select tunnels if necessary and send 1750 if (packets.size () > 0 && m_IsSendTime) 1751 { 1752 if (m_IsNAcked) m_NumResendAttempts = 1; 1753 else if (m_IsTimeOutResend) m_NumResendAttempts++; 1754 if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO) 1755 { 1756 // loss-based CC 1757 if (!m_IsWinDropped && LOSS_BASED_CONTROL_ENABLED && !m_IsClientChoked) 1758 { 1759 LogPrint (eLogDebug, "Streaming: Packet loss, reduce window size"); 1760 if (m_WindowDropTargetSize) 1761 m_LastWindowDropSize = m_WindowDropTargetSize; 1762 else 1763 m_LastWindowDropSize = m_WindowSize; 1764 m_WindowDropTargetSize = m_LastWindowDropSize * 0.5; // -50% to drain queue 1765 if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) 1766 m_WindowDropTargetSize = MIN_WINDOW_SIZE; 1767 m_WindowIncCounter = 0; // disable window growth 1768 m_DropWindowDelaySequenceNumber = m_SequenceNumber + int(m_WindowDropTargetSize); 1769 m_IsFirstACK = true; // ignore first RTT sample 1770 m_IsWinDropped = true; // don't drop window twice 1771 UpdatePacingTime (); 1772 } 1773 } 1774 else if (m_IsTimeOutResend) 1775 { 1776 m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change 1777 m_WindowDropTargetSize = INITIAL_WINDOW_SIZE; 1778 m_LastWindowDropSize = 0; 1779 m_WindowIncCounter = 0; 1780 m_IsWinDropped = true; 1781 m_IsFirstRttSample = true; 1782 m_DropWindowDelaySequenceNumber = 0; 1783 m_IsFirstACK = true; 1784 m_LastACKRecieveTime = 0; 1785 m_ACKRecieveInterval = m_AckDelay; 1786 UpdatePacingTime (); 1787 if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); 1788 if (m_NumResendAttempts & 1) 1789 { 1790 // pick another outbound tunnel 1791 m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); 1792 LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts, 1793 ", another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); 1794 } 1795 else 1796 { 1797 CancelRemoteLeaseChange (); 1798 UpdateCurrentRemoteLease (); // pick another lease 1799 LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts, 1800 ", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); 1801 } 1802 } 1803 if (m_IsTimeOutResend) ScheduleResend (); 1804 SendPackets (packets); 1805 m_LastSendTime = ts; 1806 m_IsSendTime = false; 1807 } 1808 else if (!m_IsClientChoked && !m_IsClientChoked2) 1809 SendBuffer (); 1810 m_IsSendTime = false; 1811 if (m_IsTimeOutResend) ScheduleResend (); // ^ m_IsTimeOutResend = false 1812 else if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) ScheduleSend (); 1813 } 1814 1815 void Stream::ScheduleAck (int timeout) 1816 { 1817 if (m_IsAckSendScheduled) 1818 m_AckSendTimer.cancel (); 1819 m_IsAckSendScheduled = true; 1820 if (timeout < MIN_SEND_ACK_TIMEOUT) timeout = MIN_SEND_ACK_TIMEOUT; 1821 m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(timeout)); 1822 m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, 1823 shared_from_this (), std::placeholders::_1)); 1824 } 1825 1826 void Stream::HandleAckSendTimer (const boost::system::error_code& ecode) 1827 { 1828 if (m_IsAckSendScheduled) 1829 { 1830 if (m_LastReceivedSequenceNumber < 0) 1831 { 1832 LogPrint (eLogWarning, "Streaming: SYN has not been received after ", SYN_TIMEOUT, " milliseconds after follow on, terminate rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); 1833 m_Status = eStreamStatusReset; 1834 Close (); 1835 return; 1836 } 1837 if (m_Status == eStreamStatusOpen) 1838 { 1839 if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) 1840 { 1841 auto ts = i2p::util::GetMillisecondsSinceEpoch (); 1842 if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT) 1843 { 1844 // seems something went wrong and we should re-select tunnels 1845 m_CurrentOutboundTunnel = nullptr; 1846 m_CurrentRemoteLease = nullptr; 1847 } 1848 } 1849 if (m_LastReceivedSequenceNumber == 0 && m_SequenceNumber == 1) 1850 { 1851 if (m_NumResendAttempts > 1) 1852 { 1853 m_Status = eStreamStatusReset; 1854 Close (); 1855 return; 1856 } 1857 m_NumResendAttempts++; 1858 ScheduleAck (INITIAL_RTO); 1859 } 1860 SendQuickAck (); 1861 } 1862 m_IsAckSendScheduled = false; 1863 } 1864 } 1865 1866 void Stream::UpdateCurrentRemoteLease (bool expired) 1867 { 1868 bool isLeaseChanged = true; 1869 if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) 1870 { 1871 auto remoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); 1872 if (!remoteLeaseSet) 1873 { 1874 LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase32 (), m_RemoteLeaseSet ? " expired" : " not found"); 1875 if (!m_IsIncoming) // outgoing 1876 { 1877 auto requestCallback = [s = shared_from_this ()](std::shared_ptr<i2p::data::LeaseSet> ls) 1878 { 1879 if (!ls && s->m_Status == eStreamStatusOpen) // LeaseSet not found 1880 { 1881 // close the socket without sending FIN or RST 1882 s->m_Status = eStreamStatusClosed; 1883 s->AsyncClose (); 1884 } 1885 }; 1886 1887 if (m_RemoteLeaseSet && m_RemoteLeaseSet->IsPublishedEncrypted ()) 1888 { 1889 m_LocalDestination.GetOwner ()->RequestDestinationWithEncryptedLeaseSet ( 1890 std::make_shared<i2p::data::BlindedPublicKey>(m_RemoteIdentity), requestCallback); 1891 return; // we keep m_RemoteLeaseSet for possible next request 1892 } 1893 else 1894 { 1895 m_RemoteLeaseSet = nullptr; 1896 m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash (), requestCallback); // try to request for a next attempt 1897 } 1898 } 1899 else // incoming 1900 { 1901 // just close the socket without sending FIN or RST 1902 m_Status = eStreamStatusClosed; 1903 AsyncClose (); 1904 } 1905 } 1906 else 1907 { 1908 // LeaseSet updated 1909 m_RemoteLeaseSet = remoteLeaseSet; 1910 m_RemoteIdentity = m_RemoteLeaseSet->GetIdentity (); 1911 m_TransientVerifier = m_RemoteLeaseSet->GetTransientVerifier (); 1912 } 1913 } 1914 if (m_RemoteLeaseSet) 1915 { 1916 if (!m_RoutingSession) 1917 m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true); 1918 auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (false); // try without threshold first 1919 if (leases.empty ()) 1920 { 1921 expired = false; 1922 // time to request 1923 if (m_RemoteLeaseSet->IsPublishedEncrypted ()) 1924 m_LocalDestination.GetOwner ()->RequestDestinationWithEncryptedLeaseSet ( 1925 std::make_shared<i2p::data::BlindedPublicKey>(m_RemoteIdentity)); 1926 else 1927 m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); 1928 leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold 1929 } 1930 if (!leases.empty ()) 1931 { 1932 bool updated = false; 1933 if (expired && m_CurrentRemoteLease) 1934 { 1935 for (const auto& it: leases) 1936 if ((it->tunnelGateway == m_CurrentRemoteLease->tunnelGateway) && (it->tunnelID != m_CurrentRemoteLease->tunnelID)) 1937 { 1938 m_CurrentRemoteLease = it; 1939 updated = true; 1940 break; 1941 } 1942 } 1943 if (!updated) 1944 { 1945 uint32_t i = m_LocalDestination.GetRandom () % leases.size (); 1946 if (m_CurrentRemoteLease && leases[i]->tunnelID == m_CurrentRemoteLease->tunnelID) 1947 { 1948 // make sure we don't select previous 1949 if (leases.size () > 1) 1950 i = (i + 1) % leases.size (); // if so, pick next 1951 else 1952 isLeaseChanged = false; 1953 } 1954 m_CurrentRemoteLease = leases[i]; 1955 } 1956 } 1957 else 1958 { 1959 LogPrint (eLogWarning, "Streaming: All remote leases are expired"); 1960 m_RemoteLeaseSet = nullptr; 1961 m_CurrentRemoteLease = nullptr; 1962 // we have requested expired before, no need to do it twice 1963 } 1964 } 1965 else 1966 { 1967 LogPrint (eLogWarning, "Streaming: Remote LeaseSet not found"); 1968 m_CurrentRemoteLease = nullptr; 1969 } 1970 if (isLeaseChanged && !m_IsRemoteLeaseChangeInProgress) 1971 { 1972 LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size"); 1973 ResetWindowSize (); 1974 } 1975 } 1976 1977 void Stream::ResetRoutingPath () 1978 { 1979 m_CurrentOutboundTunnel = nullptr; 1980 m_CurrentRemoteLease = nullptr; 1981 m_RTT = INITIAL_RTT; 1982 m_RTO = INITIAL_RTO; 1983 if (m_RoutingSession) 1984 m_RoutingSession->SetSharedRoutingPath (nullptr); // TODO: count failures 1985 } 1986 1987 void Stream::UpdatePacingTime () 1988 { 1989 double rtt = m_MinRTT; 1990 if (m_IsWinDropped) 1991 rtt = m_SlowRTT; 1992 if (m_WindowDropTargetSize) 1993 m_PacingTime = std::round (rtt*1000/m_WindowDropTargetSize); 1994 else 1995 m_PacingTime = std::round (rtt*1000/m_WindowSize); 1996 if (m_MinPacingTime && m_PacingTime < m_MinPacingTime) 1997 m_PacingTime = m_MinPacingTime; 1998 } 1999 2000 void Stream::ProcessWindowDrop () 2001 { 2002 if (m_WindowDropTargetSize) 2003 { 2004 m_LastWindowDropSize = m_WindowDropTargetSize * ((m_MinRTT + m_Jitter*6) / m_SlowRTT); 2005 m_WindowDropTargetSize = m_LastWindowDropSize * m_MinRTT / m_SlowRTT; // we start send faster when rtt will decrease 2006 } 2007 else 2008 { 2009 m_LastWindowDropSize = m_WindowSize * ((m_MinRTT + m_Jitter*6) / m_SlowRTT); 2010 m_WindowDropTargetSize = m_WindowSize * m_MinRTT / m_SlowRTT; // we start send faster when rtt will decrease 2011 } 2012 if (m_WindowDropTargetSize < MIN_WINDOW_SIZE) 2013 m_WindowDropTargetSize = MIN_WINDOW_SIZE; 2014 m_WindowIncCounter = 0; // disable window growth 2015 m_DropWindowDelaySequenceNumber = m_SequenceNumber + int(m_WindowDropTargetSize); 2016 m_IsFirstACK = true; // ignore first RTT sample 2017 m_IsWinDropped = true; // don't drop window twice 2018 UpdatePacingTime (); 2019 } 2020 2021 void Stream::ResetWindowSize () 2022 { 2023 m_RTO = INITIAL_RTO; 2024 if (!m_IsClientChoked) 2025 { 2026 if (m_WindowSize > INITIAL_WINDOW_SIZE) 2027 { 2028 m_WindowDropTargetSize = (float)INITIAL_WINDOW_SIZE; 2029 m_IsWinDropped = true; 2030 } 2031 else 2032 m_WindowSize = INITIAL_WINDOW_SIZE; 2033 } 2034 m_LastWindowDropSize = 0; 2035 m_IsFirstRttSample = true; 2036 m_IsFirstACK = true; 2037 m_WindowIncCounter = 0; // disable window growth 2038 m_DropWindowDelaySequenceNumber = m_SequenceNumber - int(m_SentPackets.size ()) + INITIAL_WINDOW_SIZE; 2039 m_IsWinDropped = true; // don't drop window twice 2040 UpdatePacingTime (); 2041 } 2042 2043 void Stream::CancelRemoteLeaseChange () 2044 { 2045 m_RemoteLeaseChangeTime = 0; 2046 m_IsRemoteLeaseChangeInProgress = false; 2047 } 2048 2049 StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip): 2050 m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), 2051 m_PendingIncomingTimer (m_Owner->GetService ()), 2052 m_LastCleanupTime (i2p::util::GetSecondsSinceEpoch ()) 2053 { 2054 } 2055 2056 StreamingDestination::~StreamingDestination () 2057 { 2058 for (auto& it: m_SavedPackets) 2059 { 2060 for (auto it1: it.second) DeletePacket (it1); 2061 it.second.clear (); 2062 } 2063 m_SavedPackets.clear (); 2064 } 2065 2066 void StreamingDestination::Start () 2067 { 2068 } 2069 2070 void StreamingDestination::Stop () 2071 { 2072 ResetAcceptor (); 2073 ResetPongHandler (); 2074 m_PendingIncomingTimer.cancel (); 2075 m_PendingIncomingStreams.clear (); 2076 { 2077 std::unique_lock<std::mutex> l(m_StreamsMutex); 2078 for (auto it: m_Streams) 2079 it.second->Terminate (false); // we delete here 2080 m_Streams.clear (); 2081 m_IncomingStreams.clear (); 2082 m_LastStream = nullptr; 2083 } 2084 } 2085 2086 void StreamingDestination::HandleNextPacket (Packet * packet) 2087 { 2088 uint32_t sendStreamID = packet->GetSendStreamID (); 2089 if (sendStreamID) 2090 { 2091 if (!m_LastStream || sendStreamID != m_LastStream->GetRecvStreamID ()) 2092 { 2093 auto it = m_Streams.find (sendStreamID); 2094 if (it != m_Streams.end ()) 2095 m_LastStream = it->second; 2096 else 2097 m_LastStream = nullptr; 2098 } 2099 if (m_LastStream) 2100 m_LastStream->HandleNextPacket (packet); 2101 else if (packet->IsEcho () && m_Owner->IsStreamingAnswerPings ()) 2102 { 2103 // ping 2104 LogPrint (eLogInfo, "Streaming: Ping received sSID=", sendStreamID); 2105 auto s = std::make_shared<Stream> (m_Owner->GetService (), *this); 2106 s->HandlePing (packet); 2107 } 2108 else 2109 { 2110 LogPrint (eLogInfo, "Streaming: Unknown stream sSID=", sendStreamID); 2111 DeletePacket (packet); 2112 } 2113 } 2114 else 2115 { 2116 if (packet->IsEcho ()) 2117 { 2118 // pong 2119 LogPrint (eLogInfo, "Streaming: Pong received rSID=", packet->GetReceiveStreamID ()); 2120 if (m_PongHandler != nullptr) 2121 m_PongHandler (packet->from ? packet->from->GetDestinationPtr () : nullptr); 2122 DeletePacket (packet); 2123 return; 2124 } 2125 if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream 2126 { 2127 uint32_t receiveStreamID = packet->GetReceiveStreamID (); 2128 auto it1 = m_IncomingStreams.find (receiveStreamID); 2129 if (it1 != m_IncomingStreams.end ()) 2130 { 2131 // already pending 2132 LogPrint(eLogInfo, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); 2133 it1->second->ResetRoutingPath (); // Ack was not delivered, changing path 2134 DeletePacket (packet); // drop it, because previous should be connected 2135 return; 2136 } 2137 if (m_Owner->GetStreamingMaxConcurrentStreams () > 0 && (int)m_Streams.size () > m_Owner->GetStreamingMaxConcurrentStreams ()) 2138 { 2139 LogPrint(eLogInfo, "Streaming: Number of streams exceeds ", m_Owner->GetStreamingMaxConcurrentStreams ()); 2140 DeletePacket (packet); 2141 return; 2142 } 2143 if (m_Owner->GetStreamingMaxConnsPerMinute () > 0 && packet->from) 2144 { 2145 auto ts = i2p::util::GetSecondsSinceEpoch (); 2146 auto& numConnectionsList = m_NumIncomingConnectionsPerSecond[packet->from->GetRemoteStaticKey ()]; // find or create new 2147 CleanupExpiredNumConnectionsPerSecond (numConnectionsList, ts); 2148 if ((int)numConnectionsList.size () >= m_Owner->GetStreamingMaxConnsPerMinute ()) 2149 { 2150 LogPrint (eLogInfo, "Streaming: Number of incoming streams exceeds ", m_Owner->GetStreamingMaxConnsPerMinute (), " streams per minute"); 2151 DeletePacket (packet); 2152 return; 2153 } 2154 else 2155 numConnectionsList.emplace_back (ts); 2156 } 2157 auto incomingStream = CreateNewIncomingStream (receiveStreamID); 2158 incomingStream->HandleNextPacket (packet); // SYN 2159 if (!incomingStream->GetRemoteLeaseSet ()) 2160 { 2161 LogPrint (eLogWarning, "Streaming: No remote LeaseSet for incoming stream. Terminated"); 2162 incomingStream->Terminate (); // can't send FIN anyway 2163 return; 2164 } 2165 2166 // handle saved packets if any 2167 { 2168 auto it = m_SavedPackets.find (receiveStreamID); 2169 if (it != m_SavedPackets.end ()) 2170 { 2171 LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for rSID=", receiveStreamID); 2172 for (auto it1: it->second) 2173 incomingStream->HandleNextPacket (it1); 2174 m_SavedPackets.erase (it); 2175 } 2176 } 2177 // accept 2178 if (m_Acceptor != nullptr) 2179 m_Acceptor (incomingStream); 2180 else 2181 { 2182 LogPrint (eLogWarning, "Streaming: Acceptor for incoming stream is not set"); 2183 if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG) 2184 { 2185 m_PendingIncomingStreams.push_back (incomingStream); 2186 m_PendingIncomingTimer.cancel (); 2187 m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); 2188 m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, 2189 shared_from_this (), std::placeholders::_1)); 2190 LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID); 2191 } 2192 else 2193 { 2194 LogPrint (eLogWarning, "Streaming: Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG); 2195 incomingStream->Close (); 2196 } 2197 } 2198 } 2199 else // follow on packet without SYN 2200 { 2201 uint32_t receiveStreamID = packet->GetReceiveStreamID (); 2202 auto it1 = m_IncomingStreams.find (receiveStreamID); 2203 if (it1 != m_IncomingStreams.end ()) 2204 { 2205 // found 2206 it1->second->HandleNextPacket (packet); 2207 return; 2208 } 2209 // save follow on packet 2210 auto it = m_SavedPackets.find (receiveStreamID); 2211 if (it != m_SavedPackets.end ()) 2212 it->second.push_back (packet); 2213 else 2214 { 2215 m_SavedPackets[receiveStreamID] = std::list<Packet *>{ packet }; 2216 auto timer = std::make_shared<boost::asio::deadline_timer> (m_Owner->GetService ()); 2217 timer->expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); 2218 auto s = shared_from_this (); 2219 timer->async_wait ([s,timer,receiveStreamID](const boost::system::error_code& ecode) 2220 { 2221 if (ecode != boost::asio::error::operation_aborted) 2222 { 2223 auto it = s->m_SavedPackets.find (receiveStreamID); 2224 if (it != s->m_SavedPackets.end ()) 2225 { 2226 for (auto it1: it->second) s->DeletePacket (it1); 2227 it->second.clear (); 2228 s->m_SavedPackets.erase (it); 2229 } 2230 } 2231 }); 2232 } 2233 } 2234 } 2235 } 2236 2237 std::shared_ptr<Stream> StreamingDestination::CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port) 2238 { 2239 auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port); 2240 std::unique_lock<std::mutex> l(m_StreamsMutex); 2241 m_Streams.emplace (s->GetRecvStreamID (), s); 2242 return s; 2243 } 2244 2245 void StreamingDestination::SendPing (std::shared_ptr<const i2p::data::LeaseSet> remote) 2246 { 2247 auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, 0); 2248 s->SendPing (); 2249 } 2250 2251 std::shared_ptr<Stream> StreamingDestination::CreateNewIncomingStream (uint32_t receiveStreamID) 2252 { 2253 auto s = std::make_shared<Stream> (m_Owner->GetService (), *this); 2254 std::unique_lock<std::mutex> l(m_StreamsMutex); 2255 m_Streams.emplace (s->GetRecvStreamID (), s); 2256 m_IncomingStreams.emplace (receiveStreamID, s); 2257 return s; 2258 } 2259 2260 void StreamingDestination::DeleteStream (std::shared_ptr<Stream> stream) 2261 { 2262 if (stream) 2263 { 2264 std::unique_lock<std::mutex> l(m_StreamsMutex); 2265 m_Streams.erase (stream->GetRecvStreamID ()); 2266 if (stream->IsIncoming ()) 2267 m_IncomingStreams.erase (stream->GetSendStreamID ()); 2268 if (m_LastStream == stream) m_LastStream = nullptr; 2269 } 2270 auto ts = i2p::util::GetSecondsSinceEpoch (); 2271 if (m_Streams.empty () || ts > m_LastCleanupTime + STREAMING_DESTINATION_POOLS_CLEANUP_INTERVAL) 2272 { 2273 m_PacketsPool.CleanUp (); 2274 m_I2NPMsgsPool.CleanUp (); 2275 if (!m_NumIncomingConnectionsPerSecond.empty ()) 2276 { 2277 for (auto it = m_NumIncomingConnectionsPerSecond.begin (); it != m_NumIncomingConnectionsPerSecond.end ();) 2278 { 2279 if (it->second.empty () || it->second.back () + 60 < ts) // newest is too old 2280 it = m_NumIncomingConnectionsPerSecond.erase (it); 2281 else 2282 it++; 2283 } 2284 } 2285 m_LastCleanupTime = ts; 2286 } 2287 } 2288 2289 bool StreamingDestination::DeleteStream (uint32_t recvStreamID) 2290 { 2291 auto it = m_Streams.find (recvStreamID); 2292 if (it == m_Streams.end ()) 2293 return false; 2294 auto s = it->second; 2295 boost::asio::post (m_Owner->GetService (), [this, s] () 2296 { 2297 s->Close (); // try to send FIN 2298 s->Terminate (false); 2299 DeleteStream (s); 2300 }); 2301 return true; 2302 } 2303 2304 void StreamingDestination::SetAcceptor (const Acceptor& acceptor) 2305 { 2306 m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet 2307 auto s = shared_from_this (); 2308 boost::asio::post (m_Owner->GetService (), [s](void) 2309 { 2310 // take care about incoming queue 2311 for (auto& it: s->m_PendingIncomingStreams) 2312 if (it->GetStatus () == eStreamStatusOpen) // still open? 2313 s->m_Acceptor (it); 2314 s->m_PendingIncomingStreams.clear (); 2315 s->m_PendingIncomingTimer.cancel (); 2316 }); 2317 } 2318 2319 void StreamingDestination::ResetAcceptor () 2320 { 2321 if (m_Acceptor) m_Acceptor (nullptr); 2322 m_Acceptor = nullptr; 2323 } 2324 2325 void StreamingDestination::SetPongHandler (const PongHandler& handler) 2326 { 2327 m_PongHandler = handler; 2328 } 2329 2330 void StreamingDestination::ResetPongHandler () 2331 { 2332 m_PongHandler = nullptr; 2333 } 2334 2335 void StreamingDestination::AcceptOnce (const Acceptor& acceptor) 2336 { 2337 boost::asio::post (m_Owner->GetService (), [acceptor, this](void) 2338 { 2339 if (!m_PendingIncomingStreams.empty ()) 2340 { 2341 acceptor (m_PendingIncomingStreams.front ()); 2342 m_PendingIncomingStreams.pop_front (); 2343 if (m_PendingIncomingStreams.empty ()) 2344 m_PendingIncomingTimer.cancel (); 2345 } 2346 else // we must save old acceptor and set it back 2347 { 2348 m_Acceptor = std::bind (&StreamingDestination::AcceptOnceAcceptor, this, 2349 std::placeholders::_1, acceptor, m_Acceptor); 2350 } 2351 }); 2352 } 2353 2354 void StreamingDestination::AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev) 2355 { 2356 m_Acceptor = prev; 2357 acceptor (stream); 2358 } 2359 2360 std::shared_ptr<Stream> StreamingDestination::AcceptStream (int timeout) 2361 { 2362 std::shared_ptr<i2p::stream::Stream> stream; 2363 std::condition_variable streamAccept; 2364 std::mutex streamAcceptMutex; 2365 std::unique_lock<std::mutex> l(streamAcceptMutex); 2366 AcceptOnce ( 2367 [&streamAccept, &streamAcceptMutex, &stream](std::shared_ptr<i2p::stream::Stream> s) 2368 { 2369 stream = s; 2370 std::unique_lock<std::mutex> l(streamAcceptMutex); 2371 streamAccept.notify_all (); 2372 }); 2373 if (timeout) 2374 streamAccept.wait_for (l, std::chrono::seconds (timeout)); 2375 else 2376 streamAccept.wait (l); 2377 return stream; 2378 } 2379 2380 void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode) 2381 { 2382 if (ecode != boost::asio::error::operation_aborted) 2383 { 2384 LogPrint (eLogWarning, "Streaming: Pending incoming timeout expired"); 2385 for (auto& it: m_PendingIncomingStreams) 2386 it->Close (); 2387 m_PendingIncomingStreams.clear (); 2388 } 2389 } 2390 2391 void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len, 2392 i2p::garlic::ECIESX25519AEADRatchetSession * from) 2393 { 2394 // unzip it 2395 Packet * uncompressed = NewPacket (); 2396 uncompressed->offset = 0; 2397 uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE); 2398 if (uncompressed->len) 2399 { 2400 uncompressed->from = from; 2401 HandleNextPacket (uncompressed); 2402 } 2403 else 2404 DeletePacket (uncompressed); 2405 } 2406 2407 std::shared_ptr<I2NPMessage> StreamingDestination::CreateDataMessage ( 2408 const uint8_t * payload, size_t len, uint16_t toPort, bool checksum, bool gzip) 2409 { 2410 size_t size; 2411 auto msg = (len <= STREAMING_MTU_RATCHETS) ? m_I2NPMsgsPool.AcquireShared () : NewI2NPMessage (); 2412 uint8_t * buf = msg->GetPayload (); 2413 buf += 4; // reserve for lengthlength 2414 msg->len += 4; 2415 2416 if (m_Gzip || gzip) 2417 size = m_Deflator.Deflate (payload, len, buf, msg->maxLen - msg->len); 2418 else 2419 size = i2p::data::GzipNoCompression (payload, len, buf, msg->maxLen - msg->len); 2420 2421 if (size) 2422 { 2423 htobe32buf (msg->GetPayload (), size); // length 2424 htobe16buf (buf + 4, m_LocalPort); // source port 2425 htobe16buf (buf + 6, toPort); // destination port 2426 buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol 2427 msg->len += size; 2428 msg->FillI2NPMessageHeader (eI2NPData, 0, checksum); 2429 } 2430 else 2431 msg = nullptr; 2432 return msg; 2433 } 2434 2435 uint32_t StreamingDestination::GetRandom () 2436 { 2437 return m_Owner ? m_Owner->GetRng ()() : rand (); 2438 } 2439 2440 void StreamingDestination::CleanupExpiredNumConnectionsPerSecond (std::list<uint64_t>& numConnectionsList, uint64_t ts) 2441 { 2442 if (numConnectionsList.empty ()) return; 2443 auto it = numConnectionsList.begin (); 2444 while (it != numConnectionsList.end ()) 2445 { 2446 if (*it + 60 >= ts) break; // 1 minute 2447 it++; 2448 } 2449 numConnectionsList.erase (numConnectionsList.begin (), it); 2450 } 2451 } 2452 }