/ libi2pd / Streaming.cpp
Streaming.cpp
   1  /*
   2  * Copyright (c) 2013-2026, The PurpleI2P Project
   3  *
   4  * This file is part of Purple i2pd project and licensed under BSD3
   5  *
   6  * See full license text in LICENSE file at top of project tree
   7  */
   8  
   9  #include "Crypto.h"
  10  #include "Log.h"
  11  #include "RouterInfo.h"
  12  #include "RouterContext.h"
  13  #include "Tunnel.h"
  14  #include "Timestamp.h"
  15  #include "Destination.h"
  16  #include "Streaming.h"
  17  
  18  namespace i2p
  19  {
  20  namespace stream
  21  {
  22  	void SendBufferQueue::Add (std::shared_ptr<SendBuffer>&& buf)
  23  	{
  24  		if (buf)
  25  		{
  26  			m_Size += buf->len;
  27  			m_Buffers.push_back (std::move (buf));
  28  		}
  29  	}
  30  
  31  	size_t SendBufferQueue::Get (uint8_t * buf, size_t len)
  32  	{
  33  		if (!m_Size) return 0;
  34  		size_t offset = 0;
  35  		if (len >= m_Size)
  36  		{
  37  			for (auto& it: m_Buffers)
  38  			{
  39  				auto rem = it->GetRemainingSize ();
  40  				memcpy (buf + offset, it->GetRemaningBuffer (), rem);
  41  				offset += rem;
  42  			}
  43  			m_Buffers.clear ();
  44  			m_Size = 0;
  45  			return offset;
  46  		}
  47  		else
  48  		{
  49  			while (!m_Buffers.empty () && offset < len)
  50  			{
  51  				auto nextBuffer = m_Buffers.front ();
  52  				auto rem = nextBuffer->GetRemainingSize ();
  53  				if (offset + rem <= len)
  54  				{
  55  					// whole buffer
  56  					memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem);
  57  					offset += rem;
  58  					m_Buffers.pop_front (); // delete it
  59  				}
  60  				else
  61  				{
  62  					// partially
  63  					rem = len - offset;
  64  					memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem);
  65  					nextBuffer->offset += rem;
  66  					offset = len; // break
  67  				}
  68  			}
  69  			m_Size -= offset;
  70  		}
  71  		return offset;
  72  	}
  73  
  74  	void SendBufferQueue::CleanUp ()
  75  	{
  76  		if (!m_Buffers.empty ())
  77  		{
  78  			for (auto& it: m_Buffers)
  79  				it->Cancel ();
  80  			m_Buffers.clear ();
  81  			m_Size = 0;
  82  		}
  83  	}
  84  
  85  	Stream::Stream (boost::asio::io_context& service, StreamingDestination& local,
  86  		std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service),
  87  		m_SendStreamID (0), m_SequenceNumber (0), m_DropWindowDelaySequenceNumber (INITIAL_WINDOW_SIZE),
  88  		m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1),
  89  		m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed
  90  		m_Status (eStreamStatusNew), m_IsIncoming (false), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false),
  91  		m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true),
  92  		m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false),
  93  		m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false),
  94  		m_IsBufferEmpty (false), m_IsJavaClient (false), m_DontSign (local.GetOwner ()->IsStreamingDontSign ()),
  95  		m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service),
  96  		m_SendTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0),
  97  		m_NumReceivedBytes (0), m_Port (port), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT),
  98  		m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT), m_WindowSize (INITIAL_WINDOW_SIZE),
  99  		m_MaxWindowSize (local.GetOwner ()->GetStreamingMaxWindowSize ()), m_LastWindowDropSize (0),
 100  		m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO),
 101  		m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()), m_PrevRTTSample (INITIAL_RTT),
 102  		m_Jitter (0), m_MinPacingTime (0), m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0),
 103  		m_LastSendTime (0), m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()),
 104  		m_RemoteLeaseChangeTime (0), m_LastWindowIncTime (0), m_LastACKRequestTime (0), m_LastACKSendTime (0),
 105  		m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed
 106  		m_MaxNumResendAttempts (local.GetOwner ()->GetStreamingMaxResends ()),
 107  		m_NumResendAttempts (0), m_NumPacketsToSend (0), m_JitterAccum (0), m_JitterDiv (1), m_MTU (STREAMING_MTU)
 108  	{
 109  		RAND_bytes ((uint8_t *)&m_RecvStreamID, 4);
 110  		m_RemoteIdentity = remote->GetIdentity ();
 111  		auto outboundSpeed = local.GetOwner ()->GetStreamingOutboundSpeed ();
 112  		if (outboundSpeed)
 113  			m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed;
 114  
 115  		auto inboundSpeed = local.GetOwner ()->GetStreamingInboundSpeed (); // for limit inbound speed
 116  		if (inboundSpeed)
 117  			m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed;
 118  	}
 119  
 120  	Stream::Stream (boost::asio::io_context& service, StreamingDestination& local):
 121  		m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_DropWindowDelaySequenceNumber (INITIAL_WINDOW_SIZE),
 122  		m_TunnelsChangeSequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_PreviousReceivedSequenceNumber (-1),
 123  		m_LastConfirmedReceivedSequenceNumber (0), // for limit inbound speed
 124  		m_Status (eStreamStatusNew), m_IsIncoming (true), m_IsAckSendScheduled (false), m_IsNAcked (false), m_IsFirstACK (false),
 125  		m_IsResendNeeded (false), m_IsFirstRttSample (false), m_IsSendTime (true),
 126  		m_IsWinDropped (true), m_IsChoking2 (false), m_IsClientChoked (false), m_IsClientChoked2 (false),
 127  		m_IsTimeOutResend (false), m_IsImmediateAckRequested (false), m_IsRemoteLeaseChangeInProgress (false),
 128  		m_IsBufferEmpty (false), m_IsJavaClient (false), m_DontSign (local.GetOwner ()->IsStreamingDontSign ()),
 129  		m_LocalDestination (local),m_ReceiveTimer (m_Service), m_SendTimer (m_Service),
 130  		m_ResendTimer (m_Service), m_AckSendTimer (m_Service),m_NumSentBytes (0), m_NumReceivedBytes (0),
 131  		m_Port (0), m_RTT (INITIAL_RTT), m_MinRTT (INITIAL_RTT), m_SlowRTT (INITIAL_RTT), m_FastRTT (INITIAL_RTT),
 132  		m_WindowSize (INITIAL_WINDOW_SIZE), m_MaxWindowSize (local.GetOwner ()->GetStreamingMaxWindowSize ()),
 133  		m_LastWindowDropSize  (0), m_WindowDropTargetSize (0), m_WindowIncCounter (0), m_RTO (INITIAL_RTO),
 134  		m_AckDelay (local.GetOwner ()->GetStreamingAckDelay ()),m_PrevRTTSample (INITIAL_RTT), m_Jitter (0),
 135  		m_MinPacingTime (0), m_PacingTime (INITIAL_PACING_TIME), m_PacingTimeRem (0), m_LastSendTime (0),
 136  		m_LastACKRecieveTime (0), m_ACKRecieveInterval (local.GetOwner ()->GetStreamingAckDelay ()),
 137  		m_RemoteLeaseChangeTime (0), m_LastWindowIncTime (0), m_LastACKRequestTime (0),
 138  		m_LastACKSendTime (0), m_PacketACKInterval (1), m_PacketACKIntervalRem (0), // for limit inbound speed
 139  		m_MaxNumResendAttempts (local.GetOwner ()->GetStreamingMaxResends ()),
 140  		m_NumResendAttempts (0), m_NumPacketsToSend (0), m_JitterAccum (0), m_JitterDiv (1), m_MTU (STREAMING_MTU)
 141  	{
 142  		RAND_bytes ((uint8_t *)&m_RecvStreamID, 4);
 143  		auto outboundSpeed = local.GetOwner ()->GetStreamingOutboundSpeed ();
 144  		if (outboundSpeed)
 145  			m_MinPacingTime = (1000000LL*STREAMING_MTU)/outboundSpeed;
 146  
 147  		auto inboundSpeed = local.GetOwner ()->GetStreamingInboundSpeed (); // for limit inbound speed
 148  		if (inboundSpeed)
 149  			m_PacketACKInterval = (1000000LL*STREAMING_MTU)/inboundSpeed;
 150  	}
 151  
 152  	Stream::~Stream ()
 153  	{
 154  		CleanUp ();
 155  		LogPrint (eLogDebug, "Streaming: Stream deleted");
 156  	}
 157  
 158  	void Stream::Terminate (bool deleteFromDestination) // should be called from StreamingDestination::Stop only
 159  	{
 160  		m_Status = eStreamStatusTerminated;
 161  		m_AckSendTimer.cancel ();
 162  		m_ReceiveTimer.cancel ();
 163  		m_ResendTimer.cancel ();
 164  		m_SendTimer.cancel ();
 165  		//CleanUp (); /* Need to recheck - broke working on windows */
 166  		if (deleteFromDestination)
 167  			m_LocalDestination.DeleteStream (shared_from_this ());
 168  	}
 169  
 170  	void Stream::CleanUp ()
 171  	{
 172  		if (m_RoutingSession && !m_SentPackets.empty ()) // free up space in shared window
 173  		{
 174  			int numPackets = m_SentPackets.size ();
 175  			int numSentPackets = m_RoutingSession->NumSentPackets ();
 176  			numSentPackets -= numPackets;
 177  			if (numSentPackets < 0) numSentPackets = 0;
 178  			m_RoutingSession->SetNumSentPackets (numSentPackets);
 179  		}
 180  
 181  		m_SendBuffer.CleanUp ();
 182  		while (!m_ReceiveQueue.empty ())
 183  		{
 184  			auto packet = m_ReceiveQueue.front ();
 185  			m_ReceiveQueue.pop ();
 186  			m_LocalDestination.DeletePacket (packet);
 187  		}
 188  
 189  		m_NACKedPackets.clear ();
 190  
 191  		for (auto it: m_SentPackets)
 192  			m_LocalDestination.DeletePacket (it);
 193  		m_SentPackets.clear ();
 194  
 195  		for (auto it: m_SavedPackets)
 196  			m_LocalDestination.DeletePacket (it);
 197  		m_SavedPackets.clear ();
 198  	}
 199  
 200  	void Stream::HandleNextPacket (Packet * packet)
 201  	{
 202  		if (m_Status == eStreamStatusTerminated)
 203  		{
 204  			m_LocalDestination.DeletePacket (packet);
 205  			return;
 206  		}
 207  		m_NumReceivedBytes += packet->GetLength ();
 208  		if (!m_SendStreamID)
 209  		{
 210  			m_SendStreamID = packet->GetReceiveStreamID ();
 211  			if (!m_RemoteIdentity && !packet->from && packet->GetNACKCount () == 8 && // first incoming packet
 212  			    memcmp (packet->GetNACKs (), m_LocalDestination.GetOwner ()->GetIdentHash (), 32))
 213  			{
 214  				LogPrint (eLogWarning, "Streaming: Destination mismatch for ", m_LocalDestination.GetOwner ()->GetIdentHash ().ToBase32 ());
 215  				m_LocalDestination.DeletePacket (packet);
 216  				return;
 217  			}
 218  		}
 219  
 220  		if (!packet->IsNoAck ()) // ack received
 221  			ProcessAck (packet);
 222  
 223  		int32_t receivedSeqn = packet->GetSeqn ();
 224  		if (!receivedSeqn && m_LastReceivedSequenceNumber >= 0)
 225  		{
 226  			uint16_t flags = packet->GetFlags ();
 227  			if (flags)
 228  				// plain ack with options
 229  				ProcessOptions (flags, packet);
 230  			else
 231  				// plain ack
 232  				{
 233  					LogPrint (eLogDebug, "Streaming: Plain ACK received");
 234  					if (m_IsImmediateAckRequested)
 235  					{
 236  						auto ts = i2p::util::GetMillisecondsSinceEpoch ();
 237  						if (m_IsFirstRttSample)
 238  						{
 239  							m_RTT = ts - m_LastSendTime;
 240  							m_IsFirstRttSample = false;
 241  						}
 242  						else
 243  							m_RTT = (m_RTT + (ts - m_LastSendTime)) / 2;
 244  						m_IsImmediateAckRequested = false;
 245  					}
 246  				}
 247  			m_LocalDestination.DeletePacket (packet);
 248  			return;
 249  		}
 250  
 251  		LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID);
 252  		if (receivedSeqn == m_LastReceivedSequenceNumber + 1)
 253  		{
 254  			// we have received next in sequence message
 255  			ProcessPacket (packet);
 256  			if (m_Status == eStreamStatusTerminated) return;
 257  
 258  			// we should also try stored messages if any
 259  			for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();)
 260  			{
 261  				if ((*it)->GetSeqn () == (uint32_t)(m_LastReceivedSequenceNumber + 1))
 262  				{
 263  					Packet * savedPacket = *it;
 264  					m_SavedPackets.erase (it++);
 265  
 266  					ProcessPacket (savedPacket);
 267  					if (m_Status == eStreamStatusTerminated) return;
 268  				}
 269  				else
 270  					break;
 271  			}
 272  
 273  			// schedule ack for last message
 274  			if (m_Status == eStreamStatusOpen)
 275  			{
 276  				if (!m_IsAckSendScheduled)
 277  				{
 278  					auto ackTimeout = m_RTT/10;
 279  					if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
 280  					ScheduleAck (ackTimeout);
 281  				}
 282  			}
 283  			else if (packet->IsSYN ())
 284  				// we have to send SYN back to incoming connection
 285  				SendBuffer (); // also sets m_IsOpen
 286  		}
 287  		else
 288  		{
 289  			if (receivedSeqn <= m_LastReceivedSequenceNumber)
 290  			{
 291  				// we have received duplicate
 292  				LogPrint (eLogInfo, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID);
 293  				if (receivedSeqn <= m_PreviousReceivedSequenceNumber || receivedSeqn == m_LastReceivedSequenceNumber)
 294   				{
 295   					m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
 296   					CancelRemoteLeaseChange ();
 297   					UpdateCurrentRemoteLease ();
 298   				}
 299   				m_PreviousReceivedSequenceNumber = receivedSeqn;
 300  				m_LocalDestination.DeletePacket (packet); // packet dropped
 301  				if (!m_IsAckSendScheduled)
 302  				{
 303  					SendQuickAck (); // resend ack for previous message again
 304  					auto ackTimeout = m_RTT/10;
 305  					if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
 306  					ScheduleAck (ackTimeout);
 307  				}
 308  			}
 309  			else
 310  			{
 311  				LogPrint (eLogInfo, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1);
 312  				if ((receivedSeqn - m_LastReceivedSequenceNumber) >= m_MaxWindowSize*3)
 313  				{
 314  					m_LocalDestination.DeletePacket (packet);
 315  					m_IsChoking2 = true;
 316  				}
 317  				else if (m_SavedPackets.empty () && (receivedSeqn - m_LastReceivedSequenceNumber) >= 256)
 318  				{
 319  					m_LocalDestination.DeletePacket (packet);
 320  					m_IsChoking2 = true;
 321  				}
 322  				else if (!m_SavedPackets.empty ())
 323  				{
 324  					uint8_t numNacks = 0;
 325  					auto lastSavedSeq = 0;
 326  					auto nextSeqn = m_LastReceivedSequenceNumber + 1;
 327  					for (auto it: m_SavedPackets)
 328  					{
 329  						auto seqn = it->GetSeqn ();
 330  						if ((int)seqn > lastSavedSeq) lastSavedSeq = seqn;
 331  						for (uint32_t i = nextSeqn; i < seqn; i++) numNacks++;
 332  						nextSeqn = seqn + 1;
 333  					}
 334  
 335  					if (numNacks + (receivedSeqn - lastSavedSeq) >= 256)
 336  					{
 337  						m_LocalDestination.DeletePacket (packet);
 338  						m_IsChoking2 = true;
 339  					}
 340  					else
 341  						// save message and wait for missing message again
 342  						SavePacket (packet);
 343  				}
 344  				else
 345  					// save message and wait for missing message again
 346  					SavePacket (packet);
 347  				if (m_LastReceivedSequenceNumber >= 0)
 348  				{
 349  					if (!m_IsAckSendScheduled)
 350  					{
 351  						// send NACKs for missing messages
 352  						SendQuickAck ();
 353  						auto ackTimeout = m_RTT/10;
 354  						if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
 355  						ScheduleAck (ackTimeout);
 356  					}
 357  				}
 358  				else
 359  					// wait for SYN
 360  					ScheduleAck (SYN_TIMEOUT);
 361  			}
 362  		}
 363  	}
 364  
 365  	void Stream::SavePacket (Packet * packet)
 366  	{
 367  		if (!m_SavedPackets.insert (packet).second)
 368  			m_LocalDestination.DeletePacket (packet);
 369  	}
 370  
 371  	void Stream::ProcessPacket (Packet * packet)
 372  	{
 373  		uint32_t receivedSeqn = packet->GetSeqn ();
 374  		uint16_t flags = packet->GetFlags ();
 375  		LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags);
 376  
 377  		if (!ProcessOptions (flags, packet))
 378  		{
 379  			m_LocalDestination.DeletePacket (packet);
 380  			Terminate ();
 381  			return;
 382  		}
 383  
 384  		packet->offset = packet->GetPayload () - packet->buf;
 385  		if (packet->GetLength () > 0)
 386  		{
 387  			m_ReceiveQueue.push (packet);
 388  			m_ReceiveTimer.cancel ();
 389  		}
 390  		else
 391  			m_LocalDestination.DeletePacket (packet);
 392  
 393  		m_LastReceivedSequenceNumber = receivedSeqn;
 394  
 395  		if (flags & PACKET_FLAG_RESET)
 396  		{
 397  			LogPrint (eLogDebug, "Streaming: closing stream sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ": reset flag received in packet #", receivedSeqn);
 398  			m_Status = eStreamStatusReset;
 399  			Close ();
 400  		}
 401  		else if (flags & PACKET_FLAG_CLOSE)
 402  		{
 403  			if (m_Status != eStreamStatusClosed)
 404  				SendClose ();
 405  			m_Status = eStreamStatusClosed;
 406  			Terminate ();
 407  		}
 408  	}
 409  
 410  	bool Stream::ProcessOptions (uint16_t flags, Packet * packet)
 411  	{
 412  		const uint8_t * optionData = packet->GetOptionData ();
 413  		size_t optionSize = packet->GetOptionSize ();
 414  		if (optionSize > packet->len)
 415  		{
 416  			LogPrint (eLogInfo, "Streaming: Invalid option size ", optionSize, " Discarded");
 417  			return false;
 418  		}
 419  		if (!flags) return true;
 420  		bool immediateAckRequested = false;
 421  		if (flags & PACKET_FLAG_DELAY_REQUESTED)
 422  		{
 423  			uint16_t delayRequested = bufbe16toh (optionData);
 424  			if (!delayRequested) // 0 requests an immediate ack
 425  				immediateAckRequested = true;
 426  			else if (!m_IsAckSendScheduled)
 427  			{
 428  				if (delayRequested < m_RTT)
 429  				{
 430  					m_IsAckSendScheduled = true;
 431  					m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(delayRequested));
 432  					m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer,
 433  						shared_from_this (), std::placeholders::_1));
 434  				}
 435  				if (delayRequested >= DELAY_CHOKING)
 436  				{
 437  					if (delayRequested == 65535)
 438  					{
 439  						m_IsClientChoked2 = true;
 440  						m_DropWindowDelaySequenceNumber = m_SequenceNumber-1;
 441  					}
 442  					else if (!m_IsClientChoked)
 443  					{
 444  						LogPrint (eLogDebug, "Streaming: Client choked, set min. window size");
 445  						if (delayRequested == DELAY_CHOKING_JAVA) // java detected
 446  						{
 447  							LogPrint (eLogDebug, "Streaming: limit window size for java client");
 448  							m_MaxWindowSize = 64;
 449  							m_IsJavaClient = true;
 450  							if (m_RoutingSession) m_RoutingSession->SetIsWithJava (true);
 451  						}
 452  						m_WindowDropTargetSize = MIN_WINDOW_SIZE;
 453  						m_LastWindowDropSize = 0;
 454  						m_WindowIncCounter = 0;
 455  						m_IsClientChoked = true;
 456  						m_IsWinDropped = false;
 457  						m_DropWindowDelaySequenceNumber = m_SequenceNumber-1;
 458  						m_IsFirstRttSample = true;
 459  						UpdatePacingTime ();
 460  					}
 461  				}
 462  			}
 463  			optionData += 2;
 464  		}
 465  
 466  		bool verified = true;
 467  		if (flags & PACKET_FLAG_FROM_INCLUDED)
 468  		{
 469  			verified = false;
 470  			if (m_RemoteLeaseSet) m_RemoteIdentity = m_RemoteLeaseSet->GetIdentity ();
 471  			if (!m_RemoteIdentity)
 472  				m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, optionSize);
 473  			if (m_RemoteIdentity->IsRSA ())
 474  			{
 475  				LogPrint (eLogInfo, "Streaming: Incoming stream from RSA destination ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " Discarded");
 476  				return false;
 477  			}
 478  			optionData += m_RemoteIdentity->GetFullLen ();
 479  			if (!m_RemoteLeaseSet)
 480  			{
 481  				LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase32 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID);
 482  				if (packet->from) // try to obtain LeaseSet if came from ratchets session
 483  					m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
 484  			}
 485  			if (packet->from && m_RemoteLeaseSet)
 486  			{
 487  				// stream came from ratchets session and static key must match one from LeaseSet
 488  				uint8_t staticKey[32];
 489  				m_RemoteLeaseSet->Encrypt (nullptr, staticKey);
 490  				if (memcmp (packet->from->GetRemoteStaticKey (), staticKey, 32))
 491  				{
 492  					LogPrint (eLogError, "Streaming: Remote LeaseSet static key mismatch for stream from ",
 493  						m_RemoteIdentity->GetIdentHash ().ToBase32 ());
 494  					return false;
 495  				}
 496  				verified = true;
 497  				if (!(flags & PACKET_FLAG_SIGNATURE_INCLUDED))
 498  					m_DontSign = true; // don't sign if the remote didn't sign
 499  			}
 500  		}
 501  
 502  		if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED)
 503  		{
 504  			uint16_t maxPacketSize = bufbe16toh (optionData);
 505  			LogPrint (eLogDebug, "Streaming: Max packet size ", maxPacketSize);
 506  			optionData += 2;
 507  		}
 508  
 509  		if (flags & (PACKET_FLAG_CLOSE | PACKET_FLAG_RESET))
 510  		{
 511  			verified = false;
 512  			if (packet->from)
 513  			{
 514  				if (!m_RemoteLeaseSet && m_RemoteIdentity)
 515  					m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
 516  				if (m_RemoteLeaseSet)
 517  				{
 518  					uint8_t staticKey[32];
 519  					m_RemoteLeaseSet->Encrypt (nullptr, staticKey);
 520  					if (memcmp (packet->from->GetRemoteStaticKey (), staticKey, 32))
 521  					{
 522  						LogPrint (eLogError, "Streaming: Remote LeaseSet static key mismatch for stream from ",
 523  							m_RemoteIdentity->GetIdentHash ().ToBase32 ());
 524  						return false;
 525  					}
 526  					verified = true;
 527  				}
 528  				else // invalid stream, safe to close
 529  					verified = true;
 530  			}
 531  		}
 532  
 533  		if (flags & PACKET_FLAG_OFFLINE_SIGNATURE)
 534  		{
 535  			if (!m_RemoteIdentity)
 536  			{
 537  				LogPrint (eLogInfo, "Streaming: offline signature without identity");
 538  				return false;
 539  			}
 540  			if (verified)
 541  			{
 542  				// skip offline signature
 543  				optionData += 4; // timestamp
 544  				uint16_t keyType = bufbe16toh (optionData); optionData += 2; // key type
 545  				std::unique_ptr<i2p::crypto::Verifier> transientVerifier (i2p::data::IdentityEx::CreateVerifier (keyType));
 546  				if (!transientVerifier)
 547  				{
 548  					LogPrint (eLogInfo, "Streaming: Unknown offline signature key type ", (int)keyType);
 549  					return false;
 550  				}
 551  				optionData += transientVerifier->GetPublicKeyLen (); // public key
 552  				optionData += m_RemoteIdentity->GetSignatureLen (); // signature
 553  			}
 554  			else
 555  			{
 556  				// if we have it in LeaseSet already we don't need to parse it again
 557  				if (m_RemoteLeaseSet) m_TransientVerifier = m_RemoteLeaseSet->GetTransientVerifier ();
 558  				if (m_TransientVerifier)
 559  				{
 560  					// skip option data
 561  					optionData += 6; // timestamp and key type
 562  					optionData += m_TransientVerifier->GetPublicKeyLen (); // public key
 563  					optionData += m_RemoteIdentity->GetSignatureLen (); // signature
 564  				}
 565  				else
 566  				{
 567  					// transient key
 568  					size_t offset = 0;
 569  					m_TransientVerifier = i2p::data::ProcessOfflineSignature (m_RemoteIdentity, optionData, optionSize - (optionData - packet->GetOptionData ()), offset);
 570  					optionData += offset;
 571  					if (!m_TransientVerifier)
 572  					{
 573  						LogPrint (eLogError, "Streaming: offline signature failed");
 574  						return false;
 575  					}
 576  				}
 577  			}
 578  		}
 579  
 580  		if (flags & PACKET_FLAG_SIGNATURE_INCLUDED)
 581  		{
 582  			auto signatureLen = m_TransientVerifier ? m_TransientVerifier->GetSignatureLen () : m_RemoteIdentity->GetSignatureLen ();
 583  			if (signatureLen > packet->GetLength ())
 584  			{
 585  				LogPrint (eLogError, "Streaming: Signature too big, ", signatureLen, " bytes");
 586  				return false;
 587  			}
 588  			if (!verified) // packet was not verified through session
 589  			{
 590  				// verify actual signature
 591  				if (signatureLen <= 256)
 592  				{
 593  					// standard
 594  					uint8_t signature[256];
 595  					memcpy (signature, optionData, signatureLen);
 596  					memset (const_cast<uint8_t *>(optionData), 0, signatureLen);
 597  					verified = m_TransientVerifier ?
 598  						m_TransientVerifier->Verify (packet->GetBuffer (), packet->GetLength (), signature) :
 599  						m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature);
 600  					if (verified)
 601  						memcpy (const_cast<uint8_t *>(optionData), signature, signatureLen);
 602  				}
 603  				else
 604  				{
 605  					// post quantum
 606  					std::vector<uint8_t> signature(signatureLen);
 607  					memcpy (signature.data (), optionData, signatureLen);
 608  					memset (const_cast<uint8_t *>(optionData), 0, signatureLen);
 609  					verified = m_TransientVerifier ?
 610  						m_TransientVerifier->Verify (packet->GetBuffer (), packet->GetLength (), signature.data ()) :
 611  						m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature.data ());
 612  				}
 613  			}
 614  			if (verified)
 615  				optionData += signatureLen;
 616  			else
 617  			{
 618  				LogPrint (eLogError, "Streaming: Signature verification failed, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID);
 619  				return false;
 620  			}
 621  		}
 622  		if (!verified)
 623  		{
 624  			LogPrint (eLogError, "Streaming: Missing signature, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID);
 625  			return false;
 626  		}
 627  		if (immediateAckRequested)
 628  		{
 629  			auto ts = i2p::util::GetMillisecondsSinceEpoch ();
 630  			if (m_LastACKSendTime != ts) // preventing multiple acks when reading m_SavedPackets
 631  			{
 632  				if (m_IsAckSendScheduled)
 633  				{
 634  					SendQuickAck ();
 635  					auto ackTimeout = m_RTT/10;
 636  					if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
 637  					ScheduleAck (ackTimeout);
 638  				}
 639  				else
 640  					SendQuickAck ();
 641  			}
 642  		}
 643  		return true;
 644  	}
 645  
 646  	void Stream::HandlePing (Packet * packet)
 647  	{
 648  		uint16_t flags = packet->GetFlags ();
 649  		if (ProcessOptions (flags, packet) && m_RemoteIdentity)
 650  		{
 651  			// send pong
 652  			Packet p;
 653  			memset (p.buf, 0, 22); // minimal header all zeroes
 654  			memcpy (p.buf + 4, packet->buf, 4); // but receiveStreamID is the sendStreamID from the ping
 655  			htobe16buf (p.buf + 18, PACKET_FLAG_ECHO); // and echo flag
 656  			auto payloadLen = int(packet->len) - (packet->GetPayload () - packet->buf);
 657  			if (payloadLen > 0)
 658  				memcpy (p.buf + 22, packet->GetPayload (), payloadLen);
 659  			else
 660  				payloadLen = 0;
 661  			p.len = payloadLen + 22;
 662  			SendPackets (std::vector<Packet *> { &p });
 663  			LogPrint (eLogDebug, "Streaming: Pong of ", p.len, " bytes sent");
 664  		}
 665  		m_LocalDestination.DeletePacket (packet);
 666  	}
 667  
 668  	void Stream::ProcessAck (Packet * packet)
 669  	{
 670  		bool acknowledged = false;
 671  		auto ts = i2p::util::GetMillisecondsSinceEpoch ();
 672  		uint32_t ackThrough = packet->GetAckThrough ();
 673  		m_NACKedPackets.clear ();
 674  		if (ackThrough > m_SequenceNumber)
 675  		{
 676  			LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber);
 677  			return;
 678  		}
 679  		int rttSample = INT_MAX;
 680  		int incCounter = 0;
 681  		int ackPacketsCounter = 0;
 682  		m_IsNAcked = false;
 683  		m_IsResendNeeded = false;
 684  		int nackCount = packet->GetNACKCount ();
 685  		for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();)
 686  		{
 687  			auto seqn = (*it)->GetSeqn ();
 688  			if (seqn <= ackThrough)
 689  			{
 690  				if (nackCount > 0)
 691  				{
 692  					bool nacked = false;
 693  					for (int i = 0; i < nackCount; i++)
 694  						if (seqn == packet->GetNACK (i))
 695  						{
 696  							m_NACKedPackets.insert (*it);
 697  							m_IsNAcked = true;
 698  							nacked = true;
 699  							break;
 700  						}
 701  					if (nacked)
 702  					{
 703  						LogPrint (eLogDebug, "Streaming: Packet ", seqn, " NACK");
 704  						++it;
 705  						continue;
 706  					}
 707  				}
 708  				auto sentPacket = *it;
 709  				int64_t rtt = (int64_t)ts - (int64_t)sentPacket->sendTime;
 710  				if (rtt < 0)
 711  					LogPrint (eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime);
 712  				if (!seqn)
 713  				{
 714  					m_IsFirstRttSample = true;
 715  					rttSample = rtt < 0 ? 1 : rtt;
 716  				}
 717  				else if (!sentPacket->resent && seqn > m_TunnelsChangeSequenceNumber && rtt >= 0)
 718  					rttSample = std::min (rttSample, (int)rtt);
 719  				LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
 720  				m_SentPackets.erase (it++);
 721  				m_LocalDestination.DeletePacket (sentPacket);
 722  				acknowledged = true;
 723  				ackPacketsCounter++;
 724  				if (m_WindowIncCounter < m_MaxWindowSize && !m_IsFirstACK && !m_IsWinDropped)
 725  					incCounter++;
 726  			}
 727  			else
 728  				break;
 729  		}
 730  		if (m_LastACKRecieveTime)
 731  		{
 732  			uint64_t interval = ts - m_LastACKRecieveTime;
 733  			if (m_ACKRecieveInterval)
 734  				m_ACKRecieveInterval = (m_ACKRecieveInterval + interval) / 2;
 735  			else
 736  				m_ACKRecieveInterval = interval;
 737  		}
 738  		m_LastACKRecieveTime = ts;
 739  		if (rttSample != INT_MAX)
 740  		{
 741  			if (m_IsFirstRttSample && !m_IsFirstACK)
 742  			{
 743  				m_RTT = rttSample;
 744  				m_MinRTT = m_RTT;
 745  				m_SlowRTT = rttSample;
 746  				m_FastRTT = rttSample;
 747  				m_PrevRTTSample = rttSample;
 748  				m_Jitter = rttSample / 5; // 20%
 749  				m_Jitter += 5; // for low-latency connections
 750  				m_JitterAccum = m_Jitter;
 751  				m_JitterDiv = 1;
 752  				m_IsFirstRttSample = false;
 753  			}
 754  			else
 755  			{
 756  				m_RTT = (m_PrevRTTSample + rttSample) / 2;
 757  			}
 758  			if (!m_IsWinDropped)
 759  			{
 760  				m_SlowRTT = SLOWRTT_EWMA_ALPHA * m_RTT + (1.0 - SLOWRTT_EWMA_ALPHA) * m_SlowRTT;
 761  				m_FastRTT = RTT_EWMA_ALPHA * m_RTT + (1.0 - RTT_EWMA_ALPHA) * m_FastRTT;
 762  				// calculate jitter
 763  				double jitter = 0;
 764  				if (rttSample > m_PrevRTTSample)
 765  					jitter = rttSample - m_PrevRTTSample;
 766  				else if (rttSample < m_PrevRTTSample)
 767  					jitter = m_PrevRTTSample - rttSample;
 768  				if (jitter)
 769  				{
 770  					jitter += 5;	// for low-latency connections
 771  					m_JitterAccum += jitter;
 772  					m_Jitter = m_JitterAccum / m_JitterDiv;
 773  					m_JitterDiv++;
 774  				}
 775  				if (m_MinRTT > m_RTT)
 776  				{
 777  					m_MinRTT = m_RTT;
 778  					m_FastRTT = m_MinRTT + m_Jitter;
 779  					m_SlowRTT = m_MinRTT + m_Jitter;
 780  				}
 781  			}
 782  			if (m_IsBufferEmpty || m_FastRTT >= m_MinRTT + m_Jitter*4 || m_RTT >= m_MinRTT + m_Jitter*4 || m_SlowRTT >= m_MinRTT + m_Jitter*4 || m_RTT > m_FastRTT)
 783  			{
 784  				incCounter = 0;
 785  				m_WindowIncCounter = 0;
 786  			}
 787  			m_WindowIncCounter = m_WindowIncCounter + incCounter;
 788  			//
 789  			// delay-based CC
 790  			if ((m_RTT > m_SlowRTT) && (m_SlowRTT >= m_FastRTT) && (m_FastRTT > m_MinRTT + m_Jitter*8) && (m_SlowRTT > m_MinRTT + m_Jitter*8) && !m_IsWinDropped && !m_IsClientChoked) // Drop window if RTT grows too fast
 791  			{
 792  				LogPrint (eLogDebug, "Streaming: Congestion detected, reduce window size");
 793  				ProcessWindowDrop ();
 794  			}
 795  			UpdatePacingTime ();
 796  			m_PrevRTTSample = rttSample;
 797  
 798  			bool wasInitial = m_RTO == INITIAL_RTO;
 799  			m_RTO = std::max (MIN_RTO, (int)(m_RTT + m_Jitter*2 + m_ACKRecieveInterval)); // TODO: implement it better
 800  
 801  			if (wasInitial)
 802  				ScheduleResend ();
 803  		}
 804  		if (m_IsClientChoked && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ()))
 805  			m_IsClientChoked = false;
 806  		if (m_IsClientChoked2 && (ackThrough >= m_DropWindowDelaySequenceNumber || m_SentPackets.empty ()))
 807  			m_IsClientChoked2 = false;
 808  		if (m_IsWinDropped && ackThrough > m_DropWindowDelaySequenceNumber)
 809  		{
 810  			m_IsFirstRttSample = true;
 811  			m_IsWinDropped = false;
 812  		}
 813  		if (m_WindowDropTargetSize && int(m_SentPackets.size ()) <= m_WindowDropTargetSize)
 814  		{
 815  			m_WindowSize = m_WindowDropTargetSize;
 816  			m_WindowDropTargetSize = 0;
 817  		}
 818  		if (acknowledged || m_IsNAcked)
 819  		{
 820  			ScheduleResend ();
 821  		}
 822  		if (m_SendBuffer.IsEmpty () && m_SentPackets.size () > 0) // tail loss
 823  		{
 824  			m_IsResendNeeded = true;
 825  			m_RTO = std::max (MIN_RTO, (int)(m_RTT * 1.5 + m_Jitter + m_ACKRecieveInterval)); // to prevent spurious retransmit
 826  		}
 827  		if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ())
 828  		{
 829  			m_ResendTimer.cancel ();
 830  			m_SendTimer.cancel ();
 831  			m_LastACKRecieveTime = 0;
 832  			m_ACKRecieveInterval = m_AckDelay;
 833  		}
 834  		if (acknowledged && m_IsFirstACK)
 835  		{
 836  			if (m_RoutingSession)
 837  				m_RoutingSession->SetSharedRoutingPath (
 838  					std::make_shared<i2p::garlic::GarlicRoutingPath> (
 839  						i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, (int)m_RTT, 0}));
 840  			m_IsFirstACK = false;
 841  		}
 842  		if (acknowledged)
 843  		{
 844  			if (m_RoutingSession)
 845  			{
 846  				int numSentPackets = m_RoutingSession->NumSentPackets ();
 847  				numSentPackets -= ackPacketsCounter;
 848  				if (numSentPackets < 0) numSentPackets = 0;
 849  				m_RoutingSession->SetNumSentPackets (numSentPackets);
 850  			}
 851  			m_NumResendAttempts = 0;
 852  			m_IsTimeOutResend = false;
 853  			SendBuffer ();
 854  		}
 855  		if (m_Status == eStreamStatusClosed)
 856  			Terminate ();
 857  		else if (m_Status == eStreamStatusClosing)
 858  			Close (); // check is all outgoing messages have been sent and we can send close
 859  	}
 860  
 861  	size_t Stream::Receive (uint8_t * buf, size_t len, int timeout)
 862  	{
 863  		if (!len) return 0;
 864  		size_t ret = 0;
 865  		volatile bool done = false;
 866  		std::condition_variable newDataReceived;
 867  		std::mutex newDataReceivedMutex;
 868  		AsyncReceive (boost::asio::buffer (buf, len),
 869  			[&ret, &done, &newDataReceived, &newDataReceivedMutex](const boost::system::error_code& ecode, std::size_t bytes_transferred)
 870  			{
 871  				if (ecode == boost::asio::error::timed_out)
 872  					ret = 0;
 873  				else
 874  					ret = bytes_transferred;
 875  				std::unique_lock<std::mutex> l(newDataReceivedMutex);
 876  				newDataReceived.notify_all ();
 877  				done = true;
 878  			},
 879  			timeout);
 880  		if (!done)
 881  		{	std::unique_lock<std::mutex> l(newDataReceivedMutex);
 882  			if (!done && newDataReceived.wait_for (l, std::chrono::seconds (timeout)) == std::cv_status::timeout)
 883  				ret = 0;
 884  		}
 885  		if (!done)
 886  		{
 887  			// make sure that AsycReceive complete
 888  			auto s = shared_from_this();
 889  			boost::asio::post (m_Service, [s]()
 890  		    {
 891  				s->m_ReceiveTimer.cancel ();
 892  			});
 893  			int i = 0;
 894  			while (!done && i < 100) // 1 sec
 895  			{
 896  				std::this_thread::sleep_for (std::chrono::milliseconds(10));
 897  				i++;
 898  			}
 899  		}
 900  		return ret;
 901  	}
 902  
 903  	size_t Stream::Send (const uint8_t * buf, size_t len)
 904  	{
 905  		AsyncSend (buf, len, nullptr);
 906  		return len;
 907  	}
 908  
 909  	void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler)
 910  	{
 911  		std::shared_ptr<i2p::stream::SendBuffer> buffer;
 912  		if (len > 0 && buf)
 913  			buffer = std::make_shared<i2p::stream::SendBuffer>(buf, len, handler);
 914  		else if (handler)
 915  			handler(boost::system::error_code ());
 916  		auto s = shared_from_this ();
 917  		boost::asio::post (m_Service, [s, buffer = std::move(buffer)]() mutable
 918  			{
 919  				if (buffer)
 920  					s->m_SendBuffer.Add (std::move(buffer));
 921  				s->SendBuffer ();
 922  			});
 923  	}
 924  
 925  	void Stream::SendBuffer ()
 926  	{
 927  		if (m_RemoteLeaseSet) // don't scheudle send for first SYN for incoming stream
 928  			ScheduleSend ();
 929  		auto ts = i2p::util::GetMillisecondsSinceEpoch ();
 930  		int numMsgs = m_WindowSize - m_SentPackets.size ();
 931  		if (numMsgs <= 0 || !m_IsSendTime) // window is full
 932  		{
 933  			m_LastSendTime = ts;
 934  			return;
 935  		}
 936  		else if (numMsgs > m_NumPacketsToSend)
 937  			numMsgs = m_NumPacketsToSend;
 938  
 939  		if (!m_RemoteLeaseSet) m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
 940  		if (m_RemoteLeaseSet)
 941  		{
 942  			if (!m_RoutingSession)
 943  				m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true, false);
 944  		}
 945  
 946  		if (m_RoutingSession)
 947  		{
 948  			m_IsJavaClient = m_RoutingSession->IsWithJava ();
 949  			if (m_IsJavaClient) m_MaxWindowSize = 64;
 950  			int numSentPackets = m_RoutingSession->NumSentPackets ();
 951  			int numPacketsToSend = m_MaxWindowSize - numSentPackets;
 952  			if (numPacketsToSend <= 0) // shared window is full
 953  			{
 954  				if (m_LastReceivedSequenceNumber <= 0 && m_SequenceNumber == 0)
 955  				{
 956  					LogPrint (eLogWarning, "Streaming: limit of unacknowledged packets has been reached, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
 957  					m_Status = eStreamStatusReset;
 958  					Close ();
 959  					return;
 960  				}
 961  				m_LastSendTime = ts;
 962  				return;
 963  			}
 964  			else if (numMsgs > numPacketsToSend)
 965  				numMsgs = numPacketsToSend;
 966  		}
 967  		bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet
 968  		std::vector<Packet *> packets;
 969  		while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0))
 970  		{
 971  			Packet * p = m_LocalDestination.NewPacket ();
 972  			uint8_t * packet = p->GetBuffer ();
 973  			// TODO: implement setters
 974  			size_t size = 0;
 975  			htobe32buf (packet + size, m_SendStreamID);
 976  			size += 4; // sendStreamID
 977  			htobe32buf (packet + size, m_RecvStreamID);
 978  			size += 4; // receiveStreamID
 979  			htobe32buf (packet + size, m_SequenceNumber++);
 980  			size += 4; // sequenceNum
 981  			if (isNoAck)
 982  				htobuf32 (packet + size, 0);
 983  			else
 984  				htobe32buf (packet + size, m_LastReceivedSequenceNumber);
 985  			size += 4; // ack Through
 986  			if (m_Status == eStreamStatusNew && !m_SendStreamID && m_RemoteIdentity)
 987  			{
 988  				// first SYN packet
 989  				if (m_DontSign)
 990  				{
 991  					// remote ident is useless without signature, don't include it
 992  					packet[size] = 0; size++; // NACK count
 993  				}
 994  				else
 995  				{
 996  					packet[size] = 8;
 997  					size++; // NACK count
 998  					memcpy (packet + size, m_RemoteIdentity->GetIdentHash (), 32);
 999  					size += 32;
1000  				}
1001  			}
1002  			else
1003  			{
1004  				packet[size] = 0;
1005  				size++; // NACK count
1006  			}
1007  			packet[size] = m_RTO/1000;
1008  			size++; // resend delay
1009  			if (m_Status == eStreamStatusNew)
1010  			{
1011  				// initial packet
1012  				m_Status = eStreamStatusOpen;
1013  				if (!m_RemoteLeaseSet) m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
1014  				if (m_RemoteLeaseSet)
1015  				{
1016  					m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true, !m_IsIncoming);
1017  					m_MTU = (m_RoutingSession && m_RoutingSession->IsRatchets ()) ? STREAMING_MTU_RATCHETS : STREAMING_MTU;
1018  				}
1019  				uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED;
1020  				if (!m_DontSign) flags |= PACKET_FLAG_SIGNATURE_INCLUDED;
1021  				if (isNoAck) flags |= PACKET_FLAG_NO_ACK;
1022  				bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature ();
1023  				if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE;
1024  				htobe16buf (packet + size, flags);
1025  				size += 2; // flags
1026  				size_t identityLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetFullLen ();
1027  				size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen ();
1028  				uint8_t * optionsSize = packet + size; // set options size later
1029  				size += 2; // options size
1030  				m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen);
1031  				size += identityLen; // from
1032  				htobe16buf (packet + size, m_MTU);
1033  				size += 2; // max packet size
1034  				if (m_DontSign)
1035  				{
1036  					htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size
1037  					size += m_SendBuffer.Get (packet + size, m_MTU); // payload
1038  				}
1039  				else
1040  				{
1041  					if (isOfflineSignature)
1042  					{
1043  						const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature ();
1044  						memcpy (packet + size, offlineSignature.data (), offlineSignature.size ());
1045  						size += offlineSignature.size (); // offline signature
1046  					}
1047  					uint8_t * signature = packet + size; // set it later
1048  					memset (signature, 0, signatureLen); // zeroes for now
1049  					size += signatureLen; // signature
1050  					htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size
1051  					size += m_SendBuffer.Get (packet + size, m_MTU); // payload
1052  					m_LocalDestination.GetOwner ()->Sign (packet, size, signature);
1053  				}
1054  			}
1055  			else
1056  			{
1057  				// follow on packet
1058  				if (!m_LastACKRequestTime || ts - m_LastACKRequestTime > m_MinRTT / 10)
1059  				{
1060  					m_LastACKRequestTime = ts;
1061  					htobe16buf (packet + size, PACKET_FLAG_DELAY_REQUESTED);
1062  					size += 2; // flags
1063  					htobe16buf (packet + size, 2); // 2 bytes delay interval
1064  					htobe16buf (packet + size + 2, 0); // set immediate ack interval
1065  					size += 2;
1066  				}
1067  				else
1068  				{
1069  					htobuf16 (packet + size, 0);
1070  					size += 2; // flags
1071  					htobuf16 (packet + size, 0); // no options
1072  				}
1073  				size += 2; // options size
1074  				size += m_SendBuffer.Get(packet + size, m_MTU); // payload
1075  			}
1076  			p->len = size;
1077  			packets.push_back (p);
1078  			numMsgs--;
1079  		}
1080  		if (m_SendBuffer.GetSize() == 0) m_IsBufferEmpty = true;
1081  		else m_IsBufferEmpty = false;
1082  		int numPackets = packets.size ();
1083  		if (packets.size () > 0)
1084  		{
1085  			if (m_SavedPackets.empty ()) // no NACKS
1086  			{
1087  				m_IsAckSendScheduled = false;
1088  				m_AckSendTimer.cancel ();
1089  			}
1090  			bool isEmpty = m_SentPackets.empty ();
1091  //			auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1092  			for (auto& it: packets)
1093  			{
1094  				it->sendTime = ts;
1095  				m_SentPackets.insert (it);
1096  			}
1097  			SendPackets (packets);
1098  			m_LastSendTime = ts;
1099  			m_IsSendTime = false;
1100  			if (m_RoutingSession)
1101  			{
1102  				int numSentPackets = m_RoutingSession->NumSentPackets ();
1103  				m_RoutingSession->SetNumSentPackets (numSentPackets + numPackets);
1104  			}
1105  			if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ())
1106  				SendClose ();
1107  			if (isEmpty)
1108  				ScheduleResend ();
1109  		}
1110  	}
1111  
1112  	void Stream::SendQuickAck ()
1113  	{
1114  		int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber;
1115  		// for limit inbound speed
1116  		auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1117  		int numPackets = 0;
1118  		bool lostPackets = false;
1119  		int64_t passedTime = m_PacketACKInterval * INITIAL_WINDOW_SIZE; // in microseconds // while m_LastACKSendTime == 0
1120  		if (m_LastACKSendTime)
1121  			passedTime = (ts - m_LastACKSendTime)*1000; // in microseconds
1122  		numPackets = (passedTime + m_PacketACKIntervalRem) / m_PacketACKInterval;
1123  		m_PacketACKIntervalRem = (passedTime + m_PacketACKIntervalRem) - (numPackets * m_PacketACKInterval);
1124  		if (m_LastConfirmedReceivedSequenceNumber + numPackets < m_LastReceivedSequenceNumber)
1125  		{
1126  			lastReceivedSeqn = m_LastConfirmedReceivedSequenceNumber + numPackets;
1127  			if (!m_IsAckSendScheduled)
1128  			{
1129  				auto ackTimeout = m_RTT/10;
1130  				if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
1131  				ScheduleAck (ackTimeout);
1132  			}
1133  		}
1134  		if (numPackets == 0) return;
1135  		// for limit inbound speed
1136  		if (!m_SavedPackets.empty ())
1137  		{
1138  			for (auto it: m_SavedPackets)
1139  			{
1140  				auto seqn = it->GetSeqn ();
1141  				// for limit inbound speed
1142  				if (m_LastConfirmedReceivedSequenceNumber + numPackets < int(seqn))
1143  				{
1144  					if (!m_IsAckSendScheduled)
1145  					{
1146  						auto ackTimeout = m_RTT/10;
1147  						if (ackTimeout > m_AckDelay) ackTimeout = m_AckDelay;
1148  						ScheduleAck (ackTimeout);
1149  					}
1150  					if (lostPackets)
1151  						break;
1152  					else
1153  						return;
1154  				}
1155  				// for limit inbound speed
1156  				if ((int)seqn > lastReceivedSeqn)
1157  				{
1158  					lastReceivedSeqn = seqn;
1159  					lostPackets = true;	// for limit inbound speed
1160  				}
1161  			}
1162  		}
1163  		if (lastReceivedSeqn < 0)
1164  		{
1165  			LogPrint (eLogError, "Streaming: No packets have been received yet");
1166  			if (m_SequenceNumber == 0)
1167  			{
1168  				if (m_NumResendAttempts > 1)
1169  				{
1170  					m_Status = eStreamStatusReset;
1171  					Close ();
1172  					return;
1173  				}
1174  				m_NumResendAttempts++;
1175  			}
1176  			return;
1177  		}
1178  
1179  		Packet p;
1180  		uint8_t * packet = p.GetBuffer ();
1181  		size_t size = 0;
1182  		htobe32buf (packet + size, m_SendStreamID);
1183  		size += 4; // sendStreamID
1184  		htobe32buf (packet + size, m_RecvStreamID);
1185  		size += 4; // receiveStreamID
1186  		htobuf32 (packet + size, 0); // this is plain Ack message
1187  		size += 4; // sequenceNum
1188  		htobe32buf (packet + size, lastReceivedSeqn);
1189  		size += 4; // ack Through
1190  		uint8_t numNacks = 0;
1191  		bool choking = m_IsChoking2;
1192  		if (lastReceivedSeqn > m_LastReceivedSequenceNumber)
1193  		{
1194  			// fill NACKs
1195  			uint8_t * nacks = packet + size + 1;
1196  			auto nextSeqn = m_LastReceivedSequenceNumber + 1;
1197  			for (auto it: m_SavedPackets)
1198  			{
1199  				auto seqn = it->GetSeqn ();
1200  				if (m_LastConfirmedReceivedSequenceNumber + numPackets < int(seqn)) // for limit inbound speed
1201  				{
1202  					htobe32buf (packet + 12, nextSeqn - 1);
1203  					break;
1204  				}
1205  				if (numNacks + (seqn - nextSeqn) >= 256)
1206  				{
1207  					LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn);
1208  					htobe32buf (packet + 12, nextSeqn - 1); // change ack Through back
1209  					choking = true;
1210  					break;
1211  				}
1212  				for (uint32_t i = nextSeqn; i < seqn; i++)
1213  				{
1214  					htobe32buf (nacks, i);
1215  					nacks += 4;
1216  					numNacks++;
1217  				}
1218  				nextSeqn = seqn + 1;
1219  			}
1220  			packet[size] = numNacks;
1221  			size++; // NACK count
1222  			size += numNacks*4; // NACKs
1223  		}
1224  		else
1225  		{
1226  			// No NACKs
1227  			packet[size] = 0;
1228  			size++; // NACK count
1229  		}
1230  		packet[size] = 0;
1231  		size++; // resend delay
1232  		bool requestImmediateAck = false;
1233  		if (!choking)
1234  			requestImmediateAck = m_LastSendTime && ts > m_LastSendTime + REQUEST_IMMEDIATE_ACK_INTERVAL &&
1235  				ts > m_LastSendTime + REQUEST_IMMEDIATE_ACK_INTERVAL + m_LocalDestination.GetRandom () % REQUEST_IMMEDIATE_ACK_INTERVAL_VARIANCE;
1236  		htobe16buf (packet + size, (choking || requestImmediateAck) ? PACKET_FLAG_DELAY_REQUESTED : 0); // no flags set or delay requested
1237  		size += 2; // flags
1238  		if (choking || requestImmediateAck)
1239  		{
1240  			htobe16buf (packet + size, 2); // 2 bytes delay interval
1241  			if (m_IsChoking2)
1242  				htobe16buf (packet + size + 2, DELAY_CHOKING_2); // set choking2
1243  			else
1244  				htobe16buf (packet + size + 2, choking ? DELAY_CHOKING : 0); // set choking or immediate ack interval
1245  			size += 2;
1246  			if (requestImmediateAck) // ack request sent
1247  			{
1248  				m_LastSendTime = ts;
1249  				m_IsImmediateAckRequested = true;
1250  			}
1251  		}
1252  		else
1253  			htobuf16 (packet + size, 0); // no options
1254  		size += 2; // options size
1255  		p.len = size;
1256  
1257  		SendPackets (std::vector<Packet *> { &p });
1258  		m_LastACKSendTime = ts; // for limit inbound speed
1259  		m_LastConfirmedReceivedSequenceNumber = lastReceivedSeqn; // for limit inbound speed
1260  		m_IsChoking2 = false;
1261  		LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs");
1262  	}
1263  
1264  	void Stream::SendPing ()
1265  	{
1266  		Packet p;
1267  		uint8_t * packet = p.GetBuffer ();
1268  		size_t size = 0;
1269  		htobe32buf (packet, m_RecvStreamID);
1270  		size += 4; // sendStreamID
1271  		memset (packet + size, 0, 14);
1272  		size += 14; // all zeroes
1273  		uint16_t flags = PACKET_FLAG_ECHO | PACKET_FLAG_FROM_INCLUDED;
1274  		if (!m_DontSign) flags |= PACKET_FLAG_SIGNATURE_INCLUDED;
1275  		bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature ();
1276  		if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE;
1277  		htobe16buf (packet + size, flags);
1278  		size += 2; // flags
1279  		size_t identityLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetFullLen ();
1280  		uint8_t * optionsSize = packet + size; // set options size later
1281  		size += 2; // options size
1282  		m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen);
1283  		size += identityLen; // from
1284  		if (m_DontSign)
1285  			htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size
1286  		else
1287  		{
1288  			if (isOfflineSignature)
1289  			{
1290  				const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature ();
1291  				memcpy (packet + size, offlineSignature.data (), offlineSignature.size ());
1292  				size += offlineSignature.size (); // offline signature
1293  			}
1294  			size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen ();
1295  			uint8_t * signature = packet + size; // set it later
1296  			memset (signature, 0, signatureLen); // zeroes for now
1297  			size += signatureLen; // signature
1298  			htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size
1299  			m_LocalDestination.GetOwner ()->Sign (packet, size, signature);
1300  		}
1301  		p.len = size;
1302  		SendPackets (std::vector<Packet *> { &p });
1303  		LogPrint (eLogDebug, "Streaming: Ping of ", p.len, " bytes sent");
1304  	}
1305  
1306  	void Stream::Close ()
1307  	{
1308  		LogPrint(eLogDebug, "Streaming: closing stream with sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ", status=", m_Status);
1309  		switch (m_Status)
1310  		{
1311  			case eStreamStatusOpen:
1312  				m_Status = eStreamStatusClosing;
1313  				Close (); // recursion
1314  				if (m_Status == eStreamStatusClosing) //still closing
1315  					LogPrint (eLogDebug, "Streaming: Trying to send stream data before closing, sSID=", m_SendStreamID);
1316  			break;
1317  			case eStreamStatusReset:
1318  				// TODO: send reset
1319  				Terminate ();
1320  			break;
1321  			case eStreamStatusClosing:
1322  				if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send
1323  				{
1324  					m_Status = eStreamStatusClosed;
1325  					SendClose();
1326  				}
1327  			break;
1328  			case eStreamStatusClosed:
1329  				// already closed
1330  				Terminate ();
1331  			break;
1332  			default:
1333  				LogPrint (eLogWarning, "Streaming: Unexpected stream status=", (int)m_Status, " for sSID=", m_SendStreamID);
1334  		};
1335  	}
1336  
1337  	void Stream::SendClose ()
1338  	{
1339  		Packet * p = m_LocalDestination.NewPacket ();
1340  		uint8_t * packet = p->GetBuffer ();
1341  		size_t size = 0;
1342  		htobe32buf (packet + size, m_SendStreamID);
1343  		size += 4; // sendStreamID
1344  		htobe32buf (packet + size, m_RecvStreamID);
1345  		size += 4; // receiveStreamID
1346  		htobe32buf (packet + size, m_SequenceNumber++);
1347  		size += 4; // sequenceNum
1348  		htobe32buf (packet + size, m_LastReceivedSequenceNumber >= 0 ? m_LastReceivedSequenceNumber : 0);
1349  		size += 4; // ack Through
1350  		packet[size] = 0;
1351  		size++; // NACK count
1352  		packet[size] = 0;
1353  		size++; // resend delay
1354  		uint16_t flags = PACKET_FLAG_CLOSE;
1355  		if (!m_DontSign) flags |= PACKET_FLAG_SIGNATURE_INCLUDED;
1356  		bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature ();
1357  		if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE;
1358  		htobe16buf (packet + size, flags);
1359  		size += 2; // flags
1360  		if (m_DontSign)
1361  		{
1362  			memset (packet + size, 0, 2); // no options
1363  			size += 2; // options size
1364  		}
1365  		else
1366  		{
1367  			if (isOfflineSignature)
1368  			{
1369  				const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature ();
1370  				memcpy (packet + size, offlineSignature.data (), offlineSignature.size ());
1371  				size += offlineSignature.size (); // offline signature
1372  			}
1373  			size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen ();
1374  			htobe16buf (packet + size, signatureLen); // signature only
1375  			size += 2; // options size
1376  			uint8_t * signature = packet + size;
1377  			memset (packet + size, 0, signatureLen);
1378  			size += signatureLen; // signature
1379  			m_LocalDestination.GetOwner ()->Sign (packet, size, signature);
1380  		}
1381  
1382  		p->len = size;
1383  		boost::asio::post (m_Service, std::bind (&Stream::SendPacket, shared_from_this (), p));
1384  		if (m_RoutingSession)
1385  		{
1386  			int numSentPackets = m_RoutingSession->NumSentPackets ();
1387  			m_RoutingSession->SetNumSentPackets (numSentPackets + 1);
1388  		}
1389  		LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID);
1390  	}
1391  
1392  	size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len)
1393  	{
1394  		size_t pos = 0;
1395  		while (pos < len && !m_ReceiveQueue.empty ())
1396  		{
1397  			Packet * packet = m_ReceiveQueue.front ();
1398  			size_t l = std::min (packet->GetLength (), len - pos);
1399  			memcpy (buf + pos, packet->GetBuffer (), l);
1400  			pos += l;
1401  			packet->offset += l;
1402  			if (!packet->GetLength ())
1403  			{
1404  				m_ReceiveQueue.pop ();
1405  				m_LocalDestination.DeletePacket (packet);
1406  			}
1407  		}
1408  		return pos;
1409  	}
1410  
1411  	bool Stream::SendPacket (Packet * packet)
1412  	{
1413  		if (packet)
1414  		{
1415  			if (m_IsAckSendScheduled)
1416  			{
1417  				m_IsAckSendScheduled = false;
1418  				m_AckSendTimer.cancel ();
1419  			}
1420  			if (!packet->sendTime) packet->sendTime = i2p::util::GetMillisecondsSinceEpoch ();
1421  			SendPackets (std::vector<Packet *> { packet });
1422  			bool isEmpty = m_SentPackets.empty ();
1423  			m_SentPackets.insert (packet);
1424  			if (isEmpty)
1425  				ScheduleResend ();
1426  			return true;
1427  		}
1428  		else
1429  			return false;
1430  	}
1431  
1432  	void Stream::SendPackets (const std::vector<Packet *>& packets)
1433  	{
1434  		if (!m_RemoteLeaseSet)
1435  		{
1436  			CancelRemoteLeaseChange ();
1437  			UpdateCurrentRemoteLease ();
1438  			if (!m_RemoteLeaseSet)
1439  			{
1440  				LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID);
1441  				return;
1442  			}
1443  		}
1444  		if (!m_RoutingSession || m_RoutingSession->IsTerminated () || !m_RoutingSession->IsReadyToSend ()) // expired and detached or new session sent
1445  		{
1446  			m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true, !m_IsIncoming || m_SequenceNumber > 1);
1447  			if (!m_RoutingSession)
1448  			{
1449  				LogPrint (eLogError, "Streaming: Can't obtain routing session, sSID=", m_SendStreamID);
1450  				Terminate ();
1451  				return;
1452  			}
1453  		}
1454  		if (!m_CurrentOutboundTunnel && m_RoutingSession) // first message to send
1455  		{
1456  			// try to get shared path first
1457  			auto routingPath = m_RoutingSession->GetSharedRoutingPath ();
1458  			if (routingPath)
1459  			{
1460  				m_CurrentOutboundTunnel = routingPath->outboundTunnel;
1461  				m_CurrentRemoteLease = routingPath->remoteLease;
1462  				m_RTT = routingPath->rtt;
1463  			}
1464  		}
1465  
1466  		auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1467  		if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate) // excluded from LeaseSet
1468  		{
1469  			CancelRemoteLeaseChange ();
1470  			UpdateCurrentRemoteLease (true);
1471  		}
1472  		if (m_RemoteLeaseChangeTime && m_IsRemoteLeaseChangeInProgress && ts > m_RemoteLeaseChangeTime + INITIAL_RTO)
1473  		{
1474  			LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size");
1475  			CancelRemoteLeaseChange ();
1476  			m_CurrentRemoteLease = m_NextRemoteLease;
1477  			ResetWindowSize ();
1478  		}
1479  		auto currentRemoteLease = m_CurrentRemoteLease;
1480  		if (!m_IsRemoteLeaseChangeInProgress && m_RemoteLeaseSet && m_CurrentRemoteLease && ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD)
1481  		{
1482  			auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (false);
1483  			if (leases.size ())
1484  			{
1485  				m_IsRemoteLeaseChangeInProgress = true;
1486  				UpdateCurrentRemoteLease (true);
1487  				m_NextRemoteLease = m_CurrentRemoteLease;
1488  			}
1489  			else
1490  				UpdateCurrentRemoteLease (true);
1491  		}
1492  		if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD)
1493  		{
1494  			bool freshTunnel = false;
1495  			if (!m_CurrentOutboundTunnel)
1496  			{
1497  				auto leaseRouter = i2p::data::netdb.FindRouter (m_CurrentRemoteLease->tunnelGateway);
1498  				m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (nullptr,
1499  					leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports);
1500  				freshTunnel = true;
1501  			}
1502  			else if (!m_CurrentOutboundTunnel->IsEstablished ())
1503  				std::tie(m_CurrentOutboundTunnel, freshTunnel) = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel);
1504  			if (!m_CurrentOutboundTunnel)
1505  			{
1506  				LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID);
1507  				m_CurrentRemoteLease = nullptr;
1508  				return;
1509  			}
1510  			if (freshTunnel)
1511  			{
1512  				LogPrint (eLogDebug, "Streaming: OutboundTunnel changed, set initial window size");
1513  				ResetWindowSize ();
1514  //				m_TunnelsChangeSequenceNumber = m_SequenceNumber; // should be determined more precisely
1515  			}
1516  
1517  			std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
1518  			for (const auto& it: packets)
1519  			{
1520  				auto msg = m_RoutingSession->WrapSingleMessage (m_LocalDestination.CreateDataMessage (
1521  					it->GetBuffer (), it->GetLength (), m_Port, !m_RoutingSession->IsRatchets (), it->IsSYN ()));
1522  				msgs.push_back (i2p::tunnel::TunnelMessageBlock
1523  					{
1524  						i2p::tunnel::eDeliveryTypeTunnel,
1525  						m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID,
1526  						msg
1527  					});
1528  				m_NumSentBytes += it->GetLength ();
1529  				if (m_IsRemoteLeaseChangeInProgress && !m_RemoteLeaseChangeTime)
1530  				{
1531  					m_RemoteLeaseChangeTime = ts;
1532  					m_CurrentRemoteLease = currentRemoteLease; // change it back before new lease is confirmed
1533  				}
1534  			}
1535  			m_CurrentOutboundTunnel->SendTunnelDataMsgs (msgs);
1536  		}
1537  		else
1538  		{
1539  			LogPrint (eLogWarning, "Streaming: Remote lease is not available, sSID=", m_SendStreamID);
1540  			if (m_RoutingSession)
1541  				m_RoutingSession->SetSharedRoutingPath (nullptr); // invalidate routing path
1542  		}
1543  	}
1544  
1545  	void Stream::SendUpdatedLeaseSet ()
1546  	{
1547  		if (m_RoutingSession && !m_RoutingSession->IsTerminated ())
1548  		{
1549  			if (m_RoutingSession->IsLeaseSetNonConfirmed ())
1550  			{
1551  				auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1552  				if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT)
1553  				{
1554  					// LeaseSet was not confirmed, should try other tunnels
1555  					LogPrint (eLogWarning, "Streaming: LeaseSet was not confirmed in ", i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit");
1556  					m_RoutingSession->SetSharedRoutingPath (nullptr);
1557  					m_CurrentOutboundTunnel = nullptr;
1558  					m_CurrentRemoteLease = nullptr;
1559  					SendQuickAck ();
1560  				}
1561  			}
1562  			else if (m_RoutingSession->IsLeaseSetUpdated ())
1563  			{
1564  				LogPrint (eLogDebug, "Streaming: sending updated LeaseSet");
1565  				SendQuickAck ();
1566  			}
1567  		}
1568  		else
1569  			SendQuickAck ();
1570  	}
1571  
1572  	void Stream::ScheduleSend ()
1573  	{
1574  		if (m_Status != eStreamStatusTerminated)
1575  		{
1576  			m_SendTimer.cancel ();
1577  			uint64_t interval = SEND_INTERVAL + m_LocalDestination.GetRandom () % SEND_INTERVAL_VARIANCE;
1578  			if (interval < m_PacingTime) interval = m_PacingTime;
1579  			m_SendTimer.expires_from_now (boost::posix_time::microseconds(interval));
1580  			m_SendTimer.async_wait (std::bind (&Stream::HandleSendTimer,
1581  				shared_from_this (), std::placeholders::_1));
1582  		}
1583  	}
1584  
1585  	void Stream::HandleSendTimer (const boost::system::error_code& ecode)
1586  	{
1587  		if (ecode != boost::asio::error::operation_aborted)
1588  		{
1589  			auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1590  			if (m_LastSendTime && ts*1000 > m_LastSendTime*1000 + m_PacingTime)
1591  			{
1592  				if (m_PacingTime)
1593  				{
1594  					auto numPackets = std::lldiv (m_PacingTimeRem + ts*1000 - m_LastSendTime*1000, m_PacingTime);
1595  					m_NumPacketsToSend = numPackets.quot;
1596  					m_PacingTimeRem = numPackets.rem;
1597  				}
1598  				else
1599  				{
1600  					LogPrint (eLogError, "Streaming: pacing time is zero");
1601  					m_NumPacketsToSend = 1; m_PacingTimeRem = 0;
1602  				}
1603  				m_IsSendTime = true;
1604  				if (m_WindowIncCounter && (m_WindowSize < m_MaxWindowSize || m_WindowDropTargetSize) && !m_SendBuffer.IsEmpty () && m_PacingTime > m_MinPacingTime)
1605  				{
1606  					float winSize = m_WindowSize;
1607  					if (m_WindowDropTargetSize)
1608  						winSize = m_WindowDropTargetSize;
1609  					float maxWinSize = m_MaxWindowSize;
1610  					if (m_LastWindowIncTime)
1611  						maxWinSize = (ts - m_LastWindowIncTime) / (m_RTT / MAX_WINDOW_SIZE_INC_PER_RTT) + winSize;
1612  					for (int i = 0; i < m_NumPacketsToSend; i++)
1613  					{
1614  						if (m_WindowIncCounter)
1615  						{
1616  							if (m_WindowDropTargetSize)
1617  							{
1618  								if (m_LastWindowDropSize && (m_LastWindowDropSize >= m_WindowDropTargetSize))
1619  									m_WindowDropTargetSize += 1 - (1 / ((m_LastWindowDropSize + PREV_SPEED_KEEP_TIME_COEFF) / m_WindowDropTargetSize)); // some magic here
1620  								else if (m_LastWindowDropSize && (m_LastWindowDropSize < m_WindowDropTargetSize))
1621  									m_WindowDropTargetSize += (m_WindowDropTargetSize - (m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowDropTargetSize; // some magic here
1622  								else
1623  									m_WindowDropTargetSize += (m_WindowDropTargetSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowDropTargetSize;
1624  								if (m_WindowDropTargetSize > m_MaxWindowSize) m_WindowDropTargetSize = m_MaxWindowSize;
1625  								m_WindowIncCounter--;
1626  								if (m_WindowDropTargetSize >= maxWinSize)
1627  								{
1628  									m_WindowDropTargetSize = maxWinSize;
1629  									break;
1630  								}
1631  							}
1632  							else
1633  							{
1634  								if (m_LastWindowDropSize && (m_LastWindowDropSize >= m_WindowSize))
1635  									m_WindowSize += 1 - (1 / ((m_LastWindowDropSize + PREV_SPEED_KEEP_TIME_COEFF) / m_WindowSize)); // some magic here
1636  								else if (m_LastWindowDropSize && (m_LastWindowDropSize < m_WindowSize))
1637  									m_WindowSize += (m_WindowSize - (m_LastWindowDropSize - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize; // some magic here
1638  								else
1639  									m_WindowSize += (m_WindowSize - (1 - PREV_SPEED_KEEP_TIME_COEFF)) / m_WindowSize;
1640  								if (m_WindowSize > m_MaxWindowSize) m_WindowSize = m_MaxWindowSize;
1641  								m_WindowIncCounter--;
1642  								if (m_WindowSize >= maxWinSize)
1643  								{
1644  									m_WindowSize = maxWinSize;
1645  									break;
1646  								}
1647  							}
1648  						}
1649  						else
1650  							break;
1651  					}
1652  					UpdatePacingTime ();
1653  				}
1654  				m_LastWindowIncTime = ts;
1655  				if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) // resend packets
1656  					ResendPacket ();
1657  				else if (m_WindowSize > int(m_SentPackets.size ())) // send packets
1658  					SendBuffer ();
1659  			}
1660  			else // pass
1661  				ScheduleSend ();
1662  		}
1663  	}
1664  
1665  	void Stream::ScheduleResend ()
1666  	{
1667  		if (m_Status != eStreamStatusTerminated)
1668  		{
1669  			m_ResendTimer.cancel ();
1670  			// check for invalid value
1671  			if (m_RTO <= 0) m_RTO = INITIAL_RTO;
1672  			m_ResendTimer.expires_from_now (boost::posix_time::milliseconds(m_RTO));
1673  			m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer,
1674  				shared_from_this (), std::placeholders::_1));
1675  		}
1676  	}
1677  
1678  	void Stream::HandleResendTimer (const boost::system::error_code& ecode)
1679  	{
1680  		if (ecode != boost::asio::error::operation_aborted)
1681  		{
1682  			m_IsSendTime = true;
1683  			if (m_RTO > INITIAL_RTO) m_RTO = INITIAL_RTO;
1684  			m_SendTimer.cancel (); // if no ack's in RTO, disable fast retransmit
1685  			m_IsTimeOutResend = true;
1686  			m_IsNAcked = false;
1687  			m_IsResendNeeded = false;
1688  			m_NumPacketsToSend = 1;
1689  			ResendPacket (); // send one packet per RTO, waiting for ack
1690  		}
1691  	}
1692  
1693  	void Stream::ResendPacket ()
1694  	{
1695  		// check for resend attempts
1696  		if (m_IsIncoming && m_SequenceNumber == 1 && m_NumResendAttempts > 0)
1697  		{
1698  			LogPrint (eLogWarning, "Streaming: SYNACK packet was not ACKed after ", m_NumResendAttempts, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
1699  			m_Status = eStreamStatusReset;
1700  			Close ();
1701  			return;
1702  		}
1703  		if (m_NumResendAttempts >= m_MaxNumResendAttempts)
1704  		{
1705  			LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", m_MaxNumResendAttempts, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
1706  			m_Status = eStreamStatusReset;
1707  			Close ();
1708  			return;
1709  		}
1710  
1711  		// collect packets to resend
1712  		auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1713  		std::vector<Packet *> packets;
1714  		if (m_IsNAcked && !m_IsClientChoked && !m_IsClientChoked2)
1715  		{
1716  			for (auto it : m_NACKedPackets)
1717  			{
1718  				if (ts >= it->sendTime + m_RTO)
1719  				{
1720  					if (ts < it->sendTime + m_RTO*3)
1721  						it->resent = true;
1722  					else
1723  						it->resent = false;
1724  					it->sendTime = ts;
1725  					packets.push_back (it);
1726  					if ((int)packets.size () >= m_NumPacketsToSend) break;
1727  				}
1728  			}
1729  		}
1730  		else
1731  		{
1732  			for (auto it : m_SentPackets)
1733  			{
1734  				if (ts >= it->sendTime + m_RTO)
1735  				{
1736  					if (ts < it->sendTime + m_RTO*3)
1737  						it->resent = true;
1738  					else
1739  						it->resent = false;
1740  					it->sendTime = ts;
1741  					packets.push_back (it);
1742  					if (m_IsClientChoked2 && it->GetSeqn () == m_DropWindowDelaySequenceNumber)
1743  						m_IsClientChoked2 = false;
1744  					if ((int)packets.size () >= m_NumPacketsToSend) break;
1745  				}
1746  			}
1747  		}
1748  
1749  		// select tunnels if necessary and send
1750  		if (packets.size () > 0 && m_IsSendTime)
1751  		{
1752  			if (m_IsNAcked) m_NumResendAttempts = 1;
1753  			else if (m_IsTimeOutResend) m_NumResendAttempts++;
1754  			if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO)
1755  			{
1756  				// loss-based CC
1757  				if (!m_IsWinDropped && LOSS_BASED_CONTROL_ENABLED && !m_IsClientChoked)
1758  				{
1759  					LogPrint (eLogDebug, "Streaming: Packet loss, reduce window size");
1760  					if (m_WindowDropTargetSize)
1761  						m_LastWindowDropSize = m_WindowDropTargetSize;
1762  					else
1763  						m_LastWindowDropSize = m_WindowSize;
1764  					m_WindowDropTargetSize = m_LastWindowDropSize * 0.5; // -50% to drain queue
1765  					if (m_WindowDropTargetSize < MIN_WINDOW_SIZE)
1766  						m_WindowDropTargetSize = MIN_WINDOW_SIZE;
1767  					m_WindowIncCounter = 0; // disable window growth
1768  					m_DropWindowDelaySequenceNumber = m_SequenceNumber + int(m_WindowDropTargetSize);
1769  					m_IsFirstACK = true; // ignore first RTT sample
1770  					m_IsWinDropped = true; // don't drop window twice
1771  					UpdatePacingTime ();
1772  				}
1773  			}
1774  			else if (m_IsTimeOutResend)
1775  			{
1776  				m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change
1777  				m_WindowDropTargetSize = INITIAL_WINDOW_SIZE;
1778  				m_LastWindowDropSize = 0;
1779  				m_WindowIncCounter = 0;
1780  				m_IsWinDropped = true;
1781  				m_IsFirstRttSample = true;
1782  				m_DropWindowDelaySequenceNumber = 0;
1783  				m_IsFirstACK = true;
1784  				m_LastACKRecieveTime = 0;
1785  				m_ACKRecieveInterval = m_AckDelay;
1786  				UpdatePacingTime ();
1787  				if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
1788  				if (m_NumResendAttempts & 1)
1789  				{
1790  					// pick another outbound tunnel
1791  					m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
1792  					LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
1793  						", another outbound tunnel has been selected for stream with sSID=", m_SendStreamID);
1794  				}
1795  				else
1796  				{
1797  					CancelRemoteLeaseChange ();
1798  					UpdateCurrentRemoteLease (); // pick another lease
1799  					LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
1800  						", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
1801  				}
1802  			}
1803  			if (m_IsTimeOutResend) ScheduleResend ();
1804  			SendPackets (packets);
1805  			m_LastSendTime = ts;
1806  			m_IsSendTime = false;
1807  		}
1808  		else if (!m_IsClientChoked && !m_IsClientChoked2)
1809  			SendBuffer ();
1810  		m_IsSendTime = false;
1811  		if (m_IsTimeOutResend) ScheduleResend (); // ^ m_IsTimeOutResend = false
1812  		else if (m_IsNAcked || m_IsResendNeeded || m_IsClientChoked || m_IsClientChoked2) ScheduleSend ();
1813  	}
1814  
1815  	void Stream::ScheduleAck (int timeout)
1816  	{
1817  		if (m_IsAckSendScheduled)
1818  			m_AckSendTimer.cancel ();
1819  		m_IsAckSendScheduled = true;
1820  		if (timeout < MIN_SEND_ACK_TIMEOUT) timeout = MIN_SEND_ACK_TIMEOUT;
1821  		m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(timeout));
1822  		m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer,
1823  			shared_from_this (), std::placeholders::_1));
1824  	}
1825  
1826  	void Stream::HandleAckSendTimer (const boost::system::error_code& ecode)
1827  	{
1828  		if (m_IsAckSendScheduled)
1829  		{
1830  			if (m_LastReceivedSequenceNumber < 0)
1831  			{
1832  				LogPrint (eLogWarning, "Streaming: SYN has not been received after ", SYN_TIMEOUT, " milliseconds after follow on, terminate rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
1833  				m_Status = eStreamStatusReset;
1834  				Close ();
1835  				return;
1836  			}
1837  			if (m_Status == eStreamStatusOpen)
1838  			{
1839  				if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ())
1840  				{
1841  					auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1842  					if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASESET_CONFIRMATION_TIMEOUT)
1843  					{
1844  						// seems something went wrong and we should re-select tunnels
1845  						m_CurrentOutboundTunnel = nullptr;
1846  						m_CurrentRemoteLease = nullptr;
1847  					}
1848  				}
1849  				if (m_LastReceivedSequenceNumber == 0 && m_SequenceNumber == 1)
1850  				{
1851  					if (m_NumResendAttempts > 1)
1852  					{
1853  						m_Status = eStreamStatusReset;
1854  						Close ();
1855  						return;
1856  					}
1857  					m_NumResendAttempts++;
1858  					ScheduleAck (INITIAL_RTO);
1859  				}
1860  				SendQuickAck ();
1861  			}
1862  			m_IsAckSendScheduled = false;
1863  		}
1864  	}
1865  
1866  	void Stream::UpdateCurrentRemoteLease (bool expired)
1867  	{
1868  		bool isLeaseChanged = true;
1869  		if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ())
1870  		{
1871  			auto remoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
1872  			if (!remoteLeaseSet)
1873  			{
1874  				LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase32 (), m_RemoteLeaseSet ? " expired" : " not found");
1875  				if (!m_IsIncoming) // outgoing
1876  				{
1877  					auto requestCallback = [s = shared_from_this ()](std::shared_ptr<i2p::data::LeaseSet> ls)
1878  					    {
1879  							if (!ls && s->m_Status == eStreamStatusOpen) // LeaseSet not found
1880  							{
1881  								// close the socket without sending FIN or RST
1882  								s->m_Status = eStreamStatusClosed;
1883  								s->AsyncClose ();
1884  							}
1885  						};
1886  
1887  					if (m_RemoteLeaseSet && m_RemoteLeaseSet->IsPublishedEncrypted ())
1888  					{
1889  						m_LocalDestination.GetOwner ()->RequestDestinationWithEncryptedLeaseSet (
1890  							std::make_shared<i2p::data::BlindedPublicKey>(m_RemoteIdentity), requestCallback);
1891  						return; // we keep m_RemoteLeaseSet for possible next request
1892  					}
1893  					else
1894  					{
1895  						m_RemoteLeaseSet = nullptr;
1896  						m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash (), requestCallback); // try to request for a next attempt
1897  					}
1898  				}
1899  				else // incoming
1900  				{
1901  					// just close the socket without sending FIN or RST
1902  					m_Status = eStreamStatusClosed;
1903  					AsyncClose ();
1904  				}
1905  			}
1906  			else
1907  			{
1908  				// LeaseSet updated
1909  				m_RemoteLeaseSet = remoteLeaseSet;
1910  				m_RemoteIdentity = m_RemoteLeaseSet->GetIdentity ();
1911  				m_TransientVerifier = m_RemoteLeaseSet->GetTransientVerifier ();
1912  			}
1913  		}
1914  		if (m_RemoteLeaseSet)
1915  		{
1916  			if (!m_RoutingSession)
1917  				m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true);
1918  			auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (false); // try without threshold first
1919  			if (leases.empty ())
1920  			{
1921  				expired = false;
1922  				// time to request
1923  				if (m_RemoteLeaseSet->IsPublishedEncrypted ())
1924  					m_LocalDestination.GetOwner ()->RequestDestinationWithEncryptedLeaseSet (
1925  						std::make_shared<i2p::data::BlindedPublicKey>(m_RemoteIdentity));
1926  				else
1927  					m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ());
1928  				leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold
1929  			}
1930  			if (!leases.empty ())
1931  			{
1932  				bool updated = false;
1933  				if (expired && m_CurrentRemoteLease)
1934  				{
1935  					for (const auto& it: leases)
1936  						if ((it->tunnelGateway == m_CurrentRemoteLease->tunnelGateway) && (it->tunnelID != m_CurrentRemoteLease->tunnelID))
1937  						{
1938  							m_CurrentRemoteLease = it;
1939  							updated = true;
1940  							break;
1941  						}
1942  				}
1943  				if (!updated)
1944  				{
1945  					uint32_t i = m_LocalDestination.GetRandom () % leases.size ();
1946  					if (m_CurrentRemoteLease && leases[i]->tunnelID == m_CurrentRemoteLease->tunnelID)
1947  					{
1948  						// make sure we don't select previous
1949  						if (leases.size () > 1)
1950  							i = (i + 1) % leases.size (); // if so, pick next
1951  						else
1952  							isLeaseChanged = false;
1953  					}
1954  					m_CurrentRemoteLease = leases[i];
1955  				}
1956  			}
1957  			else
1958  			{
1959  				LogPrint (eLogWarning, "Streaming: All remote leases are expired");
1960  				m_RemoteLeaseSet = nullptr;
1961  				m_CurrentRemoteLease = nullptr;
1962  				// we have requested expired before, no need to do it twice
1963  			}
1964  		}
1965  		else
1966  		{
1967  			LogPrint (eLogWarning, "Streaming: Remote LeaseSet not found");
1968  			m_CurrentRemoteLease = nullptr;
1969  		}
1970  		if (isLeaseChanged && !m_IsRemoteLeaseChangeInProgress)
1971  		{
1972  			LogPrint (eLogDebug, "Streaming: RemoteLease changed, set initial window size");
1973  			ResetWindowSize ();
1974  		}
1975  	}
1976  
1977  	void Stream::ResetRoutingPath ()
1978  	{
1979  		m_CurrentOutboundTunnel = nullptr;
1980  		m_CurrentRemoteLease = nullptr;
1981  		m_RTT = INITIAL_RTT;
1982  		m_RTO = INITIAL_RTO;
1983  		if (m_RoutingSession)
1984  			m_RoutingSession->SetSharedRoutingPath (nullptr); // TODO: count failures
1985  	}
1986  
1987  	void Stream::UpdatePacingTime ()
1988  	{
1989  		double rtt = m_MinRTT;
1990  		if (m_IsWinDropped)
1991  			rtt = m_SlowRTT;
1992  		if (m_WindowDropTargetSize)
1993  			m_PacingTime = std::round (rtt*1000/m_WindowDropTargetSize);
1994  		else
1995  			m_PacingTime = std::round (rtt*1000/m_WindowSize);
1996  		if (m_MinPacingTime && m_PacingTime < m_MinPacingTime)
1997  			m_PacingTime = m_MinPacingTime;
1998  	}
1999  
2000  	void Stream::ProcessWindowDrop ()
2001  	{
2002  		if (m_WindowDropTargetSize)
2003  		{
2004  			m_LastWindowDropSize = m_WindowDropTargetSize * ((m_MinRTT + m_Jitter*6) / m_SlowRTT);
2005  			m_WindowDropTargetSize = m_LastWindowDropSize * m_MinRTT / m_SlowRTT;	// we start send faster when rtt will decrease
2006  		}
2007  		else
2008  		{
2009  			m_LastWindowDropSize = m_WindowSize * ((m_MinRTT + m_Jitter*6) / m_SlowRTT);
2010  			m_WindowDropTargetSize = m_WindowSize * m_MinRTT / m_SlowRTT;	// we start send faster when rtt will decrease
2011  		}
2012  		if (m_WindowDropTargetSize < MIN_WINDOW_SIZE)
2013  			m_WindowDropTargetSize = MIN_WINDOW_SIZE;
2014  		m_WindowIncCounter = 0; // disable window growth
2015  		m_DropWindowDelaySequenceNumber = m_SequenceNumber + int(m_WindowDropTargetSize);
2016  		m_IsFirstACK = true; // ignore first RTT sample
2017  		m_IsWinDropped = true; // don't drop window twice
2018  		UpdatePacingTime ();
2019  	}
2020  
2021  	void Stream::ResetWindowSize ()
2022  	{
2023  		m_RTO = INITIAL_RTO;
2024  		if (!m_IsClientChoked)
2025  		{
2026  			if (m_WindowSize > INITIAL_WINDOW_SIZE)
2027  			{
2028  				m_WindowDropTargetSize = (float)INITIAL_WINDOW_SIZE;
2029  				m_IsWinDropped = true;
2030  			}
2031  			else
2032  				m_WindowSize = INITIAL_WINDOW_SIZE;
2033  		}
2034  		m_LastWindowDropSize = 0;
2035  		m_IsFirstRttSample = true;
2036  		m_IsFirstACK = true;
2037  		m_WindowIncCounter = 0; // disable window growth
2038  		m_DropWindowDelaySequenceNumber = m_SequenceNumber - int(m_SentPackets.size ()) + INITIAL_WINDOW_SIZE;
2039  		m_IsWinDropped = true; // don't drop window twice
2040  		UpdatePacingTime ();
2041  	}
2042  
2043  	void Stream::CancelRemoteLeaseChange ()
2044  	{
2045  		m_RemoteLeaseChangeTime = 0;
2046  		m_IsRemoteLeaseChangeInProgress = false;
2047  	}
2048  
2049  	StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip):
2050  		m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip),
2051  		m_PendingIncomingTimer (m_Owner->GetService ()),
2052  		m_LastCleanupTime (i2p::util::GetSecondsSinceEpoch ())
2053  	{
2054  	}
2055  
2056  	StreamingDestination::~StreamingDestination ()
2057  	{
2058  		for (auto& it: m_SavedPackets)
2059  		{
2060  			for (auto it1: it.second) DeletePacket (it1);
2061  			it.second.clear ();
2062  		}
2063  		m_SavedPackets.clear ();
2064  	}
2065  
2066  	void StreamingDestination::Start ()
2067  	{
2068  	}
2069  
2070  	void StreamingDestination::Stop ()
2071  	{
2072  		ResetAcceptor ();
2073  		ResetPongHandler ();
2074  		m_PendingIncomingTimer.cancel ();
2075  		m_PendingIncomingStreams.clear ();
2076  		{
2077  			std::unique_lock<std::mutex> l(m_StreamsMutex);
2078  			for (auto it: m_Streams)
2079  				it.second->Terminate (false); // we delete here
2080  			m_Streams.clear ();
2081  			m_IncomingStreams.clear ();
2082  			m_LastStream = nullptr;
2083  		}
2084  	}
2085  
2086  	void StreamingDestination::HandleNextPacket (Packet * packet)
2087  	{
2088  		uint32_t sendStreamID = packet->GetSendStreamID ();
2089  		if (sendStreamID)
2090  		{
2091  			if (!m_LastStream || sendStreamID != m_LastStream->GetRecvStreamID ())
2092  			{
2093  				auto it = m_Streams.find (sendStreamID);
2094  				if (it != m_Streams.end ())
2095  					m_LastStream = it->second;
2096  				else
2097  					m_LastStream = nullptr;
2098  			}
2099  			if (m_LastStream)
2100  				m_LastStream->HandleNextPacket (packet);
2101  			else if (packet->IsEcho () && m_Owner->IsStreamingAnswerPings ())
2102  			{
2103  				// ping
2104  				LogPrint (eLogInfo, "Streaming: Ping received sSID=", sendStreamID);
2105  				auto s = std::make_shared<Stream> (m_Owner->GetService (), *this);
2106  				s->HandlePing (packet);
2107  			}
2108  			else
2109  			{
2110  				LogPrint (eLogInfo, "Streaming: Unknown stream sSID=", sendStreamID);
2111  				DeletePacket (packet);
2112  			}
2113  		}
2114  		else
2115  		{
2116  			if (packet->IsEcho ())
2117  			{
2118  				// pong
2119  				LogPrint (eLogInfo, "Streaming: Pong received rSID=", packet->GetReceiveStreamID ());
2120  				if (m_PongHandler != nullptr)
2121  					m_PongHandler (packet->from ? packet->from->GetDestinationPtr () : nullptr);
2122  				DeletePacket (packet);
2123  				return;
2124  			}
2125  			if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream
2126  			{
2127  				uint32_t receiveStreamID = packet->GetReceiveStreamID ();
2128  				auto it1 = m_IncomingStreams.find (receiveStreamID);
2129  				if (it1 != m_IncomingStreams.end ())
2130  				{
2131  					// already pending
2132  					LogPrint(eLogInfo, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists");
2133  					it1->second->ResetRoutingPath (); // Ack was not delivered, changing path
2134  					DeletePacket (packet); // drop it, because previous should be connected
2135  					return;
2136  				}
2137  				if (m_Owner->GetStreamingMaxConcurrentStreams () > 0 && (int)m_Streams.size () > m_Owner->GetStreamingMaxConcurrentStreams ())
2138  				{
2139  					LogPrint(eLogInfo, "Streaming: Number of streams exceeds ", m_Owner->GetStreamingMaxConcurrentStreams ());
2140  					DeletePacket (packet);
2141  					return;
2142  				}
2143                  if (m_Owner->GetStreamingMaxConnsPerMinute () > 0 && packet->from)
2144                  {
2145                      auto ts = i2p::util::GetSecondsSinceEpoch ();
2146                      auto& numConnectionsList = m_NumIncomingConnectionsPerSecond[packet->from->GetRemoteStaticKey ()]; // find or create new
2147                      CleanupExpiredNumConnectionsPerSecond (numConnectionsList, ts);
2148                      if ((int)numConnectionsList.size () >= m_Owner->GetStreamingMaxConnsPerMinute ())
2149                      {
2150                          LogPrint (eLogInfo, "Streaming: Number of incoming streams exceeds ", m_Owner->GetStreamingMaxConnsPerMinute (), " streams per minute");
2151                          DeletePacket (packet);
2152                          return;
2153                      }
2154                      else
2155                          numConnectionsList.emplace_back (ts);
2156                  }
2157  				auto incomingStream = CreateNewIncomingStream (receiveStreamID);
2158  				incomingStream->HandleNextPacket (packet); // SYN
2159  				if (!incomingStream->GetRemoteLeaseSet ())
2160  				{
2161  					LogPrint (eLogWarning, "Streaming: No remote LeaseSet for incoming stream. Terminated");
2162  					incomingStream->Terminate (); // can't send FIN anyway
2163  					return;
2164  				}
2165  
2166  				// handle saved packets if any
2167  				{
2168  					auto it = m_SavedPackets.find (receiveStreamID);
2169  					if (it != m_SavedPackets.end ())
2170  					{
2171  						LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for rSID=", receiveStreamID);
2172  						for (auto it1: it->second)
2173  							incomingStream->HandleNextPacket (it1);
2174  						m_SavedPackets.erase (it);
2175  					}
2176  				}
2177  				// accept
2178  				if (m_Acceptor != nullptr)
2179  					m_Acceptor (incomingStream);
2180  				else
2181  				{
2182  					LogPrint (eLogWarning, "Streaming: Acceptor for incoming stream is not set");
2183  					if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG)
2184  					{
2185  						m_PendingIncomingStreams.push_back (incomingStream);
2186  						m_PendingIncomingTimer.cancel ();
2187  						m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT));
2188  						m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer,
2189  							shared_from_this (), std::placeholders::_1));
2190  						LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID);
2191  					}
2192  					else
2193  					{
2194  						LogPrint (eLogWarning, "Streaming: Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG);
2195  						incomingStream->Close ();
2196  					}
2197  				}
2198  			}
2199  			else // follow on packet without SYN
2200  			{
2201  				uint32_t receiveStreamID = packet->GetReceiveStreamID ();
2202  				auto it1 = m_IncomingStreams.find (receiveStreamID);
2203  				if (it1 != m_IncomingStreams.end ())
2204  				{
2205  					// found
2206  					it1->second->HandleNextPacket (packet);
2207  					return;
2208  				}
2209  				// save follow on packet
2210  				auto it = m_SavedPackets.find (receiveStreamID);
2211  				if (it != m_SavedPackets.end ())
2212  					it->second.push_back (packet);
2213  				else
2214  				{
2215  					m_SavedPackets[receiveStreamID] = std::list<Packet *>{ packet };
2216  					auto timer = std::make_shared<boost::asio::deadline_timer> (m_Owner->GetService ());
2217  					timer->expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT));
2218  					auto s = shared_from_this ();
2219  					timer->async_wait ([s,timer,receiveStreamID](const boost::system::error_code& ecode)
2220  					{
2221  						if (ecode != boost::asio::error::operation_aborted)
2222  						{
2223  							auto it = s->m_SavedPackets.find (receiveStreamID);
2224  							if (it != s->m_SavedPackets.end ())
2225  							{
2226  								for (auto it1: it->second) s->DeletePacket (it1);
2227  								it->second.clear ();
2228  								s->m_SavedPackets.erase (it);
2229  							}
2230  						}
2231  					});
2232  				}
2233  			}
2234  		}
2235  	}
2236  
2237  	std::shared_ptr<Stream> StreamingDestination::CreateNewOutgoingStream (std::shared_ptr<const i2p::data::LeaseSet> remote, int port)
2238  	{
2239  		auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, port);
2240  		std::unique_lock<std::mutex> l(m_StreamsMutex);
2241  		m_Streams.emplace (s->GetRecvStreamID (), s);
2242  		return s;
2243  	}
2244  
2245  	void StreamingDestination::SendPing (std::shared_ptr<const i2p::data::LeaseSet> remote)
2246  	{
2247  		auto s = std::make_shared<Stream> (m_Owner->GetService (), *this, remote, 0);
2248  		s->SendPing ();
2249  	}
2250  
2251  	std::shared_ptr<Stream> StreamingDestination::CreateNewIncomingStream (uint32_t receiveStreamID)
2252  	{
2253  		auto s = std::make_shared<Stream> (m_Owner->GetService (), *this);
2254  		std::unique_lock<std::mutex> l(m_StreamsMutex);
2255  		m_Streams.emplace (s->GetRecvStreamID (), s);
2256  		m_IncomingStreams.emplace (receiveStreamID, s);
2257  		return s;
2258  	}
2259  
2260  	void StreamingDestination::DeleteStream (std::shared_ptr<Stream> stream)
2261  	{
2262  		if (stream)
2263  		{
2264  			std::unique_lock<std::mutex> l(m_StreamsMutex);
2265  			m_Streams.erase (stream->GetRecvStreamID ());
2266  			if (stream->IsIncoming ())
2267  				m_IncomingStreams.erase (stream->GetSendStreamID ());
2268  			if (m_LastStream == stream) m_LastStream = nullptr;
2269  		}
2270  		auto ts = i2p::util::GetSecondsSinceEpoch ();
2271  		if (m_Streams.empty () || ts > m_LastCleanupTime + STREAMING_DESTINATION_POOLS_CLEANUP_INTERVAL)
2272  		{
2273  			m_PacketsPool.CleanUp ();
2274  			m_I2NPMsgsPool.CleanUp ();
2275  			if (!m_NumIncomingConnectionsPerSecond.empty ())
2276  			{
2277                  for (auto it = m_NumIncomingConnectionsPerSecond.begin (); it != m_NumIncomingConnectionsPerSecond.end ();)
2278                  {
2279                      if (it->second.empty () || it->second.back () + 60 < ts) // newest is too old
2280                          it = m_NumIncomingConnectionsPerSecond.erase (it);
2281                      else
2282                          it++;
2283                  }
2284  			}
2285  			m_LastCleanupTime = ts;
2286  		}
2287  	}
2288  
2289  	bool StreamingDestination::DeleteStream (uint32_t recvStreamID)
2290  	{
2291  		auto it = m_Streams.find (recvStreamID);
2292  		if (it == m_Streams.end ())
2293  			return false;
2294  		auto s = it->second;
2295  		boost::asio::post (m_Owner->GetService (), [this, s] ()
2296  			{
2297  				s->Close (); // try to send FIN
2298  				s->Terminate (false);
2299  				DeleteStream (s);
2300  			});
2301  		return true;
2302  	}
2303  
2304  	void StreamingDestination::SetAcceptor (const Acceptor& acceptor)
2305  	{
2306  		m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet
2307  		auto s = shared_from_this ();
2308  		boost::asio::post (m_Owner->GetService (), [s](void)
2309  			{
2310  				// take care about incoming queue
2311  				for (auto& it: s->m_PendingIncomingStreams)
2312  					if (it->GetStatus () == eStreamStatusOpen) // still open?
2313  						s->m_Acceptor (it);
2314  				s->m_PendingIncomingStreams.clear ();
2315  				s->m_PendingIncomingTimer.cancel ();
2316  			});
2317  	}
2318  
2319  	void StreamingDestination::ResetAcceptor ()
2320  	{
2321  		if (m_Acceptor) m_Acceptor (nullptr);
2322  		m_Acceptor = nullptr;
2323  	}
2324  
2325  	void StreamingDestination::SetPongHandler (const PongHandler& handler)
2326  	{
2327  		m_PongHandler = handler;
2328  	}
2329  
2330  	void StreamingDestination::ResetPongHandler ()
2331  	{
2332  		m_PongHandler = nullptr;
2333  	}
2334  
2335  	void StreamingDestination::AcceptOnce (const Acceptor& acceptor)
2336  	{
2337  		boost::asio::post (m_Owner->GetService (), [acceptor, this](void)
2338  			{
2339  				if (!m_PendingIncomingStreams.empty ())
2340  				{
2341  					acceptor (m_PendingIncomingStreams.front ());
2342  					m_PendingIncomingStreams.pop_front ();
2343  					if (m_PendingIncomingStreams.empty ())
2344  						m_PendingIncomingTimer.cancel ();
2345  				}
2346  				else // we must save old acceptor and set it back
2347  				{
2348  					m_Acceptor = std::bind (&StreamingDestination::AcceptOnceAcceptor, this,
2349  						std::placeholders::_1, acceptor, m_Acceptor);
2350  				}
2351  			});
2352  	}
2353  
2354  	void StreamingDestination::AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev)
2355  	{
2356  		m_Acceptor = prev;
2357  		acceptor (stream);
2358  	}
2359  
2360  	std::shared_ptr<Stream> StreamingDestination::AcceptStream (int timeout)
2361  	{
2362  		std::shared_ptr<i2p::stream::Stream> stream;
2363  		std::condition_variable streamAccept;
2364  		std::mutex streamAcceptMutex;
2365  		std::unique_lock<std::mutex> l(streamAcceptMutex);
2366  		AcceptOnce (
2367  			[&streamAccept, &streamAcceptMutex, &stream](std::shared_ptr<i2p::stream::Stream> s)
2368  		    {
2369  				stream = s;
2370  				std::unique_lock<std::mutex> l(streamAcceptMutex);
2371  				streamAccept.notify_all ();
2372  			});
2373  		if (timeout)
2374  			streamAccept.wait_for (l, std::chrono::seconds (timeout));
2375  		else
2376  			streamAccept.wait (l);
2377  		return stream;
2378  	}
2379  
2380  	void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode)
2381  	{
2382  		if (ecode != boost::asio::error::operation_aborted)
2383  		{
2384  			LogPrint (eLogWarning, "Streaming: Pending incoming timeout expired");
2385  			for (auto& it: m_PendingIncomingStreams)
2386  				it->Close ();
2387  			m_PendingIncomingStreams.clear ();
2388  		}
2389  	}
2390  
2391  	void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len,
2392  		i2p::garlic::ECIESX25519AEADRatchetSession * from)
2393  	{
2394  		// unzip it
2395  		Packet * uncompressed = NewPacket ();
2396  		uncompressed->offset = 0;
2397  		uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE);
2398  		if (uncompressed->len)
2399  		{
2400  			uncompressed->from = from;
2401  			HandleNextPacket (uncompressed);
2402  		}
2403  		else
2404  			DeletePacket (uncompressed);
2405  	}
2406  
2407  	std::shared_ptr<I2NPMessage> StreamingDestination::CreateDataMessage (
2408  		const uint8_t * payload, size_t len, uint16_t toPort, bool checksum, bool gzip)
2409  	{
2410  		size_t size;
2411  		auto msg = (len <= STREAMING_MTU_RATCHETS) ? m_I2NPMsgsPool.AcquireShared () : NewI2NPMessage ();
2412  		uint8_t * buf = msg->GetPayload ();
2413  		buf += 4; // reserve for lengthlength
2414  		msg->len += 4;
2415  
2416  		if (m_Gzip || gzip)
2417  			size = m_Deflator.Deflate (payload, len, buf, msg->maxLen - msg->len);
2418  		else
2419  			size = i2p::data::GzipNoCompression (payload, len, buf, msg->maxLen - msg->len);
2420  
2421  		if (size)
2422  		{
2423  			htobe32buf (msg->GetPayload (), size); // length
2424  			htobe16buf (buf + 4, m_LocalPort); // source port
2425  			htobe16buf (buf + 6, toPort); // destination port
2426  			buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol
2427  			msg->len += size;
2428  			msg->FillI2NPMessageHeader (eI2NPData, 0, checksum);
2429  		}
2430  		else
2431  			msg = nullptr;
2432  		return msg;
2433  	}
2434  
2435  	uint32_t StreamingDestination::GetRandom ()
2436  	{
2437  		return m_Owner ? m_Owner->GetRng ()() : rand ();
2438  	}
2439  
2440  	void StreamingDestination::CleanupExpiredNumConnectionsPerSecond (std::list<uint64_t>& numConnectionsList, uint64_t ts)
2441  	{
2442          if (numConnectionsList.empty ()) return;
2443          auto it = numConnectionsList.begin ();
2444          while (it != numConnectionsList.end ())
2445          {
2446              if (*it + 60 >= ts) break; // 1 minute
2447              it++;
2448          }
2449          numConnectionsList.erase (numConnectionsList.begin (), it);
2450  	}
2451  }
2452  }