client.rs
1 use std::{ 2 future::Future, 3 marker::PhantomData, 4 net::ToSocketAddrs, 5 sync::{Arc, atomic::AtomicU32}, 6 time::{Duration, Instant}, 7 }; 8 9 use anyhow::Result; 10 use futures::future::Either; 11 use rand::Rng; 12 use rkyv::{ 13 Serialize, 14 api::high::HighSerializer, 15 rancor, 16 ser::allocator::{Arena, ArenaHandle}, 17 util::AlignedVec, 18 }; 19 use smol::{ 20 Timer, 21 channel::Sender, 22 io::{AsyncReadExt, AsyncWriteExt}, 23 lock::Mutex, 24 net::{TcpStream, UdpSocket}, 25 }; 26 use thiserror::Error; 27 28 use super::{Connection, Disconnected, NetworkingSettings, SAFE_MTU_SIZE, Warning}; 29 30 struct Socket { 31 client: Mutex<Option<TcpStream>>, 32 33 udp_socket: UdpSocket, 34 udp_order: AtomicU32, 35 36 remote_connection: Mutex<Option<Connection>>, 37 38 arena: Arc<parking_lot::Mutex<Arena>>, 39 40 ping: Mutex<Ping>, 41 } 42 43 struct Ping { 44 timestamp: Option<Instant>, 45 ping: Duration, 46 } 47 48 impl Socket { 49 /// Sends the first ping message 50 async fn start_ping(&self) { 51 // send 8 byte message to be echoed 52 let mut ping = self.ping.lock().await; 53 54 let _ = self.udp_socket.send(&[0; 8]).await; 55 ping.timestamp = Some(Instant::now()); 56 } 57 58 /// Sends the second ping message and records time. 59 async fn stop_ping(&self) { 60 // send 8 byte echo back for the server to calculate the ping. 61 let mut ping = self.ping.lock().await; 62 63 let _ = self.udp_socket.send(&[0; 8]).await; 64 65 if let Some(timestamp) = ping.timestamp.take() { 66 ping.ping = timestamp.elapsed(); 67 } 68 } 69 } 70 71 pub(super) enum ClientMessage { 72 Error(ClientError), 73 Warning(Warning), 74 Tcp(Vec<u8>), 75 Udp(Vec<u8>), 76 Connected, 77 Disconnected(Disconnected), 78 } 79 80 /// A client instance that allows you to connect to a server using the same game engine 81 /// and send/receive messages. 82 pub struct ClientInterface<Msg> { 83 socket: Arc<Socket>, 84 messages: Sender<ClientMessage>, 85 settings: NetworkingSettings, 86 _msg: PhantomData<Msg>, 87 } 88 89 impl<Msg> Clone for ClientInterface<Msg> { 90 fn clone(&self) -> Self { 91 Self { 92 socket: self.socket.clone(), 93 messages: self.messages.clone(), 94 settings: self.settings.clone(), 95 _msg: PhantomData, 96 } 97 } 98 } 99 100 impl<Msg> let_engine_core::backend::networking::ClientInterface<Connection> for ClientInterface<Msg> 101 where 102 Msg: 103 Send + Sync + for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>, 104 { 105 type Msg = Msg; 106 type Error = ClientError; 107 108 /// Starts connecting and sends a message back if connecting succeeds. 109 fn connect<Addr: ToSocketAddrs>(&self, addr: Addr) -> std::result::Result<(), Self::Error> { 110 let addr = addr 111 .to_socket_addrs() 112 .map_err(ClientError::Io)? 113 .next() 114 .unwrap(); 115 116 smol::block_on(async { 117 // Error if there is a connection. 118 if self.socket.client.lock().await.is_some() { 119 return Err(ClientError::StillConnected); 120 } 121 Ok(()) 122 })?; 123 let socket = self.socket.clone(); 124 let settings = self.settings.clone(); 125 let messages = self.messages.clone(); 126 127 self.spawn(async move { 128 let inf = Self { 129 socket, 130 settings, 131 messages, 132 _msg: PhantomData, 133 }; 134 135 // Use the UdpSocket connect function to allow receiving data without port forwarding or handling the NAT stuff. 136 inf.socket 137 .udp_socket 138 .connect(addr) 139 .await 140 .map_err(ClientError::Io)?; 141 142 let mut client = inf.socket.client.lock().await; 143 144 let mut tcp_socket = TcpStream::connect(addr).await.map_err(ClientError::Io)?; 145 146 let mut buf = [0; 128]; 147 148 rand::rng().fill(&mut buf[4..]); 149 150 // Send random ID for UDP identification 151 tcp_socket 152 .write_all(&buf) 153 .await 154 .map_err(|_| ClientError::ServerFull)?; 155 156 let retries = inf.settings.auth_retries; 157 let wait_time = inf.settings.auth_retry_wait; 158 159 let mut fail = true; 160 161 for i in 0..retries { 162 inf.socket 163 .udp_socket 164 .send(&buf) 165 .await 166 .map_err(ClientError::Io)?; 167 168 let mut _buf = [0; 8]; 169 let recv = inf.socket.udp_socket.recv(&mut buf); 170 let select = futures::future::select(Box::pin(recv), Timer::after(wait_time)); 171 172 match select.await { 173 Either::Left((result, _)) => { 174 let size = result.map_err(ClientError::Io)?; 175 if size != 8 { 176 return Err(ClientError::InvalidResponse); 177 } 178 fail = false; 179 break; 180 } 181 Either::Right(_) => { 182 inf.messages 183 .send(ClientMessage::Warning(Warning::Retry(i + 1))) 184 .await 185 .unwrap(); 186 } 187 } 188 } 189 190 if fail { 191 return Err(ClientError::InvalidResponse); 192 } 193 194 *client = Some(tcp_socket); 195 196 inf.recv_messages(); 197 inf.recv_udp_messages(); 198 inf.start_pinging(); 199 200 Ok(Some(ClientMessage::Connected)) 201 }); 202 203 Ok(()) 204 } 205 206 fn disconnect(&self) -> std::result::Result<(), Self::Error> { 207 let socket = self.socket.clone(); 208 smol::spawn(async { 209 let socket = socket; 210 Self::disconnect_with(&socket.client).await; 211 }) 212 .detach(); 213 214 Ok(()) 215 } 216 217 fn peer_conn(&self) -> Option<Connection> { 218 *self.socket.remote_connection.lock_blocking() 219 } 220 221 fn local_conn(&self) -> Option<Connection> { 222 let client = self.socket.client.lock_blocking(); 223 client.as_ref().map(|client| { 224 let addr = client.local_addr().unwrap(); 225 Connection { 226 ip: addr.ip(), 227 tcp_port: addr.port(), 228 udp_port: self.socket.udp_socket.local_addr().unwrap().port(), 229 } 230 }) 231 } 232 233 fn send(&self, message: &Self::Msg) -> std::result::Result<(), Self::Error> { 234 let socket = self.socket.clone(); 235 236 let data = { 237 let mut arena = socket.arena.lock(); 238 super::serialize_tcp_into(message, &mut arena) 239 }; 240 241 self.spawn(async move { 242 let mut client = socket.client.lock().await; 243 if let Some(client) = client.as_mut() { 244 client 245 .write_all(&data) 246 .await 247 .map(|_| None) 248 .map_err(ClientError::Io) 249 } else { 250 Err(ClientError::NotConnected) 251 } 252 }); 253 254 Ok(()) 255 } 256 257 fn fast_send(&self, message: &Self::Msg) -> std::result::Result<(), Self::Error> { 258 let socket = self.socket.clone(); 259 { 260 if socket.client.lock_blocking().is_none() { 261 return Err(ClientError::NotConnected); 262 } 263 } 264 let data = { 265 let mut arena = socket.arena.lock(); 266 let mut data = super::serialize_udp_into(message, &mut arena); 267 268 let order_number = socket 269 .udp_order 270 .fetch_add(1, std::sync::atomic::Ordering::SeqCst); 271 data[0..4].copy_from_slice(&order_number.to_le_bytes()); 272 data 273 }; 274 275 self.spawn(async move { 276 for chunk in data.chunks(SAFE_MTU_SIZE) { 277 socket 278 .udp_socket 279 .send(chunk) 280 .await 281 .map_err(ClientError::Io)?; 282 } 283 284 Ok(None) 285 }); 286 287 Ok(()) 288 } 289 } 290 291 impl<Msg> ClientInterface<Msg> { 292 pub(super) fn new( 293 settings: NetworkingSettings, 294 messages: Sender<ClientMessage>, 295 arena: Arc<parking_lot::Mutex<Arena>>, 296 ) -> Result<Self, ClientError> { 297 smol::block_on(async { 298 let udp_socket = UdpSocket::bind("0.0.0.0:0") 299 .await 300 .map_err(ClientError::Io)?; 301 302 let client = Self { 303 socket: Arc::new(Socket { 304 client: Mutex::new(None), 305 udp_socket, 306 udp_order: AtomicU32::new(1), 307 remote_connection: Mutex::new(None), 308 arena, 309 ping: Mutex::new(Ping { 310 timestamp: None, 311 ping: Duration::default(), 312 }), 313 }), 314 messages, 315 settings, 316 _msg: PhantomData, 317 }; 318 319 Ok(client) 320 }) 321 } 322 323 fn spawn( 324 &self, 325 future: impl Future<Output = Result<Option<ClientMessage>, ClientError>> + Send + 'static, 326 ) { 327 let sender = self.messages.clone(); 328 smol::spawn(async move { 329 if let Some(message) = match future.await { 330 Ok(t) => t, 331 Err(e) => Some(ClientMessage::Error(e)), 332 } { 333 sender.send(message).await.unwrap(); 334 } 335 }) 336 .detach(); 337 } 338 339 fn start_pinging(&self) { 340 let socket = self.socket.clone(); 341 let settings = self.settings.clone(); 342 343 smol::spawn(async move { 344 loop { 345 socket.start_ping().await; 346 347 Timer::after(settings.ping_wait).await; 348 } 349 }) 350 .detach(); 351 } 352 353 fn recv_messages(&self) { 354 let socket = self.socket.clone(); 355 let messages = self.messages.clone(); 356 let settings = self.settings.clone(); 357 self.spawn(async move { 358 let mut disconnect_reason = Disconnected::RemoteShutdown; 359 360 let mut size_buf = [0u8; 4]; 361 let mut client = socket.client.lock().await.clone(); 362 while let Some(stream) = client.as_mut() { 363 let mut buf = Vec::with_capacity(1038); 364 365 // Get u32 size prefix 366 if let Err(e) = stream.read_exact(&mut size_buf).await { 367 disconnect_reason = e.into(); 368 break; 369 }; 370 371 // Read as many bytes as in the size prefix 372 let size = u32::from_le_bytes(size_buf) as usize; 373 374 match size { 375 size if size < 4 => { 376 continue; 377 } 378 size if size > settings.tcp_max_size => { 379 messages 380 .send(ClientMessage::Warning(super::Warning::MessageTooBig)) 381 .await 382 .unwrap(); 383 continue; 384 } 385 _ => (), 386 } 387 buf.resize(size, 0); 388 389 if let Err(e) = stream.read_exact(&mut buf).await { 390 disconnect_reason = e.into(); 391 break; 392 }; 393 394 messages.send(ClientMessage::Tcp(buf)).await.unwrap(); 395 } 396 397 Self::disconnect_with(&socket.client).await; 398 Ok(Some(ClientMessage::Disconnected(disconnect_reason))) 399 }); 400 } 401 402 fn recv_udp_messages(&self) { 403 let messages = self.messages.clone(); 404 let socket = self.socket.clone(); 405 let settings = self.settings.clone(); 406 smol::spawn(async move { 407 let mut buf: [u8; SAFE_MTU_SIZE] = [0; SAFE_MTU_SIZE]; 408 409 let mut buffered_message: Option<super::BufferingMessage> = None; 410 411 let mut last_ord = 0; 412 413 while let Ok(size) = socket.udp_socket.recv(&mut buf).await { 414 if let Some(mut message) = buffered_message.take() 415 && !message.outdated() 416 { 417 if message.completed(&buf[..size]) { 418 messages 419 .send(ClientMessage::Udp(message.consume())) 420 .await 421 .unwrap(); 422 } else { 423 buffered_message = Some(message); 424 } 425 continue; 426 } 427 buffered_message = None; 428 429 match size { 430 // 8 bytes = ping 431 8 => { 432 socket.stop_ping().await; 433 } 434 // Ignore messages smaller than the header. 435 size if size < 8 => { 436 continue; 437 } 438 size if size > settings.udp_max_size => { 439 messages 440 .send(ClientMessage::Warning(super::Warning::MessageTooBig)) 441 .await 442 .unwrap(); 443 continue; 444 } 445 _ => (), 446 } 447 448 // Get order number 449 let ord = u32::from_le_bytes(buf[0..4].try_into().unwrap()); 450 451 // Verify order 452 match ord { 453 ord if ord == last_ord + 1 => (), // in order -> allow 454 _ => { 455 // out of order -> discard 456 last_ord = ord; 457 continue; 458 } 459 } 460 last_ord = ord; 461 462 // Get length 463 let len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; 464 465 // Try again if size is above set limit 466 if len > settings.udp_max_size { 467 continue; 468 } 469 470 let mut buffering = super::BufferingMessage::new(len); 471 472 if buffering.completed(&buf[8..]) { 473 messages 474 .send(ClientMessage::Udp(buffering.consume())) 475 .await 476 .unwrap(); 477 } else { 478 buffered_message = Some(buffering); 479 } 480 } 481 }) 482 .detach(); 483 } 484 485 async fn disconnect_with(client: &Mutex<Option<TcpStream>>) { 486 let client = std::mem::take(&mut *client.lock().await); 487 488 if let Some(client) = client.as_ref() { 489 let _ = client.shutdown(std::net::Shutdown::Both); 490 }; 491 } 492 493 /// Returns the last calculated ping of the last running connection. 494 /// 495 /// May return a duration of 0 in case no calculation has been done before this function. 496 pub fn ping(&self) -> Duration { 497 self.socket.ping.lock_blocking().ping 498 } 499 } 500 501 /// Errors of the client. 502 #[derive(Debug, Error)] 503 pub enum ClientError { 504 /// An error that gets output whenever a function that requires the server to be connected 505 #[error("The client is still connected to the server.")] 506 StillConnected, 507 #[error("The client is not connected to any server.")] 508 NotConnected, 509 #[error("The server you attepted to connect to is full.")] 510 ServerFull, 511 /// The server sends a message invalid to the let-engine interface. 512 #[error("The server is sending invalid data.")] 513 InvalidResponse, 514 #[error("An Io error has occured: {0}")] 515 Io(smol::io::Error), 516 }