/ libi2pd_client / UDPTunnel.cpp
UDPTunnel.cpp
  1  /*
  2  * Copyright (c) 2013-2026, The PurpleI2P Project
  3  *
  4  * This file is part of Purple i2pd project and licensed under BSD3
  5  *
  6  * See full license text in LICENSE file at top of project tree
  7  */
  8  
  9  #include <string_view>
 10  #include "Log.h"
 11  #include "util.h"
 12  #include "ClientContext.h"
 13  #include "I2PTunnel.h" // for GetLoopbackAddressFor
 14  #include "UDPTunnel.h"
 15  
 16  namespace i2p
 17  {
 18  namespace client
 19  {
 20  	constexpr std::string_view UDP_SESSION_SEQN { "seqn" };
 21  	constexpr std::string_view UDP_SESSION_ACKED { "acked" };
 22  	constexpr std::string_view UDP_SESSION_FLAGS { "flags" };
 23  
 24  	constexpr uint8_t UDP_SESSION_FLAG_RESET_PATH = 0x01;
 25  	constexpr uint8_t UDP_SESSION_FLAG_ACK_REQUESTED = 0x02;
 26  
 27  	void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort,
 28  		const uint8_t * buf, size_t len, const i2p::util::Mapping * options)
 29  	{
 30  		if (!m_LastSession || m_LastSession->Identity.GetLL()[0] != from.GetIdentHash ().GetLL()[0] || (fromPort && fromPort != m_LastSession->RemotePort))
 31  			m_LastSession = ObtainUDPSession(from, toPort, fromPort);
 32  		boost::system::error_code ec;
 33  		if (len > 0)
 34  			m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint, 0, ec);
 35  		if (!ec)
 36  			m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch();
 37  		else
 38  			LogPrint (eLogInfo, "UDP Server: Send exception: ", ec.message (), " to ", m_RemoteEndpoint);
 39  		if (options)
 40  		{
 41  			uint32_t seqn = 0;
 42  			if (options->Get (UDP_SESSION_SEQN, seqn) && seqn > m_LastSession->m_LastReceivedPacketNum)
 43  				m_LastSession->m_LastReceivedPacketNum = seqn;
 44  			uint8_t flags = 0;
 45  			if (options->Get (UDP_SESSION_FLAGS, flags))
 46  			{
 47  				if (flags & UDP_SESSION_FLAG_RESET_PATH)
 48  					m_LastSession->GetDatagramSession ()->DropSharedRoutingPath ();
 49  				if (flags & UDP_SESSION_FLAG_ACK_REQUESTED)
 50  				{
 51  					i2p::util::Mapping replyOptions;
 52  					replyOptions.Put (UDP_SESSION_ACKED, m_LastSession->m_LastReceivedPacketNum);
 53  					m_LastSession->m_Destination->SendDatagram(m_LastSession->GetDatagramSession (),
 54  						nullptr, 0, m_LastSession->LocalPort, m_LastSession->RemotePort, &replyOptions); // Ack only, no payload
 55  					m_LastSession->LastRepliableDatagramTime = i2p::util::GetMillisecondsSinceEpoch ();
 56  				}
 57  			}
 58  			if (options->Get (UDP_SESSION_ACKED, seqn))
 59  				m_LastSession->Acked (seqn);
 60  		}
 61  	}
 62  
 63  	void I2PUDPServerTunnel::HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
 64  	{
 65  		if (m_LastSession && (fromPort != m_LastSession->RemotePort || toPort != m_LastSession->LocalPort))
 66  		{
 67  			std::lock_guard<std::mutex> lock(m_SessionsMutex);
 68  			auto it = m_Sessions.find (GetSessionIndex (fromPort, toPort));
 69  			if (it != m_Sessions.end ())
 70  				m_LastSession = it->second;
 71  			else
 72  				m_LastSession = nullptr;
 73  		}
 74  		if (m_LastSession)
 75  		{
 76  			boost::system::error_code ec;
 77  			m_LastSession->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint, 0, ec);
 78  			if (!ec)
 79  				m_LastSession->LastActivity = i2p::util::GetMillisecondsSinceEpoch();
 80  			else
 81  				LogPrint (eLogInfo, "UDP Server: Send exception: ", ec.message (), " to ", m_RemoteEndpoint);
 82  		}
 83  	}
 84  
 85  	void I2PUDPServerTunnel::ExpireStale(const uint64_t delta)
 86  	{
 87  		std::lock_guard<std::mutex> lock(m_SessionsMutex);
 88  		uint64_t now = i2p::util::GetMillisecondsSinceEpoch();
 89  		auto itr = m_Sessions.begin();
 90  		while(itr != m_Sessions.end())
 91  		{
 92  			if(now - itr->second->LastActivity >= delta )
 93  				itr = m_Sessions.erase(itr);
 94  			else
 95  				itr++;
 96  		}
 97  	}
 98  
 99  	void I2PUDPClientTunnel::ExpireStale(const uint64_t delta)
