/ src / rpc / server.rs
server.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::{collections::HashSet, io::ErrorKind, sync::Arc};
 20  
 21  use async_trait::async_trait;
 22  use smol::{
 23      io::{BufReader, ReadHalf, WriteHalf},
 24      lock::{Mutex, MutexGuard},
 25  };
 26  use tinyjson::JsonValue;
 27  use tracing::{debug, error, info};
 28  use url::Url;
 29  
 30  use super::{
 31      common::{
 32          http_read_from_stream_request, http_write_to_stream, read_from_stream, write_to_stream,
 33          INIT_BUF_SIZE,
 34      },
 35      jsonrpc::*,
 36      settings::RpcSettings,
 37  };
 38  use crate::{
 39      net::transport::{Listener, PtListener, PtStream},
 40      system::{StoppableTask, StoppableTaskPtr},
 41      Error, Result,
 42  };
 43  
 44  /// Asynchronous trait implementing a handler for incoming JSON-RPC requests.
 45  #[async_trait]
 46  pub trait RequestHandler<T>: Sync + Send {
 47      async fn handle_request(&self, req: JsonRequest) -> JsonResult;
 48  
 49      async fn pong(&self, id: u16, _params: JsonValue) -> JsonResult {
 50          JsonResponse::new(JsonValue::String("pong".to_string()), id).into()
 51      }
 52  
 53      async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>>;
 54  
 55      async fn connections(&self) -> Vec<StoppableTaskPtr> {
 56          self.connections_mut().await.iter().cloned().collect()
 57      }
 58  
 59      async fn mark_connection(&self, task: StoppableTaskPtr) {
 60          self.connections_mut().await.insert(task);
 61      }
 62  
 63      async fn unmark_connection(&self, task: StoppableTaskPtr) {
 64          self.connections_mut().await.remove(&task);
 65      }
 66  
 67      async fn active_connections(&self) -> usize {
 68          self.connections_mut().await.len()
 69      }
 70  
 71      async fn stop_connections(&self) {
 72          info!(target: "rpc::server", "[RPC] Server stopped, closing connections");
 73          for (i, task) in self.connections().await.iter().enumerate() {
 74              debug!(target: "rpc::server", "Stopping connection #{i}");
 75              task.stop().await;
 76          }
 77      }
 78  }
 79  
 80  /// Auxiliary function to handle a request in the background.
 81  async fn handle_request<T>(
 82      writer: Arc<Mutex<WriteHalf<Box<dyn PtStream>>>>,
 83      addr: Url,
 84      rh: Arc<impl RequestHandler<T> + 'static>,
 85      ex: Arc<smol::Executor<'_>>,
 86      tasks: Arc<Mutex<HashSet<Arc<StoppableTask>>>>,
 87      settings: RpcSettings,
 88      req: JsonRequest,
 89  ) -> Result<()> {
 90      // Handle disabled RPC methods
 91      let rep = if settings.is_method_disabled(&req.method) {
 92          debug!(target: "rpc::server", "RPC method {} is disabled", req.method);
 93          JsonError::new(ErrorCode::MethodNotFound, None, req.id).into()
 94      } else {
 95          rh.handle_request(req).await
 96      };
 97  
 98      match rep {
 99          JsonResult::Subscriber(subscriber) => {
100              let task = StoppableTask::new();
101  
102              // Clone what needs to go in the background
103              let task_ = task.clone();
104              let addr_ = addr.clone();
105              let tasks_ = tasks.clone();
106              let writer_ = writer.clone();
107  
108              // Detach the subscriber so we can multiplex further requests
109              task.clone().start(
110                  async move {
111                      // Subscribe to the inner method subscriber
112                      let subscription = subscriber.publisher.subscribe().await;
113                      loop {
114                          // Listen for notifications
115                          let notification = subscription.receive().await;
116  
117                          // Push notification
118                          debug!(target: "rpc::server", "{addr_} <-- {}", notification.stringify().unwrap());
119                          let notification = JsonResult::Notification(notification);
120  
121                          let mut writer_lock = writer_.lock().await;
122  
123                          #[allow(clippy::collapsible_else_if)]
124                          if settings.use_http() {
125                              if let Err(e) = http_write_to_stream(&mut writer_lock, &notification).await {
126                                  subscription.unsubscribe().await;
127                                  return Err(e.into())
128                              }
129                          } else {
130                              if let Err(e) = write_to_stream(&mut writer_lock, &notification).await {
131                                  subscription.unsubscribe().await;
132                                  return Err(e.into())
133                              }
134                          }
135  
136                          drop(writer_lock);
137                      }
138                  },
139                  move |_| async move {
140                      debug!(
141                          target: "rpc::server",
142                          "Removing background task {} from map", task_.task_id,
143                      );
144                      tasks_.lock().await.remove(&task_);
145                  },
146                  Error::DetachedTaskStopped,
147                  ex.clone(),
148              );
149  
150              debug!(target: "rpc::server", "Adding background task {} to map", task.task_id);
151              tasks.lock().await.insert(task);
152          }
153  
154          JsonResult::SubscriberWithReply(subscriber, reply) => {
155              // Write the response
156              debug!(target: "rpc::server", "{addr} <-- {}", reply.stringify()?);
157              let mut writer_lock = writer.lock().await;
158              if settings.use_http() {
159                  http_write_to_stream(&mut writer_lock, &reply.into()).await?;
160              } else {
161                  write_to_stream(&mut writer_lock, &reply.into()).await?;
162              }
163              drop(writer_lock);
164  
165              let task = StoppableTask::new();
166              // Clone what needs to go in the background
167              let task_ = task.clone();
168              let addr_ = addr.clone();
169              let tasks_ = tasks.clone();
170              let writer_ = writer.clone();
171  
172              // Detach the subscriber so we can multiplex further requests
173              task.clone().start(
174                  async move {
175                      // Start the subscriber loop
176                      let subscription = subscriber.publisher.subscribe().await;
177                      loop {
178                          // Listen for notifications
179                          let notification = subscription.receive().await;
180  
181                          // Push notification
182                          debug!(target: "rpc::server", "{addr_} <-- {}", notification.stringify().unwrap());
183                          let notification = JsonResult::Notification(notification);
184  
185                          let mut writer_lock = writer_.lock().await;
186                          #[allow(clippy::collapsible_else_if)]
187                          if settings.use_http() {
188                              if let Err(e) = http_write_to_stream(&mut writer_lock, &notification).await {
189                                  subscription.unsubscribe().await;
190                                  drop(writer_lock);
191                                  return Err(e.into())
192                              }
193                          } else {
194                              if let Err(e) = write_to_stream(&mut writer_lock, &notification).await {
195                                  subscription.unsubscribe().await;
196                                  drop(writer_lock);
197                                  return Err(e.into())
198                              }
199                          }
200                          drop(writer_lock);
201                      }
202                  },
203                  move |_| async move {
204                      debug!(
205                          target: "rpc::server",
206                          "Removing background task {} from map", task_.task_id,
207                      );
208                      tasks_.lock().await.remove(&task_);
209                  },
210                  Error::DetachedTaskStopped,
211                  ex.clone(),
212              );
213  
214              debug!(target: "rpc::server", "Adding background task {} to map", task.task_id);
215              tasks.lock().await.insert(task);
216          }
217  
218          JsonResult::Request(_) | JsonResult::Notification(_) => {
219              unreachable!("Should never happen")
220          }
221  
222          JsonResult::Response(ref v) => {
223              debug!(target: "rpc::server", "{addr} <-- {}", v.stringify()?);
224              let mut writer_lock = writer.lock().await;
225              if settings.use_http() {
226                  http_write_to_stream(&mut writer_lock, &rep).await?;
227              } else {
228                  write_to_stream(&mut writer_lock, &rep).await?;
229              }
230              drop(writer_lock);
231          }
232  
233          JsonResult::Error(ref v) => {
234              debug!(target: "rpc::server", "{addr} <-- {}", v.stringify()?);
235              let mut writer_lock = writer.lock().await;
236              if settings.use_http() {
237                  http_write_to_stream(&mut writer_lock, &rep).await?;
238              } else {
239                  write_to_stream(&mut writer_lock, &rep).await?;
240              }
241              drop(writer_lock);
242          }
243      }
244  
245      Ok(())
246  }
247  
248  /// Accept function that should run inside a loop for accepting incoming
249  /// JSON-RPC requests and passing them to the [`RequestHandler`].
250  #[allow(clippy::type_complexity)]
251  pub async fn accept<'a, T: 'a>(
252      reader: Arc<Mutex<BufReader<ReadHalf<Box<dyn PtStream>>>>>,
253      writer: Arc<Mutex<WriteHalf<Box<dyn PtStream>>>>,
254      addr: Url,
255      rh: Arc<impl RequestHandler<T> + 'static>,
256      conn_limit: Option<usize>,
257      settings: RpcSettings,
258      ex: Arc<smol::Executor<'a>>,
259  ) -> Result<()> {
260      // If there's a connection limit set, we will refuse connections
261      // after this point.
262      if let Some(conn_limit) = conn_limit {
263          if rh.clone().active_connections().await >= conn_limit {
264              debug!(
265                  target: "rpc::server::accept()",
266                  "Connection limit reached, refusing new conn"
267              );
268              return Err(Error::RpcConnectionsExhausted)
269          }
270      }
271  
272      // We'll hold our background tasks here
273      let tasks = Arc::new(Mutex::new(HashSet::new()));
274  
275      loop {
276          let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
277  
278          let mut reader_lock = reader.lock().await;
279          if settings.use_http() {
280              let _ = http_read_from_stream_request(&mut reader_lock, &mut buf).await?;
281          } else {
282              let _ = read_from_stream(&mut reader_lock, &mut buf).await?;
283          }
284          drop(reader_lock);
285  
286          let line = match String::from_utf8(buf) {
287              Ok(v) => v,
288              Err(e) => {
289                  error!(
290                      target: "rpc::server::accept()",
291                      "[RPC SERVER] Failed parsing string from read buffer: {e}"
292                  );
293                  return Err(e.into())
294              }
295          };
296  
297          // Parse the line as JSON
298          let val: JsonValue = match line.trim().parse() {
299              Ok(v) => v,
300              Err(e) => {
301                  error!(
302                      target: "rpc::server::accept()",
303                      "[RPC SERVER] Failed parsing JSON string: {e}"
304                  );
305                  return Err(e.into())
306              }
307          };
308  
309          // Cast to JsonRequest
310          let req = match JsonRequest::try_from(&val) {
311              Ok(v) => v,
312              Err(e) => {
313                  error!(
314                      target: "rpc::server::accept()",
315                      "[RPC SERVER] Failed casting JSON to a JsonRequest: {e}"
316                  );
317                  return Err(e.into())
318              }
319          };
320  
321          debug!(target: "rpc::server", "{addr} --> {}", val.stringify()?);
322  
323          // Create a new task to handle request in the background
324          let task = StoppableTask::new();
325  
326          // Clone what needs to go in the background
327          let task_ = task.clone();
328          let tasks_ = tasks.clone();
329  
330          // Detach the task
331          task.clone().start(
332              handle_request(
333                  writer.clone(),
334                  addr.clone(),
335                  rh.clone(),
336                  ex.clone(),
337                  tasks.clone(),
338                  settings.clone(),
339                  req,
340              ),
341              move |_| async move {
342                  debug!(
343                      target: "rpc::server",
344                      "Removing background task {} from map", task_.task_id,
345                  );
346                  tasks_.lock().await.remove(&task_);
347              },
348              Error::DetachedTaskStopped,
349              ex.clone(),
350          );
351  
352          debug!(target: "rpc::server", "Adding background task {} to map", task.task_id);
353          tasks.lock().await.insert(task);
354      }
355  }
356  
357  /// Wrapper function around [`accept()`] to take the incoming connection and
358  /// pass it forward.
359  async fn run_accept_loop<'a, T: 'a>(
360      listener: Box<dyn PtListener>,
361      rh: Arc<impl RequestHandler<T> + 'static>,
362      conn_limit: Option<usize>,
363      settings: RpcSettings,
364      ex: Arc<smol::Executor<'a>>,
365  ) -> Result<()> {
366      loop {
367          match listener.next().await {
368              Ok((stream, url)) => {
369                  let rh_ = rh.clone();
370                  info!(target: "rpc::server", "[RPC] Server accepted conn from {url}");
371  
372                  let (reader, writer) = smol::io::split(stream);
373                  let reader = Arc::new(Mutex::new(BufReader::new(reader)));
374                  let writer = Arc::new(Mutex::new(writer));
375  
376                  let task = StoppableTask::new();
377                  let task_ = task.clone();
378                  let ex_ = ex.clone();
379                  task.clone().start(
380                      accept(
381                          reader,
382                          writer,
383                          url.clone(),
384                          rh.clone(),
385                          conn_limit,
386                          settings.clone(),
387                          ex_,
388                      ),
389                      |_| async move {
390                          info!(target: "rpc::server", "[RPC] Closed conn from {url}");
391                          rh_.clone().unmark_connection(task_.clone()).await;
392                      },
393                      Error::ChannelStopped,
394                      ex.clone(),
395                  );
396  
397                  rh.clone().mark_connection(task.clone()).await;
398              }
399  
400              // As per accept(2) recommendation:
401              Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() {
402                  libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue,
403                  _ => {
404                      error!(
405                          target: "rpc::server::run_accept_loop()",
406                          "[RPC] Server failed listening: {e}"
407                      );
408                      error!(
409                          target: "rpc::server::run_accept_loop()",
410                          "[RPC] Closing accept loop"
411                      );
412                      return Err(e.into())
413                  }
414              },
415  
416              // In case a TLS handshake fails, we'll get this:
417              Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue,
418  
419              // Errors we didn't handle above:
420              Err(e) => {
421                  error!(
422                      target: "rpc::server::run_accept_loop()",
423                      "[RPC] Unhandled listener.next() error: {e}"
424                  );
425                  error!(
426                      target: "rpc::server::run_accept_loop()",
427                      "[RPC] Closing acceptloop"
428                  );
429                  return Err(e.into())
430              }
431          }
432      }
433  }
434  
435  /// Start a JSON-RPC server bound to the givven accept URL and use the
436  /// given [`RequestHandler`] to handle incoming requests.
437  ///
438  /// The supported network schemes can be prefixed with `http+` to serve
439  /// JSON-RPC over HTTP/1.1.
440  pub async fn listen_and_serve<'a, T: 'a>(
441      settings: RpcSettings,
442      rh: Arc<impl RequestHandler<T> + 'static>,
443      conn_limit: Option<usize>,
444      ex: Arc<smol::Executor<'a>>,
445  ) -> Result<()> {
446      // Figure out if we're using HTTP and rewrite the URL accordingly.
447      let mut listen_url = settings.listen.clone();
448      if settings.listen.scheme().starts_with("http+") {
449          let scheme = settings.listen.scheme().strip_prefix("http+").unwrap();
450          let url_str = settings.listen.as_str().replace(settings.listen.scheme(), scheme);
451          listen_url = url_str.parse()?;
452      }
453  
454      let listener = Listener::new(listen_url, None).await?.listen().await?;
455  
456      run_accept_loop(listener, rh, conn_limit, settings, ex.clone()).await
457  }
458  
459  #[cfg(test)]
460  mod tests {
461      use super::*;
462      use crate::{rpc::client::RpcClient, system::msleep};
463      use smol::{net::TcpListener, Executor};
464  
465      struct RpcServer {
466          rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
467      }
468  
469      #[async_trait]
470      impl RequestHandler<()> for RpcServer {
471          async fn handle_request(&self, req: JsonRequest) -> JsonResult {
472              match req.method.as_str() {
473                  "ping" => return self.pong(req.id, req.params).await,
474                  _ => panic!(),
475              }
476          }
477  
478          async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
479              self.rpc_connections.lock().await
480          }
481      }
482  
483      #[test]
484      fn conn_manager() -> Result<()> {
485          let executor = Arc::new(Executor::new());
486  
487          // This simulates a server and a client. Through the function, there
488          // are some calls to sleep(), which are used for the tests, because
489          // otherwise they execute too fast. In practice, The RPC server is
490          // a long-running task so when polled, it should handle things in a
491          // correct manner.
492          smol::block_on(executor.run(async {
493              // Find an available port
494              let listener = TcpListener::bind("127.0.0.1:0").await?;
495              let sockaddr = listener.local_addr()?;
496              let settings = RpcSettings {
497                  listen: Url::parse(&format!("tcp://127.0.0.1:{}", sockaddr.port()))?,
498                  disabled_methods: vec![],
499              };
500              drop(listener);
501  
502              let rpc_server = Arc::new(RpcServer { rpc_connections: Mutex::new(HashSet::new()) });
503              let rpc_server_ = rpc_server.clone();
504  
505              let server_task = StoppableTask::new();
506              server_task.clone().start(
507                  listen_and_serve(settings.clone(), rpc_server.clone(), None, executor.clone()),
508                  |res| async move {
509                      match res {
510                          Ok(()) | Err(Error::RpcServerStopped) => {
511                              rpc_server_.stop_connections().await
512                          }
513                          Err(e) => panic!("{e}"),
514                      }
515                  },
516                  Error::RpcServerStopped,
517                  executor.clone(),
518              );
519  
520              // Let the server spawn
521              msleep(500).await;
522  
523              // Connect a client
524              let rpc_client0 = RpcClient::new(settings.listen.clone(), executor.clone()).await?;
525              msleep(500).await;
526              assert!(rpc_server.active_connections().await == 1);
527  
528              // Connect another client
529              let rpc_client1 = RpcClient::new(settings.listen.clone(), executor.clone()).await?;
530              msleep(500).await;
531              assert!(rpc_server.active_connections().await == 2);
532  
533              // And another one
534              let _rpc_client2 = RpcClient::new(settings.listen.clone(), executor.clone()).await?;
535              msleep(500).await;
536              assert!(rpc_server.active_connections().await == 3);
537  
538              // Close the first client
539              rpc_client0.stop().await;
540              msleep(500).await;
541              assert!(rpc_server.active_connections().await == 2);
542  
543              // Close the second client
544              rpc_client1.stop().await;
545              msleep(500).await;
546              assert!(rpc_server.active_connections().await == 1);
547  
548              // The Listener should be stopped when we stop the server task.
549              server_task.stop().await;
550              assert!(RpcClient::new(settings.listen, executor.clone()).await.is_err());
551  
552              // After the server is stopped, the connections tasks should also be stopped
553              assert!(rpc_server.active_connections().await == 0);
554  
555              Ok(())
556          }))
557      }
558  }