/ libi2pd_client / SAM.cpp
SAM.cpp
1 /* 2 * Copyright (c) 2013-2026, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include <string.h> 10 #include <stdio.h> 11 #ifdef _MSC_VER 12 #include <stdlib.h> 13 #endif 14 #include <charconv> 15 #include "Base.h" 16 #include "Identity.h" 17 #include "Log.h" 18 #include "Destination.h" 19 #include "ClientContext.h" 20 #include "util.h" 21 #include "SAM.h" 22 23 namespace i2p 24 { 25 namespace client 26 { 27 SAMSocket::SAMSocket (SAMBridge& owner): 28 m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()), 29 m_BufferOffset (0), m_SocketType (SAMSocketType::eSAMSocketTypeUnknown), 30 m_IsSilent (false), m_IsAccepting (false), m_IsReceiving (false), 31 m_Version (MIN_SAM_VERSION) 32 { 33 } 34 35 SAMSocket::~SAMSocket () 36 { 37 m_Stream = nullptr; 38 } 39 40 void SAMSocket::Terminate (const char* reason) 41 { 42 if(m_Stream) 43 { 44 m_Stream->AsyncClose (); 45 m_Stream = nullptr; 46 } 47 switch (m_SocketType) 48 { 49 case SAMSocketType::eSAMSocketTypeSession: 50 m_Owner.CloseSession (m_ID); 51 break; 52 case SAMSocketType::eSAMSocketTypeStream: 53 break; 54 case SAMSocketType::eSAMSocketTypeAcceptor: 55 case SAMSocketType::eSAMSocketTypeForward: 56 { 57 auto session = m_Owner.FindSession(m_ID); 58 if (session) 59 { 60 if (m_IsAccepting && session->GetLocalDestination ()) 61 session->GetLocalDestination ()->StopAcceptingStreams (); 62 } 63 break; 64 } 65 default: ; 66 } 67 m_SocketType = SAMSocketType::eSAMSocketTypeTerminated; 68 if (m_Socket.is_open ()) 69 { 70 boost::system::error_code ec; 71 m_Socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); 72 m_Socket.close (); 73 } 74 m_Owner.RemoveSocket(shared_from_this()); 75 } 76 77 void SAMSocket::ReceiveHandshake () 78 { 79 m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), 80 std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), 81 std::placeholders::_1, std::placeholders::_2)); 82 } 83 84 static int ExtractVersion (std::string_view ver) 85 { 86 int version = 0; 87 for (auto ch: ver) 88 { 89 if (ch >= '0' && ch <= '9') 90 { 91 version *= 10; 92 version += (ch - '0'); 93 } 94 } 95 return version; 96 } 97 98 static std::string CreateVersion (int ver) 99 { 100 auto d = div (ver, 10); 101 return std::to_string (d.quot) + "." + std::to_string (d.rem); 102 } 103 104 void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) 105 { 106 if (ecode) 107 { 108 LogPrint (eLogError, "SAM: Handshake read error: ", ecode.message ()); 109 if (ecode != boost::asio::error::operation_aborted) 110 Terminate ("SAM: handshake read error"); 111 } 112 else 113 { 114 m_Buffer[bytes_transferred] = 0; 115 char * eol = (char *)memchr (m_Buffer, '\n', bytes_transferred); 116 if (eol) 117 *eol = 0; 118 LogPrint (eLogDebug, "SAM: Handshake ", m_Buffer); 119 char * separator = strchr (m_Buffer, ' '); 120 if (separator) 121 { 122 separator = strchr (separator + 1, ' '); 123 if (separator) 124 *separator = 0; 125 } 126 127 if (!strcmp (m_Buffer, SAM_HANDSHAKE)) 128 { 129 int minVer = 0, maxVer = 0; 130 // try to find MIN and MAX, 3.0 if not found 131 if (separator) 132 { 133 separator++; 134 auto params = ExtractParams (separator); 135 auto maxVerStr = params[SAM_PARAM_MAX]; 136 if (!maxVerStr.empty ()) 137 maxVer = ExtractVersion (maxVerStr); 138 auto minVerStr = params[SAM_PARAM_MIN]; 139 if (!minVerStr.empty ()) 140 minVer = ExtractVersion (minVerStr); 141 } 142 // version negotiation 143 if (maxVer && maxVer <= MAX_SAM_VERSION) 144 m_Version = maxVer; 145 else if (minVer && minVer >= MIN_SAM_VERSION && minVer <= MAX_SAM_VERSION) 146 m_Version = minVer; 147 else if (!maxVer && !minVer) 148 m_Version = MIN_SAM_VERSION; 149 else 150 { 151 LogPrint (eLogError, "SAM: Handshake version mismatch ", minVer, " ", maxVer); 152 SendMessageReply (SAM_HANDSHAKE_NOVERSION, true); 153 return; 154 } 155 // send reply 156 #ifdef _MSC_VER 157 size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, CreateVersion (m_Version).c_str ()); 158 #else 159 size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, CreateVersion (m_Version).c_str ()); 160 #endif 161 boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), 162 std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), 163 std::placeholders::_1, std::placeholders::_2)); 164 } 165 else 166 { 167 LogPrint (eLogError, "SAM: Handshake mismatch"); 168 Terminate ("SAM: handshake mismatch"); 169 } 170 } 171 } 172 173 bool SAMSocket::IsSession(std::string_view id) const 174 { 175 return id == m_ID; 176 } 177 178 void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred) 179 { 180 if (ecode) 181 { 182 LogPrint (eLogError, "SAM: Handshake reply send error: ", ecode.message ()); 183 if (ecode != boost::asio::error::operation_aborted) 184 Terminate ("SAM: handshake reply send error"); 185 } 186 else 187 { 188 m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), 189 std::bind(&SAMSocket::HandleMessage, shared_from_this (), 190 std::placeholders::_1, std::placeholders::_2)); 191 } 192 } 193 194 void SAMSocket::SendMessageReply (std::string_view msg, bool close) 195 { 196 LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg); 197 198 if (!m_IsSilent || m_SocketType == SAMSocketType::eSAMSocketTypeForward) 199 boost::asio::async_write (m_Socket, boost::asio::buffer (msg.data (), msg.size ()), boost::asio::transfer_all (), 200 std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), 201 std::placeholders::_1, std::placeholders::_2, close)); 202 else 203 { 204 if (close) 205 Terminate ("SAMSocket::SendMessageReply(close=true)"); 206 else 207 Receive (); 208 } 209 } 210 211 void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close) 212 { 213 if (ecode) 214 { 215 LogPrint (eLogError, "SAM: Reply send error: ", ecode.message ()); 216 if (ecode != boost::asio::error::operation_aborted) 217 Terminate ("SAM: reply send error"); 218 } 219 else 220 { 221 if (close) 222 Terminate ("SAMSocket::HandleMessageReplySent(close=true)"); 223 else 224 Receive (); 225 } 226 } 227 228 void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred) 229 { 230 if (ecode) 231 { 232 LogPrint (eLogError, "SAM: Read error: ", ecode.message ()); 233 if (ecode != boost::asio::error::operation_aborted) 234 Terminate ("SAM: read error"); 235 } 236 else if (m_SocketType == SAMSocketType::eSAMSocketTypeStream) 237 HandleReceived (ecode, bytes_transferred); 238 else 239 { 240 bytes_transferred += m_BufferOffset; 241 m_BufferOffset = 0; 242 m_Buffer[bytes_transferred] = 0; 243 char * eol = (char *)memchr (m_Buffer, '\n', bytes_transferred); 244 if (eol) 245 { 246 if (eol > m_Buffer && eol[-1] == '\r') eol--; 247 *eol = 0; 248 char * separator = strchr (m_Buffer, ' '); 249 if (separator) 250 { 251 // one word command 252 if (std::string_view (m_Buffer, separator - m_Buffer) == SAM_PING) 253 { 254 ProcessPing ({ separator + 1, (size_t)(eol - separator - 1) }); 255 return; 256 } 257 258 // two words command 259 size_t l = 0; 260 separator = strchr (separator + 1, ' '); 261 if (separator) 262 { 263 *separator = 0; 264 l = eol - separator - 1; 265 } 266 else 267 separator = eol; 268 269 if (!strcmp (m_Buffer, SAM_SESSION_CREATE)) 270 ProcessSessionCreate ({ separator + 1, l }); 271 else if (!strcmp (m_Buffer, SAM_STREAM_CONNECT)) 272 ProcessStreamConnect (separator + 1, eol - separator - 1, bytes_transferred - (eol - m_Buffer) - 1); 273 else if (!strcmp (m_Buffer, SAM_STREAM_ACCEPT)) 274 ProcessStreamAccept ({ separator + 1, l }); 275 else if (!strcmp (m_Buffer, SAM_STREAM_FORWARD)) 276 ProcessStreamForward ({ separator + 1, l }); 277 else if (!strcmp (m_Buffer, SAM_DEST_GENERATE)) 278 ProcessDestGenerate ({ separator + 1, l }); 279 else if (!strcmp (m_Buffer, SAM_NAMING_LOOKUP)) 280 ProcessNamingLookup ({ separator + 1, l }); 281 else if (!strcmp (m_Buffer, SAM_SESSION_ADD)) 282 ProcessSessionAdd ({ separator + 1, l }); 283 else if (!strcmp (m_Buffer, SAM_SESSION_REMOVE)) 284 ProcessSessionRemove ({ separator + 1, l }); 285 else if (!strcmp (m_Buffer, SAM_DATAGRAM_SEND) || !strcmp (m_Buffer, SAM_RAW_SEND)) 286 { 287 size_t len = bytes_transferred - (separator - m_Buffer) - 1; 288 size_t processed = ProcessDatagramSend (separator + 1, len, eol + 1); 289 if (processed < len) 290 { 291 m_BufferOffset = len - processed; 292 if (processed > 0) 293 memmove (m_Buffer, separator + 1 + processed, m_BufferOffset); 294 else 295 { 296 // restore string back 297 *separator = ' '; 298 *eol = '\n'; 299 } 300 } 301 // since it's SAM v1 reply is not expected 302 Receive (); 303 } 304 else 305 { 306 LogPrint (eLogError, "SAM: Unexpected message ", m_Buffer); 307 Terminate ("SAM: unexpected message"); 308 } 309 } 310 else 311 { 312 LogPrint (eLogError, "SAM: Malformed message ", m_Buffer); 313 Terminate ("malformed message"); 314 } 315 } 316 else 317 { 318 LogPrint (eLogWarning, "SAM: Incomplete message ", bytes_transferred); 319 m_BufferOffset = bytes_transferred; 320 // try to receive remaining message 321 Receive (); 322 } 323 } 324 } 325 326 static bool IsAcceptableSessionName(std::string_view str) 327 { 328 auto itr = str.begin(); 329 while(itr != str.end()) 330 { 331 char ch = *itr; 332 ++itr; 333 if (ch == '<' || ch == '>' || ch == '"' || ch == '\'' || ch == '/') 334 return false; 335 } 336 return true; 337 } 338 339 void SAMSocket::ProcessSessionCreate (std::string_view buf) 340 { 341 LogPrint (eLogDebug, "SAM: Session create: ", buf); 342 auto params = ExtractParams (buf); 343 std::string_view style = params[SAM_PARAM_STYLE]; 344 std::string_view id = params[SAM_PARAM_ID]; 345 std::string_view destination = params[SAM_PARAM_DESTINATION]; 346 347 if(!IsAcceptableSessionName(id)) 348 { 349 // invalid session id 350 SendMessageReply (SAM_SESSION_CREATE_INVALID_ID, true); 351 return; 352 } 353 m_ID = id; 354 if (m_Owner.FindSession (id)) 355 { 356 // session exists 357 SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_ID, true); 358 return; 359 } 360 361 SAMSessionType type = SAMSessionType::eSAMSessionTypeUnknown; 362 i2p::datagram::DatagramVersion datagramVersion = i2p::datagram::eDatagramV1; 363 if (style == SAM_VALUE_STREAM) type = SAMSessionType::eSAMSessionTypeStream; 364 #if __cplusplus >= 202002L // C++20 365 else if (style.starts_with (SAM_VALUE_DATAGRAM)) 366 #else 367 else if (style.substr (0, SAM_VALUE_DATAGRAM.size ()) == SAM_VALUE_DATAGRAM) 368 #endif 369 { 370 // DATAGRAM, DATAGRAM1, DATAGRAM2, DATAGRAM3 371 type = SAMSessionType::eSAMSessionTypeDatagram; 372 if (style.size () > SAM_VALUE_DATAGRAM.size ()) 373 { 374 switch (style[SAM_VALUE_DATAGRAM.size ()]) 375 { 376 case '1': datagramVersion = i2p::datagram::eDatagramV1; break; 377 case '2': datagramVersion = i2p::datagram::eDatagramV2; break; 378 case '3': datagramVersion = i2p::datagram::eDatagramV3; break; 379 default: type = SAMSessionType::eSAMSessionTypeUnknown; 380 } 381 } 382 } 383 else if (style == SAM_VALUE_RAW) type = SAMSessionType::eSAMSessionTypeRaw; 384 else if (style == SAM_VALUE_MASTER) 385 { 386 if (m_Version < SAM_VERSION_33) // < SAM 3.3 387 { 388 SendSessionI2PError("MASTER session is not supported"); 389 return; 390 } 391 type = SAMSessionType::eSAMSessionTypeMaster; 392 } 393 if (type == SAMSessionType::eSAMSessionTypeUnknown) 394 { 395 // unknown style 396 SendSessionI2PError("Unknown STYLE"); 397 return; 398 } 399 400 std::shared_ptr<boost::asio::ip::udp::endpoint> forward = nullptr; 401 if ((type == SAMSessionType::eSAMSessionTypeDatagram || type == SAMSessionType::eSAMSessionTypeRaw) && 402 params.Contains(SAM_PARAM_HOST) && params.Contains(SAM_PARAM_PORT)) 403 { 404 // udp forward selected 405 boost::system::error_code e; 406 // TODO: support hostnames in udp forward 407 auto addr = boost::asio::ip::make_address(params[SAM_PARAM_HOST], e); 408 if (e) 409 { 410 // not an ip address 411 SendSessionI2PError("Invalid IP Address in HOST"); 412 return; 413 } 414 415 uint16_t port = 0; 416 std::string_view p = params[SAM_PARAM_PORT]; 417 auto res = std::from_chars(p.data(), p.data() + p.size(), port); 418 if (res.ec != std::errc()) 419 { 420 SendSessionI2PError("Invalid port"); 421 return; 422 } 423 forward = std::make_shared<boost::asio::ip::udp::endpoint>(addr, port); 424 } 425 426 //ensure we actually received a destination 427 if (destination.empty()) 428 { 429 SendMessageReply (SAM_SESSION_STATUS_INVALID_KEY, true); 430 return; 431 } 432 433 if (destination != SAM_VALUE_TRANSIENT) 434 { 435 //ensure it's a base64 string 436 i2p::data::PrivateKeys keys; 437 if (!keys.FromBase64(destination)) 438 { 439 SendMessageReply(SAM_SESSION_STATUS_INVALID_KEY, true); 440 return; 441 } 442 } 443 444 // create destination 445 auto session = m_Owner.CreateSession (id, type, destination == SAM_VALUE_TRANSIENT ? "" : destination, params); 446 if (session) 447 { 448 m_SocketType = SAMSocketType::eSAMSocketTypeSession; 449 if (type == SAMSessionType::eSAMSessionTypeDatagram || type == SAMSessionType::eSAMSessionTypeRaw) 450 { 451 session->DatagramVersion = datagramVersion; 452 session->UDPEndpoint = forward; 453 auto dest = session->GetLocalDestination ()->CreateDatagramDestination (true, datagramVersion); 454 uint16_t port = 0; 455 if (forward) 456 { 457 std::string_view p = params[SAM_PARAM_PORT]; 458 auto res = std::from_chars(p.data(), p.data() + p.size(), port); 459 if (res.ec != std::errc()) port = 0; 460 } 461 if (type == SAMSessionType::eSAMSessionTypeDatagram) 462 dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), 463 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 464 std::placeholders::_4, std::placeholders::_5, std::placeholders::_6), 465 port 466 ); 467 else // raw 468 dest->SetRawReceiver (std::bind (&SAMSocket::HandleI2PRawDatagramReceive, shared_from_this (), 469 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), 470 port 471 ); 472 } 473 474 if (session->GetLocalDestination ()->IsReady ()) 475 SendSessionCreateReplyOk (); 476 else 477 { 478 m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); 479 m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, 480 shared_from_this (), std::placeholders::_1)); 481 } 482 } 483 else 484 SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_DEST, true); 485 } 486 487 void SAMSocket::HandleSessionReadinessCheckTimer (const boost::system::error_code& ecode) 488 { 489 if (ecode != boost::asio::error::operation_aborted) 490 { 491 if (m_Socket.is_open ()) 492 { 493 auto session = m_Owner.FindSession(m_ID); 494 if(session) 495 { 496 if (session->GetLocalDestination ()->IsReady ()) 497 SendSessionCreateReplyOk (); 498 else 499 { 500 m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); 501 m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, 502 shared_from_this (), std::placeholders::_1)); 503 } 504 } 505 } 506 else 507 Terminate ("SAM: session socket closed"); 508 } 509 } 510 511 void SAMSocket::SendSessionCreateReplyOk () 512 { 513 auto session = m_Owner.FindSession(m_ID); 514 if (session) 515 { 516 std::string priv = session->GetLocalDestination ()->GetPrivateKeys ().ToBase64 (); 517 #ifdef _MSC_VER 518 size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv.c_str ()); 519 #else 520 size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv.c_str ()); 521 #endif 522 SendMessageReply ({m_Buffer, l2}, false); 523 } 524 } 525 526 void SAMSocket::ProcessStreamConnect (char * buf, size_t len, size_t rem) 527 { 528 LogPrint (eLogDebug, "SAM: Stream connect: ", buf); 529 if ( m_SocketType != SAMSocketType::eSAMSocketTypeUnknown) 530 { 531 SendSessionI2PError ("Socket already in use"); 532 return; 533 } 534 auto params = ExtractParams (buf); 535 std::string_view id = params[SAM_PARAM_ID]; 536 std::string_view destination = params[SAM_PARAM_DESTINATION]; 537 std::string_view silent = params[SAM_PARAM_SILENT]; 538 if (silent == SAM_VALUE_TRUE) m_IsSilent = true; 539 m_ID = id; 540 auto session = m_Owner.FindSession (id); 541 if (session) 542 { 543 if (rem > 0) // handle follow on data 544 { 545 memmove (m_Buffer, buf + len + 1, rem); // buf is a pointer to m_Buffer's content 546 m_BufferOffset = rem; 547 } 548 else 549 m_BufferOffset = 0; 550 551 std::shared_ptr<const Address> addr; 552 if (destination.find(".i2p") != std::string::npos) 553 addr = context.GetAddressBook().GetAddress (destination); 554 else 555 { 556 auto dest = std::make_shared<i2p::data::IdentityEx> (); 557 size_t l = dest->FromBase64(destination); 558 if (l > 0) 559 { 560 context.GetAddressBook().InsertFullAddress(dest); 561 addr = std::make_shared<Address>(dest->GetIdentHash ()); 562 } 563 } 564 565 if (addr && addr->IsValid ()) 566 { 567 if (addr->IsIdentHash ()) 568 { 569 if (session->GetLocalDestination ()->GetIdentHash () != addr->identHash) 570 { 571 auto leaseSet = session->GetLocalDestination ()->FindLeaseSet(addr->identHash); 572 if (leaseSet) 573 Connect(leaseSet, session); 574 else 575 { 576 session->GetLocalDestination ()->RequestDestination(addr->identHash, 577 std::bind(&SAMSocket::HandleConnectLeaseSetRequestComplete, 578 shared_from_this(), std::placeholders::_1)); 579 } 580 } 581 else 582 SendStreamCantReachPeer ("Can't connect to myself"); 583 } 584 else // B33 585 session->GetLocalDestination ()->RequestDestinationWithEncryptedLeaseSet (addr->blindedPublicKey, 586 std::bind(&SAMSocket::HandleConnectLeaseSetRequestComplete, 587 shared_from_this(), std::placeholders::_1)); 588 } 589 else 590 SendMessageReply (SAM_STREAM_STATUS_INVALID_KEY, true); 591 } 592 else 593 SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, true); 594 } 595 596 void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote, std::shared_ptr<SAMSession> session) 597 { 598 if (!session) session = m_Owner.FindSession(m_ID); 599 if (session) 600 { 601 if (session->GetLocalDestination ()->SupportsEncryptionType (remote->GetEncryptionType ())) 602 { 603 m_SocketType = SAMSocketType::eSAMSocketTypeStream; 604 m_Stream = session->GetLocalDestination ()->CreateStream (remote); 605 if (m_Stream) 606 { 607 m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send 608 m_BufferOffset = 0; 609 I2PReceive (); 610 SendMessageReply (SAM_STREAM_STATUS_OK, false); 611 } 612 else 613 SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, true); 614 } 615 else 616 SendStreamCantReachPeer ("Incompatible crypto"); 617 } 618 else 619 SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, true); 620 } 621 622 void SAMSocket::HandleConnectLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet) 623 { 624 if (leaseSet) 625 Connect (leaseSet); 626 else 627 { 628 LogPrint (eLogError, "SAM: Destination to connect not found"); 629 SendStreamCantReachPeer ("LeaseSet not found"); 630 } 631 } 632 633 void SAMSocket::ProcessStreamAccept (std::string_view buf) 634 { 635 LogPrint (eLogDebug, "SAM: Stream accept: ", buf); 636 if ( m_SocketType != SAMSocketType::eSAMSocketTypeUnknown) 637 { 638 SendSessionI2PError ("Socket already in use"); 639 return; 640 } 641 auto params = ExtractParams (buf); 642 std::string_view id = params[SAM_PARAM_ID]; 643 std::string_view silent = params[SAM_PARAM_SILENT]; 644 if (silent == SAM_VALUE_TRUE) m_IsSilent = true; 645 m_ID = id; 646 auto session = m_Owner.FindSession (id); 647 if (session) 648 { 649 m_SocketType = SAMSocketType::eSAMSocketTypeAcceptor; 650 if (!session->GetLocalDestination ()->IsAcceptingStreams ()) 651 { 652 m_IsAccepting = true; 653 SendMessageReply (SAM_STREAM_STATUS_OK, false); 654 session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); 655 } 656 else 657 { 658 auto ts = i2p::util::GetSecondsSinceEpoch (); 659 while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts) 660 { 661 auto socket = session->acceptQueue.front ().first; 662 session->acceptQueue.pop_front (); 663 if (socket) 664 boost::asio::post (m_Owner.GetService (), std::bind(&SAMSocket::TerminateClose, socket)); 665 } 666 if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE) 667 { 668 // already accepting, queue up 669 SendMessageReply (SAM_STREAM_STATUS_OK, false); 670 session->acceptQueue.push_back (std::make_pair(shared_from_this(), ts)); 671 } 672 else 673 { 674 LogPrint (eLogInfo, "SAM: Session ", m_ID, " accept queue is full ", session->acceptQueue.size ()); 675 SendStreamI2PError ("Already accepting"); 676 } 677 } 678 } 679 else 680 SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, true); 681 } 682 683 void SAMSocket::ProcessStreamForward (std::string_view buf) 684 { 685 LogPrint(eLogDebug, "SAM: Stream forward: ", buf); 686 687 auto params = ExtractParams(buf); 688 auto id = params[SAM_PARAM_ID]; 689 if (id.empty ()) 690 { 691 SendSessionI2PError("Missing ID"); 692 return; 693 } 694 695 auto session = m_Owner.FindSession(id); 696 if (!session) 697 { 698 SendMessageReply(SAM_STREAM_STATUS_INVALID_ID, true); 699 return; 700 } 701 if (session->GetLocalDestination()->IsAcceptingStreams()) 702 { 703 SendSessionI2PError("Already accepting"); 704 return; 705 } 706 707 auto portStr = params[SAM_PARAM_PORT]; 708 if (portStr.empty ()) 709 { 710 SendSessionI2PError("PORT is missing"); 711 return; 712 } 713 714 if (!std::all_of(portStr.begin(), portStr.end(), ::isdigit)) 715 { 716 SendSessionI2PError("Port must be numeric"); 717 return; 718 } 719 720 uint16_t port = 0; 721 auto res = std::from_chars(portStr.data(), portStr.data() + portStr.size(), port); 722 if (res.ec != std::errc()) 723 { 724 SendSessionI2PError("Invalid port"); 725 return; 726 } 727 728 boost::asio::ip::tcp::endpoint ep; 729 auto host = params[SAM_PARAM_HOST]; 730 731 if (!host.empty ()) 732 { 733 boost::system::error_code ec; 734 auto addr = boost::asio::ip::make_address(host, ec); 735 if (ec) 736 { 737 SendSessionI2PError("Invalid IP Address in HOST"); 738 return; 739 } 740 ep = boost::asio::ip::tcp::endpoint(addr, port); 741 } 742 else 743 { 744 boost::system::error_code ec; 745 ep = m_Socket.remote_endpoint(ec); 746 if (ec) 747 { 748 SendSessionI2PError("Socket error: cannot get remote endpoint"); 749 return; 750 } 751 ep.port(port); 752 } 753 754 m_SocketType = SAMSocketType::eSAMSocketTypeForward; 755 m_ID = id; 756 m_IsAccepting = true; 757 758 if (params[SAM_PARAM_SILENT] == SAM_VALUE_TRUE) 759 m_IsSilent = true; 760 761 session->GetLocalDestination()->AcceptStreams( 762 std::bind(&SAMSocket::HandleI2PForward, shared_from_this(), std::placeholders::_1, ep)); 763 764 SendMessageReply(SAM_STREAM_STATUS_OK, false); 765 } 766 767 768 size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) 769 { 770 LogPrint (eLogDebug, "SAM: Datagram send: ", buf, " ", len); 771 auto params = ExtractParams (buf); 772 size_t size = 0; 773 std::string_view sizeStr = params[SAM_PARAM_SIZE]; 774 auto res = std::from_chars(sizeStr.data(), sizeStr.data() + sizeStr.size(), size); 775 if (res.ec != std::errc()) size = 0; 776 size_t offset = data - buf; 777 if (offset + size <= len) 778 { 779 auto session = m_Owner.FindSession(m_ID); 780 if (session) 781 { 782 auto d = session->GetLocalDestination ()->GetDatagramDestination (); 783 if (d) 784 { 785 i2p::data::IdentityEx dest; 786 dest.FromBase64 (params[SAM_PARAM_DESTINATION]); 787 if (session->Type == SAMSessionType::eSAMSessionTypeDatagram) 788 d->SendDatagramTo ((const uint8_t *)data, size, dest.GetIdentHash ()); 789 else // raw 790 d->SendRawDatagramTo ((const uint8_t *)data, size, dest.GetIdentHash ()); 791 } 792 else 793 LogPrint (eLogError, "SAM: Missing datagram destination"); 794 } 795 else 796 LogPrint (eLogError, "SAM: Session is not created from DATAGRAM SEND"); 797 } 798 else 799 { 800 LogPrint (eLogWarning, "SAM: Sent datagram size ", size, " exceeds buffer ", len - offset); 801 return 0; // try to receive more 802 } 803 return offset + size; 804 } 805 806 void SAMSocket::ProcessDestGenerate (std::string_view buf) 807 { 808 LogPrint (eLogDebug, "SAM: Dest generate"); 809 auto params = ExtractParams (buf); 810 // extract signature type 811 i2p::data::SigningKeyType signatureType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1; 812 i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL; 813 auto signatureTypeStr = params[SAM_PARAM_SIGNATURE_TYPE]; 814 if (!signatureTypeStr.empty ()) 815 { 816 if (!m_Owner.ResolveSignatureType (signatureTypeStr, signatureType)) 817 LogPrint (eLogWarning, "SAM: ", SAM_PARAM_SIGNATURE_TYPE, " is invalid ", signatureTypeStr); 818 } 819 params.Get (SAM_PARAM_CRYPTO_TYPE, cryptoType); 820 auto keys = i2p::data::PrivateKeys::CreateRandomKeys (signatureType, cryptoType, true); 821 #ifdef _MSC_VER 822 size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, 823 keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); 824 #else 825 size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, 826 keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); 827 #endif 828 SendMessageReply ({m_Buffer, l}, false); 829 } 830 831 void SAMSocket::ProcessNamingLookup (std::string_view buf) 832 { 833 LogPrint (eLogDebug, "SAM: Naming lookup: ", buf); 834 auto params = ExtractParams (buf); 835 std::string name (params[SAM_PARAM_NAME]); 836 std::shared_ptr<const i2p::data::IdentityEx> identity; 837 std::shared_ptr<const Address> addr; 838 auto session = m_Owner.FindSession(m_ID); 839 auto dest = session == nullptr ? context.GetSharedLocalDestination() : session->GetLocalDestination (); 840 if (name == "ME") 841 SendNamingLookupReply (name, dest->GetIdentity ()); 842 else if ((identity = context.GetAddressBook ().GetFullAddress (name)) != nullptr) 843 SendNamingLookupReply (name, identity); 844 else if ((addr = context.GetAddressBook ().GetAddress (name))) 845 { 846 if (addr->IsIdentHash ()) 847 { 848 auto leaseSet = dest->FindLeaseSet (addr->identHash); 849 if (leaseSet) 850 SendNamingLookupReply (name, leaseSet->GetIdentity ()); 851 else 852 dest->RequestDestination (addr->identHash, 853 std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete, 854 shared_from_this (), std::placeholders::_1, name)); 855 } 856 else 857 dest->RequestDestinationWithEncryptedLeaseSet (addr->blindedPublicKey, 858 std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete, 859 shared_from_this (), std::placeholders::_1, name)); 860 } 861 else 862 { 863 LogPrint (eLogError, "SAM: Naming failed, unknown address ", name); 864 #ifdef _MSC_VER 865 size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str()); 866 #else 867 size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str()); 868 #endif 869 SendMessageReply ({m_Buffer, len}, false); 870 } 871 } 872 873 void SAMSocket::ProcessSessionAdd (std::string_view buf) 874 { 875 if (m_Version < SAM_VERSION_33) // < SAM 3.3 876 { 877 SendSessionI2PError("SESSION ADD is not supported"); 878 return; 879 } 880 auto session = m_Owner.FindSession(m_ID); 881 if (session && session->Type == SAMSessionType::eSAMSessionTypeMaster) 882 { 883 LogPrint (eLogDebug, "SAM: Subsession add: ", buf); 884 auto masterSession = std::static_pointer_cast<SAMMasterSession>(session); 885 auto params = ExtractParams (buf); 886 std::string_view id = params[SAM_PARAM_ID]; 887 if (masterSession->subsessions.count (id) > 1) 888 { 889 // session exists 890 SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_ID, false); 891 return; 892 } 893 std::string_view style = params[SAM_PARAM_STYLE]; 894 SAMSessionType type = SAMSessionType::eSAMSessionTypeUnknown; 895 if (style == SAM_VALUE_STREAM) type = SAMSessionType::eSAMSessionTypeStream; 896 // TODO: implement other styles 897 if (type == SAMSessionType::eSAMSessionTypeUnknown) 898 { 899 // unknown style 900 SendSessionI2PError("Unsupported STYLE"); 901 return; 902 } 903 uint16_t fromPort = 0; 904 params.Get (SAM_PARAM_FROM_PORT, fromPort); 905 906 auto subsession = std::make_shared<SAMSubSession>(masterSession, id, type, fromPort); 907 if (m_Owner.AddSession (subsession)) 908 { 909 masterSession->subsessions.insert (std::string (id)); 910 SendSessionCreateReplyOk (); 911 } 912 else 913 SendMessageReply (SAM_SESSION_CREATE_DUPLICATED_ID, false); 914 } 915 else 916 SendSessionI2PError ("Wrong session type"); 917 } 918 919 void SAMSocket::ProcessSessionRemove (std::string_view buf) 920 { 921 if (m_Version < SAM_VERSION_33) // < SAM 3.3 922 { 923 SendSessionI2PError("SESSION REMOVE is not supported"); 924 return; 925 } 926 auto session = m_Owner.FindSession(m_ID); 927 if (session && session->Type == SAMSessionType::eSAMSessionTypeMaster) 928 { 929 LogPrint (eLogDebug, "SAM: Subsession remove: ", buf); 930 auto masterSession = std::static_pointer_cast<SAMMasterSession>(session); 931 auto params = ExtractParams (buf); 932 std::string id(params[SAM_PARAM_ID]); 933 if (!masterSession->subsessions.erase (id)) 934 { 935 SendMessageReply (SAM_SESSION_STATUS_INVALID_KEY, false); 936 return; 937 } 938 m_Owner.CloseSession (id); 939 SendSessionCreateReplyOk (); 940 } 941 else 942 SendSessionI2PError ("Wrong session type"); 943 } 944 945 void SAMSocket::ProcessPing (std::string_view text) 946 { 947 LogPrint (eLogDebug, "SAM: Ping ", text); 948 SendReplyWithMessage (SAM_PONG, std::string (text)); 949 } 950 951 void SAMSocket::SendReplyWithMessage (const char * reply, const std::string & msg) 952 { 953 #ifdef _MSC_VER 954 size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, reply, msg.c_str()); 955 #else 956 size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, reply, msg.c_str()); 957 #endif 958 SendMessageReply ({m_Buffer, len}, true); 959 } 960 961 void SAMSocket::SendSessionI2PError(const std::string & msg) 962 { 963 LogPrint (eLogError, "SAM: Session I2P error: ", msg); 964 SendReplyWithMessage (SAM_SESSION_STATUS_I2P_ERROR, msg); 965 } 966 967 void SAMSocket::SendStreamI2PError(const std::string & msg) 968 { 969 LogPrint (eLogError, "SAM: Stream I2P error: ", msg); 970 SendReplyWithMessage (SAM_STREAM_STATUS_I2P_ERROR, msg); 971 } 972 973 void SAMSocket::SendStreamCantReachPeer(const std::string & msg) 974 { 975 SendReplyWithMessage (SAM_STREAM_STATUS_CANT_REACH_PEER, msg); 976 } 977 978 void SAMSocket::HandleNamingLookupLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet, std::string name) 979 { 980 if (leaseSet) 981 { 982 context.GetAddressBook ().InsertFullAddress (leaseSet->GetIdentity ()); 983 SendNamingLookupReply (name, leaseSet->GetIdentity ()); 984 } 985 else 986 { 987 LogPrint (eLogError, "SAM: Naming lookup failed. LeaseSet for ", name, " not found"); 988 #ifdef _MSC_VER 989 size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str()); 990 #else 991 size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str()); 992 #endif 993 SendMessageReply ({m_Buffer, len}, false); 994 } 995 } 996 997 void SAMSocket::SendNamingLookupReply (const std::string& name, std::shared_ptr<const i2p::data::IdentityEx> identity) 998 { 999 auto base64 = identity->ToBase64 (); 1000 #ifdef _MSC_VER 1001 size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, name.c_str (), base64.c_str ()); 1002 #else 1003 size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, name.c_str (), base64.c_str ()); 1004 #endif 1005 SendMessageReply ({m_Buffer, l}, false); 1006 } 1007 1008 i2p::util::Mapping SAMSocket::ExtractParams (std::string_view buf) 1009 { 1010 i2p::util::Mapping params; 1011 size_t pos = 0; 1012 while (pos < buf.length ()) 1013 { 1014 std::string_view field; 1015 auto separator = buf.find (' ', pos); 1016 if (separator != std::string_view::npos) 1017 { 1018 field = buf.substr (pos, separator - pos); 1019 pos = separator + 1; 1020 } 1021 else 1022 { 1023 field = buf.substr (pos); 1024 pos = buf.length (); 1025 } 1026 auto value = field.find ('='); 1027 if (value != std::string_view::npos) 1028 params.Insert (field.substr (0, value), field.substr (value + 1)); 1029 } 1030 return params; 1031 } 1032 1033 void SAMSocket::Receive () 1034 { 1035 if (m_SocketType == SAMSocketType::eSAMSocketTypeStream) 1036 { 1037 if (m_IsReceiving) return; 1038 size_t bufSize = SAM_SOCKET_BUFFER_SIZE; 1039 size_t unsentSize = m_Stream ? m_Stream->GetSendBufferSize () : 0; 1040 if (unsentSize) 1041 { 1042 if (unsentSize >= SAM_STREAM_MAX_SEND_BUFFER_SIZE) return; // buffer is full 1043 if (unsentSize > SAM_STREAM_MAX_SEND_BUFFER_SIZE - SAM_SOCKET_BUFFER_SIZE) 1044 bufSize = SAM_STREAM_MAX_SEND_BUFFER_SIZE - unsentSize; 1045 } 1046 m_IsReceiving = true; 1047 m_Socket.async_read_some (boost::asio::buffer(m_Buffer, bufSize), 1048 std::bind(&SAMSocket::HandleReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); 1049 } 1050 else 1051 m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), 1052 std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); 1053 } 1054 1055 void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1056 { 1057 m_IsReceiving = false; 1058 if (ecode) 1059 { 1060 LogPrint (eLogError, "SAM: Read error: ", ecode.message ()); 1061 if (ecode != boost::asio::error::operation_aborted) 1062 Terminate ("read error"); 1063 } 1064 else 1065 { 1066 if (m_Stream) 1067 { 1068 m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred, 1069 std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1)); 1070 Receive (); 1071 } 1072 else 1073 Terminate("No Stream Remaining"); 1074 } 1075 } 1076 1077 void SAMSocket::I2PReceive () 1078 { 1079 if (m_Stream) 1080 { 1081 if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew || 1082 m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular 1083 { 1084 m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_STREAM_BUFFER_SIZE), 1085 std::bind (&SAMSocket::HandleI2PReceive, shared_from_this(), 1086 std::placeholders::_1, std::placeholders::_2), 1087 SAM_SOCKET_CONNECTION_MAX_IDLE); 1088 } 1089 else // closed by peer 1090 { 1091 uint8_t * buff = new uint8_t[SAM_STREAM_BUFFER_SIZE]; 1092 // get remaining data 1093 auto len = m_Stream->ReadSome (buff, SAM_STREAM_BUFFER_SIZE); 1094 if (len > 0) // still some data 1095 { 1096 WriteI2PDataImmediate(buff, len); 1097 } 1098 else // no more data 1099 { 1100 delete [] buff; 1101 Terminate ("no more data"); 1102 } 1103 } 1104 } 1105 } 1106 1107 void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz) 1108 { 1109 boost::asio::async_write ( 1110 m_Socket, 1111 boost::asio::buffer (buff, sz), 1112 boost::asio::transfer_all(), 1113 std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination 1114 } 1115 1116 void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff) 1117 { 1118 delete [] buff; 1119 } 1120 1121 void SAMSocket::WriteI2PData(size_t sz) 1122 { 1123 boost::asio::async_write ( 1124 m_Socket, 1125 boost::asio::buffer (m_StreamBuffer, sz), 1126 boost::asio::transfer_all(), 1127 std::bind(&SAMSocket::HandleWriteI2PData, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); 1128 } 1129 1130 void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1131 { 1132 if (ecode) 1133 { 1134 LogPrint (eLogError, "SAM: Stream read error: ", ecode.message ()); 1135 if (ecode != boost::asio::error::operation_aborted) 1136 { 1137 if (bytes_transferred > 0) 1138 { 1139 WriteI2PData(bytes_transferred); 1140 } 1141 else 1142 { 1143 auto s = shared_from_this (); 1144 boost::asio::post (m_Owner.GetService (), [s] { s->Terminate ("stream read error"); }); 1145 } 1146 } 1147 else 1148 { 1149 auto s = shared_from_this (); 1150 boost::asio::post (m_Owner.GetService (), [s] { s->Terminate ("stream read error (op aborted)"); }); 1151 } 1152 } 1153 else 1154 { 1155 if (m_SocketType != SAMSocketType::eSAMSocketTypeTerminated) 1156 { 1157 if (bytes_transferred > 0) 1158 { 1159 WriteI2PData(bytes_transferred); 1160 } 1161 else 1162 I2PReceive(); 1163 } 1164 } 1165 } 1166 1167 void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode, size_t bytes_transferred) 1168 { 1169 if (ecode) 1170 { 1171 LogPrint (eLogError, "SAM: Socket write error: ", ecode.message ()); 1172 if (ecode != boost::asio::error::operation_aborted) 1173 Terminate ("socket write error at HandleWriteI2PData"); 1174 } 1175 else 1176 { 1177 I2PReceive (); 1178 } 1179 } 1180 1181 void SAMSocket::HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream) 1182 { 1183 if (stream) 1184 { 1185 LogPrint (eLogDebug, "SAM: Incoming I2P connection for session ", m_ID); 1186 m_SocketType = SAMSocketType::eSAMSocketTypeStream; 1187 m_IsAccepting = false; 1188 m_Stream = stream; 1189 context.GetAddressBook ().InsertFullAddress (stream->GetRemoteIdentity ()); 1190 auto session = m_Owner.FindSession (m_ID); 1191 if (session && !session->acceptQueue.empty ()) 1192 { 1193 // pending acceptors 1194 auto ts = i2p::util::GetSecondsSinceEpoch (); 1195 while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts) 1196 { 1197 auto socket = session->acceptQueue.front ().first; 1198 session->acceptQueue.pop_front (); 1199 if (socket) 1200 boost::asio::post (m_Owner.GetService (), std::bind(&SAMSocket::TerminateClose, socket)); 1201 } 1202 if (!session->acceptQueue.empty ()) 1203 { 1204 auto socket = session->acceptQueue.front ().first; 1205 session->acceptQueue.pop_front (); 1206 if (socket && socket->GetSocketType () == SAMSocketType::eSAMSocketTypeAcceptor) 1207 { 1208 socket->m_IsAccepting = true; 1209 session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1)); 1210 } 1211 } 1212 } 1213 if (!m_IsSilent) 1214 { 1215 if (m_SocketType != SAMSocketType::eSAMSocketTypeTerminated) 1216 { 1217 // get remote peer address 1218 auto ident = std::make_shared<std::string>(stream->GetRemoteIdentity()->ToBase64 ()); // we need to keep it until sent 1219 ident->push_back ('\n'); 1220 // send remote peer address back to client like received from stream 1221 boost::asio::async_write (m_Socket, boost::asio::buffer (ident->data (), ident->size ()), boost::asio::transfer_all(), 1222 [ident, s = shared_from_this ()](const boost::system::error_code& ecode, size_t bytes_transferred) 1223 { 1224 s->HandleWriteI2PData (ecode, bytes_transferred); 1225 }); 1226 } 1227 } 1228 else 1229 I2PReceive (); 1230 } 1231 else 1232 LogPrint (eLogWarning, "SAM: I2P acceptor has been reset"); 1233 } 1234 1235 void SAMSocket::HandleI2PForward (std::shared_ptr<i2p::stream::Stream> stream, 1236 boost::asio::ip::tcp::endpoint ep) 1237 { 1238 if (stream) 1239 { 1240 LogPrint (eLogDebug, "SAM: Incoming forward I2P connection for session ", m_ID); 1241 auto newSocket = std::make_shared<SAMSocket>(m_Owner); 1242 newSocket->SetSocketType (SAMSocketType::eSAMSocketTypeStream); 1243 auto s = shared_from_this (); 1244 newSocket->GetSocket ().async_connect (ep, 1245 [s, newSocket, stream](const boost::system::error_code& ecode) 1246 { 1247 if (!ecode) 1248 { 1249 s->m_Owner.AddSocket (newSocket); 1250 newSocket->Receive (); 1251 newSocket->m_Stream = stream; 1252 newSocket->m_ID = s->m_ID; 1253 if (!s->m_IsSilent) 1254 { 1255 // get remote peer address 1256 auto dest = stream->GetRemoteIdentity()->ToBase64 (); 1257 memcpy (newSocket->m_StreamBuffer, dest.c_str (), dest.length ()); 1258 newSocket->m_StreamBuffer[dest.length ()] = '\n'; 1259 newSocket->HandleI2PReceive (boost::system::error_code (),dest.length () + 1); // we send identity like it has been received from stream 1260 } 1261 else 1262 newSocket->I2PReceive (); 1263 } 1264 else 1265 stream->AsyncClose (); 1266 }); 1267 } 1268 else 1269 LogPrint (eLogWarning, "SAM: I2P forward acceptor has been reset"); 1270 } 1271 1272 void SAMSocket::HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, 1273 const uint8_t * buf, size_t len, const i2p::util::Mapping * options) 1274 { 1275 LogPrint (eLogDebug, "SAM: Datagram received ", len); 1276 auto session = m_Owner.FindSession(m_ID); 1277 if(session) 1278 { 1279 auto base64 = (session->DatagramVersion == i2p::datagram::eDatagramV3) ? from.GetIdentHash ().ToBase64 () : from.ToBase64 (); 1280 auto ep = session->UDPEndpoint; 1281 if (ep) 1282 { 1283 // udp forward enabled 1284 const char lf = '\n'; 1285 // send to remote endpoint, { destination, linefeed, payload } 1286 m_Owner.SendTo({ {(const uint8_t *)base64.c_str(), base64.size()}, {(const uint8_t *)&lf, 1}, {buf, len} }, *ep); 1287 } 1288 else 1289 { 1290 #ifdef _MSC_VER 1291 size_t l = sprintf_s ( 1292 (char *)m_StreamBuffer, 1293 SAM_STREAM_BUFFER_SIZE, 1294 SAM_DATAGRAM_RECEIVED, 1295 base64.c_str (), 1296 (long unsigned int)len, 1297 (unsigned)fromPort, 1298 (unsigned)toPort 1299 ); 1300 #else 1301 size_t l = snprintf ( 1302 (char *)m_StreamBuffer, 1303 SAM_STREAM_BUFFER_SIZE, 1304 SAM_DATAGRAM_RECEIVED, 1305 base64.c_str (), 1306 (long unsigned int)len, 1307 (unsigned)fromPort, 1308 (unsigned)toPort 1309 ); 1310 #endif 1311 if (len < SAM_STREAM_BUFFER_SIZE - l) 1312 { 1313 memcpy (m_StreamBuffer + l, buf, len); 1314 WriteI2PData(len + l); 1315 } 1316 else 1317 LogPrint (eLogWarning, "SAM: Received datagram size ", len," exceeds buffer"); 1318 } 1319 } 1320 } 1321 1322 void SAMSocket::HandleI2PRawDatagramReceive (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) 1323 { 1324 LogPrint (eLogDebug, "SAM: Raw datagram received ", len); 1325 auto session = m_Owner.FindSession(m_ID); 1326 if(session) 1327 { 1328 auto ep = session->UDPEndpoint; 1329 if (ep) 1330 // udp forward enabled 1331 m_Owner.SendTo({ {buf, len} }, *ep); 1332 else 1333 { 1334 #ifdef _MSC_VER 1335 size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_STREAM_BUFFER_SIZE, SAM_RAW_RECEIVED, (long unsigned int)len); 1336 #else 1337 size_t l = snprintf ((char *)m_StreamBuffer, SAM_STREAM_BUFFER_SIZE, SAM_RAW_RECEIVED, (long unsigned int)len); 1338 #endif 1339 if (len < SAM_STREAM_BUFFER_SIZE - l) 1340 { 1341 memcpy (m_StreamBuffer + l, buf, len); 1342 WriteI2PData(len + l); 1343 } 1344 else 1345 LogPrint (eLogWarning, "SAM: Received raw datagram size ", len," exceeds buffer"); 1346 } 1347 } 1348 } 1349 1350 void SAMSocket::HandleStreamSend(const boost::system::error_code & ec) 1351 { 1352 boost::asio::post (m_Owner.GetService (), std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this())); 1353 } 1354 1355 SAMSession::SAMSession (SAMBridge & parent, std::string_view id, SAMSessionType type): 1356 m_Bridge(parent), Name(id), Type (type), DatagramVersion (i2p::datagram::eDatagramV1) 1357 { 1358 } 1359 1360 void SAMSession::CloseStreams () 1361 { 1362 for(const auto & itr : m_Bridge.ListSockets(Name)) 1363 { 1364 itr->Terminate(nullptr); 1365 } 1366 } 1367 1368 SAMSingleSession::SAMSingleSession (SAMBridge & parent, std::string_view name, SAMSessionType type, std::shared_ptr<ClientDestination> dest): 1369 SAMSession (parent, name, type), 1370 localDestination (dest) 1371 { 1372 } 1373 1374 SAMSingleSession::~SAMSingleSession () 1375 { 1376 i2p::client::context.DeleteLocalDestination (localDestination); 1377 } 1378 1379 void SAMSingleSession::StopLocalDestination () 1380 { 1381 localDestination->Release (); 1382 // stop accepting new streams 1383 localDestination->StopAcceptingStreams (); 1384 // terminate existing streams 1385 auto s = localDestination->GetStreamingDestination (); // TODO: take care about datagrams 1386 if (s) s->Stop (); 1387 } 1388 1389 void SAMMasterSession::Close () 1390 { 1391 SAMSingleSession::Close (); 1392 for (const auto& it: subsessions) 1393 m_Bridge.CloseSession (it); 1394 subsessions.clear (); 1395 } 1396 1397 SAMSubSession::SAMSubSession (std::shared_ptr<SAMMasterSession> master, std::string_view name, SAMSessionType type, uint16_t port): 1398 SAMSession (master->m_Bridge, name, type), masterSession (master), inPort (port) 1399 { 1400 if (Type == SAMSessionType::eSAMSessionTypeStream && port) 1401 { 1402 // additional streaming destination, use default if port is 0 1403 auto d = masterSession->GetLocalDestination ()->CreateStreamingDestination (inPort); 1404 if (d) d->Start (); 1405 } 1406 // TODO: implement datagrams 1407 } 1408 1409 std::shared_ptr<ClientDestination> SAMSubSession::GetLocalDestination () 1410 { 1411 return masterSession ? masterSession->GetLocalDestination () : nullptr; 1412 } 1413 1414 void SAMSubSession::StopLocalDestination () 1415 { 1416 auto dest = GetLocalDestination (); 1417 if (dest && Type == SAMSessionType::eSAMSessionTypeStream) 1418 { 1419 auto d = dest->RemoveStreamingDestination (inPort); 1420 if (d) d->Stop (); 1421 } 1422 // TODO: implement datagrams 1423 } 1424 1425 SAMBridge::SAMBridge (const std::string& address, uint16_t portTCP, uint16_t portUDP, bool singleThread): 1426 RunnableService ("SAM"), m_IsSingleThread (singleThread), 1427 m_Acceptor (GetIOService (), boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(address), portTCP)), 1428 m_DatagramEndpoint (boost::asio::ip::make_address(address), (!portUDP) ? portTCP-1 : portUDP), m_DatagramSocket (GetIOService (), m_DatagramEndpoint), 1429 m_SignatureTypes 1430 { 1431 {"DSA_SHA1", i2p::data::SIGNING_KEY_TYPE_DSA_SHA1}, 1432 {"ECDSA_SHA256_P256", i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256}, 1433 {"ECDSA_SHA384_P384", i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA384_P384}, 1434 {"ECDSA_SHA512_P521", i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA512_P521}, 1435 {"EdDSA_SHA512_Ed25519", i2p::data::SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519}, 1436 {"GOST_GOSTR3411256_GOSTR3410CRYPTOPROA", i2p::data::SIGNING_KEY_TYPE_GOSTR3410_CRYPTO_PRO_A_GOSTR3411_256}, 1437 {"GOST_GOSTR3411512_GOSTR3410TC26A512", i2p::data::SIGNING_KEY_TYPE_GOSTR3410_TC26_A_512_GOSTR3411_512}, 1438 {"RedDSA_SHA512_Ed25519", i2p::data::SIGNING_KEY_TYPE_REDDSA_SHA512_ED25519}, 1439 } 1440 { 1441 } 1442 1443 SAMBridge::~SAMBridge () 1444 { 1445 if (IsRunning ()) 1446 Stop (); 1447 } 1448 1449 void SAMBridge::Start () 1450 { 1451 Accept (); 1452 ReceiveDatagram (); 1453 StartIOService (); 1454 } 1455 1456 void SAMBridge::Stop () 1457 { 1458 try 1459 { 1460 m_Acceptor.cancel (); 1461 } 1462 catch (const std::exception& ex) 1463 { 1464 LogPrint (eLogError, "SAM: Runtime exception: ", ex.what ()); 1465 } 1466 1467 decltype(m_Sessions) sessions; 1468 { 1469 std::unique_lock<std::mutex> l(m_SessionsMutex); 1470 m_Sessions.swap (sessions); 1471 } 1472 for (auto& it: sessions) 1473 it.second->Close (); 1474 1475 StopIOService (); 1476 } 1477 1478 void SAMBridge::Accept () 1479 { 1480 auto newSocket = std::make_shared<SAMSocket>(*this); 1481 m_Acceptor.async_accept (newSocket->GetSocket(), std::bind (&SAMBridge::HandleAccept, this, 1482 std::placeholders::_1, newSocket)); 1483 } 1484 1485 void SAMBridge::AddSocket(std::shared_ptr<SAMSocket> socket) 1486 { 1487 std::unique_lock<std::mutex> lock(m_OpenSocketsMutex); 1488 m_OpenSockets.push_back(socket); 1489 } 1490 1491 void SAMBridge::RemoveSocket(const std::shared_ptr<SAMSocket> & socket) 1492 { 1493 std::unique_lock<std::mutex> lock(m_OpenSocketsMutex); 1494 m_OpenSockets.remove_if([socket](const std::shared_ptr<SAMSocket> & item) -> bool { return item == socket; }); 1495 } 1496 1497 void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket) 1498 { 1499 if (!ecode) 1500 { 1501 boost::system::error_code ec; 1502 auto ep = socket->GetSocket ().remote_endpoint (ec); 1503 if (!ec) 1504 { 1505 LogPrint (eLogDebug, "SAM: New connection from ", ep); 1506 AddSocket (socket); 1507 socket->ReceiveHandshake (); 1508 } 1509 else 1510 LogPrint (eLogError, "SAM: Incoming connection error: ", ec.message ()); 1511 } 1512 else 1513 LogPrint (eLogError, "SAM: Accept error: ", ecode.message ()); 1514 1515 if (ecode != boost::asio::error::operation_aborted) 1516 Accept (); 1517 } 1518 1519 std::shared_ptr<SAMSession> SAMBridge::CreateSession (std::string_view id, SAMSessionType type, 1520 std::string_view destination, const i2p::util::Mapping& params) 1521 { 1522 std::shared_ptr<ClientDestination> localDestination = nullptr; 1523 if (destination != "") 1524 { 1525 i2p::data::PrivateKeys keys; 1526 if (!keys.FromBase64 (destination)) return nullptr; 1527 localDestination = m_IsSingleThread ? 1528 i2p::client::context.CreateNewLocalDestination (GetIOService (), keys, true, ¶ms) : 1529 i2p::client::context.CreateNewLocalDestination (keys, true, ¶ms); 1530 } 1531 else // transient 1532 { 1533 // extract signature type 1534 i2p::data::SigningKeyType signatureType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1; 1535 i2p::data::CryptoKeyType cryptoType = i2p::data::CRYPTO_KEY_TYPE_ELGAMAL; 1536 if (!params.IsEmpty ()) 1537 { 1538 auto signatureTypeStr = params[SAM_PARAM_SIGNATURE_TYPE]; 1539 if (!signatureTypeStr.empty ()) 1540 { 1541 if (!ResolveSignatureType (signatureTypeStr, signatureType)) 1542 LogPrint (eLogWarning, "SAM: ", SAM_PARAM_SIGNATURE_TYPE, " is invalid ", signatureTypeStr); 1543 } 1544 params.Get (SAM_PARAM_CRYPTO_TYPE, cryptoType); 1545 } 1546 localDestination = m_IsSingleThread ? 1547 i2p::client::context.CreateNewLocalDestination (GetIOService (), true, signatureType, cryptoType, ¶ms) : 1548 i2p::client::context.CreateNewLocalDestination (true, signatureType, cryptoType, ¶ms); 1549 } 1550 if (localDestination) 1551 { 1552 localDestination->Acquire (); 1553 auto session = (type == SAMSessionType::eSAMSessionTypeMaster) ? std::make_shared<SAMMasterSession>(*this, id, localDestination) : 1554 std::make_shared<SAMSingleSession>(*this, id, type, localDestination); 1555 std::unique_lock<std::mutex> l(m_SessionsMutex); 1556 auto ret = m_Sessions.emplace (id, session); 1557 if (!ret.second) 1558 LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); 1559 return ret.first->second; 1560 } 1561 return nullptr; 1562 } 1563 1564 bool SAMBridge::AddSession (std::shared_ptr<SAMSession> session) 1565 { 1566 if (!session) return false; 1567 auto ret = m_Sessions.emplace (session->Name, session); 1568 return ret.second; 1569 } 1570 1571 void SAMBridge::CloseSession (std::string_view id) 1572 { 1573 std::shared_ptr<SAMSession> session; 1574 { 1575 std::unique_lock<std::mutex> l(m_SessionsMutex); 1576 auto it = m_Sessions.find (id); 1577 if (it != m_Sessions.end ()) 1578 { 1579 session = it->second; 1580 m_Sessions.erase (it); 1581 } 1582 } 1583 if (session) 1584 { 1585 session->StopLocalDestination (); 1586 session->Close (); 1587 if (m_IsSingleThread) 1588 ScheduleSessionCleanupTimer (session); // let all session's streams close 1589 } 1590 } 1591 1592 void SAMBridge::ScheduleSessionCleanupTimer (std::shared_ptr<SAMSession> session) 1593 { 1594 auto timer = std::make_shared<boost::asio::deadline_timer>(GetService ()); 1595 timer->expires_from_now (boost::posix_time::seconds(5)); // postpone destination clean for 5 seconds 1596 timer->async_wait (std::bind (&SAMBridge::HandleSessionCleanupTimer, this, std::placeholders::_1, session, timer)); 1597 } 1598 1599 void SAMBridge::HandleSessionCleanupTimer (const boost::system::error_code& ecode, 1600 std::shared_ptr<SAMSession> session, std::shared_ptr<boost::asio::deadline_timer> timer) 1601 { 1602 if (ecode != boost::asio::error::operation_aborted && session) 1603 { 1604 auto dest = session->GetLocalDestination (); 1605 if (dest) 1606 { 1607 auto streamingDest = dest->GetStreamingDestination (); 1608 if (streamingDest) 1609 { 1610 auto numStreams = streamingDest->GetNumStreams (); 1611 if (numStreams > 0) 1612 { 1613 LogPrint (eLogInfo, "SAM: Session ", session->Name, " still has ", numStreams, " streams"); 1614 ScheduleSessionCleanupTimer (session); 1615 } 1616 else 1617 LogPrint (eLogDebug, "SAM: Session ", session->Name, " terminated"); 1618 } 1619 } 1620 } 1621 // session's destructor is called here unless rescheduled 1622 } 1623 1624 std::shared_ptr<SAMSession> SAMBridge::FindSession (std::string_view id) const 1625 { 1626 std::unique_lock<std::mutex> l(m_SessionsMutex); 1627 auto it = m_Sessions.find (id); 1628 if (it != m_Sessions.end ()) 1629 return it->second; 1630 return nullptr; 1631 } 1632 1633 std::list<std::shared_ptr<SAMSocket> > SAMBridge::ListSockets(std::string_view id) const 1634 { 1635 std::list<std::shared_ptr<SAMSocket > > list; 1636 { 1637 std::unique_lock<std::mutex> l(m_OpenSocketsMutex); 1638 for (const auto & itr : m_OpenSockets) 1639 if (itr->IsSession(id)) 1640 list.push_back(itr); 1641 } 1642 return list; 1643 } 1644 1645 void SAMBridge::SendTo (const std::vector<boost::asio::const_buffer>& bufs, const boost::asio::ip::udp::endpoint& ep) 1646 { 1647 m_DatagramSocket.send_to (bufs, ep); 1648 } 1649 1650 void SAMBridge::ReceiveDatagram () 1651 { 1652 m_DatagramSocket.async_receive_from ( 1653 boost::asio::buffer (m_DatagramReceiveBuffer, i2p::datagram::MAX_DATAGRAM_SIZE), 1654 m_SenderEndpoint, 1655 std::bind (&SAMBridge::HandleReceivedDatagram, this, std::placeholders::_1, std::placeholders::_2)); 1656 } 1657 1658 void SAMBridge::HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred) 1659 { 1660 if (!ecode) 1661 { 1662 m_DatagramReceiveBuffer[bytes_transferred] = 0; 1663 char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n'); 1664 if(eol) 1665 { 1666 *eol = 0; eol++; 1667 size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); 1668 LogPrint (eLogDebug, "SAM: Datagram received ", m_DatagramReceiveBuffer," size=", payloadLen); 1669 char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' '); 1670 if (sessionID) 1671 { 1672 sessionID++; 1673 char * destination = strchr (sessionID, ' '); 1674 if (destination) 1675 { 1676 *destination = 0; destination++; 1677 auto session = FindSession (sessionID); 1678 if (session) 1679 { 1680 uint16_t fromPort = 0; 1681 uint16_t toPort = 0; 1682 char *raw_params = strchr(destination, ' '); 1683 if (raw_params) 1684 { 1685 *raw_params = 0; raw_params++; 1686 auto params = SAMSocket::ExtractParams(raw_params); 1687 params.Get(SAM_PARAM_FROM_PORT, fromPort); 1688 params.Get(SAM_PARAM_TO_PORT, toPort); 1689 LogPrint (eLogInfo, "SAM: Datagram params are FROM_PORT=", fromPort, " TO_PORT=", toPort); 1690 } 1691 1692 auto localDest = session->GetLocalDestination (); 1693 auto datagramDest = localDest ? localDest->GetDatagramDestination () : nullptr; 1694 if (datagramDest) 1695 { 1696 i2p::data::IdentHash ident; bool isDest = false; 1697 if (std::string_view (destination).find(".i2p") != std::string_view::npos) 1698 { 1699 auto addr = context.GetAddressBook().GetAddress (destination); 1700 if (addr && addr->IsValid () && addr->IsIdentHash ()) 1701 { 1702 ident = addr->identHash; 1703 isDest = true; 1704 } 1705 } 1706 else 1707 { 1708 i2p::data::IdentityEx dest; 1709 if (dest.FromBase64 (destination) > 0) 1710 { 1711 ident = dest.GetIdentHash (); 1712 isDest = true; 1713 } 1714 } 1715 if (isDest) 1716 { 1717 if (session->Type == SAMSessionType::eSAMSessionTypeDatagram) 1718 datagramDest->SendDatagramTo ((uint8_t *)eol, payloadLen, ident, fromPort, toPort); 1719 else if (session->Type == SAMSessionType::eSAMSessionTypeRaw) 1720 datagramDest->SendRawDatagramTo ((uint8_t *)eol, payloadLen, ident, fromPort, toPort); 1721 else 1722 LogPrint (eLogError, "SAM: Unexpected session type ", (int)session->Type, "for session ", sessionID); 1723 } 1724 else 1725 LogPrint (eLogError, "SAM: Datagram unexpected destination ", destination); 1726 } 1727 else 1728 LogPrint (eLogError, "SAM: Datagram destination is not set for session ", sessionID); 1729 } 1730 else 1731 LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); 1732 } 1733 else 1734 LogPrint (eLogError, "SAM: Missing destination key"); 1735 } 1736 else 1737 LogPrint (eLogError, "SAM: Missing sessionID"); 1738 } 1739 else 1740 LogPrint(eLogError, "SAM: Invalid datagram"); 1741 ReceiveDatagram (); 1742 } 1743 else 1744 LogPrint (eLogError, "SAM: Datagram receive error: ", ecode.message ()); 1745 } 1746 1747 bool SAMBridge::ResolveSignatureType (std::string_view name, i2p::data::SigningKeyType& type) const 1748 { 1749 auto res = std::from_chars(name.data(), name.data() + name.size(), type); 1750 if (res.ec != std::errc()) 1751 { 1752 if (res.ec == std::errc::invalid_argument) 1753 { 1754 // name is not numeric, resolving 1755 auto it = m_SignatureTypes.find (name); 1756 if (it != m_SignatureTypes.end ()) 1757 type = it->second; 1758 else 1759 return false; 1760 } 1761 else 1762 return false; 1763 } 1764 // name has been resolved 1765 return true; 1766 } 1767 } 1768 }