rpc.rs
1 //! JSON-RPC Control Plane 2 //! 3 //! Local control interface for managing the Abzu node. 4 5 use std::net::SocketAddr; 6 use std::sync::Arc; 7 use std::time::Duration; 8 9 use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; 10 use jsonrpsee::core::async_trait; 11 use jsonrpsee::core::RpcResult; 12 use jsonrpsee::proc_macros::rpc; 13 use jsonrpsee::server::Server; 14 use serde::{Deserialize, Serialize}; 15 use tower::ServiceBuilder; 16 use tracing::{debug, info, warn}; 17 18 use crate::auth::{AuthLayer, AuthToken}; 19 use abzu_core::{connect_peer, Node, Switchboard}; 20 use abzu_transport::{AbzuFrame, AbzuInterfaceExt}; 21 use abzu_token::ServiceOffer; 22 23 /// Node information response 24 #[derive(Debug, Clone, Serialize, Deserialize)] 25 pub struct NodeInfo { 26 /// Our peer key (hex encoded) 27 pub peer_key: String, 28 /// Our IPv6 address 29 pub address: String, 30 /// Number of connected peers 31 pub peer_count: usize, 32 /// Storage path 33 pub storage_path: String, 34 /// Total stored content size in bytes 35 pub store_size: u64, 36 } 37 38 /// Connect result 39 #[derive(Debug, Clone, Serialize, Deserialize)] 40 pub struct ConnectResult { 41 /// Whether connection succeeded 42 pub success: bool, 43 /// Peer key if successful (hex encoded) 44 pub peer_key: Option<String>, 45 /// Error message if failed 46 pub error: Option<String>, 47 } 48 49 /// Send message result 50 #[derive(Debug, Clone, Serialize, Deserialize)] 51 pub struct SendResult { 52 /// Whether send succeeded 53 pub success: bool, 54 /// Message ID (content hash) 55 pub message_id: Option<String>, 56 /// Error message if failed 57 pub error: Option<String>, 58 } 59 60 /// Upload content result 61 #[derive(Debug, Clone, Serialize, Deserialize)] 62 pub struct UploadResult { 63 /// Whether upload succeeded 64 pub success: bool, 65 /// Content ID (BLAKE3 hash, hex encoded) 66 pub cid: Option<String>, 67 /// Size in bytes 68 pub size: Option<u64>, 69 /// Error message if failed 70 pub error: Option<String>, 71 } 72 73 /// Download content result 74 #[derive(Debug, Clone, Serialize, Deserialize)] 75 pub struct DownloadResult { 76 /// Whether content was found 77 pub found: bool, 78 /// Content data (base64 encoded) 79 pub data: Option<String>, 80 /// Size in bytes 81 pub size: Option<u64>, 82 /// Error message if failed 83 pub error: Option<String>, 84 } 85 86 /// Contact DTO for RPC 87 #[derive(Debug, Clone, Serialize, Deserialize)] 88 pub struct ContactDto { 89 /// Human-readable alias 90 pub alias: String, 91 /// Public key (hex encoded) 92 pub pubkey: String, 93 /// When the contact was added (ISO 8601) 94 pub added_at: String, 95 } 96 97 /// Chat message DTO for RPC 98 #[derive(Debug, Clone, Serialize, Deserialize)] 99 pub struct ChatMessageDto { 100 /// Message ID 101 pub id: u64, 102 /// Other party pubkey (hex) 103 pub peer: String, 104 /// Message content (UTF-8 text) 105 pub content: String, 106 /// Timestamp (ISO 8601) 107 pub timestamp: String, 108 /// Direction: "inbound" or "outbound" 109 pub direction: String, 110 /// Status: "pending", "delivered", or "failed" 111 pub status: String, 112 } 113 114 /// Send chat result 115 #[derive(Debug, Clone, Serialize, Deserialize)] 116 pub struct SendChatResult { 117 /// Whether send succeeded 118 pub success: bool, 119 /// Message ID if successful 120 pub message_id: Option<u64>, 121 /// Error message if failed 122 pub error: Option<String>, 123 } 124 125 /// The RPC API definition 126 #[rpc(server)] 127 pub trait AbzuApi { 128 /// Get node information 129 #[method(name = "get_info")] 130 async fn get_info(&self) -> RpcResult<NodeInfo>; 131 132 /// Connect to a peer 133 #[method(name = "connect")] 134 async fn connect(&self, peer: String) -> RpcResult<ConnectResult>; 135 136 /// Send a text message to a target 137 #[method(name = "send_message")] 138 async fn send_message(&self, target: String, msg: String) -> RpcResult<SendResult>; 139 140 /// List connected peers 141 #[method(name = "list_peers")] 142 async fn list_peers(&self) -> RpcResult<Vec<String>>; 143 144 /// Upload content to local store (returns CID) 145 #[method(name = "upload_content")] 146 async fn upload_content(&self, data_b64: String) -> RpcResult<UploadResult>; 147 148 /// Download content from local store by CID 149 #[method(name = "download_content")] 150 async fn download_content(&self, cid: String) -> RpcResult<DownloadResult>; 151 152 /// List all stored content CIDs 153 #[method(name = "list_content")] 154 async fn list_content(&self) -> RpcResult<Vec<String>>; 155 156 /// Shutdown the node 157 #[method(name = "shutdown")] 158 async fn shutdown(&self) -> RpcResult<bool>; 159 160 // ===== Dark Comms API ===== 161 162 /// Add a contact 163 #[method(name = "add_contact")] 164 async fn add_contact(&self, alias: String, pubkey: String) -> RpcResult<ContactDto>; 165 166 /// Get all contacts 167 #[method(name = "get_contacts")] 168 async fn get_contacts(&self) -> RpcResult<Vec<ContactDto>>; 169 170 /// Send a chat message 171 #[method(name = "send_chat")] 172 async fn send_chat(&self, to_pubkey: String, message: String) -> RpcResult<SendChatResult>; 173 174 /// Get chat history with a peer 175 /// Get chat history with a peer 176 #[method(name = "get_chat_history")] 177 async fn get_chat_history(&self, pubkey: String) -> RpcResult<Vec<ChatMessageDto>>; 178 179 // ===== Circles API ===== 180 181 /// List all circles 182 #[method(name = "list_circles")] 183 async fn list_circles(&self) -> RpcResult<Vec<sovereign_agent::circles::Circle>>; 184 185 /// Post message to circle 186 #[method(name = "post_circle_message")] 187 async fn post_circle_message(&self, circle_id: String, content: String) -> RpcResult<Option<sovereign_agent::circles::Message>>; 188 189 // ===== Agent API ===== 190 191 /// Send a prompt to the agent 192 #[method(name = "agent_prompt")] 193 async fn agent_prompt(&self, session_id: String, prompt: String) -> RpcResult<String>; 194 195 /// List agent presets 196 #[method(name = "list_presets")] 197 async fn list_presets(&self) -> RpcResult<Vec<sovereign_agent::settings::AgentPreset>>; 198 199 /// Get agent status 200 #[method(name = "agent_status")] 201 async fn agent_status(&self) -> RpcResult<sovereign_agent::agent::AgentStatus>; 202 203 // ===== Token Economy API ===== 204 205 /// Get token balance for an address 206 #[method(name = "token_balance")] 207 async fn token_balance(&self, address: String) -> RpcResult<u128>; 208 209 /// List service offers 210 #[method(name = "token_offers")] 211 async fn token_offers(&self) -> RpcResult<Vec<ServiceOffer>>; 212 213 /// Publish a service offer 214 #[method(name = "token_publish_offer")] 215 async fn token_publish_offer(&self, category: String, price: u128) -> RpcResult<String>; 216 } 217 218 /// RPC server implementation 219 pub struct RpcServer { 220 node: Arc<Node>, 221 agent: Arc<sovereign_agent::agent::AgentRuntime>, 222 shared_key: [u8; 32], // Pre-shared key for now (TODO: proper key exchange) 223 } 224 225 impl RpcServer { 226 pub fn new(node: Arc<Node>, agent: Arc<sovereign_agent::agent::AgentRuntime>, shared_key: [u8; 32]) -> Self { 227 Self { node, agent, shared_key } 228 } 229 } 230 231 #[async_trait] 232 impl AbzuApiServer for RpcServer { 233 async fn get_info(&self) -> RpcResult<NodeInfo> { 234 let peer_key = hex_encode(self.node.peer_key()); 235 let address = self.node.address().to_ipv6_string(); 236 let peer_count = self.node.peer_count().await; 237 let storage_path = self.node.config().storage_path.clone(); 238 let store_size = self.node.store_size().unwrap_or(0); 239 240 Ok(NodeInfo { 241 peer_key, 242 address, 243 peer_count, 244 storage_path, 245 store_size, 246 }) 247 } 248 249 async fn connect(&self, peer: String) -> RpcResult<ConnectResult> { 250 info!(peer = %peer, "RPC: Connecting to peer"); 251 252 match connect_peer(Arc::clone(&self.node), &peer, &self.shared_key).await { 253 Ok(peer_key) => Ok(ConnectResult { 254 success: true, 255 peer_key: Some(hex_encode(peer_key)), 256 error: None, 257 }), 258 Err(e) => { 259 warn!(peer = %peer, error = %e, "Failed to connect"); 260 Ok(ConnectResult { 261 success: false, 262 peer_key: None, 263 error: Some(e.to_string()), 264 }) 265 } 266 } 267 } 268 269 async fn send_message(&self, target: String, msg: String) -> RpcResult<SendResult> { 270 info!(target = %target, len = msg.len(), "RPC: Sending message"); 271 272 // Hash the message for CID 273 let cid = blake3::hash(msg.as_bytes()); 274 let cid_bytes = *cid.as_bytes(); 275 276 // Store locally first 277 if let Err(e) = self.node.store_chunk(&cid_bytes, msg.as_bytes()) { 278 return Ok(SendResult { 279 success: false, 280 message_id: None, 281 error: Some(e.to_string()), 282 }); 283 } 284 285 // TODO: Actually route the message to target 286 // For now, just store locally and return success 287 Ok(SendResult { 288 success: true, 289 message_id: Some(hex_encode(cid_bytes)), 290 error: None, 291 }) 292 } 293 294 async fn list_peers(&self) -> RpcResult<Vec<String>> { 295 let peers_arc = self.node.peers(); 296 let peers = peers_arc.lock().await; 297 let keys: Vec<String> = peers.keys().map(|k| hex_encode(*k)).collect(); 298 Ok(keys) 299 } 300 301 async fn upload_content(&self, data_b64: String) -> RpcResult<UploadResult> { 302 // Decode base64 303 let data = match BASE64.decode(&data_b64) { 304 Ok(d) => d, 305 Err(e) => { 306 return Ok(UploadResult { 307 success: false, 308 cid: None, 309 size: None, 310 error: Some(format!("Invalid base64: {}", e)), 311 }); 312 } 313 }; 314 315 let size = data.len() as u64; 316 debug!(size = size, "RPC: Uploading content"); 317 318 // Clone node for spawn_blocking (Arc clone is cheap) 319 let node = Arc::clone(&self.node); 320 321 // Store using CAS - wrapped in spawn_blocking to avoid blocking async runtime 322 let result = tokio::task::spawn_blocking(move || { 323 node.store_content(&data) 324 }).await; 325 326 match result { 327 Ok(Ok(cid)) => { 328 info!(cid = %hex_encode(cid), size = size, "Content uploaded"); 329 Ok(UploadResult { 330 success: true, 331 cid: Some(hex_encode(cid)), 332 size: Some(size), 333 error: None, 334 }) 335 } 336 Ok(Err(e)) => Ok(UploadResult { 337 success: false, 338 cid: None, 339 size: None, 340 error: Some(e.to_string()), 341 }), 342 Err(e) => Ok(UploadResult { 343 success: false, 344 cid: None, 345 size: None, 346 error: Some(format!("Task join error: {}", e)), 347 }), 348 } 349 } 350 351 async fn download_content(&self, cid: String) -> RpcResult<DownloadResult> { 352 // Parse hex CID 353 let cid_bytes: [u8; 32] = match hex_decode(&cid) { 354 Ok(b) if b.len() == 32 => { 355 let mut arr = [0u8; 32]; 356 arr.copy_from_slice(&b); 357 arr 358 } 359 _ => { 360 return Ok(DownloadResult { 361 found: false, 362 data: None, 363 size: None, 364 error: Some("Invalid CID: must be 64 hex characters".to_string()), 365 }); 366 } 367 }; 368 369 debug!(cid = %cid, "RPC: Downloading content"); 370 371 // Step 1: Check local store first 372 let node = Arc::clone(&self.node); 373 let cid_for_local = cid_bytes; 374 375 let local_result = tokio::task::spawn_blocking(move || { 376 node.get_content(&cid_for_local) 377 }).await; 378 379 // If found locally, return immediately 380 if let Ok(Ok(Some(data))) = local_result { 381 let size = data.len() as u64; 382 let data_b64 = BASE64.encode(&data); 383 return Ok(DownloadResult { 384 found: true, 385 data: Some(data_b64), 386 size: Some(size), 387 error: None, 388 }); 389 } 390 391 // Step 2: Not found locally - try network fetch 392 info!(cid = %cid, "Content not found locally, requesting from peers"); 393 394 // Check if we have any peers 395 let peer_count = self.node.peer_count().await; 396 if peer_count == 0 { 397 return Ok(DownloadResult { 398 found: false, 399 data: None, 400 size: None, 401 error: Some("No peers connected".to_string()), 402 }); 403 } 404 405 // Register pending fetch 406 let notify = self.node.register_pending_fetch(cid_bytes); 407 408 // Broadcast Request to all peers 409 let request_frame = AbzuFrame::request(cid_bytes, self.node.peer_key()); 410 let switchboard = Switchboard::new(Arc::clone(&self.node)); 411 let _results = switchboard.broadcast(&request_frame).await; 412 413 // Wait for content to arrive (with timeout) 414 let timeout_result = tokio::time::timeout( 415 Duration::from_secs(5), 416 notify.notified() 417 ).await; 418 419 // Clean up pending fetch 420 self.node.pending_fetches().remove(&cid_bytes); 421 422 if timeout_result.is_err() { 423 // Timeout 424 return Ok(DownloadResult { 425 found: false, 426 data: None, 427 size: None, 428 error: Some("Network request timed out".to_string()), 429 }); 430 } 431 432 // Step 3: Content should be in store now, fetch it 433 let node = Arc::clone(&self.node); 434 let cid_for_fetch = cid_bytes; 435 436 let fetch_result = tokio::task::spawn_blocking(move || { 437 node.get_content(&cid_for_fetch) 438 }).await; 439 440 match fetch_result { 441 Ok(Ok(Some(data))) => { 442 let size = data.len() as u64; 443 let data_b64 = BASE64.encode(&data); 444 info!(cid = %cid, size = size, "Content fetched from network"); 445 Ok(DownloadResult { 446 found: true, 447 data: Some(data_b64), 448 size: Some(size), 449 error: None, 450 }) 451 } 452 Ok(Ok(None)) => Ok(DownloadResult { 453 found: false, 454 data: None, 455 size: None, 456 error: Some("Content not received from network".to_string()), 457 }), 458 Ok(Err(e)) => Ok(DownloadResult { 459 found: false, 460 data: None, 461 size: None, 462 error: Some(e.to_string()), 463 }), 464 Err(e) => Ok(DownloadResult { 465 found: false, 466 data: None, 467 size: None, 468 error: Some(format!("Task join error: {}", e)), 469 }), 470 } 471 } 472 473 async fn list_content(&self) -> RpcResult<Vec<String>> { 474 match self.node.list_content() { 475 Ok(cids) => Ok(cids.iter().map(hex_encode).collect()), 476 Err(_) => Ok(Vec::new()), 477 } 478 } 479 480 async fn shutdown(&self) -> RpcResult<bool> { 481 info!("RPC: Shutdown requested"); 482 self.node.signal_shutdown(); 483 Ok(true) 484 } 485 486 // ===== Dark Comms Implementations ===== 487 488 async fn add_contact(&self, alias: String, pubkey: String) -> RpcResult<ContactDto> { 489 info!(alias = %alias, "RPC: Adding contact"); 490 491 let pubkey_bytes = match hex_decode(&pubkey) { 492 Ok(bytes) if bytes.len() == 32 => { 493 let mut arr = [0u8; 32]; 494 arr.copy_from_slice(&bytes); 495 arr 496 } 497 _ => { 498 return Err(jsonrpsee::types::ErrorObjectOwned::owned( 499 -32602, 500 "Invalid pubkey: must be 64 hex characters", 501 None::<()>, 502 )); 503 } 504 }; 505 506 match self.node.add_contact(alias, pubkey_bytes) { 507 Ok(contact) => Ok(ContactDto { 508 alias: contact.alias, 509 pubkey: hex_encode(contact.pubkey), 510 added_at: timestamp_to_iso(contact.added_at), 511 }), 512 Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned( 513 -32000, 514 e.to_string(), 515 None::<()>, 516 )), 517 } 518 } 519 520 async fn get_contacts(&self) -> RpcResult<Vec<ContactDto>> { 521 debug!("RPC: Getting contacts"); 522 523 match self.node.get_contacts() { 524 Ok(contacts) => Ok(contacts 525 .into_iter() 526 .map(|c| ContactDto { 527 alias: c.alias, 528 pubkey: hex_encode(c.pubkey), 529 added_at: timestamp_to_iso(c.added_at), 530 }) 531 .collect()), 532 Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned( 533 -32000, 534 e.to_string(), 535 None::<()>, 536 )), 537 } 538 } 539 540 async fn send_chat(&self, to_pubkey: String, message: String) -> RpcResult<SendChatResult> { 541 info!(to = %to_pubkey, len = message.len(), "RPC: Sending chat"); 542 543 let to_bytes = match hex_decode(&to_pubkey) { 544 Ok(bytes) if bytes.len() == 32 => { 545 let mut arr = [0u8; 32]; 546 arr.copy_from_slice(&bytes); 547 arr 548 } 549 _ => { 550 return Ok(SendChatResult { 551 success: false, 552 message_id: None, 553 error: Some("Invalid pubkey: must be 64 hex characters".to_string()), 554 }); 555 } 556 }; 557 558 // Get current timestamp 559 let timestamp = std::time::SystemTime::now() 560 .duration_since(std::time::UNIX_EPOCH) 561 .unwrap_or_default() 562 .as_millis() as u64; 563 564 // Store the outbound message 565 let stored = match self.node.store_outbound_chat(to_bytes, message.into_bytes(), timestamp) { 566 Ok(msg) => msg, 567 Err(e) => { 568 return Ok(SendChatResult { 569 success: false, 570 message_id: None, 571 error: Some(e.to_string()), 572 }); 573 } 574 }; 575 576 // Try to send to connected peer 577 let chat_frame = AbzuFrame::chat(stored.id, to_bytes, stored.content.clone(), timestamp); 578 let peers_arc = self.node.peers(); 579 let mut peers = peers_arc.lock().await; 580 581 if let Some(conn) = peers.get_mut(&to_bytes) { 582 if let Err(e) = conn.interface.send_frame(&chat_frame).await { 583 warn!(error = %e, "Failed to send chat frame"); 584 // Message is stored but not sent - will retry later 585 } else { 586 conn.touch(); 587 debug!(id = stored.id, "Chat frame sent"); 588 } 589 } else { 590 debug!("Peer not connected, message stored for later delivery"); 591 } 592 593 Ok(SendChatResult { 594 success: true, 595 message_id: Some(stored.id), 596 error: None, 597 }) 598 } 599 600 async fn get_chat_history(&self, pubkey: String) -> RpcResult<Vec<ChatMessageDto>> { 601 debug!(peer = %pubkey, "RPC: Getting chat history"); 602 603 let pubkey_bytes = match hex_decode(&pubkey) { 604 Ok(bytes) if bytes.len() == 32 => { 605 let mut arr = [0u8; 32]; 606 arr.copy_from_slice(&bytes); 607 arr 608 } 609 _ => { 610 return Err(jsonrpsee::types::ErrorObjectOwned::owned( 611 -32602, 612 "Invalid pubkey: must be 64 hex characters", 613 None::<()>, 614 )); 615 } 616 }; 617 618 match self.node.get_chat_history(&pubkey_bytes) { 619 Ok(messages) => Ok(messages 620 .into_iter() 621 .map(|m| ChatMessageDto { 622 id: m.id, 623 peer: hex_encode(m.peer), 624 content: String::from_utf8_lossy(&m.content).to_string(), 625 timestamp: timestamp_to_iso(m.timestamp), 626 direction: match m.direction { 627 abzu_core::MessageDirection::Inbound => "inbound".to_string(), 628 abzu_core::MessageDirection::Outbound => "outbound".to_string(), 629 }, 630 status: match m.status { 631 abzu_core::MessageStatus::Pending => "pending".to_string(), 632 abzu_core::MessageStatus::Delivered => "delivered".to_string(), 633 abzu_core::MessageStatus::Read => "read".to_string(), 634 abzu_core::MessageStatus::Failed => "failed".to_string(), 635 }, 636 }) 637 .collect()), 638 Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned( 639 -32000, 640 e.to_string(), 641 None::<()>, 642 )), 643 } 644 } 645 646 // ===== Chat / Circles API ===== 647 648 async fn list_circles(&self) -> RpcResult<Vec<sovereign_agent::circles::Circle>> { 649 Ok(self.agent.circles.list_circles().await) 650 } 651 652 async fn post_circle_message(&self, circle_id: String, content: String) -> RpcResult<Option<sovereign_agent::circles::Message>> { 653 // For now, sender is hardcoded to "Human" or we use the identity 654 // Ideally we should use the authenticated user's ID 655 let sender = "Human"; 656 Ok(self.agent.circles.post_message(&circle_id, sender, &content).await) 657 } 658 659 // ===== Agent / Inference API ===== 660 661 async fn agent_prompt(&self, session_id: String, prompt: String) -> RpcResult<String> { 662 info!(session = %session_id, "RPC: Agent prompt"); 663 match self.agent.prompt(&session_id, &prompt).await { 664 Ok(response) => Ok(response), 665 Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned( 666 -32000, 667 e.to_string(), 668 None::<()>, 669 )), 670 } 671 } 672 673 async fn list_presets(&self) -> RpcResult<Vec<sovereign_agent::settings::AgentPreset>> { 674 Ok(self.agent.list_presets().await) 675 } 676 677 async fn agent_status(&self) -> RpcResult<sovereign_agent::agent::AgentStatus> { 678 Ok(self.agent.status().await) 679 } 680 681 // ===== Token Economy Implementations ===== 682 683 async fn token_balance(&self, address: String) -> RpcResult<u128> { 684 Ok(self.agent.get_token_balance(&address).await) 685 } 686 687 async fn token_offers(&self) -> RpcResult<Vec<ServiceOffer>> { 688 Ok(self.agent.list_service_offers().await) 689 } 690 691 async fn token_publish_offer(&self, category: String, price: u128) -> RpcResult<String> { 692 // "Protective Wiring": This endpoint is purely local for now. 693 // It registers an offer in the local (memory) registry. 694 // Malicious actors cannot touch this execution path without the auth key. 695 match self.agent.publish_service_offer(&category, price).await { 696 Ok(id) => Ok(id), 697 Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned( 698 -32000, 699 e, 700 None::<()>, 701 )), 702 } 703 } 704 } 705 706 /// Start the RPC server with authentication 707 pub async fn start_rpc_server( 708 addr: &str, 709 node: Arc<Node>, 710 agent: Arc<sovereign_agent::agent::AgentRuntime>, // Added agent 711 shared_key: [u8; 32], 712 auth_token: AuthToken, 713 ) -> anyhow::Result<SocketAddr> { 714 let addr: SocketAddr = addr.parse()?; 715 716 // Build HTTP middleware with auth layer 717 let http_middleware = ServiceBuilder::new() 718 .layer(AuthLayer::new(auth_token)); 719 720 let server = Server::builder() 721 .set_http_middleware(http_middleware) 722 .build(addr) 723 .await?; 724 725 let bound_addr = server.local_addr()?; 726 info!(addr = %bound_addr, "RPC server listening (authentication required)"); 727 728 let rpc = RpcServer::new(node, agent, shared_key); // Pass agent 729 let handle = server.start(rpc.into_rpc()); 730 731 // Spawn server task 732 tokio::spawn(async move { 733 handle.stopped().await; 734 info!("RPC server stopped"); 735 }); 736 737 Ok(bound_addr) 738 } 739 740 /// Hex encode bytes 741 fn hex_encode(bytes: impl AsRef<[u8]>) -> String { 742 bytes.as_ref().iter().map(|b| format!("{:02x}", b)).collect() 743 } 744 745 /// Hex decode string to bytes 746 fn hex_decode(s: &str) -> Result<Vec<u8>, String> { 747 if !s.len().is_multiple_of(2) { 748 return Err("Odd length hex string".to_string()); 749 } 750 let mut bytes = Vec::with_capacity(s.len() / 2); 751 for chunk in s.as_bytes().chunks(2) { 752 let hex_str = std::str::from_utf8(chunk).map_err(|e| e.to_string())?; 753 let byte = u8::from_str_radix(hex_str, 16).map_err(|e| e.to_string())?; 754 bytes.push(byte); 755 } 756 Ok(bytes) 757 } 758 759 /// Convert Unix timestamp (ms) to ISO 8601 string 760 fn timestamp_to_iso(ms: u64) -> String { 761 use std::time::{Duration, UNIX_EPOCH}; 762 let secs = ms / 1000; 763 let d = UNIX_EPOCH + Duration::from_secs(secs); 764 // Simple ISO format - not perfect but good enough for RPC 765 format!("{:?}", d) 766 } 767 768 #[cfg(test)] 769 mod tests { 770 use super::*; 771 772 #[test] 773 fn test_hex_encode() { 774 assert_eq!(hex_encode([0xDE, 0xAD, 0xBE, 0xEF]), "deadbeef"); 775 } 776 777 #[test] 778 fn test_hex_decode() { 779 let hex = "deadbeef"; 780 let bytes = hex_decode(hex).unwrap(); 781 assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]); 782 } 783 }