mod.rs
1 //! Networking, server and client ablilities built in the game engine. 2 //! 3 //! Networking through the public internet requires port forwarding the same TCP and UDP port. 4 5 // Formats 6 // 7 // # TCP 8 // 9 // TCP can only send 2 kinds of messages: Auth messages and Data messages. 10 // 11 // Auth messages are made out of 128 random bytes, where the first 4 bytes are 0. They are the first message that arrives. 12 // 13 // Auth messages during a registered connection will be seen as misbehaving peer and disconnected. 14 // 15 // Data messages include a 4 byte header with the length prefix and a rest as big as the u32 that comes from the length. 16 // 17 // # UDP 18 // 19 // UDP has 3 kinds of messages: Auth messages, Ping messages and Data messages. 20 // 21 // Auth messages are the same random bytes as TCP and are retried 10 times before giving up the connection. 22 // 23 // Auth messages start with 4 bytes made only out of zeros, because zeroes are not valid order numbers 24 // 25 // The rest of the messages have a 8 byte header with the first 4 bytes as the order number and the rest as lenght prefix. 26 // 27 // A Ping packet also works as the ack auth back message signalling to stop sending the auth message. 28 // 29 // It is mainly there to calculate ping and consists of a valid order number and a length of 0, thereby always 8 bytes of data. 30 // 31 // A data packet consists of a valid order number, length over 0 and leading data as big as the length number indicates. 32 // 33 // To combat UDP fragmentation and corruption there is a order number. Any packet that does not follow the right order will be ignored. 34 // If another packet arrives with an order not one bigger than the last one, the data will be discarted. 35 // If a packet arrives with an order number exactly 1 bigger than the last one, it will be kept track of again. 36 // 37 // There is a lot of discarting here. Users have to expect that UDP is not perfect and reliable. 38 39 mod client; 40 mod server; 41 42 use std::{ 43 io::{self, ErrorKind}, 44 net::{IpAddr, SocketAddr}, 45 time::{Duration, SystemTime}, 46 }; 47 48 pub use client::*; 49 use let_engine_core::backend::networking::{NetEvent, NetworkingBackend}; 50 pub use server::*; 51 use smol::{ 52 channel::{Receiver, bounded}, 53 future::race, 54 }; 55 use thiserror::Error; 56 57 const SAFE_MTU_SIZE: usize = 1200; 58 59 pub trait NetSerializable 60 where 61 for<'a> Self: Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>> + Archive, 62 for<'a> Self::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static, 63 Self: Send + Sync, 64 { 65 } 66 67 impl<T> NetSerializable for T 68 where 69 for<'a> T: Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>> + Archive, 70 for<'a> T::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static, 71 T: Send + Sync, 72 { 73 } 74 75 pub struct DefaultNetworkingBackend<ServerMsg, ClientMsg> { 76 server_interface: server::ServerInterface<ServerMsg>, 77 client_interface: client::ClientInterface<ClientMsg>, 78 server_receiver: Receiver<(Connection, ServerMessage)>, 79 client_receiver: Receiver<ClientMessage>, 80 } 81 82 impl<ServerMsg, ClientMsg> NetworkingBackend for DefaultNetworkingBackend<ServerMsg, ClientMsg> 83 where 84 ClientMsg: NetSerializable, 85 ServerMsg: NetSerializable, 86 for<'a> ClientMsg::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static, 87 for<'a> ServerMsg::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static, 88 { 89 type Settings = NetworkingSettings; 90 type Error = NetworkingError; 91 92 type ServerEvent<'a> = RemoteMessage<'a, <ClientMsg as Archive>::Archived>; 93 type ClientEvent<'a> = RemoteMessage<'a, <ServerMsg as Archive>::Archived>; 94 95 type Connection = Connection; 96 97 type ServerInterface = server::ServerInterface<ServerMsg>; 98 type ClientInterface = client::ClientInterface<ClientMsg>; 99 100 fn new(settings: Self::Settings) -> Result<Self, Self::Error> { 101 let (server_sender, server_receiver) = bounded(2); 102 let (client_sender, client_receiver) = bounded(2); 103 104 let arena = std::sync::Arc::new(parking_lot::Mutex::new(Arena::new())); 105 106 let server_interface = 107 server::ServerInterface::new(settings.clone(), server_sender, arena.clone()).unwrap(); 108 let client_interface = 109 client::ClientInterface::new(settings, client_sender, arena).unwrap(); 110 111 Ok(Self { 112 server_interface, 113 client_interface, 114 server_receiver, 115 client_receiver, 116 }) 117 } 118 119 fn server_interface(&self) -> &Self::ServerInterface { 120 &self.server_interface 121 } 122 123 fn client_interface(&self) -> &Self::ClientInterface { 124 &self.client_interface 125 } 126 127 fn receive<F>(&mut self, f: F) -> Result<(), Self::Error> 128 where 129 F: for<'a> FnOnce(NetEvent<'a, Self>), 130 { 131 enum Event { 132 Server((Connection, ServerMessage)), 133 Client(ClientMessage), 134 } 135 136 let event = smol::block_on(race( 137 async { 138 // Server 139 Event::Server(self.server_receiver.recv().await.unwrap()) 140 }, 141 async { 142 // Client 143 Event::Client(self.client_receiver.recv().await.unwrap()) 144 }, 145 )); 146 147 match event { 148 Event::Server((connection, message)) => match message { 149 ServerMessage::Error(e) => return Err(NetworkingError::Server(ServerError::Io(e))), 150 ServerMessage::Warning(w) => f(NetEvent::Server { 151 connection, 152 event: RemoteMessage::Warning(w), 153 }), 154 ServerMessage::Connected => f(NetEvent::Server { 155 connection, 156 event: RemoteMessage::Connected, 157 }), 158 ServerMessage::Disconnected(reason) => f(NetEvent::Server { 159 connection, 160 event: RemoteMessage::Disconnected(reason), 161 }), 162 ServerMessage::Tcp(msg) => { 163 let result = rkyv::access(&msg); 164 165 f(match result { 166 Ok(archive) => NetEvent::Server { 167 connection, 168 event: RemoteMessage::Tcp(archive), 169 }, 170 171 Err(e) => NetEvent::Server { 172 connection, 173 event: RemoteMessage::Warning(Warning::UnintelligableContent(e)), 174 }, 175 }) 176 } 177 ServerMessage::Udp(msg) => { 178 let result = rkyv::access(&msg); 179 180 f(match result { 181 Ok(archive) => NetEvent::Server { 182 connection, 183 event: RemoteMessage::Udp(archive), 184 }, 185 Err(e) => NetEvent::Server { 186 connection, 187 event: RemoteMessage::Warning(Warning::UnintelligableContent(e)), 188 }, 189 }) 190 } 191 }, 192 Event::Client(message) => match message { 193 ClientMessage::Error(e) => return Err(NetworkingError::Client(e)), 194 ClientMessage::Warning(w) => f(NetEvent::Client { 195 event: RemoteMessage::Warning(w), 196 }), 197 ClientMessage::Connected => f(NetEvent::Client { 198 event: RemoteMessage::Connected, 199 }), 200 ClientMessage::Disconnected(reason) => f(NetEvent::Client { 201 event: RemoteMessage::Disconnected(reason), 202 }), 203 ClientMessage::Tcp(msg) => { 204 let result = rkyv::access(&msg); 205 206 f(match result { 207 Ok(archive) => NetEvent::Client { 208 event: RemoteMessage::Tcp(archive), 209 }, 210 Err(e) => NetEvent::Client { 211 event: RemoteMessage::Warning(Warning::UnintelligableContent(e)), 212 }, 213 }) 214 } 215 ClientMessage::Udp(msg) => { 216 let result = rkyv::access(&msg); 217 218 f(match result { 219 Ok(archive) => NetEvent::Client { 220 event: RemoteMessage::Udp(archive), 221 }, 222 Err(e) => NetEvent::Client { 223 event: RemoteMessage::Warning(Warning::UnintelligableContent(e)), 224 }, 225 }) 226 } 227 }, 228 } 229 230 Ok(()) 231 } 232 } 233 234 #[derive(Debug, Error)] 235 pub enum NetworkingError { 236 /// An IO error from the system. 237 #[error(transparent)] 238 Io(std::io::Error), 239 240 #[error(transparent)] 241 Server(ServerError), 242 243 #[error(transparent)] 244 Client(ClientError), 245 } 246 247 /// Settings for the networking system of let-engine. 248 #[derive(Clone, Debug, PartialEq, Eq, Hash)] 249 pub struct NetworkingSettings { 250 /// The number of auth request retries before giving up the connection 251 /// and failing to connect as client. 252 /// 253 /// ## Default 254 /// 255 /// `5` 256 pub auth_retries: usize, 257 258 /// The time between retries. 259 /// 260 /// ## Default 261 /// 262 /// `2 seconds` 263 pub auth_retry_wait: Duration, 264 265 /// The time between ping requests. 266 /// 267 /// ## Default 268 /// 269 /// `5 seconds` 270 pub ping_wait: Duration, 271 272 /// The maximum allowed ping before sending warnings. 273 /// 274 /// ## Default 275 /// 276 /// `5 seconds` 277 pub max_ping: Duration, 278 279 /// Maximum amount of concurrent connections allowed before warning 280 /// 281 /// # Default 282 /// 283 /// `20` 284 pub max_connections: usize, 285 286 /// The minimum duration between multiple packets allowed before warning 287 /// 288 /// ## Default 289 /// 290 /// `10 milliseconds` 291 pub rate_limit: Duration, 292 293 /// Max package size limit for the built in TCP protocol in bytes 294 /// 295 /// ## Default 296 /// 297 /// `1048576` bytes or 1MiB 298 pub tcp_max_size: usize, 299 300 /// Max package size limit for the built in UDP protocol in bytes 301 /// 302 /// ## Default 303 /// 304 /// `1200` bytes 305 pub udp_max_size: usize, 306 } 307 308 impl Default for NetworkingSettings { 309 fn default() -> Self { 310 Self { 311 auth_retries: 5, 312 auth_retry_wait: Duration::from_secs(2), 313 ping_wait: Duration::from_secs(5), 314 max_ping: Duration::from_secs(5), 315 rate_limit: Duration::from_millis(10), 316 max_connections: 20, 317 tcp_max_size: 1048576, 318 udp_max_size: 1200, 319 } 320 } 321 } 322 323 /// Messages received by a remote connection. 324 #[derive(Debug)] 325 pub enum RemoteMessage<'a, Msg> { 326 /// The client has connected to the server successfully. 327 Connected, 328 329 /// The remote has sent a message using TCP. 330 Tcp(&'a Msg), 331 332 /// The remote has sent a message using UDP. 333 Udp(&'a Msg), 334 335 /// The remote has sent non conformant packets. 336 Warning(Warning), 337 338 /// The client has been disconnected from the server. 339 Disconnected(Disconnected), 340 } 341 342 /// Misbehaviour recorded by the remote peer. 343 #[derive(Debug)] 344 pub enum Warning { 345 /// The rate at which the packets are sent is faster than the configured limit. 346 RateLimitHit, 347 348 /// The header of the message shows a size bigger than the configured limit. 349 MessageTooBig, 350 351 /// There was a problem reading and deserializing the received data. 352 UnintelligableContent(rkyv::rancor::Error), 353 354 /// The ping limit as set in the networking settings was hit. 355 PingTooHigh, 356 357 /// There was a problem connecting, which caused a retry. 358 Retry(usize), 359 } 360 361 /// The connection to the peer has been stopped. 362 /// 363 /// The reason for the disconnect is 364 #[derive(Debug, Error)] 365 pub enum Disconnected { 366 /// The peer has gracefully shut down the connection 367 #[error("Remote Shutdown")] 368 RemoteShutdown, 369 370 /// An unexpected termination of the connection has occured. 371 #[error("Connection Aborted")] 372 ConnectionAborted, 373 374 /// The connection has been forcibly closed by the remote. 375 /// 376 /// The remote could be rebooting, shutting down or the application could have crashed. 377 #[error("Connection Reset")] 378 ConnectionReset, 379 380 /// The peer has been disconnected for misbehaving and sending packets 381 /// not according to the system. 382 #[error("Peer Misbehaving")] 383 MisbehavingPeer, 384 385 /// An unexplainable error has occured. 386 #[error(transparent)] 387 Other(io::Error), 388 } 389 390 impl From<io::Error> for Disconnected { 391 fn from(value: io::Error) -> Self { 392 match value.kind() { 393 ErrorKind::UnexpectedEof => Self::RemoteShutdown, 394 ErrorKind::ConnectionAborted => Self::ConnectionAborted, 395 ErrorKind::ConnectionReset => Self::ConnectionReset, 396 _ => Self::Other(value), 397 } 398 } 399 } 400 401 /// The identification of a connection containing both TCP and UDP connection addresses for one user. 402 /// 403 /// The IP of both is the same, but the port is different. 404 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Hash)] 405 pub struct Connection { 406 ip: IpAddr, 407 tcp_port: u16, 408 udp_port: u16, 409 } 410 411 impl Connection { 412 fn from_tcp_udp_addr(tcp_addr: SocketAddr, udp_addr: SocketAddr) -> Self { 413 Self { 414 ip: tcp_addr.ip(), 415 tcp_port: tcp_addr.port(), 416 udp_port: udp_addr.port(), 417 } 418 } 419 420 /// Returns the IP address of this connection. 421 pub fn ip_addr(&self) -> IpAddr { 422 self.ip 423 } 424 425 /// Returns the TCP port of this connection. 426 pub fn tcp_port(&self) -> u16 { 427 self.tcp_port 428 } 429 430 /// Returns the UDP port of this connection. 431 pub fn udp_port(&self) -> u16 { 432 self.udp_port 433 } 434 435 /// Returns the TCP address of this connection. 436 pub fn tcp_addr(&self) -> SocketAddr { 437 SocketAddr::new(self.ip, self.tcp_port) 438 } 439 440 /// Returns the UDP address of this connection. 441 pub fn udp_addr(&self) -> SocketAddr { 442 SocketAddr::new(self.ip, self.udp_port) 443 } 444 445 pub fn contains_addr(&self, addr: &SocketAddr) -> bool { 446 self.ip == addr.ip() && (self.tcp_port == addr.port() || self.udp_port == addr.port()) 447 } 448 } 449 450 use rkyv::{ 451 Archive, Serialize, 452 api::high::{HighSerializer, HighValidator, to_bytes_in_with_alloc}, 453 bytecheck::CheckBytes, 454 rancor::{self, Source}, 455 ser::allocator::{Arena, ArenaHandle}, 456 util::AlignedVec, 457 }; 458 459 /// Serialize the given data to a streamable message format. 460 /// 461 /// ## Message format 462 /// 463 /// - Length prefixed with a u32 464 /// 465 /// \[u32data_len\](u8data) 466 fn serialize_tcp_into<T, E>(message: &T, arena: &mut Arena) -> AlignedVec 467 where 468 T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, E>>, 469 E: Source, 470 { 471 let mut data = AlignedVec::new(); 472 data.extend_from_slice(&[0; 4]); 473 let mut data = to_bytes_in_with_alloc(message, data, arena.acquire()).unwrap(); 474 475 let len = data.len() - 4; 476 477 data[0..4].copy_from_slice(&(len as u32).to_le_bytes()); 478 479 data 480 } 481 482 /// Serialize the given data to a streamable message format. 483 /// 484 /// ## Message format 485 /// 486 /// - Indexed and data length prefixed 487 /// 488 /// \[u32order_number\]\[u32data_len\])(u8data) 489 /// 490 /// Order number has to be added by yourself 491 fn serialize_udp_into<E: Source>( 492 message: &impl for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, E>>, 493 arena: &mut Arena, 494 ) -> AlignedVec { 495 let mut data = AlignedVec::with_capacity(1024); 496 data.extend_from_slice(&[0; 8]); 497 498 let mut data = to_bytes_in_with_alloc(message, data, arena.acquire()).unwrap(); 499 500 let data_len = data.len() - 8; 501 data[4..8].copy_from_slice(&(data_len as u32).to_le_bytes()); 502 503 data 504 } 505 506 struct BufferingMessage { 507 bytes_left: usize, 508 buf: Vec<u8>, 509 timestamp: SystemTime, 510 } 511 512 impl BufferingMessage { 513 pub fn new(size: usize) -> Self { 514 let buf = Vec::with_capacity(size); 515 516 Self { 517 bytes_left: size, 518 buf, 519 timestamp: SystemTime::now(), 520 } 521 } 522 523 pub fn completed(&mut self, buf: &[u8]) -> bool { 524 let bytes_to_copy = buf.len().min(self.bytes_left); 525 self.buf.extend_from_slice(&buf[..bytes_to_copy]); 526 self.bytes_left -= bytes_to_copy; 527 self.timestamp = SystemTime::now(); 528 self.bytes_left == 0 529 } 530 531 pub fn consume(self) -> Vec<u8> { 532 self.buf 533 } 534 535 pub fn outdated(&self) -> bool { 536 self.timestamp.elapsed().unwrap() > Duration::from_secs(1) 537 } 538 }