/ let-engine / src / backend / networking / client.rs
client.rs
  1  use std::{
  2      future::Future,
  3      marker::PhantomData,
  4      net::ToSocketAddrs,
  5      sync::{Arc, atomic::AtomicU32},
  6      time::{Duration, Instant},
  7  };
  8  
  9  use anyhow::Result;
 10  use futures::future::Either;
 11  use rand::Rng;
 12  use rkyv::{
 13      Serialize,
 14      api::high::HighSerializer,
 15      rancor,
 16      ser::allocator::{Arena, ArenaHandle},
 17      util::AlignedVec,
 18  };
 19  use smol::{
 20      Timer,
 21      channel::Sender,
 22      io::{AsyncReadExt, AsyncWriteExt},
 23      lock::Mutex,
 24      net::{TcpStream, UdpSocket},
 25  };
 26  use thiserror::Error;
 27  
 28  use super::{Connection, Disconnected, NetworkingSettings, SAFE_MTU_SIZE, Warning};
 29  
 30  struct Socket {
 31      client: Mutex<Option<TcpStream>>,
 32  
 33      udp_socket: UdpSocket,
 34      udp_order: AtomicU32,
 35  
 36      remote_connection: Mutex<Option<Connection>>,
 37  
 38      arena: Arc<parking_lot::Mutex<Arena>>,
 39  
 40      ping: Mutex<Ping>,
 41  }
 42  
 43  struct Ping {
 44      timestamp: Option<Instant>,
 45      ping: Duration,
 46  }
 47  
 48  impl Socket {
 49      /// Sends the first ping message
 50      async fn start_ping(&self) {
 51          // send 8 byte message to be echoed
 52          let mut ping = self.ping.lock().await;
 53  
 54          let _ = self.udp_socket.send(&[0; 8]).await;
 55          ping.timestamp = Some(Instant::now());
 56      }
 57  
 58      /// Sends the second ping message and records time.
 59      async fn stop_ping(&self) {
 60          // send 8 byte echo back for the server to calculate the ping.
 61          let mut ping = self.ping.lock().await;
 62  
 63          let _ = self.udp_socket.send(&[0; 8]).await;
 64  
 65          if let Some(timestamp) = ping.timestamp.take() {
 66              ping.ping = timestamp.elapsed();
 67          }
 68      }
 69  }
 70  
 71  pub(super) enum ClientMessage {
 72      Error(ClientError),
 73      Warning(Warning),
 74      Tcp(Vec<u8>),
 75      Udp(Vec<u8>),
 76      Connected,
 77      Disconnected(Disconnected),
 78  }
 79  
 80  /// A client instance that allows you to connect to a server using the same game engine
 81  /// and send/receive messages.
 82  pub struct ClientInterface<Msg> {
 83      socket: Arc<Socket>,
 84      messages: Sender<ClientMessage>,
 85      settings: NetworkingSettings,
 86      _msg: PhantomData<Msg>,
 87  }
 88  
 89  impl<Msg> Clone for ClientInterface<Msg> {
 90      fn clone(&self) -> Self {
 91          Self {
 92              socket: self.socket.clone(),
 93              messages: self.messages.clone(),
 94              settings: self.settings.clone(),
 95              _msg: PhantomData,
 96          }
 97      }
 98  }
 99  
