/ libi2pd / SSU2.cpp
SSU2.cpp
   1  /*
   2  * Copyright (c) 2022-2026, The PurpleI2P Project
   3  *
   4  * This file is part of Purple i2pd project and licensed under BSD3
   5  *
   6  * See full license text in LICENSE file at top of project tree
   7  */
   8  
   9  #include <random>
  10  #include "Log.h"
  11  #include "RouterContext.h"
  12  #include "Transports.h"
  13  #include "NetDb.hpp"
  14  #include "Config.h"
  15  #include "SSU2.h"
  16  
  17  namespace i2p
  18  {
  19  namespace transport
  20  {
  21  	SSU2Server::SSU2Server ():
  22  		RunnableServiceWithWork ("SSU2"), m_ReceiveService ("SSU2r"),
  23  		m_SocketV4 (m_ReceiveService.GetService ()), m_SocketV6 (m_ReceiveService.GetService ()),
  24  		m_AddressV4 (boost::asio::ip::address_v4()), m_AddressV6 (boost::asio::ip::address_v6()),
  25  		m_TerminationTimer (GetService ()), m_CleanupTimer (GetService ()), m_ResendTimer (GetService ()),
  26  		m_IntroducersUpdateTimer (GetService ()), m_IntroducersUpdateTimerV6 (GetService ()),
  27  		m_IsPublished (true), m_IsSyncClockFromPeers (true), m_PendingTimeOffset (0),
  28  		m_Rng(i2p::util::GetMonotonicMicroseconds ()%1000000LL), m_IsForcedFirewalled4 (false),
  29  		m_IsForcedFirewalled6 (false), m_IsThroughProxy (false)
  30  	{
  31  	}
  32  
  33  	void SSU2Server::Start ()
  34  	{
  35  		if (!IsRunning ())
  36  		{
  37  			StartIOService ();
  38  			i2p::config::GetOption ("ssu2.published", m_IsPublished);
  39  			i2p::config::GetOption("nettime.frompeers", m_IsSyncClockFromPeers);
  40  			bool found = false;
  41  			auto addresses = i2p::context.GetRouterInfo ().GetAddresses ();
  42  			if (!addresses) return;
  43  			for (const auto& address: *addresses)
  44  			{
  45  				if (!address) continue;
  46  				if (address->transportStyle == i2p::data::RouterInfo::eTransportSSU2)
  47  				{
  48  					if (m_IsThroughProxy)
  49  					{
  50  						found = true;
  51  						if (address->IsV6 ())
  52  						{
  53  							uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu);
  54  							if (!mtu || mtu > SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE)
  55  								mtu = SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE;
  56  							i2p::context.SetMTU (mtu, false);
  57  						}
  58  						else
  59  						{
  60  							uint16_t mtu; i2p::config::GetOption ("ssu2.mtu4", mtu);
  61  							if (!mtu || mtu > SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE)
  62  								mtu = SSU2_MAX_PACKET_SIZE - SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE;
  63  							i2p::context.SetMTU (mtu, true);
  64  						}
  65  						continue; // we don't need port for proxy
  66  					}
  67  					auto port = address->port;
  68  					if (!port)
  69  					{
  70  						uint16_t ssu2Port; i2p::config::GetOption ("ssu2.port", ssu2Port);
  71  						if (ssu2Port) port = ssu2Port;
  72  						else
  73  						{
  74  							uint16_t p; i2p::config::GetOption ("port", p);
  75  							if (p) port = p;
  76  						}
  77  					}
  78  					if (port)
  79  					{
  80  						if (address->IsV4 ())
  81  						{
  82  							found = true;
  83  							i2p::config::GetOption ("ssu2.firewalled4",  m_IsForcedFirewalled4);
  84  							LogPrint (eLogDebug, "SSU2: Opening IPv4 socket at Start");
  85  							OpenSocket (boost::asio::ip::udp::endpoint (m_AddressV4, port));
  86  							boost::asio::post (m_ReceiveService.GetService (),
  87  								[this]()
  88  								{
  89  									Receive (m_SocketV4);
  90  								});
  91  							ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers
  92  						}
  93  						if (address->IsV6 ())
  94  						{
  95  							found = true;
  96  							i2p::config::GetOption ("ssu2.firewalled6",  m_IsForcedFirewalled6);
  97  							LogPrint (eLogDebug, "SSU2: Opening IPv6 socket at Start");
  98  							OpenSocket (boost::asio::ip::udp::endpoint (m_AddressV6, port));
  99  							boost::asio::post (m_ReceiveService.GetService (),
 100  								[this]()
 101  								{
 102  									Receive (m_SocketV6);
 103  								});
 104  							ScheduleIntroducersUpdateTimerV6 (); // wait for 30 seconds and decide if we need introducers
 105  						}
 106  					}
 107  					else
 108  						LogPrint (eLogCritical, "SSU2: Can't start server because port not specified");
 109  				}
 110  			}
 111  			if (found)
 112  			{
 113  				if (m_IsThroughProxy)
 114  					ConnectToProxy ();
 115  				m_ReceiveService.Start ();
 116  			}
 117  			ScheduleTermination ();
 118  			ScheduleCleanup ();
 119  			ScheduleResend (false);
 120  		}
 121  	}
 122  
 123  	void SSU2Server::Stop ()
 124  	{
 125  		if (IsRunning ())
 126  		{
 127  			m_TerminationTimer.cancel ();
 128  			m_CleanupTimer.cancel ();
 129  			m_ResendTimer.cancel ();
 130  			m_IntroducersUpdateTimer.cancel ();
 131  			m_IntroducersUpdateTimerV6.cancel ();
 132  		}
 133  
 134  		auto sessions = m_Sessions;
 135  		for (auto& it: sessions)
 136  		{
 137  			it.second->RequestTermination (eSSU2TerminationReasonRouterShutdown);
 138  			it.second->Done ();
 139  		}
 140  
 141  		if (context.SupportsV4 () || context.SupportsV6 ())
 142  			m_ReceiveService.Stop ();
 143  		m_SocketV4.close ();
 144  		m_SocketV6.close ();
 145  
 146  		if (m_UDPAssociateSocket)
 147  		{
 148  			m_UDPAssociateSocket->close ();
 149  			m_UDPAssociateSocket.reset (nullptr);
 150  		}
 151  
 152  		StopIOService ();
 153  
 154  		m_Sessions.clear ();
 155  		m_SessionsByRouterHash.clear ();
 156  		m_PendingOutgoingSessions.clear ();
 157  		m_Relays.clear ();
 158  		m_PeerTests.clear ();
 159  		m_Introducers.clear ();
 160  		m_IntroducersV6.clear ();
 161  		m_ConnectedRecently.clear ();
 162  		m_RequestedPeerTests.clear ();
 163  
 164  		m_PacketsPool.ReleaseMt (m_ReceivedPacketsQueue);
 165  		m_ReceivedPacketsQueue.clear ();
 166  	}
 167  
 168  	void SSU2Server::SetLocalAddress (const boost::asio::ip::address& localAddress)
 169  	{
 170  		if (localAddress.is_unspecified ()) return;
 171  		if (localAddress.is_v4 ())
 172  		{
 173  			m_AddressV4 = localAddress;
 174  			uint16_t mtu; i2p::config::GetOption ("ssu2.mtu4", mtu);
 175  			if (!mtu) mtu = i2p::util::net::GetMTU (localAddress);
 176  			if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE;
 177  			if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE;
 178  			i2p::context.SetMTU (mtu, true);
 179  		}
 180  		else if (localAddress.is_v6 ())
 181  		{
 182  			m_AddressV6 = localAddress;
 183  			uint16_t mtu; i2p::config::GetOption ("ssu2.mtu6", mtu);
 184  			if (!mtu)
 185  			{
 186  				int maxMTU = i2p::util::net::GetMaxMTU (localAddress.to_v6 ());
 187  				mtu = i2p::util::net::GetMTU (localAddress);
 188  				if (mtu > maxMTU) mtu = maxMTU;
 189  			}
 190  			else
 191  				if (mtu > (int)SSU2_MAX_PACKET_SIZE) mtu = SSU2_MAX_PACKET_SIZE;
 192  			if (mtu < (int)SSU2_MIN_PACKET_SIZE) mtu = SSU2_MIN_PACKET_SIZE;
 193  			i2p::context.SetMTU (mtu, false);
 194  		}
 195  	}
 196  
 197  	bool SSU2Server::IsSupported (const boost::asio::ip::address& addr) const
 198  	{
 199  		if (m_IsThroughProxy)
 200  			return m_SocketV4.is_open ();
 201  		if (addr.is_v4 ())
 202  		{
 203  			if (m_SocketV4.is_open ())
 204  				return true;
 205  		}
 206  		else if (addr.is_v6 ())
 207  		{
 208  			if (m_SocketV6.is_open ())
 209  				return true;
 210  		}
 211  		return false;
 212  	}
 213  
 214  	uint16_t SSU2Server::GetPort (bool v4) const
 215  	{
 216  		boost::system::error_code ec;
 217  		boost::asio::ip::udp::endpoint ep = (v4 || m_IsThroughProxy) ? m_SocketV4.local_endpoint (ec) : m_SocketV6.local_endpoint (ec);
 218  		if (ec) return 0;
 219  		return ep.port ();
 220  	}
 221  
 222  	bool SSU2Server::IsConnectedRecently (const boost::asio::ip::udp::endpoint& ep, bool max)
 223  	{
 224  		if (!ep.port () || ep.address ().is_unspecified ()) return false;
 225  		std::lock_guard<std::mutex> l(m_ConnectedRecentlyMutex);
 226  		auto it = m_ConnectedRecently.find (ep);
 227  		if (it != m_ConnectedRecently.end ())
 228  		{
 229  			if (i2p::util::GetSecondsSinceEpoch () <= it->second + (max ? SSU2_MAX_HOLE_PUNCH_EXPIRATION : SSU2_MIN_HOLE_PUNCH_EXPIRATION))
 230  				return true;
 231  			else if (max)
 232  				m_ConnectedRecently.erase (it);
 233  		}
 234  		return false;
 235  	}
 236  
 237  	void SSU2Server::AddConnectedRecently (const boost::asio::ip::udp::endpoint& ep, uint64_t ts)
 238  	{
 239  		if (!ep.port () || ep.address ().is_unspecified () ||
 240  		    i2p::util::GetSecondsSinceEpoch () > ts + SSU2_MAX_HOLE_PUNCH_EXPIRATION) return;
 241  		std::lock_guard<std::mutex> l(m_ConnectedRecentlyMutex);
 242  		auto [it, added] = m_ConnectedRecently.try_emplace (ep, ts);
 243  		if (!added && ts > it->second)
 244  			it->second = ts; // renew timestamp of existing endpoint
 245  	}
 246  
 247  	void SSU2Server::AdjustTimeOffset (int64_t offset, std::shared_ptr<const i2p::data::IdentityEx> from)
 248  	{
 249  		if (offset)
 250  		{
 251  			if (m_PendingTimeOffset) // one more
 252  			{
 253  				if (m_PendingTimeOffsetFrom && from &&
 254  					m_PendingTimeOffsetFrom->GetIdentHash ().GetLL()[0] != from->GetIdentHash ().GetLL()[0]) // from different routers
 255  				{
 256  					if (std::abs (m_PendingTimeOffset - offset) < SSU2_CLOCK_SKEW)
 257  					{
 258  						offset = (m_PendingTimeOffset + offset)/2; // average
 259  						LogPrint (eLogWarning, "SSU2: Clock adjusted by ", offset, " seconds");
 260  						i2p::util::AdjustTimeOffset (offset);
 261  					}
 262  					else
 263  						LogPrint (eLogWarning, "SSU2: Time offsets are too different. Clock not adjusted");
 264  					m_PendingTimeOffset = 0;
 265  					m_PendingTimeOffsetFrom = nullptr;
 266  				}
 267  				else
 268  					LogPrint (eLogWarning, "SSU2: Time offsets from same router. Clock not adjusted");
 269  			}
 270  			else
 271  			{
 272  				m_PendingTimeOffset = offset; // first
 273  				m_PendingTimeOffsetFrom = from;
 274  			}
 275  		}
 276  		else
 277  		{
 278  			m_PendingTimeOffset = 0; // reset
 279  			m_PendingTimeOffsetFrom = nullptr;
 280  		}
 281  	}
 282  
 283  	boost::asio::ip::udp::socket& SSU2Server::OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint)
 284  	{
 285  		boost::asio::ip::udp::socket& socket = localEndpoint.address ().is_v6 () ? m_SocketV6 : m_SocketV4;
 286  		try
 287  		{
 288  			if (socket.is_open ())
 289  				socket.close ();
 290  			socket.open (localEndpoint.protocol ());
 291  			if (localEndpoint.address ().is_v6 ())
 292  #if !defined(__HAIKU__)
 293  				socket.set_option (boost::asio::ip::v6_only (true));
 294  #else
 295  			{
 296  				LogPrint (eLogWarning, "SSU2: Socket option IPV6_V6ONLY is not supported");
 297  				m_IsForcedFirewalled6 = true; // IPV6_V6ONLY is not supported, always Firewalled for ipv6
 298  			}
 299  #endif
 300  
 301  			uint64_t bufferSize = i2p::context.GetBandwidthLimit() * 1024 / 5; // max lag = 200ms
 302  			bufferSize = std::max(SSU2_SOCKET_MIN_BUFFER_SIZE, std::min(bufferSize, SSU2_SOCKET_MAX_BUFFER_SIZE));
 303  
 304  			boost::asio::socket_base::receive_buffer_size receiveBufferSizeSet (bufferSize);
 305  			boost::asio::socket_base::send_buffer_size sendBufferSizeSet (bufferSize);
 306  			socket.set_option (receiveBufferSizeSet);
 307  			socket.set_option (sendBufferSizeSet);
 308  			boost::asio::socket_base::receive_buffer_size receiveBufferSizeGet;
 309  			boost::asio::socket_base::send_buffer_size sendBufferSizeGet;
 310  			socket.get_option (receiveBufferSizeGet);
 311  			socket.get_option (sendBufferSizeGet);
 312  			if (receiveBufferSizeGet.value () != receiveBufferSizeSet.value () ||
 313  				sendBufferSizeGet.value () != sendBufferSizeSet.value ())
 314  			{
 315  				LogPrint (eLogWarning, "SSU2: Socket receive buffer size: requested = ",
 316  					receiveBufferSizeSet.value (), ", got = ", receiveBufferSizeGet.value ());
 317  				LogPrint (eLogWarning, "SSU2: Socket send buffer size: requested = ",
 318  					sendBufferSizeSet.value (), ", got = ", sendBufferSizeGet.value ());
 319  			}
 320  			else
 321  			{
 322  				LogPrint (eLogInfo, "SSU2: Socket receive buffer size: ", receiveBufferSizeGet.value ());
 323  				LogPrint (eLogInfo, "SSU2: Socket send buffer size: ", sendBufferSizeGet.value ());
 324  			}
 325  
 326  			socket.non_blocking (true);
 327  		}
 328  		catch (std::exception& ex )
 329  		{
 330  			LogPrint (eLogCritical, "SSU2: Failed to open socket on ", localEndpoint.address (), ": ", ex.what());
 331  			ThrowFatal ("Unable to start SSU2 transport on ", localEndpoint.address (), ": ", ex.what ());
 332  			return socket;
 333  		}
 334  		try
 335  		{
 336  			socket.bind (localEndpoint);
 337  			LogPrint (eLogInfo, "SSU2: Start listening on ", localEndpoint);
 338  		}
 339  		catch (std::exception& ex )
 340  		{
 341  			LogPrint (eLogWarning, "SSU2: Failed to bind to ", localEndpoint, ": ", ex.what(), ". Actual endpoint is ", socket.local_endpoint ());
 342  			// we can continue without binding being firewalled
 343  		}
 344  		return socket;
 345  	}
 346  
 347  	void SSU2Server::Receive (boost::asio::ip::udp::socket& socket)
 348  	{
 349  		Packet * packet = m_PacketsPool.AcquireMt ();
 350  		socket.async_receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from,
 351  			std::bind (&SSU2Server::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet, std::ref (socket)));
 352  	}
 353  
 354  	void SSU2Server::HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred,
 355  		Packet * packet, boost::asio::ip::udp::socket& socket)
 356  	{
 357  		if (!ecode
 358  			|| ecode == boost::asio::error::connection_refused
 359  			|| ecode == boost::asio::error::connection_reset
 360  			|| ecode == boost::asio::error::network_reset
 361  			|| ecode == boost::asio::error::network_unreachable
 362  			|| ecode == boost::asio::error::host_unreachable
 363  #ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO
 364  			|| ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_
 365  			|| ecode.value() == boost::winapi::WSAENETRESET_ // 10052
 366  			|| ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_
 367  			|| ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_
 368  #endif
 369  		)
 370  		// just try continue reading when received ICMP response otherwise socket can crash,
 371  		// but better to find out which host were sent it and mark that router as unreachable
 372  		{
 373  			i2p::transport::transports.UpdateReceivedBytes (bytes_transferred);
 374  			if (bytes_transferred < SSU2_MIN_RECEIVED_PACKET_SIZE)
 375  			{
 376  				// drop too short packets
 377  				m_PacketsPool.ReleaseMt (packet);
 378  				Receive (socket);
 379  				return;
 380  			}
 381  			packet->len = bytes_transferred;
 382  
 383  			boost::system::error_code ec;
 384  			size_t moreBytes = socket.available (ec);
 385  			if (!ec && moreBytes)
 386  			{
 387  				std::list<Packet *> packets;
 388  				packets.push_back (packet);
 389  				while (moreBytes && packets.size () < SSU2_MAX_NUM_PACKETS_PER_BATCH)
 390  				{
 391  					packet = m_PacketsPool.AcquireMt ();
 392  					packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MAX_PACKET_SIZE), packet->from, 0, ec);
 393  					if (!ec)
 394  					{
 395  						i2p::transport::transports.UpdateReceivedBytes (packet->len);
 396  						if (packet->len >= SSU2_MIN_RECEIVED_PACKET_SIZE)
 397  							packets.push_back (packet);
 398  						else // drop too short packets
 399  							m_PacketsPool.ReleaseMt (packet);
 400  						moreBytes = socket.available(ec);
 401  						if (ec) break;
 402  					}
 403  					else
 404  					{
 405  						LogPrint (eLogError, "SSU2: receive_from error: code ", ec.value(), ": ", ec.message ());
 406  						m_PacketsPool.ReleaseMt (packet);
 407  						break;
 408  					}
 409  				}
 410  				InsertToReceivedPacketsQueue (packets);
 411  			}
 412  			else
 413  				InsertToReceivedPacketsQueue (packet);
 414  			Receive (socket);
 415  		}
 416  		else
 417  		{
 418  			m_PacketsPool.ReleaseMt (packet);
 419  			if (ecode != boost::asio::error::operation_aborted)
 420  			{
 421  				LogPrint (eLogError, "SSU2: Receive error: code ", ecode.value(), ": ", ecode.message ());
 422  				if (m_IsThroughProxy)
 423  				{
 424  					m_UDPAssociateSocket.reset (nullptr);
 425  					m_ProxyRelayEndpoint.reset (nullptr);
 426  					m_SocketV4.close ();
 427  					ConnectToProxy ();
 428  				}
 429  				else
 430  				{
 431  					auto ep = socket.local_endpoint ();
 432  					LogPrint (eLogCritical, "SSU2: Reopening socket in HandleReceivedFrom: code ", ecode.value(), ": ", ecode.message ());
 433  					OpenSocket (ep);
 434  					Receive (socket);
 435  				}
 436  			}
 437  		}
 438  	}
 439  
 440  	void SSU2Server::HandleReceivedPackets (std::list<Packet *>&& packets)
 441  	{
 442  		if (packets.empty ()) return;
 443  		if (m_IsThroughProxy)
 444  			for (auto it: packets)
 445  				ProcessNextPacketFromProxy (it->buf, it->len);
 446  		else
 447  			for (auto it: packets)
 448  				ProcessNextPacket (it->buf, it->len, it->from);
 449  		m_PacketsPool.ReleaseMt (packets);
 450  		if (m_LastSession && m_LastSession->GetState () != eSSU2SessionStateTerminated)
 451  			m_LastSession->FlushData ();
 452  	}
 453  
 454  	void SSU2Server::InsertToReceivedPacketsQueue (Packet * packet)
 455  	{
 456  		if (!packet) return;
 457  		bool empty = false;
 458  		{
 459  			std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex);
 460  			empty = m_ReceivedPacketsQueue.empty ();
 461  			m_ReceivedPacketsQueue.push_back (packet);
 462  		}
 463  		if (empty)
 464  			boost::asio::post (GetService (), [this]() { HandleReceivedPacketsQueue (); });
 465  	}
 466  
 467  	void SSU2Server::InsertToReceivedPacketsQueue (std::list<Packet *>& packets)
 468  	{
 469  		if (packets.empty ()) return;
 470  		size_t queueSize = 0;
 471  		{
 472  			std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex);
 473  			queueSize = m_ReceivedPacketsQueue.size ();
 474  			if (queueSize < SSU2_MAX_RECEIVED_QUEUE_SIZE)
 475  				m_ReceivedPacketsQueue.splice (m_ReceivedPacketsQueue.end (), packets);
 476  			else
 477  			{
 478  				LogPrint (eLogError, "SSU2: Received queue size ", queueSize, " exceeds max size ", SSU2_MAX_RECEIVED_QUEUE_SIZE);
 479  				m_PacketsPool.ReleaseMt (packets);
 480  				queueSize = 0; // invoke processing just in case
 481  			}
 482  		}
 483  		if (!queueSize)
 484  			boost::asio::post (GetService (), [this]() { HandleReceivedPacketsQueue (); });
 485  	}
 486  
 487  	void SSU2Server::HandleReceivedPacketsQueue ()
 488  	{
 489  		std::list<Packet *> receivedPackets;
 490  		{
 491  			std::lock_guard<std::mutex> l(m_ReceivedPacketsQueueMutex);
 492  			m_ReceivedPacketsQueue.swap (receivedPackets);
 493  		}
 494  		HandleReceivedPackets (std::move (receivedPackets));
 495  	}
 496  
 497  	bool SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
 498  	{
 499  		if (session)
 500  		{
 501  			if (m_Sessions.emplace (session->GetConnID (), session).second)
 502  			{
 503  				if (session->GetState () != eSSU2SessionStatePeerTest)
 504  					AddSessionByRouterHash (session);
 505  				return true;
 506  			}
 507  		}
 508  		return false;
 509  	}
 510  
 511  	void SSU2Server::RemoveSession (uint64_t connID)
 512  	{
 513  		auto it = m_Sessions.find (connID);
 514  		if (it != m_Sessions.end ())
 515  		{
 516  			if (it->second->GetState () != eSSU2SessionStatePeerTest)
 517  			{
 518  				auto ident = it->second->GetRemoteIdentity ();
 519  				if (ident)
 520  				{
 521  					std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
 522  					auto it1 = m_SessionsByRouterHash.find (ident->GetIdentHash ());
 523  					if (it1 != m_SessionsByRouterHash.end () && it->second == it1->second.lock ())
 524  						m_SessionsByRouterHash.erase (it1);
 525  				}
 526  			}
 527  			if (m_LastSession == it->second)
 528  				m_LastSession = nullptr;
 529  			m_Sessions.erase (it);
 530  		}
 531  	}
 532  
 533  	void SSU2Server::RequestRemoveSession (uint64_t connID)
 534  	{
 535  		boost::asio::post (GetService (), [connID, this]() { RemoveSession (connID); });
 536  	}
 537  
 538  	void SSU2Server::AddSessionByRouterHash (std::shared_ptr<SSU2Session> session)
 539  	{
 540  		if (session)
 541  		{
 542  			auto ident = session->GetRemoteIdentity ();
 543  			if (ident)
 544  			{
 545  				std::shared_ptr<SSU2Session> oldSession;
 546  				{
 547  					std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
 548  					auto ret = m_SessionsByRouterHash.emplace (ident->GetIdentHash (), session);
 549  					if (!ret.second)
 550  					{
 551  						oldSession = ret.first->second.lock ();
 552  						// update session
 553  						ret.first->second = session;
 554  					}
 555  				}
 556  				if (oldSession && oldSession != session)
 557  				{
 558  					// session already exists
 559  					LogPrint (eLogWarning, "SSU2: Session to ", ident->GetIdentHash ().ToBase64 (), " already exists");
 560  					// move unsent msgs to new session
 561  					oldSession->MoveSendQueue (session);
 562  					// terminate existing
 563  					boost::asio::post (GetService (), std::bind (&SSU2Session::RequestTermination, oldSession, eSSU2TerminationReasonReplacedByNewSession));
 564  				}
 565  			}
 566  		}
 567  	}
 568  
 569  	bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr<SSU2Session> session)
 570  	{
 571  		if (!session) return false;
 572  		std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
 573  		return m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session).second;
 574  	}
 575  
 576  	std::shared_ptr<SSU2Session> SSU2Server::FindSession (const i2p::data::IdentHash& ident)
 577  	{
 578  		std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
 579  		auto it = m_SessionsByRouterHash.find (ident);
 580  		if (it != m_SessionsByRouterHash.end ())
 581  		{
 582  			if (!it->second.expired ())
 583  			{
 584  				auto s = it->second.lock ();
 585  				if (s && s->GetState () != eSSU2SessionStateTerminated)
 586  					return s;
 587  			}
 588  			m_SessionsByRouterHash.erase (it);
 589  		}
 590  		return nullptr;
 591  	}
 592  
 593  	std::shared_ptr<SSU2Session> SSU2Server::FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const
 594  	{
 595  		std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
 596  		auto it = m_PendingOutgoingSessions.find (ep);
 597  		if (it != m_PendingOutgoingSessions.end ())
 598  			return it->second;
 599  		return nullptr;
 600  	}
 601  
 602  	void SSU2Server::RemovePendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep)
 603  	{
 604  		std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
 605  		m_PendingOutgoingSessions.erase (ep);
 606  	}
 607  
 608  	std::shared_ptr<SSU2Session> SSU2Server::GetRandomPeerTestSession (
 609  		i2p::data::RouterInfo::CompatibleTransports remoteTransports, const i2p::data::IdentHash& excluded)
 610  	{
 611  		if (m_Sessions.empty ()) return nullptr;
 612  		int ind = m_Rng () % m_Sessions.size ();
 613  		auto it = m_Sessions.begin ();
 614  		std::advance (it, ind);
 615  		while (it != m_Sessions.end ())
 616  		{
 617  			if (it->second->IsEstablished () && (it->second->GetRemotePeerTestTransports () & remoteTransports) &&
 618  				it->second->GetRemoteVersion () >= i2p::data::NETDB_MIN_PEER_TEST_VERSION &&
 619  			    it->second->GetRemoteIdentity ()->GetIdentHash () != excluded)
 620  				return it->second;
 621  			it++;
 622  		}
 623  		// not found, try from beginning
 624  		it = m_Sessions.begin ();
 625  		while (it != m_Sessions.end () && ind)
 626  		{
 627  			if (it->second->IsEstablished () && (it->second->GetRemotePeerTestTransports () & remoteTransports) &&
 628  				it->second->GetRemoteVersion () >= i2p::data::NETDB_MIN_PEER_TEST_VERSION &&
 629  			    it->second->GetRemoteIdentity ()->GetIdentHash () != excluded)
 630  				return it->second;
 631  			it++; ind--;
 632  		}
 633  		return nullptr;
 634  	}
 635  
 636  	void SSU2Server::AddRelay (uint32_t tag, std::shared_ptr<SSU2Session> relay)
 637  	{
 638  		m_Relays.emplace (tag, relay);
 639  	}
 640  
 641  	void SSU2Server::RemoveRelay (uint32_t tag)
 642  	{
 643  		m_Relays.erase (tag);
 644  	}
 645  
 646  	std::shared_ptr<SSU2Session> SSU2Server::FindRelaySession (uint32_t tag)
 647  	{
 648  		auto it = m_Relays.find (tag);
 649  		if (it != m_Relays.end ())
 650  		{
 651  			if (!it->second.expired ())
 652  			{
 653  				auto s = it->second.lock ();
 654  				if (s && s->IsEstablished ())
 655  					return s;
 656  			}
 657  			m_Relays.erase (it);
 658  		}
 659  		return nullptr;
 660  	}
 661  
 662  	bool SSU2Server::AddPeerTest (uint32_t nonce, std::shared_ptr<SSU2Session> aliceSession, uint64_t ts)
 663  	{
 664  		return m_PeerTests.emplace (nonce, std::pair{ aliceSession, ts }).second;
 665  	}
 666  
 667  	std::shared_ptr<SSU2Session> SSU2Server::GetPeerTest (uint32_t nonce)
 668  	{
 669  		auto it = m_PeerTests.find (nonce);
 670  		if (it != m_PeerTests.end ())
 671  		{
 672  			auto s = it->second.first.lock ();
 673  			m_PeerTests.erase (it);
 674  			return s;
 675  		}
 676  		return nullptr;
 677  	}
 678  
 679  	bool SSU2Server::AddRequestedPeerTest (uint32_t nonce, std::shared_ptr<SSU2PeerTestSession> session, uint64_t ts)
 680  	{
 681  		return m_RequestedPeerTests.emplace (nonce, std::pair{ session, ts }).second;
 682  	}
 683  
 684  	std::shared_ptr<SSU2PeerTestSession> SSU2Server::GetRequestedPeerTest (uint32_t nonce)
 685  	{
 686  		auto it = m_RequestedPeerTests.find (nonce);
 687  		if (it != m_RequestedPeerTests.end ())
 688  		{
 689  			auto s = it->second.first.lock ();
 690  			m_RequestedPeerTests.erase (it);
 691  			return s;
 692  		}
 693  		return nullptr;
 694  	}
 695  
 696  	void SSU2Server::ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint)
 697  	{
 698  		if (len < 24) return;
 699  		uint64_t connID;
 700  		memcpy (&connID, buf, 8);
 701  		connID ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24));
 702  		if (!m_LastSession || m_LastSession->GetConnID () != connID)
 703  		{
 704  			if (m_LastSession) m_LastSession->FlushData ();
 705  			auto it = m_Sessions.find (connID);
 706  			if (it != m_Sessions.end ())
 707  				m_LastSession = it->second;
 708  			else
 709  				m_LastSession = nullptr;
 710  		}
 711  		if (m_LastSession)
 712  		{
 713  			switch (m_LastSession->GetState ())
 714  			{
 715  				case eSSU2SessionStateEstablished:
 716  				case eSSU2SessionStateSessionConfirmedSent:
 717  					m_LastSession->ProcessData (buf, len, senderEndpoint);
 718  				break;
 719  				case eSSU2SessionStateSessionCreatedSent:
 720  					if (!m_LastSession->ProcessSessionConfirmed (buf, len))
 721  					{
 722  						m_LastSession->Done ();
 723  						m_LastSession = nullptr;
 724  					}
 725  				break;
 726  				case eSSU2SessionStateIntroduced:
 727  					if (m_LastSession->GetRemoteEndpoint ().address ().is_unspecified ())
 728  						m_LastSession->SetRemoteEndpoint (senderEndpoint);
 729  					if (m_LastSession->GetRemoteEndpoint ().address () == senderEndpoint.address ()) // port might be different
 730  						m_LastSession->ProcessHolePunch (buf, len);
 731  					else
 732  					{
 733  						LogPrint (eLogWarning, "SSU2: HolePunch address ", senderEndpoint.address (),
 734  							" doesn't match RelayResponse ", m_LastSession->GetRemoteEndpoint ().address ());
 735  						m_LastSession->Done ();
 736  						m_LastSession = nullptr;
 737  					}
 738  				break;
 739  				case eSSU2SessionStatePeerTest:
 740  					m_LastSession->SetRemoteEndpoint (senderEndpoint);
 741  					m_LastSession->ProcessPeerTest (buf, len);
 742  				break;
 743  				case eSSU2SessionStateHolePunch:
 744  					m_LastSession->ProcessFirstIncomingMessage (connID, buf, len); // SessionRequest
 745  				break;
 746  				case eSSU2SessionStateClosing:
 747  					m_LastSession->ProcessData (buf, len, senderEndpoint); // we might receive termintaion block
 748  					if (m_LastSession && m_LastSession->GetState () == eSSU2SessionStateClosing)
 749  						m_LastSession->RequestTermination (eSSU2TerminationReasonIdleTimeout); // send termination again
 750  				break;
 751  				case eSSU2SessionStateClosingConfirmed:
 752  				case eSSU2SessionStateTerminated:
 753  					m_LastSession = nullptr;
 754  				break;
 755  				default:
 756  					LogPrint (eLogWarning, "SSU2: Invalid session state ", (int)m_LastSession->GetState ());
 757  			}
 758  		}
 759  		else
 760  		{
 761  			// check pending sessions if it's SessionCreated or Retry
 762  			auto it1 = m_PendingOutgoingSessions.find (senderEndpoint);
 763  			if (it1 != m_PendingOutgoingSessions.end ())
 764  			{
 765  				if (it1->second->GetState () == eSSU2SessionStateSessionRequestSent &&
 766  					it1->second->ProcessSessionCreated (buf, len))
 767  				{
 768  					std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
 769  					m_PendingOutgoingSessions.erase (it1); // we are done with that endpoint
 770  				}
 771  				else
 772  					it1->second->ProcessRetry (buf, len);
 773  			}
 774  			else if (!i2p::transport::transports.IsInReservedRange(senderEndpoint.address ()) && senderEndpoint.port () &&
 775  				!i2p::transport::transports.IsBanned (senderEndpoint.address ()))
 776  			{
 777  				// assume new incoming session
 778  				auto queueSize = m_ReceivedPacketsQueue.size ();
 779  				if (queueSize < SSU2_STOP_ACCEPTING_NEW_SESSIONS_QUEUE_SIZE)
 780  				{
 781  					auto session = std::make_shared<SSU2Session> (*this);
 782  					session->SetRemoteEndpoint (senderEndpoint);
 783  					session->ProcessFirstIncomingMessage (connID, buf, len);
 784  				}
 785  				else
 786  					LogPrint (eLogWarning, "SSU2: Incoming session dropped from ",  senderEndpoint, ". Queue size ", queueSize, " exceeds ", SSU2_STOP_ACCEPTING_NEW_SESSIONS_QUEUE_SIZE);
 787  			}
 788  			else
 789  				LogPrint (eLogWarning, "SSU2: Incoming packet received from invalid or banned endpoint ", senderEndpoint);
 790  		}
 791  	}
 792  
 793  	void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * payload, size_t payloadLen,
 794  		const boost::asio::ip::udp::endpoint& to)
 795  	{
 796  		if (m_IsThroughProxy)
 797  		{
 798  			SendThroughProxy (header, headerLen, nullptr, 0, payload, payloadLen, to);
 799  			return;
 800  		}
 801  
 802  		std::vector<boost::asio::const_buffer> bufs
 803  		{
 804  			boost::asio::buffer (header, headerLen),
 805  			boost::asio::buffer (payload, payloadLen)
 806  		};
 807  
 808  		boost::system::error_code ec;
 809  		if (to.address ().is_v6 ())
 810  		{
 811  			if (!m_SocketV6.is_open ()) return;
 812  			m_SocketV6.send_to (bufs, to, 0, ec);
 813  		}
 814  		else
 815  		{
 816  			if (!m_SocketV4.is_open ()) return;
 817  			m_SocketV4.send_to (bufs, to, 0, ec);
 818  		}
 819  
 820  		if (!ec)
 821  			i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen);
 822  		else
 823  		{
 824  			LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError,
 825  				"SSU2: Send exception: ", ec.message (), " to ", to);
 826  		}
 827  	}
 828  
 829  	void SSU2Server::Send (const uint8_t * header, size_t headerLen, const uint8_t * headerX, size_t headerXLen,
 830  		const uint8_t * payload, size_t payloadLen, const boost::asio::ip::udp::endpoint& to)
 831  	{
 832  		if (m_IsThroughProxy)
 833  		{
 834  			SendThroughProxy (header, headerLen, headerX, headerXLen, payload, payloadLen, to);
 835  			return;
 836  		}
 837  
 838  		std::vector<boost::asio::const_buffer> bufs
 839  		{
 840  			boost::asio::buffer (header, headerLen),
 841  			boost::asio::buffer (headerX, headerXLen),
 842  			boost::asio::buffer (payload, payloadLen)
 843  		};
 844  
 845  		boost::system::error_code ec;
 846  		if (to.address ().is_v6 ())
 847  		{
 848  			if (!m_SocketV6.is_open ()) return;
 849  			m_SocketV6.send_to (bufs, to, 0, ec);
 850  		}
 851  		else
 852  		{
 853  			if (!m_SocketV4.is_open ()) return;
 854  			m_SocketV4.send_to (bufs, to, 0, ec);
 855  		}
 856  
 857  		if (!ec)
 858  			i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen);
 859  		else
 860  		{
 861  			LogPrint (ec == boost::asio::error::would_block ? eLogInfo : eLogError,
 862  				"SSU2: Send exception: ", ec.message (), " to ", to);
 863  		}
 864  	}
 865  
 866  	bool SSU2Server::CheckPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep, bool peerTest)
 867  	{
 868  		auto s = FindPendingOutgoingSession (ep);
 869  		if (s)
 870  		{
 871  			if (peerTest)
 872  			{
 873  				// if peer test requested add it to the list for pending session
 874  				auto onEstablished = s->GetOnEstablished ();
 875  				if (onEstablished)
 876  					s->SetOnEstablished ([s, onEstablished]()
 877  						{
 878  							onEstablished ();
 879  							s->SendPeerTest ();
 880  						});
 881  				else
 882  					s->SetOnEstablished ([s]() { s->SendPeerTest (); });
 883  			}
 884  			return true;
 885  		}
 886  		return false;
 887  	}
 888  
 889  	bool SSU2Server::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
 890  		std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest)
 891  	{
 892  		if (router && address)
 893  		{
 894  			// check if no session
 895  			auto existingSession = FindSession (router->GetIdentHash ());
 896  			if (existingSession)
 897  			{
 898  				// session with router found, trying to send peer test if requested
 899  				if (peerTest && existingSession->IsEstablished ())
 900  					boost::asio::post (GetService (), [existingSession]() { existingSession->SendPeerTest (); });
 901  				return false;
 902  			}
 903  			// check is no pending session
 904  			bool isValidEndpoint = !address->host.is_unspecified () && address->port;
 905  			if (isValidEndpoint)
 906  			{
 907  				if (i2p::transport::transports.IsInReservedRange(address->host)) return false;
 908  				if (CheckPendingOutgoingSession (boost::asio::ip::udp::endpoint (address->host, address->port), peerTest)) return false;
 909  			}
 910  
 911  			auto session = std::make_shared<SSU2Session> (*this, router, address);
 912  			if (!isValidEndpoint && router->HasProfile () && router->GetProfile ()->HasLastEndpoint (address->IsV4 ()))
 913  			{
 914  				// router doesn't publish endpoint, but we connected before and hole punch might be alive
 915  				auto ep = router->GetProfile ()->GetLastEndpoint ();
 916  				if (IsConnectedRecently (ep, false))
 917  				{
 918  					if (CheckPendingOutgoingSession (ep, peerTest)) return false;
 919  					session->SetRemoteEndpoint (ep);
 920  					isValidEndpoint = true;
 921  				}
 922  			}
 923  			if (peerTest)
 924  				session->SetOnEstablished ([session]() {session->SendPeerTest (); });
 925  
 926  			if (isValidEndpoint) // we know endpoint
 927  				boost::asio::post (GetService (), [session]() { session->Connect (); });
 928  			else if (address->UsesIntroducer ()) // we don't know endpoint yet
 929  				boost::asio::post (GetService (), std::bind (&SSU2Server::ConnectThroughIntroducer, this, session));
 930  			else
 931  				return false;
 932  		}
 933  		else
 934  			return false;
 935  		return true;
 936  	}
 937  
 938  	void SSU2Server::ConnectThroughIntroducer (std::shared_ptr<SSU2Session> session)
 939  	{
 940  		if (!session) return;
 941  		auto address = session->GetAddress ();
 942  		if (!address) return;
 943  		session->WaitForIntroduction ();
 944  		auto ts = i2p::util::GetSecondsSinceEpoch ();
 945  		std::vector<int> indices; int i = 0;
 946  		// try to find existing session first
 947  		for (auto& it: address->ssu->introducers)
 948  		{
 949  			if (it.iTag && ts < it.iExp)
 950  			{
 951  				auto s = FindSession (it.iH);
 952  				if (s)
 953  				{
 954  					auto addr = s->GetAddress ();
 955  					if (addr && addr->IsIntroducer ())
 956  					{
 957  						s->Introduce (session, it.iTag);
 958  						return;
 959  					}
 960  				}
 961  				else
 962  					indices.push_back(i);
 963  			}
 964  			i++;
 965  		}
 966  		// we have to start a new session to an introducer
 967  		std::vector<i2p::data::IdentHash> newRouters;
 968  		std::shared_ptr<i2p::data::RouterInfo> r;
 969  		std::shared_ptr<const i2p::data::RouterInfo::Address> addr;
 970  		uint32_t relayTag = 0;
 971  		if (!indices.empty ())
 972  		{
 973  			if (indices.size () > 1)
 974  				std::shuffle (indices.begin(), indices.end(), m_Rng);
 975  
 976  			for (auto ind: indices)
 977  			{
 978  				const auto& introducer = address->ssu->introducers[ind];
 979  				// introducer is not expired, because in indices
 980  				r = i2p::data::netdb.FindRouter (introducer.iH);
 981  				if (r)
 982  				{
 983  					if (r->IsPublishedOn (i2p::context.GetRouterInfo ().GetCompatibleTransports (false) & // outgoing
 984  					    (i2p::data::RouterInfo::eSSU2V4 | i2p::data::RouterInfo::eSSU2V6)))
 985  					{
 986  						relayTag = introducer.iTag;
 987  						addr = address->IsV6 () ? r->GetSSU2V6Address () : r->GetSSU2V4Address ();
 988  						if (addr && addr->IsIntroducer () && !addr->host.is_unspecified () && addr->port &&
 989  							!i2p::transport::transports.IsInReservedRange(addr->host))
 990  							break;
 991  						else
 992  						{
 993  							// address is invalid or not intrudcer, try another SSU2 address if exists
 994  							if (address->IsV4 ())
 995  							{
 996  								if (i2p::context.SupportsV6 ())
 997  									addr = r->GetSSU2V6Address ();
 998  							}
 999  							else
1000  							{
1001  								if (i2p::context.SupportsV4 ())
1002  									addr = r->GetSSU2V4Address ();
1003  							}
1004  							if (addr && addr->IsIntroducer () && !addr->host.is_unspecified () && addr->port &&
1005  								!i2p::transport::transports.IsInReservedRange(addr->host))
1006  								break;
1007  							else
1008  							{
1009  								// all addresses are invalid, try next introducer
1010  								relayTag = 0;
1011  								addr = nullptr;
1012  								r = nullptr;
1013  							}
1014  						}
1015  					}
1016  					else
1017  						r = nullptr;
1018  				}
1019  				else if (!i2p::data::IsRouterBanned (introducer.iH))
1020  					newRouters.push_back (introducer.iH);
1021  			}
1022  		}
1023  		if (r)
1024  		{
1025  			if (relayTag && addr)
1026  			{
1027  				// introducer and tag found connect to it through SSU2
1028  				auto s = FindPendingOutgoingSession (boost::asio::ip::udp::endpoint (addr->host, addr->port));
1029  				if (!s)
1030  				{
1031  					s = std::make_shared<SSU2Session> (*this, r, addr);
1032  					s->SetOnEstablished ([session, s, relayTag]() { s->Introduce (session, relayTag); });
1033  					s->Connect ();
1034  				}
1035  				else
1036  				{
1037  					auto onEstablished = s->GetOnEstablished ();
1038  					if (onEstablished)
1039  						s->SetOnEstablished ([session, s, relayTag, onEstablished]()
1040  							{
1041  								onEstablished ();
1042  								s->Introduce (session, relayTag);
1043  							});
1044  					else
1045  						s->SetOnEstablished ([session, s, relayTag]() {s->Introduce (session, relayTag); });
1046  				}
1047  			}
1048  			else
1049  				session->Done ();
1050  		}
1051  		else
1052  		{
1053  			// introducers not found, try to request them
1054  			for (auto& it: newRouters)
1055  				i2p::data::netdb.RequestDestination (it);
1056  			session->Done (); // don't wait for connect timeout
1057  		}
1058  	}
1059  
1060  	bool SSU2Server::StartPeerTest (std::shared_ptr<const i2p::data::RouterInfo> router, bool v4)
1061  	{
1062  		if (!router) return false;
1063  		auto addr = v4 ? router->GetSSU2V4Address () : router->GetSSU2V6Address ();
1064  		if (!addr) return false;
1065  		auto session = FindSession (router->GetIdentHash ());
1066  		if (session)
1067  		{
1068  			auto remoteAddr = session->GetAddress ();
1069  			if (!remoteAddr || !remoteAddr->IsPeerTesting () ||
1070  			    (v4 && !remoteAddr->IsV4 ()) || (!v4 && !remoteAddr->IsV6 ())) return false;
1071  			if (session->IsEstablished ())
1072  				boost::asio::post (GetService (), [session]() { session->SendPeerTest (); });
1073  			else
1074  				session->SetOnEstablished ([session]() { session->SendPeerTest (); });
1075  			return true;
1076  		}
1077  		else
1078  			CreateSession (router, addr, true);
1079  		return true;
1080  	}
1081  
1082  	void SSU2Server::ScheduleTermination ()
1083  	{
1084  		m_TerminationTimer.expires_from_now (boost::posix_time::seconds(
1085  			SSU2_TERMINATION_CHECK_TIMEOUT + m_Rng () % SSU2_TERMINATION_CHECK_TIMEOUT_VARIANCE));
1086  		m_TerminationTimer.async_wait (std::bind (&SSU2Server::HandleTerminationTimer,
1087  			this, std::placeholders::_1));
1088  	}
1089  
1090  	void SSU2Server::HandleTerminationTimer (const boost::system::error_code& ecode)
1091  	{
1092  		if (ecode != boost::asio::error::operation_aborted)
1093  		{
1094  			auto ts = i2p::util::GetSecondsSinceEpoch ();
1095  
1096  			{
1097  				std::lock_guard<std::mutex> l(m_PendingOutgoingSessionsMutex);
1098  				for (auto it = m_PendingOutgoingSessions.begin (); it != m_PendingOutgoingSessions.end ();)
1099  				{
1100  					if (it->second->IsTerminationTimeoutExpired (ts))
1101  					{
1102  						//it->second->Terminate ();
1103  						it = m_PendingOutgoingSessions.erase (it);
1104  					}
1105  					else
1106  						it++;
1107  				}
1108  			}
1109  
1110  			for (auto it: m_Sessions)
1111  			{
1112  				auto state = it.second->GetState ();
1113  				if (state == eSSU2SessionStateTerminated || state == eSSU2SessionStateClosing)
1114  					it.second->Done ();
1115  				else if (it.second->IsTerminationTimeoutExpired (ts))
1116  				{
1117  					if (it.second->IsEstablished ())
1118  						it.second->RequestTermination (eSSU2TerminationReasonIdleTimeout);
1119  					else
1120  						it.second->Done ();
1121  				}
1122  				else
1123  					it.second->CleanUp (ts);
1124  			}
1125  
1126  			ScheduleTermination ();
1127  		}
1128  	}
1129  
1130  	void SSU2Server::ScheduleCleanup ()
1131  	{
1132  		m_CleanupTimer.expires_from_now (boost::posix_time::seconds(SSU2_CLEANUP_INTERVAL));
1133  		m_CleanupTimer.async_wait (std::bind (&SSU2Server::HandleCleanupTimer,
1134  			this, std::placeholders::_1));
1135  	}
1136  
1137  	void SSU2Server::HandleCleanupTimer (const boost::system::error_code& ecode)
1138  	{
1139  		if (ecode != boost::asio::error::operation_aborted)
1140  		{
1141  			auto ts = i2p::util::GetSecondsSinceEpoch ();
1142  			for (auto it = m_Relays.begin (); it != m_Relays.begin ();)
1143  			{
1144  				if (it->second.expired ())
1145  					it = m_Relays.erase (it);
1146  				else
1147  					it++;
1148  			}
1149  
1150  			for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();)
1151  			{
1152  				if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT || it->second.first.expired ())
1153  				{
1154  					LogPrint (eLogInfo, "SSU2: Peer test nonce ", it->first, " was not responded in ", SSU2_PEER_TEST_EXPIRATION_TIMEOUT, " seconds or session invalid. Deleted");
1155  					it = m_PeerTests.erase (it);
1156  				}
1157  				else
1158  					it++;
1159  			}
1160  
1161  			for (auto it = m_IncomingTokens.begin (); it != m_IncomingTokens.end (); )
1162  			{
1163  				if (ts > it->second.second)
1164  					it = m_IncomingTokens.erase (it);
1165  				else
1166  					it++;
1167  			}
1168  
1169  			for (auto it = m_OutgoingTokens.begin (); it != m_OutgoingTokens.end (); )
1170  			{
1171  				if (ts > it->second.second)
1172  					it = m_OutgoingTokens.erase (it);
1173  				else
1174  					it++;
1175  			}
1176  
1177  			for (auto it = m_ConnectedRecently.begin (); it != m_ConnectedRecently.end (); )
1178  			{
1179  				if (ts > it->second + SSU2_MAX_HOLE_PUNCH_EXPIRATION)
1180  					it = m_ConnectedRecently.erase (it);
1181  				else
1182  					it++;
1183  			}
1184  
1185  			for (auto it = m_RequestedPeerTests.begin (); it != m_RequestedPeerTests.end ();)
1186  			{
1187  				if (ts > it->second.second + SSU2_PEER_TEST_EXPIRATION_TIMEOUT)
1188  					it = m_RequestedPeerTests.erase (it);
1189  				else
1190  					it++;
1191  			}
1192  
1193  			{
1194  				std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
1195  				for (auto it = m_SessionsByRouterHash.begin (); it != m_SessionsByRouterHash.begin ();)
1196  				{
1197  					if (it->second.expired ())
1198  						it = m_SessionsByRouterHash.erase (it);
1199  					else
1200  						it++;
1201  				}
1202  			}
1203  
1204  			m_PacketsPool.CleanUpMt ();
1205  			m_SentPacketsPool.CleanUp ();
1206  			m_IncompleteMessagesPool.CleanUp ();
1207  			m_FragmentsPool.CleanUp ();
1208  			ScheduleCleanup ();
1209  		}
1210  	}
1211  
1212  	void SSU2Server::ScheduleResend (bool more)
1213  	{
1214  		m_ResendTimer.expires_from_now (boost::posix_time::milliseconds (more ?
1215  		    (SSU2_RESEND_CHECK_MORE_TIMEOUT + m_Rng () % SSU2_RESEND_CHECK_MORE_TIMEOUT_VARIANCE):
1216  			(SSU2_RESEND_CHECK_TIMEOUT + m_Rng () % SSU2_RESEND_CHECK_TIMEOUT_VARIANCE)));
1217  		m_ResendTimer.async_wait (std::bind (&SSU2Server::HandleResendTimer,
1218  			this, std::placeholders::_1));
1219  	}
1220  
1221  	void SSU2Server::HandleResendTimer (const boost::system::error_code& ecode)
1222  	{
1223  		if (ecode != boost::asio::error::operation_aborted)
1224  		{
1225  			size_t resentPacketsNum = 0;
1226  			auto ts = i2p::util::GetMillisecondsSinceEpoch ();
1227  			for (auto it: m_Sessions)
1228  			{
1229  				if (ts >= it.second->GetLastResendTime () + SSU2_RESEND_CHECK_TIMEOUT)
1230  					resentPacketsNum += it.second->Resend (ts);
1231  				if (resentPacketsNum > SSU2_MAX_RESEND_PACKETS) break;
1232  			}
1233  			for (auto it: m_PendingOutgoingSessions)
1234  				it.second->Resend (ts);
1235  			ScheduleResend (resentPacketsNum > SSU2_MAX_RESEND_PACKETS);
1236  		}
1237  	}
1238  
1239  	void SSU2Server::UpdateOutgoingToken (const boost::asio::ip::udp::endpoint& ep, uint64_t token, uint32_t exp)
1240  	{
1241  		m_OutgoingTokens[ep] = {token, exp};
1242  	}
1243  
1244  	uint64_t SSU2Server::FindOutgoingToken (const boost::asio::ip::udp::endpoint& ep)
1245  	{
1246  		auto it = m_OutgoingTokens.find (ep);
1247  		if (it != m_OutgoingTokens.end ())
1248  		{
1249  			if (i2p::util::GetSecondsSinceEpoch () + SSU2_TOKEN_EXPIRATION_THRESHOLD > it->second.second)
1250  			{
1251  				// token expired
1252  				m_OutgoingTokens.erase (it);
1253  				return 0;
1254  			}
1255  			return it->second.first;
1256  		}
1257  		return 0;
1258  	}
1259  
1260  	uint64_t SSU2Server::GetIncomingToken (const boost::asio::ip::udp::endpoint& ep)
1261  	{
1262  		auto ts = i2p::util::GetSecondsSinceEpoch ();
1263  		auto it = m_IncomingTokens.find (ep);
1264  		if (it != m_IncomingTokens.end ())
1265  		{
1266  			if (ts + SSU2_TOKEN_EXPIRATION_THRESHOLD <= it->second.second)
1267  				return it->second.first;
1268  			else // token expired
1269  				m_IncomingTokens.erase (it);
1270  		}
1271  		uint64_t token;
1272  		RAND_bytes ((uint8_t *)&token, 8);
1273  		if (!token) token = 1; // token can't be zero
1274  		m_IncomingTokens.try_emplace (ep, token, uint32_t(ts + SSU2_TOKEN_EXPIRATION_TIMEOUT));
1275  		return token;
1276  	}
1277  
1278  	std::pair<uint64_t, uint32_t> SSU2Server::NewIncomingToken (const boost::asio::ip::udp::endpoint& ep)
1279  	{
1280  		uint64_t token;
1281  		RAND_bytes ((uint8_t *)&token, 8);
1282  		if (!token) token = 1; // token can't be zero
1283  		uint32_t expires = i2p::util::GetSecondsSinceEpoch () + SSU2_NEXT_TOKEN_EXPIRATION_TIMEOUT;
1284  		auto [it, inserted] = m_IncomingTokens.try_emplace (ep, token, expires);
1285  		if (!inserted)
1286  			it->second = { token, expires }; // override
1287  		return it->second;
1288  	}
1289  
1290  	std::vector<std::shared_ptr<SSU2Session> > SSU2Server::FindIntroducers (int maxNumIntroducers,
1291  		bool v4, const std::unordered_set<i2p::data::IdentHash>& excluded)
1292  	{
1293  		std::vector<std::shared_ptr<SSU2Session> > ret;
1294  		if (maxNumIntroducers <= 0 || m_Sessions.empty ()) return ret;
1295  
1296  		std::vector<std::shared_ptr<SSU2Session> > eligible;
1297  		eligible.reserve (m_Sessions.size ()/2);
1298  		auto ts = i2p::util::GetSecondsSinceEpoch ();
1299  		for (const auto& s : m_Sessions)
1300  		{
1301  			if (s.second->IsEstablished () && (s.second->GetRelayTag () && s.second->IsOutgoing ()) &&
1302  			    ts < s.second->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION/2 &&
1303  			    !excluded.count (s.second->GetRemoteIdentity ()->GetIdentHash ()) &&
1304  			    ((v4 && (s.second->GetRemoteTransports () & i2p::data::RouterInfo::eSSU2V4)) ||
1305  			    (!v4 && (s.second->GetRemoteTransports () & i2p::data::RouterInfo::eSSU2V6))))
1306  				eligible.push_back (s.second);
1307  		}
1308  
1309  		if (eligible.size () <= (size_t)maxNumIntroducers)
1310  			return eligible;
1311  		else
1312  			std::sample (eligible.begin(), eligible.end(), std::back_inserter(ret), maxNumIntroducers, m_Rng);
1313  		return ret;
1314  	}
1315  
1316  	void SSU2Server::UpdateIntroducers (bool v4)
1317  	{
1318  		uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
1319  		std::list<std::pair<i2p::data::IdentHash, uint32_t> > newList, impliedList;
1320  		auto& introducers = v4 ? m_Introducers : m_IntroducersV6;
1321  		std::unordered_set<i2p::data::IdentHash> excluded;
1322  		for (const auto& [ident, tag] : introducers)
1323  		{
1324  			std::shared_ptr<SSU2Session> session = FindSession (ident);
1325  			if (session)
1326  				excluded.insert (ident);
1327  			if (session)
1328  			{
1329  				if (session->IsEstablished () && session->GetRelayTag () && session->IsOutgoing () && // still session with introducer?
1330  					ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION)
1331  				{
1332  					session->SendKeepAlive ();
1333  					if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION)
1334  					{
1335  						newList.push_back ({ident, session->GetRelayTag ()});
1336  						if (tag != session->GetRelayTag ())
1337  						{
1338  							LogPrint (eLogDebug, "SSU2: Introducer session to  ", session->GetIdentHashBase64() , " was replaced. iTag ", tag, "->", session->GetRelayTag ());
1339  							i2p::context.UpdateSSU2Introducer (ident, v4, session->GetRelayTag (),
1340  								session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION);
1341  						}
1342  					}
1343  					else
1344  					{
1345  						impliedList.push_back ({ident, session->GetRelayTag ()}); // keep in introducers list, but not publish
1346  						session = nullptr;
1347  					}
1348  				}
1349  				else
1350  					session = nullptr;
1351  			}
1352  
1353  			if (!session)
1354  				i2p::context.RemoveSSU2Introducer (ident, v4);
1355  		}
1356  		int numOldSessions = 0;
1357  		if (newList.size () < SSU2_MAX_NUM_INTRODUCERS)
1358  		{
1359  			auto sessions = FindIntroducers (SSU2_MAX_NUM_INTRODUCERS - newList.size (), v4, excluded);
1360  			if (sessions.empty () && !impliedList.empty ())
1361  			{
1362  				LogPrint (eLogDebug, "SSU2: No new introducers found. Trying to reuse existing");
1363  				for (const auto& it : impliedList)
1364  				{
1365  					auto session = FindSession (it.first);
1366  					if (session)
1367  					{
1368  						if (std::find_if (newList.begin (), newList.end (),
1369  						    [&ident = it.first](const auto& s){ return ident == s.first; }) == newList.end ())
1370  						{
1371  							sessions.push_back (session);
1372  							numOldSessions++;
1373  						}
1374  					}
1375  				}
1376  				impliedList.clear ();
1377  			}
1378  
1379  			for (const auto& it : sessions)
1380  			{
1381  				uint32_t tag = it->GetRelayTag ();
1382  				uint32_t exp = it->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION;
1383  				if (!tag && ts >= exp)
1384  					continue; // don't publish expired introducer
1385  				i2p::data::RouterInfo::Introducer introducer;
1386  				introducer.iTag = tag;
1387  				introducer.iH = it->GetRemoteIdentity ()->GetIdentHash ();
1388  				introducer.iExp = exp;
1389  				excluded.insert (it->GetRemoteIdentity ()->GetIdentHash ());
1390  				if (i2p::context.AddSSU2Introducer (introducer, v4))
1391  				{
1392  					LogPrint (eLogDebug, "SSU2: Introducer added ", it->GetRelayTag (), " at ",
1393  						i2p::data::GetIdentHashAbbreviation (it->GetRemoteIdentity ()->GetIdentHash ()));
1394  					newList.push_back ({ it->GetRemoteIdentity ()->GetIdentHash (), tag });
1395  					it->SendKeepAlive ();
1396  					if (newList.size () >= SSU2_MAX_NUM_INTRODUCERS) break;
1397  				}
1398  			}
1399  		}
1400  		introducers = newList;
1401  
1402  		if (introducers.size () < SSU2_MAX_NUM_INTRODUCERS || numOldSessions)
1403  		{
1404  			// we need to create more sessions with relay tag
1405  
1406  			// exclude all existing sessions
1407  			excluded.clear ();
1408  			{
1409  				std::lock_guard<std::mutex> l(m_SessionsByRouterHashMutex);
1410  				for (const auto& [ident, s] : m_SessionsByRouterHash)
1411  					excluded.insert (ident);
1412  			}
1413  
1414  			// session about to expire are not counted
1415  			for (auto i = introducers.size (); i < SSU2_MAX_NUM_INTRODUCERS + numOldSessions; i++)
1416  			{
1417  				auto introducer = i2p::data::netdb.GetRandomSSU2Introducer (v4, excluded);
1418  				if (introducer)
1419  				{
1420  					auto address = v4 ? introducer->GetSSU2V4Address () : introducer->GetSSU2V6Address ();
1421  					if (address)
1422  					{
1423  						CreateSession (introducer, address);
1424  						excluded.insert (introducer->GetIdentHash ());
1425  					}
1426  				}
1427  				else
1428  				{
1429  					LogPrint (eLogDebug, "SSU2: Can't find more introducers");
1430  					break;
1431  				}
1432  			}
1433  		}
1434  		introducers.splice (introducers.end (), impliedList);  // insert non-published, but non-expired introducers back
1435  	}
1436  
1437  	void SSU2Server::ScheduleIntroducersUpdateTimer ()
1438  	{
1439  		if (m_IsPublished)
1440  		{
1441  			m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(
1442  				SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE));
1443  			m_IntroducersUpdateTimer.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer,
1444  				this, std::placeholders::_1, true));
1445  		}
1446  	}
1447  
1448  	void SSU2Server::RescheduleIntroducersUpdateTimer ()
1449  	{
1450  		if (m_IsPublished)
1451  		{
1452  			m_IntroducersUpdateTimer.cancel ();
1453  			i2p::context.ClearSSU2Introducers (true);
1454  			m_Introducers.clear ();
1455  			m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(
1456  				(SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)/2));
1457  			m_IntroducersUpdateTimer.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer,
1458  				this, std::placeholders::_1, true));
1459  		}
1460  	}
1461  
1462  	void SSU2Server::ScheduleIntroducersUpdateTimerV6 ()
1463  	{
1464  		if (m_IsPublished)
1465  		{
1466  			m_IntroducersUpdateTimerV6.expires_from_now (boost::posix_time::seconds(
1467  				SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE));
1468  			m_IntroducersUpdateTimerV6.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer,
1469  				this, std::placeholders::_1, false));
1470  		}
1471  	}
1472  
1473  	void SSU2Server::RescheduleIntroducersUpdateTimerV6 ()
1474  	{
1475  		if (m_IsPublished)
1476  		{
1477  			m_IntroducersUpdateTimerV6.cancel ();
1478  			i2p::context.ClearSSU2Introducers (false);
1479  			m_IntroducersV6.clear ();
1480  			m_IntroducersUpdateTimerV6.expires_from_now (boost::posix_time::seconds(
1481  				(SSU2_KEEP_ALIVE_INTERVAL + m_Rng () % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)/2));
1482  			m_IntroducersUpdateTimerV6.async_wait (std::bind (&SSU2Server::HandleIntroducersUpdateTimer,
1483  				this, std::placeholders::_1, false));
1484  		}
1485  	}
1486  
1487  	void SSU2Server::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode, bool v4)
1488  	{
1489  		if (ecode != boost::asio::error::operation_aborted)
1490  		{
1491  			// timeout expired
1492  			if (v4)
1493  			{
1494  				if (i2p::context.GetTesting ())
1495  				{
1496  					// we still don't know if we need introducers
1497  					ScheduleIntroducersUpdateTimer ();
1498  					return;
1499  				}
1500  				if (i2p::context.GetStatus () != eRouterStatusFirewalled)
1501  				{
1502  					// we don't need introducers
1503  					i2p::context.ClearSSU2Introducers (true);
1504  					m_Introducers.clear ();
1505  					return;
1506  				}
1507  				// we are firewalled
1508  				auto addr = i2p::context.GetRouterInfo ().GetSSU2V4Address ();
1509  				if (addr && addr->ssu && addr->ssu->introducers.empty ())
1510  					i2p::context.SetUnreachable (true, false); // v4
1511  
1512  				UpdateIntroducers (true);
1513  				ScheduleIntroducersUpdateTimer ();
1514  			}
1515  			else
1516  			{
1517  				if (i2p::context.GetTestingV6 ())
1518  				{
1519  					// we still don't know if we need introducers
1520  					ScheduleIntroducersUpdateTimerV6 ();
1521  					return;
1522  				}
1523  				if (i2p::context.GetStatusV6 () != eRouterStatusFirewalled)
1524  				{
1525  					// we don't need introducers
1526  					i2p::context.ClearSSU2Introducers (false);
1527  					m_IntroducersV6.clear ();
1528  					return;
1529  				}
1530  				// we are firewalled
1531  				auto addr = i2p::context.GetRouterInfo ().GetSSU2V6Address ();
1532  				if (addr && addr->ssu && addr->ssu->introducers.empty ())
1533  					i2p::context.SetUnreachable (false, true); // v6
1534  
1535  				UpdateIntroducers (false);
1536  				ScheduleIntroducersUpdateTimerV6 ();
1537  			}
1538  		}
1539  	}
1540  
1541  	bool SSU2Server::AEADChaCha20Poly1305Encrypt (const uint8_t * msg, size_t msgLen,
1542  		const uint8_t * ad, size_t adLen, const uint8_t * key, const uint8_t * nonce, uint8_t * buf, size_t len)
1543  	{
1544  		return m_Encryptor.Encrypt (msg, msgLen, ad, adLen, key, nonce, buf, len);
1545  	}
1546  
1547  	bool SSU2Server::AEADChaCha20Poly1305Decrypt (const uint8_t * msg, size_t msgLen,
1548  		const uint8_t * ad, size_t adLen, const uint8_t * key, const uint8_t * nonce, uint8_t * buf, size_t len)
1549  	{
1550  		return m_Decryptor.Decrypt (msg, msgLen, ad, adLen, key, nonce, buf, len);
1551  	}
1552  
1553  	void SSU2Server::ChaCha20 (const uint8_t * msg, size_t msgLen, const uint8_t * key, const uint8_t * nonce, uint8_t * out)
1554  	{
1555  		m_ChaCha20 (msg, msgLen, key, nonce, out);
1556  	}
1557  
1558  	void SSU2Server::SendThroughProxy (const uint8_t * header, size_t headerLen, const uint8_t * headerX, size_t headerXLen,
1559  		const uint8_t * payload, size_t payloadLen, const boost::asio::ip::udp::endpoint& to)
1560  	{
1561  		if (!m_ProxyRelayEndpoint) return;
1562  		size_t requestHeaderSize = 0;
1563  		memset (m_UDPRequestHeader, 0, 3);
1564  		if (to.address ().is_v6 ())
1565  		{
1566  			m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV6;
1567  			memcpy (m_UDPRequestHeader + 4, to.address ().to_v6().to_bytes().data(), 16);
1568  			requestHeaderSize = SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE;
1569  		}
1570  		else
1571  		{
1572  			m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV4;
1573  			memcpy (m_UDPRequestHeader + 4, to.address ().to_v4().to_bytes().data(), 4);
1574  			requestHeaderSize = SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE;
1575  		}
1576  		htobe16buf (m_UDPRequestHeader + requestHeaderSize - 2, to.port ());
1577  
1578  		std::vector<boost::asio::const_buffer> bufs;
1579  		bufs.push_back (boost::asio::buffer (m_UDPRequestHeader, requestHeaderSize));
1580  		bufs.push_back (boost::asio::buffer (header, headerLen));
1581  		if (headerX) bufs.push_back (boost::asio::buffer (headerX, headerXLen));
1582  		bufs.push_back (boost::asio::buffer (payload, payloadLen));
1583  
1584  		boost::system::error_code ec;
1585  		m_SocketV4.send_to (bufs, *m_ProxyRelayEndpoint, 0, ec); // TODO: implement ipv6 proxy
1586  		if (!ec)
1587  			i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen);
1588  		else
1589  			LogPrint (eLogError, "SSU2: Send exception: ", ec.message (), " to ", to);
1590  	}
1591  
1592  	void SSU2Server::ProcessNextPacketFromProxy (uint8_t * buf, size_t len)
1593  	{
1594  		if (buf[2]) // FRAG
1595  		{
1596  			LogPrint (eLogWarning, "SSU2: Proxy packet fragmentation is not supported");
1597  			return;
1598  		}
1599  		size_t offset = 0;
1600  		boost::asio::ip::udp::endpoint ep;
1601  		switch (buf[3]) // ATYP
1602  		{
1603  			case SOCKS5_ATYP_IPV4:
1604  			{
1605  				offset = SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE;
1606  				if (offset > len) return;
1607  				boost::asio::ip::address_v4::bytes_type bytes;
1608  				memcpy (bytes.data (), buf + 4, 4);
1609  				uint16_t port = bufbe16toh (buf + 8);
1610  				ep = boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4 (bytes), port);
1611  				break;
1612  			}
1613  			case SOCKS5_ATYP_IPV6:
1614  			{
1615  				offset = SOCKS5_UDP_IPV6_REQUEST_HEADER_SIZE;
1616  				if (offset > len) return;
1617  				boost::asio::ip::address_v6::bytes_type bytes;
1618  				memcpy (bytes.data (), buf + 4, 16);
1619  				uint16_t port = bufbe16toh (buf + 20);
1620  				ep = boost::asio::ip::udp::endpoint (boost::asio::ip::address_v6 (bytes), port);
1621  				break;
1622  			}
1623  			default:
1624  			{
1625  				LogPrint (eLogWarning, "SSU2: Unknown ATYP ", (int)buf[3], " from proxy relay");
1626  				return;
1627  			}
1628  		}
1629  		ProcessNextPacket (buf + offset, len - offset, ep);
1630  	}
1631  
1632  	void SSU2Server::ConnectToProxy ()
1633  	{
1634  		if (!m_ProxyEndpoint) return;
1635  		m_UDPAssociateSocket.reset (new boost::asio::ip::tcp::socket (m_ReceiveService.GetService ()));
1636  		m_UDPAssociateSocket->async_connect (*m_ProxyEndpoint,
1637  		    [this] (const boost::system::error_code& ecode)
1638  			{
1639  				if (ecode)
1640  				{
1641  					LogPrint (eLogError, "SSU2: Can't connect to proxy ", *m_ProxyEndpoint, " ", ecode.message ());
1642  					m_UDPAssociateSocket.reset (nullptr);
1643  					ReconnectToProxy ();
1644  				}
1645  				else
1646  					HandshakeWithProxy ();
1647  			});
1648  	}
1649  
1650  	void SSU2Server::HandshakeWithProxy ()
1651  	{
1652  		if (!m_UDPAssociateSocket) return;
1653  		m_UDPRequestHeader[0] = SOCKS5_VER;
1654  		m_UDPRequestHeader[1] = 1; // 1 method
1655  		m_UDPRequestHeader[2] = 0; // no authentication
1656  		boost::asio::async_write (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, 3), boost::asio::transfer_all(),
1657  			[this] (const boost::system::error_code& ecode, std::size_t bytes_transferred)
1658  			{
1659  				(void) bytes_transferred;
1660  				if (ecode)
1661  				{
1662  					LogPrint(eLogError, "SSU2: Proxy write error ", ecode.message());
1663  					m_UDPAssociateSocket.reset (nullptr);
1664  					ReconnectToProxy ();
1665  				}
1666  				else
1667  					ReadHandshakeWithProxyReply ();
1668  			});
1669  	}
1670  
1671  	void SSU2Server::ReadHandshakeWithProxyReply ()
1672  	{
1673  		if (!m_UDPAssociateSocket) return;
1674  		boost::asio::async_read (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, 2), boost::asio::transfer_all(),
1675  			[this] (const boost::system::error_code& ecode, std::size_t bytes_transferred)
1676  			{
1677  				(void) bytes_transferred;
1678  				if (ecode)
1679  				{
1680  					LogPrint(eLogError, "SSU2: Proxy read error ", ecode.message());
1681  					m_UDPAssociateSocket.reset (nullptr);
1682  					ReconnectToProxy ();
1683  				}
1684  				else
1685  				{
1686  					if (m_UDPRequestHeader[0] == SOCKS5_VER && !m_UDPRequestHeader[1])
1687  						SendUDPAssociateRequest ();
1688  					else
1689  					{
1690  						LogPrint(eLogError, "SSU2: Invalid proxy reply");
1691  						m_UDPAssociateSocket.reset (nullptr);
1692  					}
1693  				}
1694  			});
1695  	}
1696  
1697  	void SSU2Server::SendUDPAssociateRequest ()
1698  	{
1699  		if (!m_UDPAssociateSocket) return;
1700  		m_UDPRequestHeader[0] = SOCKS5_VER;
1701  		m_UDPRequestHeader[1] = SOCKS5_CMD_UDP_ASSOCIATE;
1702  		m_UDPRequestHeader[2] = 0; // RSV
1703  		m_UDPRequestHeader[3] = SOCKS5_ATYP_IPV4; // TODO: implement ipv6 proxy
1704  		memset (m_UDPRequestHeader + 4, 0, 6); // address and port all zeros
1705  		boost::asio::async_write (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE), boost::asio::transfer_all(),
1706  			[this] (const boost::system::error_code& ecode, std::size_t bytes_transferred)
1707  			{
1708  				(void) bytes_transferred;
1709  				if (ecode)
1710  				{
1711  					LogPrint(eLogError, "SSU2: Proxy write error ", ecode.message());
1712  					m_UDPAssociateSocket.reset (nullptr);
1713  					ReconnectToProxy ();
1714  				}
1715  				else
1716  					ReadUDPAssociateReply ();
1717  			});
1718  	}
1719  
1720  	void SSU2Server::ReadUDPAssociateReply ()
1721  	{
1722  		if (!m_UDPAssociateSocket) return;
1723  		boost::asio::async_read (*m_UDPAssociateSocket, boost::asio::buffer (m_UDPRequestHeader, SOCKS5_UDP_IPV4_REQUEST_HEADER_SIZE), boost::asio::transfer_all(),
1724  			[this] (const boost::system::error_code& ecode, std::size_t bytes_transferred)
1725  			{
1726  				(void) bytes_transferred;
1727  				if (ecode)
1728  				{
1729  					LogPrint(eLogError, "SSU2: Proxy read error ", ecode.message());
1730  					m_UDPAssociateSocket.reset (nullptr);
1731  					ReconnectToProxy ();
1732  				}
1733  				else
1734  				{
1735  					if (m_UDPRequestHeader[0] == SOCKS5_VER && !m_UDPRequestHeader[1])
1736  					{
1737  						if (m_UDPRequestHeader[3] == SOCKS5_ATYP_IPV4)
1738  						{
1739  							boost::asio::ip::address_v4::bytes_type bytes;
1740  							memcpy (bytes.data (), m_UDPRequestHeader + 4, 4);
1741  							uint16_t port = bufbe16toh (m_UDPRequestHeader + 8);
1742  							m_ProxyRelayEndpoint.reset (new boost::asio::ip::udp::endpoint (boost::asio::ip::address_v4 (bytes), port));
1743  							m_SocketV4.open (boost::asio::ip::udp::v4 ());
1744  							Receive (m_SocketV4);
1745  							ReadUDPAssociateSocket ();
1746  						}
1747  						else
1748  						{
1749  							LogPrint(eLogError, "SSU2: Proxy UDP associate unsupported ATYP ", (int)m_UDPRequestHeader[3]);
1750  							m_UDPAssociateSocket.reset (nullptr);
1751  						}
1752  					}
1753  					else
1754  					{
1755  						LogPrint(eLogError, "SSU2: Proxy UDP associate error ", (int)m_UDPRequestHeader[1]);
1756  						m_UDPAssociateSocket.reset (nullptr);
1757  					}
1758  				}
1759  			});
1760  	}
1761  
1762  	void SSU2Server::ReadUDPAssociateSocket ()
1763  	{
1764  		if (!m_UDPAssociateSocket) return;
1765  		m_UDPAssociateSocket->async_read_some (boost::asio::buffer (m_UDPRequestHeader, 1),
1766  			[this] (const boost::system::error_code& ecode, std::size_t bytes_transferred)
1767  			{
1768  				(void) bytes_transferred;
1769  				if (ecode)
1770  				{
1771  					LogPrint(eLogWarning, "SSU2: Proxy UDP Associate socket error ", ecode.message());
1772  					m_UDPAssociateSocket.reset (nullptr);
1773  					m_ProxyRelayEndpoint.reset (nullptr);
1774  					m_SocketV4.close ();
1775  					ConnectToProxy (); // try to reconnect immediately
1776  				}
1777  				else
1778  					ReadUDPAssociateSocket ();
1779  			});
1780  	}
1781  
1782  	void SSU2Server::ReconnectToProxy ()
1783  	{
1784  		LogPrint(eLogInfo, "SSU2: Reconnect to proxy after ", SSU2_PROXY_CONNECT_RETRY_TIMEOUT, " seconds");
1785  		if (m_ProxyConnectRetryTimer)
1786  			m_ProxyConnectRetryTimer->cancel ();
1787  		else
1788  			m_ProxyConnectRetryTimer.reset (new boost::asio::deadline_timer (m_ReceiveService.GetService ()));
1789  		m_ProxyConnectRetryTimer->expires_from_now (boost::posix_time::seconds (SSU2_PROXY_CONNECT_RETRY_TIMEOUT));
1790  		m_ProxyConnectRetryTimer->async_wait (
1791  			[this](const boost::system::error_code& ecode)
1792  			{
1793  				if (ecode != boost::asio::error::operation_aborted)
1794  				{
1795  					m_UDPAssociateSocket.reset (nullptr);
1796  					m_ProxyRelayEndpoint.reset (nullptr);
1797  					LogPrint(eLogInfo, "SSU2: Reconnecting to proxy");
1798  					ConnectToProxy ();
1799  				}
1800  			});
1801  	}
1802  
1803  	bool SSU2Server::SetProxy (const std::string& address, uint16_t port)
1804  	{
1805  		boost::system::error_code ecode;
1806  		auto addr = boost::asio::ip::make_address (address, ecode);
1807  		if (!ecode && !addr.is_unspecified () && port)
1808  		{
1809  			m_IsThroughProxy = true;
1810  			m_ProxyEndpoint.reset (new boost::asio::ip::tcp::endpoint (addr, port));
1811  		}
1812  		else
1813  		{
1814  			if (ecode)
1815  				LogPrint (eLogError, "SSU2: Invalid proxy address ", address, " ", ecode.message());
1816  			return false;
1817  		}
1818  		return true;
1819  	}
1820  }
1821  }