mod.rs
  1  //! Networking, server and client ablilities built in the game engine.
  2  //!
  3  //! Networking through the public internet requires port forwarding the same TCP and UDP port.
  4  
  5  // Formats
  6  //
  7  // # TCP
  8  //
  9  // TCP can only send 2 kinds of messages: Auth messages and Data messages.
 10  //
 11  // Auth messages are made out of 128 random bytes, where the first 4 bytes are 0. They are the first message that arrives.
 12  //
 13  // Auth messages during a registered connection will be seen as misbehaving peer and disconnected.
 14  //
 15  // Data messages include a 4 byte header with the length prefix and a rest as big as the u32 that comes from the length.
 16  //
 17  // # UDP
 18  //
 19  // UDP has 3 kinds of messages: Auth messages, Ping messages and Data messages.
 20  //
 21  // Auth messages are the same random bytes as TCP and are retried 10 times before giving up the connection.
 22  //
 23  // Auth messages start with 4 bytes made only out of zeros, because zeroes are not valid order numbers
 24  //
 25  // The rest of the messages have a 8 byte header with the first 4 bytes as the order number and the rest as lenght prefix.
 26  //
 27  // A Ping packet also works as the ack auth back message signalling to stop sending the auth message.
 28  //
 29  // It is mainly there to calculate ping and consists of a valid order number and a length of 0, thereby always 8 bytes of data.
 30  //
 31  // A data packet consists of a valid order number, length over 0 and leading data as big as the length number indicates.
 32  //
 33  // To combat UDP fragmentation and corruption there is a order number. Any packet that does not follow the right order will be ignored.
 34  // If another packet arrives with an order not one bigger than the last one, the data will be discarted.
 35  // If a packet arrives with an order number exactly 1 bigger than the last one, it will be kept track of again.
 36  //
 37  // There is a lot of discarting here. Users have to expect that UDP is not perfect and reliable.
 38  
 39  mod client;
 40  mod server;
 41  
 42  use std::{
 43      io::{self, ErrorKind},
 44      net::{IpAddr, SocketAddr},
 45      time::{Duration, SystemTime},
 46  };
 47  
 48  pub use client::*;
 49  use let_engine_core::backend::networking::{NetEvent, NetworkingBackend};
 50  pub use server::*;
 51  use smol::{
 52      channel::{Receiver, bounded},
 53      future::race,
 54  };
 55  use thiserror::Error;
 56  
 57  const SAFE_MTU_SIZE: usize = 1200;
 58  
 59  pub trait NetSerializable
 60  where
 61      for<'a> Self: Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>> + Archive,
 62      for<'a> Self::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static,
 63      Self: Send + Sync,
 64  {
 65  }
 66  
 67  impl<T> NetSerializable for T
 68  where
 69      for<'a> T: Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>> + Archive,
 70      for<'a> T::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static,
 71      T: Send + Sync,
 72  {
 73  }
 74  
 75  pub struct DefaultNetworkingBackend<ServerMsg, ClientMsg> {
 76      server_interface: server::ServerInterface<ServerMsg>,
 77      client_interface: client::ClientInterface<ClientMsg>,
 78      server_receiver: Receiver<(Connection, ServerMessage)>,
 79      client_receiver: Receiver<ClientMessage>,
 80  }
 81  
 82  impl<ServerMsg, ClientMsg> NetworkingBackend for DefaultNetworkingBackend<ServerMsg, ClientMsg>
 83  where
 84      ClientMsg: NetSerializable,
 85      ServerMsg: NetSerializable,
 86      for<'a> ClientMsg::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static,
 87      for<'a> ServerMsg::Archived: CheckBytes<HighValidator<'a, rancor::Error>> + 'static,
 88  {
 89      type Settings = NetworkingSettings;
 90      type Error = NetworkingError;
 91  
 92      type ServerEvent<'a> = RemoteMessage<'a, <ClientMsg as Archive>::Archived>;
 93      type ClientEvent<'a> = RemoteMessage<'a, <ServerMsg as Archive>::Archived>;
 94  
 95      type Connection = Connection;
 96  
 97      type ServerInterface = server::ServerInterface<ServerMsg>;
 98      type ClientInterface = client::ClientInterface<ClientMsg>;
 99  
100      fn new(settings: Self::Settings) -> Result<Self, Self::Error> {
101          let (server_sender, server_receiver) = bounded(2);
102          let (client_sender, client_receiver) = bounded(2);
103  
104          let arena = std::sync::Arc::new(parking_lot::Mutex::new(Arena::new()));
105  
106          let server_interface =
107              server::ServerInterface::new(settings.clone(), server_sender, arena.clone()).unwrap();
108          let client_interface =
109              client::ClientInterface::new(settings, client_sender, arena).unwrap();
110  
111          Ok(Self {
112              server_interface,
113              client_interface,
114              server_receiver,
115              client_receiver,
116          })
117      }
118  
119      fn server_interface(&self) -> &Self::ServerInterface {
120          &self.server_interface
121      }
122  
123      fn client_interface(&self) -> &Self::ClientInterface {
124          &self.client_interface
125      }
126  
127      fn receive<F>(&mut self, f: F) -> Result<(), Self::Error>
128      where
129          F: for<'a> FnOnce(NetEvent<'a, Self>),
130      {
131          enum Event {
132              Server((Connection, ServerMessage)),
133              Client(ClientMessage),
134          }
135  
136          let event = smol::block_on(race(
137              async {
138                  // Server
139                  Event::Server(self.server_receiver.recv().await.unwrap())
140              },
141              async {
142                  // Client
143                  Event::Client(self.client_receiver.recv().await.unwrap())
144              },
145          ));
146  
147          match event {
148              Event::Server((connection, message)) => match message {
149                  ServerMessage::Error(e) => return Err(NetworkingError::Server(ServerError::Io(e))),
150                  ServerMessage::Warning(w) => f(NetEvent::Server {
151                      connection,
152                      event: RemoteMessage::Warning(w),
153                  }),
154                  ServerMessage::Connected => f(NetEvent::Server {
155                      connection,
156                      event: RemoteMessage::Connected,
157                  }),
158                  ServerMessage::Disconnected(reason) => f(NetEvent::Server {
159                      connection,
160                      event: RemoteMessage::Disconnected(reason),
161                  }),
162                  ServerMessage::Tcp(msg) => {
163                      let result = rkyv::access(&msg);
164  
165                      f(match result {
166                          Ok(archive) => NetEvent::Server {
167                              connection,
168                              event: RemoteMessage::Tcp(archive),
169                          },
170  
171                          Err(e) => NetEvent::Server {
172                              connection,
173                              event: RemoteMessage::Warning(Warning::UnintelligableContent(e)),
174                          },
175                      })
176                  }
177                  ServerMessage::Udp(msg) => {
178                      let result = rkyv::access(&msg);
179  
180                      f(match result {
181                          Ok(archive) => NetEvent::Server {
182                              connection,
183                              event: RemoteMessage::Udp(archive),
184                          },
185                          Err(e) => NetEvent::Server {
186                              connection,
187                              event: RemoteMessage::Warning(Warning::UnintelligableContent(e)),
188                          },
189                      })
190                  }
191              },
192              Event::Client(message) => match message {
193                  ClientMessage::Error(e) => return Err(NetworkingError::Client(e)),
194                  ClientMessage::Warning(w) => f(NetEvent::Client {
195                      event: RemoteMessage::Warning(w),
196                  }),
197                  ClientMessage::Connected => f(NetEvent::Client {
198                      event: RemoteMessage::Connected,
199                  }),
200                  ClientMessage::Disconnected(reason) => f(NetEvent::Client {
201                      event: RemoteMessage::Disconnected(reason),
202                  }),
203                  ClientMessage::Tcp(msg) => {
204                      let result = rkyv::access(&msg);
205  
206                      f(match result {
207                          Ok(archive) => NetEvent::Client {
208                              event: RemoteMessage::Tcp(archive),
209                          },
210                          Err(e) => NetEvent::Client {
211                              event: RemoteMessage::Warning(Warning::UnintelligableContent(e)),
212                          },
213                      })
214                  }
215                  ClientMessage::Udp(msg) => {
216                      let result = rkyv::access(&msg);
217  
218                      f(match result {
219                          Ok(archive) => NetEvent::Client {
220                              event: RemoteMessage::Udp(archive),
221                          },
222                          Err(e) => NetEvent::Client {
223                              event: RemoteMessage::Warning(Warning::UnintelligableContent(e)),
224                          },
225                      })
226                  }
227              },
228          }
229  
230          Ok(())
231      }
232  }
233  
234  #[derive(Debug, Error)]
235  pub enum NetworkingError {
236      /// An IO error from the system.
237      #[error(transparent)]
238      Io(std::io::Error),
239  
240      #[error(transparent)]
241      Server(ServerError),
242  
243      #[error(transparent)]
244      Client(ClientError),
245  }
246  
247  /// Settings for the networking system of let-engine.
248  #[derive(Clone, Debug, PartialEq, Eq, Hash)]
249  pub struct NetworkingSettings {
250      /// The number of auth request retries before giving up the connection
251      /// and failing to connect as client.
252      ///
253      /// ## Default
254      ///
255      /// `5`
256      pub auth_retries: usize,
257  
258      /// The time between retries.
259      ///
260      /// ## Default
261      ///
262      /// `2 seconds`
263      pub auth_retry_wait: Duration,
264  
265      /// The time between ping requests.
266      ///
267      /// ## Default
268      ///
269      /// `5 seconds`
270      pub ping_wait: Duration,
271  
272      /// The maximum allowed ping before sending warnings.
273      ///
274      /// ## Default
275      ///
276      /// `5 seconds`
277      pub max_ping: Duration,
278  
279      /// Maximum amount of concurrent connections allowed before warning
280      ///
281      /// # Default
282      ///
283      /// `20`
284      pub max_connections: usize,
285  
286      /// The minimum duration between multiple packets allowed before warning
287      ///
288      /// ## Default
289      ///
290      /// `10 milliseconds`
291      pub rate_limit: Duration,
292  
293      /// Max package size limit for the built in TCP protocol in bytes
294      ///
295      /// ## Default
296      ///
297      /// `1048576` bytes or 1MiB
298      pub tcp_max_size: usize,
299  
300      /// Max package size limit for the built in UDP protocol in bytes
301      ///
302      /// ## Default
303      ///
304      /// `1200` bytes
305      pub udp_max_size: usize,
306  }
307  
308  impl Default for NetworkingSettings {
309      fn default() -> Self {
310          Self {
311              auth_retries: 5,
312              auth_retry_wait: Duration::from_secs(2),
313              ping_wait: Duration::from_secs(5),
314              max_ping: Duration::from_secs(5),
315              rate_limit: Duration::from_millis(10),
316              max_connections: 20,
317              tcp_max_size: 1048576,
318              udp_max_size: 1200,
319          }
320      }
321  }
322  
323  /// Messages received by a remote connection.
324  #[derive(Debug)]
325  pub enum RemoteMessage<'a, Msg> {
326      /// The client has connected to the server successfully.
327      Connected,
328  
329      /// The remote has sent a message using TCP.
330      Tcp(&'a Msg),
331  
332      /// The remote has sent a message using UDP.
333      Udp(&'a Msg),
334  
335      /// The remote has sent non conformant packets.
336      Warning(Warning),
337  
338      /// The client has been disconnected from the server.
339      Disconnected(Disconnected),
340  }
341  
342  /// Misbehaviour recorded by the remote peer.
343  #[derive(Debug)]
344  pub enum Warning {
345      /// The rate at which the packets are sent is faster than the configured limit.
346      RateLimitHit,
347  
348      /// The header of the message shows a size bigger than the configured limit.
349      MessageTooBig,
350  
351      /// There was a problem reading and deserializing the received data.
352      UnintelligableContent(rkyv::rancor::Error),
353  
354      /// The ping limit as set in the networking settings was hit.
355      PingTooHigh,
356  
357      /// There was a problem connecting, which caused a retry.
358      Retry(usize),
359  }
360  
361  /// The connection to the peer has been stopped.
362  ///
363  /// The reason for the disconnect is
364  #[derive(Debug, Error)]
365  pub enum Disconnected {
366      /// The peer has gracefully shut down the connection
367      #[error("Remote Shutdown")]
368      RemoteShutdown,
369  
370      /// An unexpected termination of the connection has occured.
371      #[error("Connection Aborted")]
372      ConnectionAborted,
373  
374      /// The connection has been forcibly closed by the remote.
375      ///
376      /// The remote could be rebooting, shutting down or the application could have crashed.
377      #[error("Connection Reset")]
378      ConnectionReset,
379  
380      /// The peer has been disconnected for misbehaving and sending packets
381      /// not according to the system.
382      #[error("Peer Misbehaving")]
383      MisbehavingPeer,
384  
385      /// An unexplainable error has occured.
386      #[error(transparent)]
387      Other(io::Error),
388  }
389  
390  impl From<io::Error> for Disconnected {
391      fn from(value: io::Error) -> Self {
392          match value.kind() {
393              ErrorKind::UnexpectedEof => Self::RemoteShutdown,
394              ErrorKind::ConnectionAborted => Self::ConnectionAborted,
395              ErrorKind::ConnectionReset => Self::ConnectionReset,
396              _ => Self::Other(value),
397          }
398      }
399  }
400  
401  /// The identification of a connection containing both TCP and UDP connection addresses for one user.
402  ///
403  /// The IP of both is the same, but the port is different.
404  #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Hash)]
405  pub struct Connection {
406      ip: IpAddr,
407      tcp_port: u16,
408      udp_port: u16,
409  }
410  
411  impl Connection {
412      fn from_tcp_udp_addr(tcp_addr: SocketAddr, udp_addr: SocketAddr) -> Self {
413          Self {
414              ip: tcp_addr.ip(),
415              tcp_port: tcp_addr.port(),
416              udp_port: udp_addr.port(),
417          }
418      }
419  
420      /// Returns the IP address of this connection.
421      pub fn ip_addr(&self) -> IpAddr {
422          self.ip
423      }
424  
425      /// Returns the TCP port of this connection.
426      pub fn tcp_port(&self) -> u16 {
427          self.tcp_port
428      }
429  
430      /// Returns the UDP port of this connection.
431      pub fn udp_port(&self) -> u16 {
432          self.udp_port
433      }
434  
435      /// Returns the TCP address of this connection.
436      pub fn tcp_addr(&self) -> SocketAddr {
437          SocketAddr::new(self.ip, self.tcp_port)
438      }
439  
440      /// Returns the UDP address of this connection.
441      pub fn udp_addr(&self) -> SocketAddr {
442          SocketAddr::new(self.ip, self.udp_port)
443      }
444  
445      pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
446          self.ip == addr.ip() && (self.tcp_port == addr.port() || self.udp_port == addr.port())
447      }
448  }
449  
450  use rkyv::{
451      Archive, Serialize,
452      api::high::{HighSerializer, HighValidator, to_bytes_in_with_alloc},
453      bytecheck::CheckBytes,
454      rancor::{self, Source},
455      ser::allocator::{Arena, ArenaHandle},
456      util::AlignedVec,
457  };
458  
459  /// Serialize the given data to a streamable message format.
460  ///
461  /// ## Message format
462  ///
463  /// - Length prefixed with a u32
464  ///
465  /// \[u32data_len\](u8data)
466  fn serialize_tcp_into<T, E>(message: &T, arena: &mut Arena) -> AlignedVec
467  where
468      T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, E>>,
469      E: Source,
470  {
471      let mut data = AlignedVec::new();
472      data.extend_from_slice(&[0; 4]);
473      let mut data = to_bytes_in_with_alloc(message, data, arena.acquire()).unwrap();
474  
475      let len = data.len() - 4;
476  
477      data[0..4].copy_from_slice(&(len as u32).to_le_bytes());
478  
479      data
480  }
481  
482  /// Serialize the given data to a streamable message format.
483  ///
484  /// ## Message format
485  ///
486  /// - Indexed and data length prefixed
487  ///
488  /// \[u32order_number\]\[u32data_len\])(u8data)
489  ///
490  /// Order number has to be added by yourself
491  fn serialize_udp_into<E: Source>(
492      message: &impl for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, E>>,
493      arena: &mut Arena,
494  ) -> AlignedVec {
495      let mut data = AlignedVec::with_capacity(1024);
496      data.extend_from_slice(&[0; 8]);
497  
498      let mut data = to_bytes_in_with_alloc(message, data, arena.acquire()).unwrap();
499  
500      let data_len = data.len() - 8;
501      data[4..8].copy_from_slice(&(data_len as u32).to_le_bytes());
502  
503      data
504  }
505  
506  struct BufferingMessage {
507      bytes_left: usize,
508      buf: Vec<u8>,
509      timestamp: SystemTime,
510  }
511  
512  impl BufferingMessage {
513      pub fn new(size: usize) -> Self {
514          let buf = Vec::with_capacity(size);
515  
516          Self {
517              bytes_left: size,
518              buf,
519              timestamp: SystemTime::now(),
520          }
521      }
522  
523      pub fn completed(&mut self, buf: &[u8]) -> bool {
524          let bytes_to_copy = buf.len().min(self.bytes_left);
525          self.buf.extend_from_slice(&buf[..bytes_to_copy]);
526          self.bytes_left -= bytes_to_copy;
527          self.timestamp = SystemTime::now();
528          self.bytes_left == 0
529      }
530  
531      pub fn consume(self) -> Vec<u8> {
532          self.buf
533      }
534  
535      pub fn outdated(&self) -> bool {
536          self.timestamp.elapsed().unwrap() > Duration::from_secs(1)
537      }
538  }