100  impl<Msg> let_engine_core::backend::networking::ClientInterface<Connection> for ClientInterface<Msg>
101  where
102      Msg:
103          Send + Sync + for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>,
104  {
105      type Msg = Msg;
106      type Error = ClientError;
107  
108      /// Starts connecting and sends a message back if connecting succeeds.
109      fn connect<Addr: ToSocketAddrs>(&self, addr: Addr) -> std::result::Result<(), Self::Error> {
110          let addr = addr
111              .to_socket_addrs()
112              .map_err(ClientError::Io)?
113              .next()
114              .unwrap();
115  
116          smol::block_on(async {
117              // Error if there is a connection.
118              if self.socket.client.lock().await.is_some() {
119                  return Err(ClientError::StillConnected);
120              }
121              Ok(())
122          })?;
123          let socket = self.socket.clone();
124          let settings = self.settings.clone();
125          let messages = self.messages.clone();
126  
127          self.spawn(async move {
128              let inf = Self {
129                  socket,
130                  settings,
131                  messages,
132                  _msg: PhantomData,
133              };
134  
135              // Use the UdpSocket connect function to allow receiving data without port forwarding or handling the NAT stuff.
136              inf.socket
137                  .udp_socket
138                  .connect(addr)
139                  .await
140                  .map_err(ClientError::Io)?;
141  
142              let mut client = inf.socket.client.lock().await;
143  
144              let mut tcp_socket = TcpStream::connect(addr).await.map_err(ClientError::Io)?;
145  
146              let mut buf = [0; 128];
147  
148              rand::rng().fill(&mut buf[4..]);
149  
150              // Send random ID for UDP identification
151              tcp_socket
152                  .write_all(&buf)
153                  .await
154                  .map_err(|_| ClientError::ServerFull)?;
155  
156              let retries = inf.settings.auth_retries;
157              let wait_time = inf.settings.auth_retry_wait;
158  
159              let mut fail = true;
160  
161              for i in 0..retries {
162                  inf.socket
163                      .udp_socket
164                      .send(&buf)
165                      .await
166                      .map_err(ClientError::Io)?;
167  
168                  let mut _buf = [0; 8];
169                  let recv = inf.socket.udp_socket.recv(&mut buf);
170                  let select = futures::future::select(Box::pin(recv), Timer::after(wait_time));
171  
172                  match select.await {
173                      Either::Left((result, _)) => {
174                          let size = result.map_err(ClientError::Io)?;
175                          if size != 8 {
176                              return Err(ClientError::InvalidResponse);
177                          }
178                          fail = false;
179                          break;
180                      }
181                      Either::Right(_) => {
182                          inf.messages
183                              .send(ClientMessage::Warning(Warning::Retry(i + 1)))
184                              .await
185                              .unwrap();
186                      }
187                  }
188              }
189  
190              if fail {
191                  return Err(ClientError::InvalidResponse);
192              }
193  
194              *client = Some(tcp_socket);
195  
196              inf.recv_messages();
197              inf.recv_udp_messages();
198              inf.start_pinging();
199  
200              Ok(Some(ClientMessage::Connected))
201          });
202  
203          Ok(())
204      }
205  
206      fn disconnect(&self) -> std::result::Result<(), Self::Error> {
207          let socket = self.socket.clone();
208          smol::spawn(async {
209              let socket = socket;
210              Self::disconnect_with(&socket.client).await;
211          })
212          .detach();
213  
214          Ok(())
215      }
216  
217      fn peer_conn(&self) -> Option<Connection> {
218          *self.socket.remote_connection.lock_blocking()
219      }
220  
221      fn local_conn(&self) -> Option<Connection> {
222          let client = self.socket.client.lock_blocking();
223          client.as_ref().map(|client| {
224              let addr = client.local_addr().unwrap();
225              Connection {
226                  ip: addr.ip(),
227                  tcp_port: addr.port(),
228                  udp_port: self.socket.udp_socket.local_addr().unwrap().port(),
229              }
230          })
231      }
232  
233      fn send(&self, message: &Self::Msg) -> std::result::Result<(), Self::Error> {
234          let socket = self.socket.clone();
235  
236          let data = {
237              let mut arena = socket.arena.lock();
238              super::serialize_tcp_into(message, &mut arena)
239          };
240  
241          self.spawn(async move {
242              let mut client = socket.client.lock().await;
243              if let Some(client) = client.as_mut() {
244                  client
245                      .write_all(&data)
246                      .await
247                      .map(|_| None)
248                      .map_err(ClientError::Io)
249              } else {
250                  Err(ClientError::NotConnected)
251              }
252          });
253  
254          Ok(())
255      }
256  
257      fn fast_send(&self, message: &Self::Msg) -> std::result::Result<(), Self::Error> {
258          let socket = self.socket.clone();
259          {
260              if socket.client.lock_blocking().is_none() {
261                  return Err(ClientError::NotConnected);
262              }
263          }
264          let data = {
265              let mut arena = socket.arena.lock();
266              let mut data = super::serialize_udp_into(message, &mut arena);
267  
268              let order_number = socket
269                  .udp_order
270                  .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
271              data[0..4].copy_from_slice(&order_number.to_le_bytes());
272              data
273          };
274  
275          self.spawn(async move {
276              for chunk in data.chunks(SAFE_MTU_SIZE) {
277                  socket
278                      .udp_socket
279                      .send(chunk)
280                      .await
281                      .map_err(ClientError::Io)?;
282              }
283  
284              Ok(None)
285          });
286  
287          Ok(())
288      }
289  }
290  
291  impl<Msg> ClientInterface<Msg> {
292      pub(super) fn new(
293          settings: NetworkingSettings,
294          messages: Sender<ClientMessage>,
295          arena: Arc<parking_lot::Mutex<Arena>>,
296      ) -> Result<Self, ClientError> {
297          smol::block_on(async {
298              let udp_socket = UdpSocket::bind("0.0.0.0:0")
299                  .await
300                  .map_err(ClientError::Io)?;
301  
302              let client = Self {
303                  socket: Arc::new(Socket {
304                      client: Mutex::new(None),
305                      udp_socket,
306                      udp_order: AtomicU32::new(1),
307                      remote_connection: Mutex::new(None),
308                      arena,
309                      ping: Mutex::new(Ping {
310                          timestamp: None,
311                          ping: Duration::default(),
312                      }),
313                  }),
314                  messages,
315                  settings,
316                  _msg: PhantomData,
317              };
318  
319              Ok(client)
320          })
321      }
322  
323      fn spawn(
324          &self,
325          future: impl Future<Output = Result<Option<ClientMessage>, ClientError>> + Send + 'static,
326      ) {
327          let sender = self.messages.clone();
328          smol::spawn(async move {
329              if let Some(message) = match future.await {
330                  Ok(t) => t,
331                  Err(e) => Some(ClientMessage::Error(e)),
332              } {
333                  sender.send(message).await.unwrap();
334              }
335          })
336          .detach();
337      }
338  
339      fn start_pinging(&self) {
340          let socket = self.socket.clone();
341          let settings = self.settings.clone();
342  
343          smol::spawn(async move {
344              loop {
345                  socket.start_ping().await;
346  
347                  Timer::after(settings.ping_wait).await;
348              }
349          })
350          .detach();
351      }
352  
353      fn recv_messages(&self) {
354          let socket = self.socket.clone();
355          let messages = self.messages.clone();
356          let settings = self.settings.clone();
357          self.spawn(async move {
358              let mut disconnect_reason = Disconnected::RemoteShutdown;
359  
360              let mut size_buf = [0u8; 4];
361              let mut client = socket.client.lock().await.clone();
362              while let Some(stream) = client.as_mut() {
363                  let mut buf = Vec::with_capacity(1038);
364  
365                  // Get u32 size prefix
366                  if let Err(e) = stream.read_exact(&mut size_buf).await {
367                      disconnect_reason = e.into();
368                      break;
369                  };
370  
371                  // Read as many bytes as in the size prefix
372                  let size = u32::from_le_bytes(size_buf) as usize;
373  
374                  match size {
375                      size if size < 4 => {
376                          continue;
377                      }
378                      size if size > settings.tcp_max_size => {
379                          messages
380                              .send(ClientMessage::Warning(super::Warning::MessageTooBig))
381                              .await
382                              .unwrap();
383                          continue;
384                      }
385                      _ => (),
386                  }
387                  buf.resize(size, 0);
388  
389                  if let Err(e) = stream.read_exact(&mut buf).await {
390                      disconnect_reason = e.into();
391                      break;
392                  };
393  
394                  messages.send(ClientMessage::Tcp(buf)).await.unwrap();
395              }
396  
397              Self::disconnect_with(&socket.client).await;
398              Ok(Some(ClientMessage::Disconnected(disconnect_reason)))
399          });
400      }
401  
402      fn recv_udp_messages(&self) {
403          let messages = self.messages.clone();
404          let socket = self.socket.clone();
405          let settings = self.settings.clone();
406          smol::spawn(async move {
407              let mut buf: [u8; SAFE_MTU_SIZE] = [0; SAFE_MTU_SIZE];
408  
409              let mut buffered_message: Option<super::BufferingMessage> = None;
410  
411              let mut last_ord = 0;
412  
413              while let Ok(size) = socket.udp_socket.recv(&mut buf).await {
414                  if let Some(mut message) = buffered_message.take()
415                      && !message.outdated()
416                  {
417                      if message.completed(&buf[..size]) {
418                          messages
419                              .send(ClientMessage::Udp(message.consume()))
420                              .await
421                              .unwrap();
422                      } else {
423                          buffered_message = Some(message);
424                      }
425                      continue;
426                  }
427                  buffered_message = None;
428  
429                  match size {
430                      // 8 bytes = ping
431                      8 => {
432                          socket.stop_ping().await;
433                      }
434                      // Ignore messages smaller than the header.
435                      size if size < 8 => {
436                          continue;
437                      }
438                      size if size > settings.udp_max_size => {
439                          messages
440                              .send(ClientMessage::Warning(super::Warning::MessageTooBig))
441                              .await
442                              .unwrap();
443                          continue;
444                      }
445                      _ => (),
446                  }
447  
448                  // Get order number
449                  let ord = u32::from_le_bytes(buf[0..4].try_into().unwrap());
450  
451                  // Verify order
452                  match ord {
453                      ord if ord == last_ord + 1 => (), // in order -> allow
454                      _ => {
455                          // out of order -> discard
456                          last_ord = ord;
457                          continue;
458                      }
459                  }
460                  last_ord = ord;
461  
462                  // Get length
463                  let len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize;
464  
465                  // Try again if size is above set limit
466                  if len > settings.udp_max_size {
467                      continue;
468                  }
469  
470                  let mut buffering = super::BufferingMessage::new(len);
471  
472                  if buffering.completed(&buf[8..]) {
473                      messages
474                          .send(ClientMessage::Udp(buffering.consume()))
475                          .await
476                          .unwrap();
477                  } else {
478                      buffered_message = Some(buffering);
479                  }
480              }
481          })
482          .detach();
483      }
484  
485      async fn disconnect_with(client: &Mutex<Option<TcpStream>>) {
486          let client = std::mem::take(&mut *client.lock().await);
487  
488          if let Some(client) = client.as_ref() {
489              let _ = client.shutdown(std::net::Shutdown::Both);
490          };
491      }
492  
493      /// Returns the last calculated ping of the last running connection.
494      ///
495      /// May return a duration of 0 in case no calculation has been done before this function.
496      pub fn ping(&self) -> Duration {
497          self.socket.ping.lock_blocking().ping
498      }
499  }
500  
501  /// Errors of the client.
502  #[derive(Debug, Error)]
503  pub enum ClientError {
504      /// An error that gets output whenever a function that requires the server to be connected
505      #[error("The client is still connected to the server.")]
506      StillConnected,
507      #[error("The client is not connected to any server.")]
508      NotConnected,
509      #[error("The server you attepted to connect to is full.")]
510      ServerFull,
511      /// The server sends a message invalid to the let-engine interface.
512      #[error("The server is sending invalid data.")]
513      InvalidResponse,
514      #[error("An Io error has occured: {0}")]
515      Io(smol::io::Error),
516  }