node.rs
1 use axum::extract::State; 2 use axum::response::IntoResponse; 3 use axum::routing::get; 4 use axum::{Json, Router}; 5 use serde::Serialize; 6 use serde_json::json; 7 use tokio::time::{timeout, Duration}; 8 9 use radicle::crypto::ssh::fmt; 10 use radicle::identity::{Did, RepoId}; 11 use radicle::node::address::Store as AddressStore; 12 use radicle::node::routing::Store; 13 use radicle::node::{AliasStore, Config, Handle, NodeId, UserAgent}; 14 use radicle::web; 15 use radicle::Node; 16 17 use crate::api::error::Error; 18 use crate::api::Context; 19 use crate::axum_extra::{cached_response, Path}; 20 21 const SOCKET_QUERY_TIMEOUT_MS: Duration = Duration::from_millis(500); 22 23 pub fn router(ctx: Context) -> Router { 24 Router::new() 25 .route("/node", get(node_handler)) 26 .route("/node/policies/repos", get(node_policies_repos_handler)) 27 .route( 28 "/node/policies/repos/{rid}", 29 get(node_policies_repo_handler), 30 ) 31 .route("/nodes/{nid}", get(nodes_handler)) 32 .route("/nodes/{nid}/inventory", get(nodes_inventory_handler)) 33 .with_state(ctx) 34 } 35 36 #[derive(Clone, Debug, Default, Serialize)] 37 #[serde(rename_all = "camelCase")] 38 struct Response { 39 id: String, 40 agent: Option<UserAgent>, 41 config: Option<Config>, 42 state: String, 43 #[serde(skip_serializing_if = "Option::is_none")] 44 avatar_url: Option<String>, 45 #[serde(skip_serializing_if = "Option::is_none")] 46 banner_url: Option<String>, 47 #[serde(skip_serializing_if = "Option::is_none")] 48 description: Option<String>, 49 } 50 51 impl Response { 52 fn new( 53 nid: NodeId, 54 agent: Option<UserAgent>, 55 config: Option<Config>, 56 state: String, 57 web_config: web::Config, 58 ) -> Self { 59 Response { 60 id: nid.to_string(), 61 agent, 62 config, 63 state, 64 avatar_url: web_config.avatar_url, 65 banner_url: web_config.banner_url, 66 description: web_config.description, 67 } 68 } 69 } 70 71 /// Return local node information. 72 /// `GET /node` 73 async fn node_handler(State(ctx): State<Context>) -> impl IntoResponse { 74 let node_id = ctx.profile.public_key; 75 let home = ctx.profile.home.database()?; 76 let agent = AddressStore::get(&home, &node_id) 77 .unwrap_or_default() 78 .map(|node| node.agent); 79 80 // The call to `is_running` is a blocking call, which has been, anecdotally, slow to respond. 81 // Spawn a thread with a timeout to ensure that the call to `is_running` does not slow down the 82 // response of the `/node` route too much. 83 let node_state = { 84 let socket = ctx.profile.socket(); 85 let is_running = timeout( 86 SOCKET_QUERY_TIMEOUT_MS, 87 tokio::task::spawn_blocking(move || Node::new(socket).is_running()), 88 ) 89 .await 90 .ok() // Handle timeout. 91 .and_then(|r| r.ok()) // Handle JoinError. 92 .unwrap_or(false); 93 94 if is_running { 95 "running" 96 } else { 97 "stopped" 98 } 99 }; 100 let config = { 101 let socket = ctx.profile.socket(); 102 match timeout( 103 SOCKET_QUERY_TIMEOUT_MS, 104 tokio::task::spawn_blocking(move || Node::new(socket).config()), 105 ) 106 .await 107 { 108 Ok(Ok(result)) => match result { 109 Ok(config) => Some(config), 110 Err(err) => { 111 tracing::error!("Error getting node config: {:#}", err); 112 None 113 } 114 }, 115 _ => None, // Timeout or join error - node likely not running. 116 } 117 }; 118 119 let response = Response::new( 120 node_id, 121 agent, 122 config, 123 node_state.to_string(), 124 ctx.web_config().read().await, 125 ); 126 127 Ok::<_, Error>(cached_response(response, 600)) 128 } 129 130 /// Return stored information about other nodes. 131 /// `GET /nodes/:nid` 132 async fn nodes_handler(State(ctx): State<Context>, Path(nid): Path<NodeId>) -> impl IntoResponse { 133 let aliases = ctx.profile.aliases(); 134 let response = json!({ 135 "alias": aliases.alias(&nid), 136 "did": Did::from(nid), 137 "ssh": { 138 "full": fmt::key(&nid), 139 "hash": fmt::fingerprint(&nid) 140 } 141 }); 142 143 Ok::<_, Error>(Json(response)) 144 } 145 146 /// Return stored information about other nodes. 147 /// `GET /nodes/:nid/inventory` 148 async fn nodes_inventory_handler( 149 State(ctx): State<Context>, 150 Path(nid): Path<NodeId>, 151 ) -> impl IntoResponse { 152 let db = &ctx.profile.database()?; 153 let resources = db.get_inventory(&nid)?; 154 155 Ok::<_, Error>(Json(resources)) 156 } 157 158 /// Return local repo policies information. 159 /// `GET /node/policies/repos` 160 async fn node_policies_repos_handler(State(ctx): State<Context>) -> impl IntoResponse { 161 let policies = ctx.profile.policies()?; 162 let policies = policies.seed_policies()?.collect::<Result<Vec<_>, _>>()?; 163 164 Ok::<_, Error>(Json(policies)) 165 } 166 167 /// Return local repo policy information. 168 /// `GET /node/policies/repos/:rid` 169 async fn node_policies_repo_handler( 170 State(ctx): State<Context>, 171 Path(rid): Path<RepoId>, 172 ) -> impl IntoResponse { 173 let policies = ctx.profile.policies()?; 174 let policy = policies.seed_policy(&rid)?; 175 176 Ok::<_, Error>(Json(*policy)) 177 } 178 179 #[cfg(test)] 180 mod routes { 181 use std::net::SocketAddr; 182 183 use axum::extract::connect_info::MockConnectInfo; 184 use axum::http::StatusCode; 185 use pretty_assertions::assert_eq; 186 use serde_json::{json, Value}; 187 188 use crate::test::*; 189 190 #[tokio::test] 191 async fn test_node_repos_policies() { 192 let tmp = tempfile::tempdir().unwrap(); 193 let seed = seed(tmp.path()); 194 let app = super::router(seed.clone()) 195 .layer(MockConnectInfo(SocketAddr::from(([127, 0, 0, 1], 8080)))); 196 let response = get(&app, "/node/policies/repos").await; 197 198 assert_eq!(response.status(), StatusCode::OK); 199 assert_eq!( 200 response.json().await, 201 json!([ 202 { 203 "rid": "rad:zLuTzcmoWMcdK37xqArS8eckp9vK", 204 "policy": { 205 "policy": "block", 206 } 207 }, 208 { 209 "rid": "rad:z4FucBZHZMCsxTyQE1dfE2YR59Qbp", 210 "policy": { 211 "policy": "allow", 212 "scope": "all", 213 } 214 }, 215 { 216 "rid": "rad:z4GypKmh1gkEfmkXtarcYnkvtFUfE", 217 "policy": { 218 "policy": "allow", 219 "scope" : "followed" 220 } 221 }, 222 ]) 223 ); 224 } 225 226 #[tokio::test] 227 async fn test_node_repo_policies() { 228 let tmp = tempfile::tempdir().unwrap(); 229 let seed = seed(tmp.path()); 230 let app = super::router(seed.clone()) 231 .layer(MockConnectInfo(SocketAddr::from(([127, 0, 0, 1], 8080)))); 232 let response = get( 233 &app, 234 "/node/policies/repos/rad:zLuTzcmoWMcdK37xqArS8eckp9vK", 235 ) 236 .await; 237 238 assert_eq!(response.status(), StatusCode::OK); 239 assert_eq!( 240 response.json().await, 241 json!({ 242 "policy": "block", 243 }) 244 ); 245 } 246 247 #[tokio::test] 248 async fn test_nodes() { 249 let tmp = tempfile::tempdir().unwrap(); 250 let seed = seed(tmp.path()); 251 let app = super::router(seed.clone()) 252 .layer(MockConnectInfo(SocketAddr::from(([127, 0, 0, 1], 8080)))); 253 let nid = seed.profile().id(); 254 let response = get(&app, format!("/nodes/{nid}")).await; 255 256 assert_eq!(response.status(), StatusCode::OK); 257 assert_eq!( 258 response.json().await, 259 json!({ 260 "alias": "seed", 261 "did": "did:key:z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi", 262 "ssh": { 263 "full": "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHahWSBEpuT1ESZbynOmBNkLBSnR32Ar4woZqSV2YNH1", 264 "hash": "SHA256:UIedaL6Cxm6OUErh9GQUzzglSk7VpQlVTI1TAFB/HWA", 265 }, 266 }) 267 ); 268 } 269 270 #[tokio::test] 271 async fn test_nodes_inventory() { 272 let tmp = tempfile::tempdir().unwrap(); 273 let seed = seed(tmp.path()); 274 let app = super::router(seed.clone()) 275 .layer(MockConnectInfo(SocketAddr::from(([127, 0, 0, 1], 8080)))); 276 let nid = seed.profile().public_key; 277 let response = get(&app, format!("/nodes/{nid}/inventory")).await; 278 279 assert_eq!(response.status(), StatusCode::OK); 280 let json_response = response.json().await; 281 282 let mut arr = match json_response { 283 Value::Array(arr) => arr, 284 _ => panic!("Expected JSON array in response"), 285 }; 286 287 arr.sort_by(|a, b| a.as_str().cmp(&b.as_str())); 288 289 assert_eq!( 290 arr, 291 vec![ 292 json!("rad:z4FucBZHZMCsxTyQE1dfE2YR59Qbp"), 293 json!("rad:z4GypKmh1gkEfmkXtarcYnkvtFUfE"), 294 ] 295 ); 296 } 297 298 #[tokio::test] 299 async fn test_node_uses_reloadable_config() { 300 let tmp = tempfile::tempdir().unwrap(); 301 let seed = seed(tmp.path()); 302 303 { 304 seed.web_config 305 .update(|config| { 306 config.description = Some("Test node description".to_string()); 307 config.avatar_url = Some("https://example.com/avatar.png".to_string()); 308 config.banner_url = Some("https://example.com/banner.png".to_string()); 309 }) 310 .await; 311 } 312 313 let app = super::router(seed.clone()) 314 .layer(MockConnectInfo(SocketAddr::from(([127, 0, 0, 1], 8080)))); 315 let response = get(&app, "/node").await; 316 317 assert_eq!(response.status(), StatusCode::OK); 318 let json_response = response.json().await; 319 320 assert_eq!(json_response["description"], json!("Test node description")); 321 assert_eq!( 322 json_response["avatarUrl"], 323 json!("https://example.com/avatar.png") 324 ); 325 assert_eq!( 326 json_response["bannerUrl"], 327 json!("https://example.com/banner.png") 328 ); 329 } 330 331 #[tokio::test] 332 async fn test_node_endpoint_responds_quickly() { 333 use std::time::Instant; 334 335 let tmp = tempfile::tempdir().unwrap(); 336 let seed = seed(tmp.path()); 337 let app = super::router(seed.clone()) 338 .layer(MockConnectInfo(SocketAddr::from(([127, 0, 0, 1], 8080)))); 339 340 let start = Instant::now(); 341 let response = get(&app, "/node").await; 342 let elapsed = start.elapsed(); 343 344 // Endpoint should respond within 2 seconds even if node.is_running() is slow 345 // (500ms timeout + overhead) 346 assert!( 347 elapsed.as_secs() < 2, 348 "Request took too long: {:?}", 349 elapsed 350 ); 351 assert_eq!(response.status(), StatusCode::OK); 352 353 let json_response = response.json().await; 354 assert!(json_response["state"].is_string()); 355 let state = json_response["state"].as_str().unwrap(); 356 assert!(state == "running" || state == "stopped"); 357 } 358 }