/ abzu-daemon / src / rpc.rs
rpc.rs
  1  //! JSON-RPC Control Plane
  2  //!
  3  //! Local control interface for managing the Abzu node.
  4  
  5  use std::net::SocketAddr;
  6  use std::sync::Arc;
  7  use std::time::Duration;
  8  
  9  use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
 10  use jsonrpsee::core::async_trait;
 11  use jsonrpsee::core::RpcResult;
 12  use jsonrpsee::proc_macros::rpc;
 13  use jsonrpsee::server::Server;
 14  use serde::{Deserialize, Serialize};
 15  use tower::ServiceBuilder;
 16  use tracing::{debug, info, warn};
 17  
 18  use crate::auth::{AuthLayer, AuthToken};
 19  use abzu_core::{connect_peer, Node, Switchboard};
 20  use abzu_transport::{AbzuFrame, AbzuInterfaceExt};
 21  use abzu_token::ServiceOffer;
 22  
 23  /// Node information response
 24  #[derive(Debug, Clone, Serialize, Deserialize)]
 25  pub struct NodeInfo {
 26      /// Our peer key (hex encoded)
 27      pub peer_key: String,
 28      /// Our IPv6 address
 29      pub address: String,
 30      /// Number of connected peers
 31      pub peer_count: usize,
 32      /// Storage path
 33      pub storage_path: String,
 34      /// Total stored content size in bytes
 35      pub store_size: u64,
 36  }
 37  
 38  /// Connect result
 39  #[derive(Debug, Clone, Serialize, Deserialize)]
 40  pub struct ConnectResult {
 41      /// Whether connection succeeded
 42      pub success: bool,
 43      /// Peer key if successful (hex encoded)
 44      pub peer_key: Option<String>,
 45      /// Error message if failed
 46      pub error: Option<String>,
 47  }
 48  
 49  /// Send message result
 50  #[derive(Debug, Clone, Serialize, Deserialize)]
 51  pub struct SendResult {
 52      /// Whether send succeeded
 53      pub success: bool,
 54      /// Message ID (content hash)
 55      pub message_id: Option<String>,
 56      /// Error message if failed
 57      pub error: Option<String>,
 58  }
 59  
 60  /// Upload content result
 61  #[derive(Debug, Clone, Serialize, Deserialize)]
 62  pub struct UploadResult {
 63      /// Whether upload succeeded
 64      pub success: bool,
 65      /// Content ID (BLAKE3 hash, hex encoded)
 66      pub cid: Option<String>,
 67      /// Size in bytes
 68      pub size: Option<u64>,
 69      /// Error message if failed
 70      pub error: Option<String>,
 71  }
 72  
 73  /// Download content result
 74  #[derive(Debug, Clone, Serialize, Deserialize)]
 75  pub struct DownloadResult {
 76      /// Whether content was found
 77      pub found: bool,
 78      /// Content data (base64 encoded)
 79      pub data: Option<String>,
 80      /// Size in bytes
 81      pub size: Option<u64>,
 82      /// Error message if failed
 83      pub error: Option<String>,
 84  }
 85  
 86  /// Contact DTO for RPC
 87  #[derive(Debug, Clone, Serialize, Deserialize)]
 88  pub struct ContactDto {
 89      /// Human-readable alias
 90      pub alias: String,
 91      /// Public key (hex encoded)
 92      pub pubkey: String,
 93      /// When the contact was added (ISO 8601)
 94      pub added_at: String,
 95  }
 96  
 97  /// Chat message DTO for RPC
 98  #[derive(Debug, Clone, Serialize, Deserialize)]
 99  pub struct ChatMessageDto {
100      /// Message ID
101      pub id: u64,
102      /// Other party pubkey (hex)
103      pub peer: String,
104      /// Message content (UTF-8 text)
105      pub content: String,
106      /// Timestamp (ISO 8601)
107      pub timestamp: String,
108      /// Direction: "inbound" or "outbound"
109      pub direction: String,
110      /// Status: "pending", "delivered", or "failed"
111      pub status: String,
112  }
113  
114  /// Send chat result
115  #[derive(Debug, Clone, Serialize, Deserialize)]
116  pub struct SendChatResult {
117      /// Whether send succeeded
118      pub success: bool,
119      /// Message ID if successful
120      pub message_id: Option<u64>,
121      /// Error message if failed
122      pub error: Option<String>,
123  }
124  
125  /// The RPC API definition
126  #[rpc(server)]
127  pub trait AbzuApi {
128      /// Get node information
129      #[method(name = "get_info")]
130      async fn get_info(&self) -> RpcResult<NodeInfo>;
131  
132      /// Connect to a peer
133      #[method(name = "connect")]
134      async fn connect(&self, peer: String) -> RpcResult<ConnectResult>;
135  
136      /// Send a text message to a target
137      #[method(name = "send_message")]
138      async fn send_message(&self, target: String, msg: String) -> RpcResult<SendResult>;
139  
140      /// List connected peers
141      #[method(name = "list_peers")]
142      async fn list_peers(&self) -> RpcResult<Vec<String>>;
143  
144      /// Upload content to local store (returns CID)
145      #[method(name = "upload_content")]
146      async fn upload_content(&self, data_b64: String) -> RpcResult<UploadResult>;
147  
148      /// Download content from local store by CID
149      #[method(name = "download_content")]
150      async fn download_content(&self, cid: String) -> RpcResult<DownloadResult>;
151  
152      /// List all stored content CIDs
153      #[method(name = "list_content")]
154      async fn list_content(&self) -> RpcResult<Vec<String>>;
155  
156      /// Shutdown the node
157      #[method(name = "shutdown")]
158      async fn shutdown(&self) -> RpcResult<bool>;
159  
160      // ===== Dark Comms API =====
161  
162      /// Add a contact
163      #[method(name = "add_contact")]
164      async fn add_contact(&self, alias: String, pubkey: String) -> RpcResult<ContactDto>;
165  
166      /// Get all contacts
167      #[method(name = "get_contacts")]
168      async fn get_contacts(&self) -> RpcResult<Vec<ContactDto>>;
169  
170      /// Send a chat message
171      #[method(name = "send_chat")]
172      async fn send_chat(&self, to_pubkey: String, message: String) -> RpcResult<SendChatResult>;
173  
174      /// Get chat history with a peer
175      /// Get chat history with a peer
176      #[method(name = "get_chat_history")]
177      async fn get_chat_history(&self, pubkey: String) -> RpcResult<Vec<ChatMessageDto>>;
178  
179      // ===== Circles API =====
180  
181      /// List all circles
182      #[method(name = "list_circles")]
183      async fn list_circles(&self) -> RpcResult<Vec<sovereign_agent::circles::Circle>>;
184  
185      /// Post message to circle
186      #[method(name = "post_circle_message")]
187      async fn post_circle_message(&self, circle_id: String, content: String) -> RpcResult<Option<sovereign_agent::circles::Message>>;
188  
189      // ===== Agent API =====
190  
191      /// Send a prompt to the agent
192      #[method(name = "agent_prompt")]
193      async fn agent_prompt(&self, session_id: String, prompt: String) -> RpcResult<String>;
194  
195      /// List agent presets
196      #[method(name = "list_presets")]
197      async fn list_presets(&self) -> RpcResult<Vec<sovereign_agent::settings::AgentPreset>>;
198  
199      /// Get agent status
200      #[method(name = "agent_status")]
201      async fn agent_status(&self) -> RpcResult<sovereign_agent::agent::AgentStatus>;
202  
203      // ===== Token Economy API =====
204  
205      /// Get token balance for an address
206      #[method(name = "token_balance")]
207      async fn token_balance(&self, address: String) -> RpcResult<u128>;
208  
209      /// List service offers
210      #[method(name = "token_offers")]
211      async fn token_offers(&self) -> RpcResult<Vec<ServiceOffer>>;
212  
213      /// Publish a service offer
214      #[method(name = "token_publish_offer")]
215      async fn token_publish_offer(&self, category: String, price: u128) -> RpcResult<String>;
216  }
217  
218  /// RPC server implementation
219  pub struct RpcServer {
220      node: Arc<Node>,
221      agent: Arc<sovereign_agent::agent::AgentRuntime>,
222      shared_key: [u8; 32], // Pre-shared key for now (TODO: proper key exchange)
223  }
224  
225  impl RpcServer {
226      pub fn new(node: Arc<Node>, agent: Arc<sovereign_agent::agent::AgentRuntime>, shared_key: [u8; 32]) -> Self {
227          Self { node, agent, shared_key }
228      }
229  }
230  
231  #[async_trait]
232  impl AbzuApiServer for RpcServer {
233      async fn get_info(&self) -> RpcResult<NodeInfo> {
234          let peer_key = hex_encode(self.node.peer_key());
235          let address = self.node.address().to_ipv6_string();
236          let peer_count = self.node.peer_count().await;
237          let storage_path = self.node.config().storage_path.clone();
238          let store_size = self.node.store_size().unwrap_or(0);
239  
240          Ok(NodeInfo {
241              peer_key,
242              address,
243              peer_count,
244              storage_path,
245              store_size,
246          })
247      }
248  
249      async fn connect(&self, peer: String) -> RpcResult<ConnectResult> {
250          info!(peer = %peer, "RPC: Connecting to peer");
251  
252          match connect_peer(Arc::clone(&self.node), &peer, &self.shared_key).await {
253              Ok(peer_key) => Ok(ConnectResult {
254                  success: true,
255                  peer_key: Some(hex_encode(peer_key)),
256                  error: None,
257              }),
258              Err(e) => {
259                  warn!(peer = %peer, error = %e, "Failed to connect");
260                  Ok(ConnectResult {
261                      success: false,
262                      peer_key: None,
263                      error: Some(e.to_string()),
264                  })
265              }
266          }
267      }
268  
269      async fn send_message(&self, target: String, msg: String) -> RpcResult<SendResult> {
270          info!(target = %target, len = msg.len(), "RPC: Sending message");
271  
272          // Hash the message for CID
273          let cid = blake3::hash(msg.as_bytes());
274          let cid_bytes = *cid.as_bytes();
275  
276          // Store locally first
277          if let Err(e) = self.node.store_chunk(&cid_bytes, msg.as_bytes()) {
278              return Ok(SendResult {
279                  success: false,
280                  message_id: None,
281                  error: Some(e.to_string()),
282              });
283          }
284  
285          // TODO: Actually route the message to target
286          // For now, just store locally and return success
287          Ok(SendResult {
288              success: true,
289              message_id: Some(hex_encode(cid_bytes)),
290              error: None,
291          })
292      }
293  
294      async fn list_peers(&self) -> RpcResult<Vec<String>> {
295          let peers_arc = self.node.peers();
296          let peers = peers_arc.lock().await;
297          let keys: Vec<String> = peers.keys().map(|k| hex_encode(*k)).collect();
298          Ok(keys)
299      }
300  
301      async fn upload_content(&self, data_b64: String) -> RpcResult<UploadResult> {
302          // Decode base64
303          let data = match BASE64.decode(&data_b64) {
304              Ok(d) => d,
305              Err(e) => {
306                  return Ok(UploadResult {
307                      success: false,
308                      cid: None,
309                      size: None,
310                      error: Some(format!("Invalid base64: {}", e)),
311                  });
312              }
313          };
314  
315          let size = data.len() as u64;
316          debug!(size = size, "RPC: Uploading content");
317  
318          // Clone node for spawn_blocking (Arc clone is cheap)
319          let node = Arc::clone(&self.node);
320  
321          // Store using CAS - wrapped in spawn_blocking to avoid blocking async runtime
322          let result = tokio::task::spawn_blocking(move || {
323              node.store_content(&data)
324          }).await;
325  
326          match result {
327              Ok(Ok(cid)) => {
328                  info!(cid = %hex_encode(cid), size = size, "Content uploaded");
329                  Ok(UploadResult {
330                      success: true,
331                      cid: Some(hex_encode(cid)),
332                      size: Some(size),
333                      error: None,
334                  })
335              }
336              Ok(Err(e)) => Ok(UploadResult {
337                  success: false,
338                  cid: None,
339                  size: None,
340                  error: Some(e.to_string()),
341              }),
342              Err(e) => Ok(UploadResult {
343                  success: false,
344                  cid: None,
345                  size: None,
346                  error: Some(format!("Task join error: {}", e)),
347              }),
348          }
349      }
350  
351      async fn download_content(&self, cid: String) -> RpcResult<DownloadResult> {
352          // Parse hex CID
353          let cid_bytes: [u8; 32] = match hex_decode(&cid) {
354              Ok(b) if b.len() == 32 => {
355                  let mut arr = [0u8; 32];
356                  arr.copy_from_slice(&b);
357                  arr
358              }
359              _ => {
360                  return Ok(DownloadResult {
361                      found: false,
362                      data: None,
363                      size: None,
364                      error: Some("Invalid CID: must be 64 hex characters".to_string()),
365                  });
366              }
367          };
368  
369          debug!(cid = %cid, "RPC: Downloading content");
370  
371          // Step 1: Check local store first
372          let node = Arc::clone(&self.node);
373          let cid_for_local = cid_bytes;
374          
375          let local_result = tokio::task::spawn_blocking(move || {
376              node.get_content(&cid_for_local)
377          }).await;
378  
379          // If found locally, return immediately
380          if let Ok(Ok(Some(data))) = local_result {
381              let size = data.len() as u64;
382              let data_b64 = BASE64.encode(&data);
383              return Ok(DownloadResult {
384                  found: true,
385                  data: Some(data_b64),
386                  size: Some(size),
387                  error: None,
388              });
389          }
390  
391          // Step 2: Not found locally - try network fetch
392          info!(cid = %cid, "Content not found locally, requesting from peers");
393  
394          // Check if we have any peers
395          let peer_count = self.node.peer_count().await;
396          if peer_count == 0 {
397              return Ok(DownloadResult {
398                  found: false,
399                  data: None,
400                  size: None,
401                  error: Some("No peers connected".to_string()),
402              });
403          }
404  
405          // Register pending fetch
406          let notify = self.node.register_pending_fetch(cid_bytes);
407  
408          // Broadcast Request to all peers
409          let request_frame = AbzuFrame::request(cid_bytes, self.node.peer_key());
410          let switchboard = Switchboard::new(Arc::clone(&self.node));
411          let _results = switchboard.broadcast(&request_frame).await;
412  
413          // Wait for content to arrive (with timeout)
414          let timeout_result = tokio::time::timeout(
415              Duration::from_secs(5),
416              notify.notified()
417          ).await;
418  
419          // Clean up pending fetch
420          self.node.pending_fetches().remove(&cid_bytes);
421  
422          if timeout_result.is_err() {
423              // Timeout
424              return Ok(DownloadResult {
425                  found: false,
426                  data: None,
427                  size: None,
428                  error: Some("Network request timed out".to_string()),
429              });
430          }
431  
432          // Step 3: Content should be in store now, fetch it
433          let node = Arc::clone(&self.node);
434          let cid_for_fetch = cid_bytes;
435          
436          let fetch_result = tokio::task::spawn_blocking(move || {
437              node.get_content(&cid_for_fetch)
438          }).await;
439  
440          match fetch_result {
441              Ok(Ok(Some(data))) => {
442                  let size = data.len() as u64;
443                  let data_b64 = BASE64.encode(&data);
444                  info!(cid = %cid, size = size, "Content fetched from network");
445                  Ok(DownloadResult {
446                      found: true,
447                      data: Some(data_b64),
448                      size: Some(size),
449                      error: None,
450                  })
451              }
452              Ok(Ok(None)) => Ok(DownloadResult {
453                  found: false,
454                  data: None,
455                  size: None,
456                  error: Some("Content not received from network".to_string()),
457              }),
458              Ok(Err(e)) => Ok(DownloadResult {
459                  found: false,
460                  data: None,
461                  size: None,
462                  error: Some(e.to_string()),
463              }),
464              Err(e) => Ok(DownloadResult {
465                  found: false,
466                  data: None,
467                  size: None,
468                  error: Some(format!("Task join error: {}", e)),
469              }),
470          }
471      }
472  
473      async fn list_content(&self) -> RpcResult<Vec<String>> {
474          match self.node.list_content() {
475              Ok(cids) => Ok(cids.iter().map(hex_encode).collect()),
476              Err(_) => Ok(Vec::new()),
477          }
478      }
479  
480      async fn shutdown(&self) -> RpcResult<bool> {
481          info!("RPC: Shutdown requested");
482          self.node.signal_shutdown();
483          Ok(true)
484      }
485  
486      // ===== Dark Comms Implementations =====
487  
488      async fn add_contact(&self, alias: String, pubkey: String) -> RpcResult<ContactDto> {
489          info!(alias = %alias, "RPC: Adding contact");
490  
491          let pubkey_bytes = match hex_decode(&pubkey) {
492              Ok(bytes) if bytes.len() == 32 => {
493                  let mut arr = [0u8; 32];
494                  arr.copy_from_slice(&bytes);
495                  arr
496              }
497              _ => {
498                  return Err(jsonrpsee::types::ErrorObjectOwned::owned(
499                      -32602,
500                      "Invalid pubkey: must be 64 hex characters",
501                      None::<()>,
502                  ));
503              }
504          };
505  
506          match self.node.add_contact(alias, pubkey_bytes) {
507              Ok(contact) => Ok(ContactDto {
508                  alias: contact.alias,
509                  pubkey: hex_encode(contact.pubkey),
510                  added_at: timestamp_to_iso(contact.added_at),
511              }),
512              Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
513                  -32000,
514                  e.to_string(),
515                  None::<()>,
516              )),
517          }
518      }
519  
520      async fn get_contacts(&self) -> RpcResult<Vec<ContactDto>> {
521          debug!("RPC: Getting contacts");
522  
523          match self.node.get_contacts() {
524              Ok(contacts) => Ok(contacts
525                  .into_iter()
526                  .map(|c| ContactDto {
527                      alias: c.alias,
528                      pubkey: hex_encode(c.pubkey),
529                      added_at: timestamp_to_iso(c.added_at),
530                  })
531                  .collect()),
532              Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
533                  -32000,
534                  e.to_string(),
535                  None::<()>,
536              )),
537          }
538      }
539  
540      async fn send_chat(&self, to_pubkey: String, message: String) -> RpcResult<SendChatResult> {
541          info!(to = %to_pubkey, len = message.len(), "RPC: Sending chat");
542  
543          let to_bytes = match hex_decode(&to_pubkey) {
544              Ok(bytes) if bytes.len() == 32 => {
545                  let mut arr = [0u8; 32];
546                  arr.copy_from_slice(&bytes);
547                  arr
548              }
549              _ => {
550                  return Ok(SendChatResult {
551                      success: false,
552                      message_id: None,
553                      error: Some("Invalid pubkey: must be 64 hex characters".to_string()),
554                  });
555              }
556          };
557  
558          // Get current timestamp
559          let timestamp = std::time::SystemTime::now()
560              .duration_since(std::time::UNIX_EPOCH)
561              .unwrap_or_default()
562              .as_millis() as u64;
563  
564          // Store the outbound message
565          let stored = match self.node.store_outbound_chat(to_bytes, message.into_bytes(), timestamp) {
566              Ok(msg) => msg,
567              Err(e) => {
568                  return Ok(SendChatResult {
569                      success: false,
570                      message_id: None,
571                      error: Some(e.to_string()),
572                  });
573              }
574          };
575  
576          // Try to send to connected peer
577          let chat_frame = AbzuFrame::chat(stored.id, to_bytes, stored.content.clone(), timestamp);
578          let peers_arc = self.node.peers();
579          let mut peers = peers_arc.lock().await;
580  
581          if let Some(conn) = peers.get_mut(&to_bytes) {
582              if let Err(e) = conn.interface.send_frame(&chat_frame).await {
583                  warn!(error = %e, "Failed to send chat frame");
584                  // Message is stored but not sent - will retry later
585              } else {
586                  conn.touch();
587                  debug!(id = stored.id, "Chat frame sent");
588              }
589          } else {
590              debug!("Peer not connected, message stored for later delivery");
591          }
592  
593          Ok(SendChatResult {
594              success: true,
595              message_id: Some(stored.id),
596              error: None,
597          })
598      }
599  
600      async fn get_chat_history(&self, pubkey: String) -> RpcResult<Vec<ChatMessageDto>> {
601          debug!(peer = %pubkey, "RPC: Getting chat history");
602  
603          let pubkey_bytes = match hex_decode(&pubkey) {
604              Ok(bytes) if bytes.len() == 32 => {
605                  let mut arr = [0u8; 32];
606                  arr.copy_from_slice(&bytes);
607                  arr
608              }
609              _ => {
610                  return Err(jsonrpsee::types::ErrorObjectOwned::owned(
611                      -32602,
612                      "Invalid pubkey: must be 64 hex characters",
613                      None::<()>,
614                  ));
615              }
616          };
617  
618          match self.node.get_chat_history(&pubkey_bytes) {
619              Ok(messages) => Ok(messages
620                  .into_iter()
621                  .map(|m| ChatMessageDto {
622                      id: m.id,
623                      peer: hex_encode(m.peer),
624                      content: String::from_utf8_lossy(&m.content).to_string(),
625                      timestamp: timestamp_to_iso(m.timestamp),
626                      direction: match m.direction {
627                          abzu_core::MessageDirection::Inbound => "inbound".to_string(),
628                          abzu_core::MessageDirection::Outbound => "outbound".to_string(),
629                      },
630                      status: match m.status {
631                          abzu_core::MessageStatus::Pending => "pending".to_string(),
632                          abzu_core::MessageStatus::Delivered => "delivered".to_string(),
633                          abzu_core::MessageStatus::Read => "read".to_string(),
634                          abzu_core::MessageStatus::Failed => "failed".to_string(),
635                      },
636                  })
637                  .collect()),
638              Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
639                  -32000,
640                  e.to_string(),
641                  None::<()>,
642              )),
643          }
644      }
645  
646      // ===== Chat / Circles API =====
647  
648      async fn list_circles(&self) -> RpcResult<Vec<sovereign_agent::circles::Circle>> {
649          Ok(self.agent.circles.list_circles().await)
650      }
651  
652      async fn post_circle_message(&self, circle_id: String, content: String) -> RpcResult<Option<sovereign_agent::circles::Message>> {
653          // For now, sender is hardcoded to "Human" or we use the identity
654          // Ideally we should use the authenticated user's ID
655          let sender = "Human"; 
656          Ok(self.agent.circles.post_message(&circle_id, sender, &content).await)
657      }
658  
659      // ===== Agent / Inference API =====
660  
661      async fn agent_prompt(&self, session_id: String, prompt: String) -> RpcResult<String> {
662          info!(session = %session_id, "RPC: Agent prompt");
663          match self.agent.prompt(&session_id, &prompt).await {
664              Ok(response) => Ok(response),
665              Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
666                  -32000,
667                  e.to_string(),
668                  None::<()>,
669              )),
670          }
671      }
672  
673      async fn list_presets(&self) -> RpcResult<Vec<sovereign_agent::settings::AgentPreset>> {
674          Ok(self.agent.list_presets().await)
675      }
676  
677      async fn agent_status(&self) -> RpcResult<sovereign_agent::agent::AgentStatus> {
678          Ok(self.agent.status().await)
679      }
680  
681      // ===== Token Economy Implementations =====
682  
683      async fn token_balance(&self, address: String) -> RpcResult<u128> {
684          Ok(self.agent.get_token_balance(&address).await)
685      }
686  
687      async fn token_offers(&self) -> RpcResult<Vec<ServiceOffer>> {
688          Ok(self.agent.list_service_offers().await)
689      }
690  
691      async fn token_publish_offer(&self, category: String, price: u128) -> RpcResult<String> {
692          // "Protective Wiring": This endpoint is purely local for now.
693          // It registers an offer in the local (memory) registry.
694          // Malicious actors cannot touch this execution path without the auth key.
695          match self.agent.publish_service_offer(&category, price).await {
696              Ok(id) => Ok(id),
697              Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
698                  -32000,
699                  e,
700                  None::<()>,
701              )),
702          }
703      }
704  }
705  
706  /// Start the RPC server with authentication
707  pub async fn start_rpc_server(
708      addr: &str,
709      node: Arc<Node>,
710      agent: Arc<sovereign_agent::agent::AgentRuntime>, // Added agent
711      shared_key: [u8; 32],
712      auth_token: AuthToken,
713  ) -> anyhow::Result<SocketAddr> {
714      let addr: SocketAddr = addr.parse()?;
715      
716      // Build HTTP middleware with auth layer
717      let http_middleware = ServiceBuilder::new()
718          .layer(AuthLayer::new(auth_token));
719      
720      let server = Server::builder()
721          .set_http_middleware(http_middleware)
722          .build(addr)
723          .await?;
724  
725      let bound_addr = server.local_addr()?;
726      info!(addr = %bound_addr, "RPC server listening (authentication required)");
727  
728      let rpc = RpcServer::new(node, agent, shared_key); // Pass agent
729      let handle = server.start(rpc.into_rpc());
730  
731      // Spawn server task
732      tokio::spawn(async move {
733          handle.stopped().await;
734          info!("RPC server stopped");
735      });
736  
737      Ok(bound_addr)
738  }
739  
740  /// Hex encode bytes
741  fn hex_encode(bytes: impl AsRef<[u8]>) -> String {
742      bytes.as_ref().iter().map(|b| format!("{:02x}", b)).collect()
743  }
744  
745  /// Hex decode string to bytes
746  fn hex_decode(s: &str) -> Result<Vec<u8>, String> {
747      if !s.len().is_multiple_of(2) {
748          return Err("Odd length hex string".to_string());
749      }
750      let mut bytes = Vec::with_capacity(s.len() / 2);
751      for chunk in s.as_bytes().chunks(2) {
752          let hex_str = std::str::from_utf8(chunk).map_err(|e| e.to_string())?;
753          let byte = u8::from_str_radix(hex_str, 16).map_err(|e| e.to_string())?;
754          bytes.push(byte);
755      }
756      Ok(bytes)
757  }
758  
759  /// Convert Unix timestamp (ms) to ISO 8601 string
760  fn timestamp_to_iso(ms: u64) -> String {
761      use std::time::{Duration, UNIX_EPOCH};
762      let secs = ms / 1000;
763      let d = UNIX_EPOCH + Duration::from_secs(secs);
764      // Simple ISO format - not perfect but good enough for RPC
765      format!("{:?}", d)
766  }
767  
768  #[cfg(test)]
769  mod tests {
770      use super::*;
771  
772      #[test]
773      fn test_hex_encode() {
774          assert_eq!(hex_encode([0xDE, 0xAD, 0xBE, 0xEF]), "deadbeef");
775      }
776  
777      #[test]
778      fn test_hex_decode() {
779          let hex = "deadbeef";
780          let bytes = hex_decode(hex).unwrap();
781          assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]);
782      }
783  }