server.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::{collections::HashSet, io::ErrorKind, sync::Arc}; 20 21 use async_trait::async_trait; 22 use smol::{ 23 io::{BufReader, ReadHalf, WriteHalf}, 24 lock::{Mutex, MutexGuard}, 25 }; 26 use tinyjson::JsonValue; 27 use tracing::{debug, error, info}; 28 use url::Url; 29 30 use super::{ 31 common::{ 32 http_read_from_stream_request, http_write_to_stream, read_from_stream, write_to_stream, 33 INIT_BUF_SIZE, 34 }, 35 jsonrpc::*, 36 settings::RpcSettings, 37 }; 38 use crate::{ 39 net::transport::{Listener, PtListener, PtStream}, 40 system::{StoppableTask, StoppableTaskPtr}, 41 Error, Result, 42 }; 43 44 /// Asynchronous trait implementing a handler for incoming JSON-RPC requests. 45 #[async_trait] 46 pub trait RequestHandler<T>: Sync + Send { 47 async fn handle_request(&self, req: JsonRequest) -> JsonResult; 48 49 async fn pong(&self, id: u16, _params: JsonValue) -> JsonResult { 50 JsonResponse::new(JsonValue::String("pong".to_string()), id).into() 51 } 52 53 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>>; 54 55 async fn connections(&self) -> Vec<StoppableTaskPtr> { 56 self.connections_mut().await.iter().cloned().collect() 57 } 58 59 async fn mark_connection(&self, task: StoppableTaskPtr) { 60 self.connections_mut().await.insert(task); 61 } 62 63 async fn unmark_connection(&self, task: StoppableTaskPtr) { 64 self.connections_mut().await.remove(&task); 65 } 66 67 async fn active_connections(&self) -> usize { 68 self.connections_mut().await.len() 69 } 70 71 async fn stop_connections(&self) { 72 info!(target: "rpc::server", "[RPC] Server stopped, closing connections"); 73 for (i, task) in self.connections().await.iter().enumerate() { 74 debug!(target: "rpc::server", "Stopping connection #{i}"); 75 task.stop().await; 76 } 77 } 78 } 79 80 /// Auxiliary function to handle a request in the background. 81 async fn handle_request<T>( 82 writer: Arc<Mutex<WriteHalf<Box<dyn PtStream>>>>, 83 addr: Url, 84 rh: Arc<impl RequestHandler<T> + 'static>, 85 ex: Arc<smol::Executor<'_>>, 86 tasks: Arc<Mutex<HashSet<Arc<StoppableTask>>>>, 87 settings: RpcSettings, 88 req: JsonRequest, 89 ) -> Result<()> { 90 // Handle disabled RPC methods 91 let rep = if settings.is_method_disabled(&req.method) { 92 debug!(target: "rpc::server", "RPC method {} is disabled", req.method); 93 JsonError::new(ErrorCode::MethodNotFound, None, req.id).into() 94 } else { 95 rh.handle_request(req).await 96 }; 97 98 match rep { 99 JsonResult::Subscriber(subscriber) => { 100 let task = StoppableTask::new(); 101 102 // Clone what needs to go in the background 103 let task_ = task.clone(); 104 let addr_ = addr.clone(); 105 let tasks_ = tasks.clone(); 106 let writer_ = writer.clone(); 107 108 // Detach the subscriber so we can multiplex further requests 109 task.clone().start( 110 async move { 111 // Subscribe to the inner method subscriber 112 let subscription = subscriber.publisher.subscribe().await; 113 loop { 114 // Listen for notifications 115 let notification = subscription.receive().await; 116 117 // Push notification 118 debug!(target: "rpc::server", "{addr_} <-- {}", notification.stringify().unwrap()); 119 let notification = JsonResult::Notification(notification); 120 121 let mut writer_lock = writer_.lock().await; 122 123 #[allow(clippy::collapsible_else_if)] 124 if settings.use_http() { 125 if let Err(e) = http_write_to_stream(&mut writer_lock, ¬ification).await { 126 subscription.unsubscribe().await; 127 return Err(e.into()) 128 } 129 } else { 130 if let Err(e) = write_to_stream(&mut writer_lock, ¬ification).await { 131 subscription.unsubscribe().await; 132 return Err(e.into()) 133 } 134 } 135 136 drop(writer_lock); 137 } 138 }, 139 move |_| async move { 140 debug!( 141 target: "rpc::server", 142 "Removing background task {} from map", task_.task_id, 143 ); 144 tasks_.lock().await.remove(&task_); 145 }, 146 Error::DetachedTaskStopped, 147 ex.clone(), 148 ); 149 150 debug!(target: "rpc::server", "Adding background task {} to map", task.task_id); 151 tasks.lock().await.insert(task); 152 } 153 154 JsonResult::SubscriberWithReply(subscriber, reply) => { 155 // Write the response 156 debug!(target: "rpc::server", "{addr} <-- {}", reply.stringify()?); 157 let mut writer_lock = writer.lock().await; 158 if settings.use_http() { 159 http_write_to_stream(&mut writer_lock, &reply.into()).await?; 160 } else { 161 write_to_stream(&mut writer_lock, &reply.into()).await?; 162 } 163 drop(writer_lock); 164 165 let task = StoppableTask::new(); 166 // Clone what needs to go in the background 167 let task_ = task.clone(); 168 let addr_ = addr.clone(); 169 let tasks_ = tasks.clone(); 170 let writer_ = writer.clone(); 171 172 // Detach the subscriber so we can multiplex further requests 173 task.clone().start( 174 async move { 175 // Start the subscriber loop 176 let subscription = subscriber.publisher.subscribe().await; 177 loop { 178 // Listen for notifications 179 let notification = subscription.receive().await; 180 181 // Push notification 182 debug!(target: "rpc::server", "{addr_} <-- {}", notification.stringify().unwrap()); 183 let notification = JsonResult::Notification(notification); 184 185 let mut writer_lock = writer_.lock().await; 186 #[allow(clippy::collapsible_else_if)] 187 if settings.use_http() { 188 if let Err(e) = http_write_to_stream(&mut writer_lock, ¬ification).await { 189 subscription.unsubscribe().await; 190 drop(writer_lock); 191 return Err(e.into()) 192 } 193 } else { 194 if let Err(e) = write_to_stream(&mut writer_lock, ¬ification).await { 195 subscription.unsubscribe().await; 196 drop(writer_lock); 197 return Err(e.into()) 198 } 199 } 200 drop(writer_lock); 201 } 202 }, 203 move |_| async move { 204 debug!( 205 target: "rpc::server", 206 "Removing background task {} from map", task_.task_id, 207 ); 208 tasks_.lock().await.remove(&task_); 209 }, 210 Error::DetachedTaskStopped, 211 ex.clone(), 212 ); 213 214 debug!(target: "rpc::server", "Adding background task {} to map", task.task_id); 215 tasks.lock().await.insert(task); 216 } 217 218 JsonResult::Request(_) | JsonResult::Notification(_) => { 219 unreachable!("Should never happen") 220 } 221 222 JsonResult::Response(ref v) => { 223 debug!(target: "rpc::server", "{addr} <-- {}", v.stringify()?); 224 let mut writer_lock = writer.lock().await; 225 if settings.use_http() { 226 http_write_to_stream(&mut writer_lock, &rep).await?; 227 } else { 228 write_to_stream(&mut writer_lock, &rep).await?; 229 } 230 drop(writer_lock); 231 } 232 233 JsonResult::Error(ref v) => { 234 debug!(target: "rpc::server", "{addr} <-- {}", v.stringify()?); 235 let mut writer_lock = writer.lock().await; 236 if settings.use_http() { 237 http_write_to_stream(&mut writer_lock, &rep).await?; 238 } else { 239 write_to_stream(&mut writer_lock, &rep).await?; 240 } 241 drop(writer_lock); 242 } 243 } 244 245 Ok(()) 246 } 247 248 /// Accept function that should run inside a loop for accepting incoming 249 /// JSON-RPC requests and passing them to the [`RequestHandler`]. 250 #[allow(clippy::type_complexity)] 251 pub async fn accept<'a, T: 'a>( 252 reader: Arc<Mutex<BufReader<ReadHalf<Box<dyn PtStream>>>>>, 253 writer: Arc<Mutex<WriteHalf<Box<dyn PtStream>>>>, 254 addr: Url, 255 rh: Arc<impl RequestHandler<T> + 'static>, 256 conn_limit: Option<usize>, 257 settings: RpcSettings, 258 ex: Arc<smol::Executor<'a>>, 259 ) -> Result<()> { 260 // If there's a connection limit set, we will refuse connections 261 // after this point. 262 if let Some(conn_limit) = conn_limit { 263 if rh.clone().active_connections().await >= conn_limit { 264 debug!( 265 target: "rpc::server::accept()", 266 "Connection limit reached, refusing new conn" 267 ); 268 return Err(Error::RpcConnectionsExhausted) 269 } 270 } 271 272 // We'll hold our background tasks here 273 let tasks = Arc::new(Mutex::new(HashSet::new())); 274 275 loop { 276 let mut buf = Vec::with_capacity(INIT_BUF_SIZE); 277 278 let mut reader_lock = reader.lock().await; 279 if settings.use_http() { 280 let _ = http_read_from_stream_request(&mut reader_lock, &mut buf).await?; 281 } else { 282 let _ = read_from_stream(&mut reader_lock, &mut buf).await?; 283 } 284 drop(reader_lock); 285 286 let line = match String::from_utf8(buf) { 287 Ok(v) => v, 288 Err(e) => { 289 error!( 290 target: "rpc::server::accept()", 291 "[RPC SERVER] Failed parsing string from read buffer: {e}" 292 ); 293 return Err(e.into()) 294 } 295 }; 296 297 // Parse the line as JSON 298 let val: JsonValue = match line.trim().parse() { 299 Ok(v) => v, 300 Err(e) => { 301 error!( 302 target: "rpc::server::accept()", 303 "[RPC SERVER] Failed parsing JSON string: {e}" 304 ); 305 return Err(e.into()) 306 } 307 }; 308 309 // Cast to JsonRequest 310 let req = match JsonRequest::try_from(&val) { 311 Ok(v) => v, 312 Err(e) => { 313 error!( 314 target: "rpc::server::accept()", 315 "[RPC SERVER] Failed casting JSON to a JsonRequest: {e}" 316 ); 317 return Err(e.into()) 318 } 319 }; 320 321 debug!(target: "rpc::server", "{addr} --> {}", val.stringify()?); 322 323 // Create a new task to handle request in the background 324 let task = StoppableTask::new(); 325 326 // Clone what needs to go in the background 327 let task_ = task.clone(); 328 let tasks_ = tasks.clone(); 329 330 // Detach the task 331 task.clone().start( 332 handle_request( 333 writer.clone(), 334 addr.clone(), 335 rh.clone(), 336 ex.clone(), 337 tasks.clone(), 338 settings.clone(), 339 req, 340 ), 341 move |_| async move { 342 debug!( 343 target: "rpc::server", 344 "Removing background task {} from map", task_.task_id, 345 ); 346 tasks_.lock().await.remove(&task_); 347 }, 348 Error::DetachedTaskStopped, 349 ex.clone(), 350 ); 351 352 debug!(target: "rpc::server", "Adding background task {} to map", task.task_id); 353 tasks.lock().await.insert(task); 354 } 355 } 356 357 /// Wrapper function around [`accept()`] to take the incoming connection and 358 /// pass it forward. 359 async fn run_accept_loop<'a, T: 'a>( 360 listener: Box<dyn PtListener>, 361 rh: Arc<impl RequestHandler<T> + 'static>, 362 conn_limit: Option<usize>, 363 settings: RpcSettings, 364 ex: Arc<smol::Executor<'a>>, 365 ) -> Result<()> { 366 loop { 367 match listener.next().await { 368 Ok((stream, url)) => { 369 let rh_ = rh.clone(); 370 info!(target: "rpc::server", "[RPC] Server accepted conn from {url}"); 371 372 let (reader, writer) = smol::io::split(stream); 373 let reader = Arc::new(Mutex::new(BufReader::new(reader))); 374 let writer = Arc::new(Mutex::new(writer)); 375 376 let task = StoppableTask::new(); 377 let task_ = task.clone(); 378 let ex_ = ex.clone(); 379 task.clone().start( 380 accept( 381 reader, 382 writer, 383 url.clone(), 384 rh.clone(), 385 conn_limit, 386 settings.clone(), 387 ex_, 388 ), 389 |_| async move { 390 info!(target: "rpc::server", "[RPC] Closed conn from {url}"); 391 rh_.clone().unmark_connection(task_.clone()).await; 392 }, 393 Error::ChannelStopped, 394 ex.clone(), 395 ); 396 397 rh.clone().mark_connection(task.clone()).await; 398 } 399 400 // As per accept(2) recommendation: 401 Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() { 402 libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue, 403 _ => { 404 error!( 405 target: "rpc::server::run_accept_loop()", 406 "[RPC] Server failed listening: {e}" 407 ); 408 error!( 409 target: "rpc::server::run_accept_loop()", 410 "[RPC] Closing accept loop" 411 ); 412 return Err(e.into()) 413 } 414 }, 415 416 // In case a TLS handshake fails, we'll get this: 417 Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue, 418 419 // Errors we didn't handle above: 420 Err(e) => { 421 error!( 422 target: "rpc::server::run_accept_loop()", 423 "[RPC] Unhandled listener.next() error: {e}" 424 ); 425 error!( 426 target: "rpc::server::run_accept_loop()", 427 "[RPC] Closing acceptloop" 428 ); 429 return Err(e.into()) 430 } 431 } 432 } 433 } 434 435 /// Start a JSON-RPC server bound to the givven accept URL and use the 436 /// given [`RequestHandler`] to handle incoming requests. 437 /// 438 /// The supported network schemes can be prefixed with `http+` to serve 439 /// JSON-RPC over HTTP/1.1. 440 pub async fn listen_and_serve<'a, T: 'a>( 441 settings: RpcSettings, 442 rh: Arc<impl RequestHandler<T> + 'static>, 443 conn_limit: Option<usize>, 444 ex: Arc<smol::Executor<'a>>, 445 ) -> Result<()> { 446 // Figure out if we're using HTTP and rewrite the URL accordingly. 447 let mut listen_url = settings.listen.clone(); 448 if settings.listen.scheme().starts_with("http+") { 449 let scheme = settings.listen.scheme().strip_prefix("http+").unwrap(); 450 let url_str = settings.listen.as_str().replace(settings.listen.scheme(), scheme); 451 listen_url = url_str.parse()?; 452 } 453 454 let listener = Listener::new(listen_url, None).await?.listen().await?; 455 456 run_accept_loop(listener, rh, conn_limit, settings, ex.clone()).await 457 } 458 459 #[cfg(test)] 460 mod tests { 461 use super::*; 462 use crate::{rpc::client::RpcClient, system::msleep}; 463 use smol::{net::TcpListener, Executor}; 464 465 struct RpcServer { 466 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>, 467 } 468 469 #[async_trait] 470 impl RequestHandler<()> for RpcServer { 471 async fn handle_request(&self, req: JsonRequest) -> JsonResult { 472 match req.method.as_str() { 473 "ping" => return self.pong(req.id, req.params).await, 474 _ => panic!(), 475 } 476 } 477 478 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> { 479 self.rpc_connections.lock().await 480 } 481 } 482 483 #[test] 484 fn conn_manager() -> Result<()> { 485 let executor = Arc::new(Executor::new()); 486 487 // This simulates a server and a client. Through the function, there 488 // are some calls to sleep(), which are used for the tests, because 489 // otherwise they execute too fast. In practice, The RPC server is 490 // a long-running task so when polled, it should handle things in a 491 // correct manner. 492 smol::block_on(executor.run(async { 493 // Find an available port 494 let listener = TcpListener::bind("127.0.0.1:0").await?; 495 let sockaddr = listener.local_addr()?; 496 let settings = RpcSettings { 497 listen: Url::parse(&format!("tcp://127.0.0.1:{}", sockaddr.port()))?, 498 disabled_methods: vec![], 499 }; 500 drop(listener); 501 502 let rpc_server = Arc::new(RpcServer { rpc_connections: Mutex::new(HashSet::new()) }); 503 let rpc_server_ = rpc_server.clone(); 504 505 let server_task = StoppableTask::new(); 506 server_task.clone().start( 507 listen_and_serve(settings.clone(), rpc_server.clone(), None, executor.clone()), 508 |res| async move { 509 match res { 510 Ok(()) | Err(Error::RpcServerStopped) => { 511 rpc_server_.stop_connections().await 512 } 513 Err(e) => panic!("{e}"), 514 } 515 }, 516 Error::RpcServerStopped, 517 executor.clone(), 518 ); 519 520 // Let the server spawn 521 msleep(500).await; 522 523 // Connect a client 524 let rpc_client0 = RpcClient::new(settings.listen.clone(), executor.clone()).await?; 525 msleep(500).await; 526 assert!(rpc_server.active_connections().await == 1); 527 528 // Connect another client 529 let rpc_client1 = RpcClient::new(settings.listen.clone(), executor.clone()).await?; 530 msleep(500).await; 531 assert!(rpc_server.active_connections().await == 2); 532 533 // And another one 534 let _rpc_client2 = RpcClient::new(settings.listen.clone(), executor.clone()).await?; 535 msleep(500).await; 536 assert!(rpc_server.active_connections().await == 3); 537 538 // Close the first client 539 rpc_client0.stop().await; 540 msleep(500).await; 541 assert!(rpc_server.active_connections().await == 2); 542 543 // Close the second client 544 rpc_client1.stop().await; 545 msleep(500).await; 546 assert!(rpc_server.active_connections().await == 1); 547 548 // The Listener should be stopped when we stop the server task. 549 server_task.stop().await; 550 assert!(RpcClient::new(settings.listen, executor.clone()).await.is_err()); 551 552 // After the server is stopped, the connections tasks should also be stopped 553 assert!(rpc_server.active_connections().await == 0); 554 555 Ok(()) 556 })) 557 } 558 }