100  	{
101  		std::lock_guard<std::mutex> lock(m_SessionsMutex);
102  		uint64_t now = i2p::util::GetMillisecondsSinceEpoch();
103  		std::vector<uint16_t> removePorts;
104  		for (const auto & s : m_Sessions) {
105  			if (now - s.second->second >= delta)
106  				removePorts.push_back(s.first);
107  		}
108  		for(auto port : removePorts) {
109  			m_Sessions.erase(port);
110  		}
111  	}
112  
113  	UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort)
114  	{
115  		auto ih = from.GetIdentHash();
116  		auto idx = GetSessionIndex (remotePort, localPort);
117  		{
118  			std::lock_guard<std::mutex> lock(m_SessionsMutex);
119  			auto it = m_Sessions.find (idx);
120  			if (it != m_Sessions.end ())
121  			{
122  				if (it->second->Identity.GetLL()[0] == ih.GetLL()[0])
123  				{
124  					LogPrint(eLogDebug, "UDPServer: Found session ", it->second->IPSocket.local_endpoint(), " ", ih.ToBase32());
125  					return it->second;
126  				}
127  				else
128  				{
129  					LogPrint(eLogWarning, "UDPServer: Session with from ", remotePort, " and to ", localPort, " ports already exists. But from different address. Removed");
130  					m_Sessions.erase (it);
131  				}
132  			}
133  		}
134  
135  		boost::asio::ip::address addr;
136  		/** create new udp session */
137  		if(m_IsUniqueLocal && m_LocalAddress.is_loopback())
138  		{
139  			auto ident = from.GetIdentHash();
140  			addr = GetLoopbackAddressFor(ident);
141  		}
142  		else
143  			addr = m_LocalAddress;
144  
145  		auto s = std::make_shared<UDPSession>(boost::asio::ip::udp::endpoint(addr, 0),
146  			m_LocalDest, m_RemoteEndpoint, ih, localPort, remotePort);
147  		std::lock_guard<std::mutex> lock(m_SessionsMutex);
148  		m_Sessions.emplace (idx, s);
149  		return s;
150  	}
151  
152  	void UDPConnection::Stop ()
153  	{
154  		m_AckTimer.cancel ();
155  	}
156  
157  	void UDPConnection::Acked (uint32_t seqn)
158  	{
159  		if (!m_AckTimerSeqn) seqn = m_UnackedDatagrams.back ().first;	// if we recieve ack after paht change, clear window and send new datagrams
160  		m_IsFirstPacket = false;	// first packet confirmed
161  		if (m_AckTimerSeqn && seqn >= m_AckTimerSeqn)
162  		{
163  			m_AckTimerSeqn = 0;
164  			m_AckTimer.cancel ();
165  		}
166  		if (m_UnackedDatagrams.empty () && seqn < m_UnackedDatagrams.front ().first) return;
167  		bool acknowledged = false;
168  		auto it = m_UnackedDatagrams.begin ();
169  		while (it != m_UnackedDatagrams.end ())
170  		{
171  			if (it->first > seqn) break;
172  			 if (it->first == seqn && m_IsSendingAllowed) // ignore first ack after path change
173  			{
174  				auto rtt = i2p::util::GetMillisecondsSinceEpoch () - it->second;
175  				m_RTT = m_RTT ? (m_RTT + rtt)/2 : rtt;
176  				acknowledged = true;
177  			}
178  			it++;
179  		}
180  		m_UnackedDatagrams.erase (m_UnackedDatagrams.begin (), it);
181  		m_IsSendingAllowed = true; // if we recieve ack after path change, now can send new datagrams
182  		if (acknowledged && !m_UnackedDatagrams.empty ())
183  		{
184  			m_AckTimer.cancel ();
185  			m_AckTimerSeqn = 0;
186  			ScheduleAckTimer (m_UnackedDatagrams.back ().first);
187  		}
188  	}
189  
190  	void UDPConnection::ScheduleAckTimer (uint32_t seqn)
191  	{
192  		if (!m_AckTimerSeqn)
193  		{
194  			m_AckTimerSeqn = seqn;
195  			m_AckTimer.expires_from_now (boost::posix_time::milliseconds (m_RTT ? 2*m_RTT : I2P_UDP_MAX_UNACKED_DATAGRAM_TIME));
196  			m_AckTimer.async_wait ([this](const boost::system::error_code& ecode)
197  				{
198  					if (ecode != boost::asio::error::operation_aborted)
199  					{
200  						LogPrint (eLogInfo, "UDP Connection: Packet ", m_AckTimerSeqn, " was not acked");
201  //						DeleteExpiredUnackedDatagrams ();
202  						m_IsSendingAllowed = false; // stop sending datagrams
203  						m_AckTimerSeqn = 0;
204  						m_RTT = 0;
205  						if (!m_UnackedDatagrams.empty ()) ScheduleAckTimer (0); // try again if failed
206  						// send empty packet with reset path flag
207  						i2p::util::Mapping options;
208  						options.Put (UDP_SESSION_FLAGS, UDP_SESSION_FLAG_RESET_PATH | UDP_SESSION_FLAG_ACK_REQUESTED);
209  						auto session = GetDatagramSession ();
210  						session->DropSharedRoutingPath ();
211  						GetDatagramDestination ()->SendDatagram (session, nullptr, 0, 0, 0, &options);
212  					}
213  				});
214  		}
215  	}
216  
217  	void UDPConnection::DeleteExpiredUnackedDatagrams ()
218  	{
219  		if (m_UnackedDatagrams.empty ()) return;
220  		auto expired  = i2p::util::GetMillisecondsSinceEpoch () - (m_RTT ? 2*m_RTT : I2P_UDP_MAX_UNACKED_DATAGRAM_TIME);
221  		auto it = m_UnackedDatagrams.begin ();
222  		while (it != m_UnackedDatagrams.end ())
223  		{
224  			if (it->second < expired) break;
225  			it++;
226  		}
227  		m_UnackedDatagrams.erase (m_UnackedDatagrams.begin (), it);
228  	}
229  
230  	UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint,
231  		const std::shared_ptr<i2p::client::ClientDestination> & localDestination,
232  		const boost::asio::ip::udp::endpoint& endpoint, const i2p::data::IdentHash& to,
233  		uint16_t ourPort, uint16_t theirPort) :
234  		UDPConnection (localDestination->GetService()),
235  		m_Destination(localDestination->GetDatagramDestination()),
236  		IPSocket(localDestination->GetService(), localEndpoint), Identity (to),
237  		SendEndpoint(endpoint), LastActivity(i2p::util::GetMillisecondsSinceEpoch()),
238  		LastRepliableDatagramTime (0), LocalPort(ourPort), RemotePort(theirPort)
239  	{
240  		Start ();
241  		IPSocket.set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU ));
242  		IPSocket.non_blocking (true);
243  		Receive();
244  	}
245  
246  	void UDPSession::Receive()
247  	{
248  		LogPrint(eLogDebug, "UDPSession: Receive");
249  		IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU),
250  			FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2));
251  	}
252  
253  	void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len)
254  	{
255  		if(!ecode)
256  		{
257  			if (!m_UnackedDatagrams.empty () && m_NextSendPacketNum > m_UnackedDatagrams.front ().first + I2P_UDP_MAX_NUM_UNACKED_DATAGRAMS)
258  			{
259  				// window is full, drop packet
260  				Receive ();
261  				return;
262  			}
263  			LogPrint(eLogDebug, "UDPSession: Forward ", len, "B from ", FromEndpoint);
264  			auto ts = i2p::util::GetMillisecondsSinceEpoch();
265  			auto session = GetDatagramSession ();
266  			uint64_t repliableDatagramInterval = I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL;
267  			if (m_RTT && m_RTT >= I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL && m_RTT < I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL*10) repliableDatagramInterval = m_RTT/10; // 10 - 100 ms
268  			if (ts > LastRepliableDatagramTime + repliableDatagramInterval)
269  			{
270  				if (session->GetVersion () == i2p::datagram::eDatagramV3)
271  				{
272  					uint8_t flags = 0;
273  					if (!m_RTT || !m_AckTimerSeqn || (!m_UnackedDatagrams.empty () &&
274  						ts > m_UnackedDatagrams.back ().second + repliableDatagramInterval)) // last ack request
275  					{
276  						flags |= UDP_SESSION_FLAG_ACK_REQUESTED;
277  						m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts });
278  						ScheduleAckTimer (m_NextSendPacketNum);
279  					}
280  					i2p::util::Mapping options;
281  					options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum);
282  					if (m_LastReceivedPacketNum > 0)
283  						options.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum);
284  					if (flags)
285  						options.Put (UDP_SESSION_FLAGS, flags);
286  					m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort, &options);
287  					ScheduleAckTimer (m_NextSendPacketNum);
288  				}
289  				else
290  					m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort);
291  				LastRepliableDatagramTime = ts;
292  			}
293  			else
294  				m_Destination->SendRawDatagram(session, m_Buffer, len, LocalPort, RemotePort);
295  			size_t numPackets = 0;
296  			while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE)
297  			{
298  				boost::system::error_code ec;
299  				size_t moreBytes = IPSocket.available(ec);
300  				if (ec || !moreBytes) break;
301  				len = IPSocket.receive_from (boost::asio::buffer (m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, 0, ec);
302  				m_Destination->SendRawDatagram (session, m_Buffer, len, LocalPort, RemotePort);
303  				numPackets++;
304  			}
305  			if (numPackets > 0)
306  				LogPrint(eLogDebug, "UDPSession: Forward more ", numPackets, "packets B from ", FromEndpoint);
307  			m_NextSendPacketNum += numPackets + 1;
308  			m_Destination->FlushSendQueue (session);
309  			LastActivity = ts;
310  			Receive();
311  		}
312  		else
313  			LogPrint(eLogError, "UDPSession: ", ecode.message());
314  	}
315  
316  	std::shared_ptr<i2p::datagram::DatagramSession> UDPSession::GetDatagramSession ()
317  	{
318  		auto session = m_LastDatagramSession.lock ();
319  		if (!session)
320  		{
321  			session = m_Destination->GetSession (Identity);
322  			m_LastDatagramSession = session;
323  		}
324  		return session;
325  	}
326  
327  	I2PUDPServerTunnel::I2PUDPServerTunnel (const std::string & name, std::shared_ptr<i2p::client::ClientDestination> localDestination,
328  		const boost::asio::ip::address& localAddress, const boost::asio::ip::udp::endpoint& forwardTo, uint16_t inPort, bool gzip) :
329  		m_IsUniqueLocal (true), m_Name (name), m_LocalAddress (localAddress),
330  		m_RemoteEndpoint (forwardTo), m_LocalDest (localDestination), m_inPort(inPort), m_Gzip (gzip)
331  	{
332  	}
333  
334  	I2PUDPServerTunnel::~I2PUDPServerTunnel ()
335  	{
336  		Stop ();
337  	}
338  
339  	void I2PUDPServerTunnel::Start ()
340  	{
341  		m_LocalDest->Start ();
342  
343  		auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip);
344  		dgram->SetReceiver (
345  			std::bind (&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2,
346  				std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6),
347  			m_inPort
348  		);
349  		dgram->SetRawReceiver (
350  			std::bind (&I2PUDPServerTunnel::HandleRecvFromI2PRaw, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
351  			m_inPort
352  		);
353  	}
354  
355  	void I2PUDPServerTunnel::Stop ()
356  	{
357  		auto dgram = m_LocalDest->GetDatagramDestination ();
358  		if (dgram) {
359  			dgram->ResetReceiver (m_inPort);
360  			dgram->ResetRawReceiver (m_inPort);
361  		}
362  		m_Sessions.clear ();
363  	}
364  
365  	std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPServerTunnel::GetSessions ()
366  	{
367  		std::vector<std::shared_ptr<DatagramSessionInfo> > sessions;
368  		std::lock_guard<std::mutex> lock (m_SessionsMutex);
369  
370          for (const auto &it: m_Sessions)
371  		{
372  			auto s = it.second;
373  			if (!s->m_Destination) continue;
374  			auto info = s->m_Destination->GetInfoForRemote (s->Identity);
375  			if (!info) continue;
376  
377  			auto sinfo = std::make_shared<DatagramSessionInfo> ();
378  			sinfo->Name = m_Name;
379  			sinfo->LocalIdent = std::make_shared<i2p::data::IdentHash> (m_LocalDest->GetIdentHash ().data ());
380  			sinfo->RemoteIdent = std::make_shared<i2p::data::IdentHash> (s->Identity.data ());
381  			sinfo->CurrentIBGW = info->IBGW;
382  			sinfo->CurrentOBEP = info->OBEP;
383  			sessions.push_back (sinfo);
384  		}
385  		return sessions;
386  	}
387  
388  	I2PUDPClientTunnel::I2PUDPClientTunnel (const std::string & name, const std::string &remoteDest,
389  		const boost::asio::ip::udp::endpoint& localEndpoint,
390  		std::shared_ptr<i2p::client::ClientDestination> localDestination,
391  		uint16_t remotePort, bool gzip, i2p::datagram::DatagramVersion datagramVersion) :
392  		UDPConnection (localDestination->GetService ()),
393  		m_Name (name), m_RemoteDest (remoteDest), m_LocalDest (localDestination), m_LocalEndpoint (localEndpoint),
394  		m_ResolveThread (nullptr), m_LocalSocket (nullptr), RemotePort (remotePort),
395  		m_LastPort (0), m_cancel_resolve (false), m_Gzip (gzip), m_DatagramVersion (datagramVersion),
396  		m_LastRepliableDatagramTime (0)
397  	{
398  	}
399  
400  	I2PUDPClientTunnel::~I2PUDPClientTunnel ()
401  	{
402  		Stop ();
403  	}
404  
405  	void I2PUDPClientTunnel::Start ()
406  	{
407  		UDPConnection::Start ();
408  		// Reset flag in case of tunnel reload
409  		if (m_cancel_resolve) m_cancel_resolve = false;
410  
411  		m_LocalSocket.reset (new boost::asio::ip::udp::socket (m_LocalDest->GetService (), m_LocalEndpoint));
412  		m_LocalSocket->set_option (boost::asio::socket_base::receive_buffer_size (I2P_UDP_MAX_MTU));
413  		m_LocalSocket->set_option (boost::asio::socket_base::reuse_address (true));
414  		m_LocalSocket->non_blocking (true);
415  
416  		auto dgram = m_LocalDest->CreateDatagramDestination (m_Gzip, m_DatagramVersion);
417  		dgram->SetReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2P, this,
418  			std::placeholders::_1, std::placeholders::_2,
419  			std::placeholders::_3, std::placeholders::_4,
420  			std::placeholders::_5, std::placeholders::_6),
421  			RemotePort
422  		);
423  		dgram->SetRawReceiver (std::bind (&I2PUDPClientTunnel::HandleRecvFromI2PRaw, this,
424  			std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
425  			RemotePort
426  		);
427  
428  		m_LocalDest->Start ();
429  		if (m_ResolveThread == nullptr)
430  			m_ResolveThread = new std::thread (std::bind (&I2PUDPClientTunnel::TryResolving, this));
431  		RecvFromLocal ();
432  	}
433  
434  	void I2PUDPClientTunnel::Stop ()
435  	{
436  		auto dgram = m_LocalDest->GetDatagramDestination ();
437  		if (dgram) {
438  			dgram->ResetReceiver (RemotePort);
439  			dgram->ResetRawReceiver (RemotePort);
440  		}
441  		m_cancel_resolve = true;
442  
443  		m_Sessions.clear();
444  
445  		if(m_LocalSocket && m_LocalSocket->is_open ())
446  			m_LocalSocket->close ();
447  
448  		if(m_ResolveThread)
449  		{
450  			m_ResolveThread->join ();
451  			delete m_ResolveThread;
452  			m_ResolveThread = nullptr;
453  		}
454  		m_RemoteAddr = nullptr;
455  		UDPConnection::Stop ();
456  	}
457  
458  	void I2PUDPClientTunnel::RecvFromLocal ()
459  	{
460  		m_LocalSocket->async_receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU),
461  			m_RecvEndpoint, std::bind (&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2));
462  	}
463  
464  	void I2PUDPClientTunnel::HandleRecvFromLocal (const boost::system::error_code & ec, std::size_t transferred)
465  	{
466  		if (m_cancel_resolve) {
467  			LogPrint (eLogDebug, "UDP Client: Ignoring incoming data: stopping");
468  			return;
469  		}
470  		if (ec) {
471  			LogPrint (eLogError, "UDP Client: Reading from socket error: ", ec.message (), ". Restarting listener...");
472  			RecvFromLocal (); // Restart listener and continue work
473  			return;
474  		}
475  		if (!m_RemoteAddr || !m_RemoteAddr->IsIdentHash ())  // TODO: handle B33
476  		{
477  			LogPrint (eLogWarning, "UDP Client: Remote endpoint not resolved yet");
478  			RecvFromLocal ();
479  			return; // drop, remote not resolved
480  		}
481  		if ((!m_UnackedDatagrams.empty () && m_NextSendPacketNum > m_UnackedDatagrams.front ().first + I2P_UDP_MAX_NUM_UNACKED_DATAGRAMS) || !m_IsSendingAllowed)
482  		{
483  			// window is full, drop packet
484  			RecvFromLocal ();
485  			return;
486  		}
487  		auto remotePort = m_RecvEndpoint.port ();
488  		if (!m_LastPort || m_LastPort != remotePort)
489  		{
490  			auto itr = m_Sessions.find (remotePort);
491  			if (itr != m_Sessions.end ())
492  				m_LastSession = itr->second;
493  			else
494  			{
495  				m_LastSession = std::make_shared<UDPConvo> (boost::asio::ip::udp::endpoint (m_RecvEndpoint), 0);
496  				m_Sessions.emplace (remotePort, m_LastSession);
497  			}
498  			m_LastPort = remotePort;
499  		}
500  		// send off to remote i2p destination
501  		auto ts = i2p::util::GetMillisecondsSinceEpoch ();
502  		LogPrint (eLogDebug, "UDP Client: Send ", transferred, " to ", m_RemoteAddr->identHash.ToBase32 (), ":", RemotePort);
503  		auto session = GetDatagramSession ();
504  		uint64_t repliableDatagramInterval = I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL;
505  		if (m_RTT && m_RTT >= I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL && m_RTT < I2P_UDP_REPLIABLE_DATAGRAM_INTERVAL*10) repliableDatagramInterval = m_RTT/10; // 10 - 100 ms
506  		if (ts > m_LastRepliableDatagramTime + repliableDatagramInterval)
507  		{
508  			if (m_DatagramVersion == i2p::datagram::eDatagramV3)
509  			{
510  				uint8_t flags = 0;
511  				if (!m_RTT || !m_AckTimerSeqn || (!m_UnackedDatagrams.empty () &&
512  					ts > m_UnackedDatagrams.back ().second + repliableDatagramInterval)) // last ack request
513  				{
514  					flags |= UDP_SESSION_FLAG_ACK_REQUESTED;
515  					m_UnackedDatagrams.push_back ({ m_NextSendPacketNum, ts });
516  					ScheduleAckTimer (m_NextSendPacketNum);
517  				}
518  				i2p::util::Mapping options;
519  				options.Put (UDP_SESSION_SEQN, m_NextSendPacketNum);
520  				if (m_LastReceivedPacketNum > 0)
521  					options.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum);
522  				if (flags)
523  					options.Put (UDP_SESSION_FLAGS, flags);
524  				m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort, &options);
525  				if (m_IsFirstPacket) m_IsSendingAllowed = false; // send only one packet at the start and wait ack
526  			}
527  			else
528  				m_LocalDest->GetDatagramDestination ()->SendDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort);
529  			m_LastRepliableDatagramTime = ts;
530  		}
531  		else
532  			m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort);
533  		size_t numPackets = 0;
534  		while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE)
535  		{
536  			boost::system::error_code ec;
537  			size_t moreBytes = m_LocalSocket->available (ec);
538  			if (ec || !moreBytes) break;
539  			transferred = m_LocalSocket->receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), m_RecvEndpoint, 0, ec);
540  			remotePort = m_RecvEndpoint.port ();
541  			// TODO: check remotePort
542  			m_LocalDest->GetDatagramDestination ()->SendRawDatagram (session, m_RecvBuff, transferred, remotePort, RemotePort);
543  			numPackets++;
544  		}
545  		if (numPackets)
546  			LogPrint (eLogDebug, "UDP Client: Sent ", numPackets, " more packets to ", m_RemoteAddr->identHash.ToBase32 ());
547  		m_NextSendPacketNum += numPackets + 1;
548  		m_LocalDest->GetDatagramDestination ()->FlushSendQueue (session);
549  
550  		// mark convo as active
551  		if (m_LastSession)
552  			m_LastSession->second = ts;
553  		RecvFromLocal ();
554  	}
555  
556  	std::vector<std::shared_ptr<DatagramSessionInfo> > I2PUDPClientTunnel::GetSessions ()
557  	{
558  		// TODO: implement
559  		std::vector<std::shared_ptr<DatagramSessionInfo> > infos;
560  		return infos;
561  	}
562  
563  	void I2PUDPClientTunnel::TryResolving ()
564  	{
565  		i2p::util::SetThreadName ("UDP Resolver");
566  		LogPrint (eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest);
567  
568  		while (!(m_RemoteAddr = context.GetAddressBook().GetAddress(m_RemoteDest)) && !m_cancel_resolve)
569  		{
570  			LogPrint (eLogWarning, "UDP Tunnel: Failed to lookup ", m_RemoteDest);
571  			std::this_thread::sleep_for (std::chrono::seconds (1));
572  		}
573  		if (m_cancel_resolve)
574  		{
575  			LogPrint(eLogError, "UDP Tunnel: Lookup of ", m_RemoteDest, " was cancelled");
576  			return;
577  		}
578  		if (!m_RemoteAddr)
579  		{
580  			LogPrint (eLogError, "UDP Tunnel: ", m_RemoteDest, " not found");
581  			return;
582  		}
583  		LogPrint(eLogInfo, "UDP Tunnel: Resolved ", m_RemoteDest, " to ", m_RemoteAddr->identHash.ToBase32 ());
584  	}
585  
586  	std::shared_ptr<i2p::datagram::DatagramSession> I2PUDPClientTunnel::GetDatagramSession ()
587  	{
588  		auto session = m_LastDatagramSession.lock ();
589  		if (!session)
590  		{
591  			session = m_LocalDest->GetDatagramDestination ()->GetSession (m_RemoteAddr->identHash);
592  			m_LastDatagramSession = session;
593  		}
594  		return session;
595  	}
596  
597  	void I2PUDPClientTunnel::HandleRecvFromI2P (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort,
598  		const uint8_t * buf, size_t len, const i2p::util::Mapping * options)
599  	{
600  		if (m_RemoteAddr && from.GetIdentHash() == m_RemoteAddr->identHash)
601  		{
602  			if (options)
603  			{
604  				uint32_t seqn = 0;
605  				if (options->Get (UDP_SESSION_SEQN, seqn) && seqn > m_LastReceivedPacketNum)
606  					m_LastReceivedPacketNum = seqn;
607  				uint8_t flags = 0;
608  				if (options->Get (UDP_SESSION_FLAGS, flags) && (flags & UDP_SESSION_FLAG_ACK_REQUESTED))
609  				{
610  					i2p::util::Mapping replyOptions;
611  					replyOptions.Put (UDP_SESSION_ACKED, m_LastReceivedPacketNum);
612  					m_LocalDest->GetDatagramDestination ()->SendDatagram (GetDatagramSession (),
613  						nullptr, 0, m_LastPort, RemotePort, &replyOptions); // Ack only, no payload
614  					m_LastRepliableDatagramTime = i2p::util::GetMillisecondsSinceEpoch ();
615  				}
616  				if (options->Get (UDP_SESSION_ACKED, seqn))
617  					Acked (seqn);
618  			}
619  			if (len > 0)
620  				HandleRecvFromI2PRaw (fromPort, toPort, buf, len);
621  		}
622  		else
623  			LogPrint(eLogWarning, "UDP Client: Unwarranted traffic from ", from.GetIdentHash().ToBase32 ());
624  	}
625  
626  	void I2PUDPClientTunnel::HandleRecvFromI2PRaw (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
627  	{
628  		auto itr = m_Sessions.find (toPort);
629  		// found convo ?
630  		if (itr != m_Sessions.end ())
631  		{
632  			// found convo
633  			if (len > 0)
634  			{
635  				LogPrint (eLogDebug, "UDP Client: Got ", len, "B from ", m_RemoteAddr ? m_RemoteAddr->identHash.ToBase32 () : "");
636  				boost::system::error_code ec;
637  				m_LocalSocket->send_to (boost::asio::buffer (buf, len), itr->second->first, 0, ec);
638  				if (!ec)
639  					// mark convo as active
640  					itr->second->second = i2p::util::GetMillisecondsSinceEpoch ();
641  				else
642  					LogPrint (eLogInfo, "UDP Client: Send exception: ", ec.message (), " to ", itr->second->first);
643  			}
644  		}
645  		else
646  			LogPrint (eLogWarning, "UDP Client: Not tracking udp session using port ", (int) toPort);
647  	}
648  }
649  }