/ node / src / ipc_server.rs
ipc_server.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // DeltaOS IPC Server implementation
  3  // SPDX-License-Identifier: Apache-2.0
  4  
  5  //! # DELTA Node IPC Server
  6  //!
  7  //! Unix domain socket IPC server for the DELTA chain node.
  8  //! Provides JSON-RPC style interface for bridge communication.
  9  //!
 10  //! ## Supported Methods
 11  //!
 12  //! - `getNodeStatus` - Get current node status
 13  //! - `getBlock` - Get block by height
 14  //! - `getStatus` - Get detailed chain status (halted, attestations, etc.)
 15  //! - `submitAttestation` - Submit a state root attestation from ALPHA
 16  //! - `submitMessage` - Submit a cross-chain message
 17  //! - `getPendingWithdrawals` - Get pending withdrawal requests
 18  //! - `confirmWithdrawal` - Confirm a withdrawal was processed on ALPHA
 19  //! - `subscribeBlocks` - Subscribe to new block notifications
 20  
 21  use deltavm_execution::DeltaRuntime;
 22  use serde::{Deserialize, Serialize};
 23  use std::collections::HashMap;
 24  use std::path::PathBuf;
 25  use std::sync::Arc;
 26  use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
 27  use tokio::net::{UnixListener, UnixStream};
 28  use tokio::sync::{RwLock, broadcast, mpsc};
 29  
 30  /// Default socket path for DELTA node IPC
 31  pub const DEFAULT_DELTA_SOCKET: &str = "/tmp/adnet/delta.sock";
 32  
 33  /// IPC request message (JSON-RPC style)
 34  #[derive(Debug, Clone, Serialize, Deserialize)]
 35  pub struct IpcRequest {
 36      pub id: u64,
 37      pub method: String,
 38      pub params: serde_json::Value,
 39  }
 40  
 41  /// IPC response message
 42  #[derive(Debug, Clone, Serialize, Deserialize)]
 43  pub struct IpcResponse {
 44      pub id: u64,
 45      #[serde(skip_serializing_if = "Option::is_none")]
 46      pub result: Option<serde_json::Value>,
 47      #[serde(skip_serializing_if = "Option::is_none")]
 48      pub error: Option<String>,
 49  }
 50  
 51  impl IpcResponse {
 52      /// Create a success response
 53      pub fn success(id: u64, result: serde_json::Value) -> Self {
 54          Self {
 55              id,
 56              result: Some(result),
 57              error: None,
 58          }
 59      }
 60  
 61      /// Create an error response
 62      pub fn error(id: u64, message: impl Into<String>) -> Self {
 63          Self {
 64              id,
 65              result: None,
 66              error: Some(message.into()),
 67          }
 68      }
 69  }
 70  
 71  /// Block notification sent to subscribers
 72  #[derive(Debug, Clone, Serialize, Deserialize)]
 73  pub struct BlockNotification {
 74      pub height: u64,
 75      pub state_root: String,
 76      pub previous_state_root: String,
 77      pub timestamp: u64,
 78      pub transaction_count: u32,
 79  }
 80  
 81  /// Node status response
 82  #[derive(Debug, Clone, Serialize, Deserialize)]
 83  pub struct NodeStatusResponse {
 84      pub block_height: u64,
 85      pub is_syncing: bool,
 86      pub peer_count: usize,
 87  }
 88  
 89  /// Chain status response (detailed)
 90  #[derive(Debug, Clone, Serialize, Deserialize)]
 91  pub struct ChainStatusResponse {
 92      pub height: u64,
 93      pub halted: bool,
 94      pub halt_reason: Option<String>,
 95      pub last_alpha_height: Option<u64>,
 96      pub pending_deposits: usize,
 97      pub pending_withdrawals: usize,
 98  }
 99  
100  /// Attestation request from ALPHA chain
101  #[derive(Debug, Clone, Serialize, Deserialize)]
102  pub struct AttestationRequest {
103      pub source_chain: String,
104      pub target_chain: String,
105      pub block_height: u64,
106      pub state_root: String,
107      pub previous_state_root: String,
108      pub timestamp: u64,
109      pub validator_epoch: u64,
110  }
111  
112  /// Cross-chain message request
113  #[derive(Debug, Clone, Serialize, Deserialize)]
114  pub struct MessageRequest {
115      pub message_type: String,
116      pub source_chain: String,
117      pub source_height: u64,
118      pub validator_epoch: u64,
119      pub payload: serde_json::Value,
120  }
121  
122  /// Withdrawal info
123  #[derive(Debug, Clone, Serialize, Deserialize)]
124  pub struct WithdrawalInfo {
125      pub nonce: u64,
126      pub amount: u64,
127      pub recipient: String,
128      pub status: String,
129      pub delta_height: u64,
130      pub created_at: u64,
131  }
132  
133  /// Deposit info
134  #[derive(Debug, Clone, Serialize, Deserialize)]
135  pub struct DepositInfo {
136      pub nonce: u64,
137      pub amount: u64,
138      pub sender: String,
139      pub recipient: String,
140      pub alpha_height: u64,
141      pub status: String,
142  }
143  
144  /// Halt reason enum for serialization
145  #[derive(Debug, Clone, Serialize, Deserialize)]
146  pub enum HaltReason {
147      MissingAttestations,
148      PoolImbalance,
149      InvalidState,
150      GovernanceHalt,
151  }
152  
153  impl std::fmt::Display for HaltReason {
154      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155          match self {
156              HaltReason::MissingAttestations => write!(f, "MissingAttestations"),
157              HaltReason::PoolImbalance => write!(f, "PoolImbalance"),
158              HaltReason::InvalidState => write!(f, "InvalidState"),
159              HaltReason::GovernanceHalt => write!(f, "GovernanceHalt"),
160          }
161      }
162  }
163  
164  /// IPC server configuration
165  #[derive(Clone, Debug)]
166  pub struct IpcServerConfig {
167      /// Socket path
168      pub socket_path: PathBuf,
169      /// Maximum concurrent connections
170      pub max_connections: usize,
171      /// Buffer size for block notifications
172      pub notification_buffer_size: usize,
173  }
174  
175  impl Default for IpcServerConfig {
176      fn default() -> Self {
177          Self {
178              socket_path: PathBuf::from(DEFAULT_DELTA_SOCKET),
179              max_connections: 100,
180              notification_buffer_size: 1000,
181          }
182      }
183  }
184  
185  /// State shared across all IPC connections
186  pub struct IpcServerState {
187      /// Reference to the runtime
188      pub runtime: Option<Arc<RwLock<DeltaRuntime>>>,
189      /// Current block height
190      pub block_height: RwLock<u64>,
191      /// Is node halted
192      pub is_halted: RwLock<bool>,
193      /// Halt reason
194      pub halt_reason: RwLock<Option<HaltReason>>,
195      /// Last ALPHA height processed
196      pub last_alpha_height: RwLock<Option<u64>>,
197      /// Connected peer count
198      pub peer_count: RwLock<usize>,
199      /// Pending deposits
200      pub pending_deposits: RwLock<HashMap<u64, DepositInfo>>,
201      /// Pending withdrawals
202      pub pending_withdrawals: RwLock<HashMap<u64, WithdrawalInfo>>,
203      /// Received attestations
204      pub attestations: RwLock<Vec<AttestationRequest>>,
205      /// Received messages
206      pub messages: RwLock<Vec<MessageRequest>>,
207      /// Block notification broadcaster
208      pub block_tx: broadcast::Sender<BlockNotification>,
209      /// Withdrawal nonce counter
210      withdrawal_nonce: RwLock<u64>,
211      /// Deposit nonce counter
212      deposit_nonce: RwLock<u64>,
213  }
214  
215  impl IpcServerState {
216      /// Create new server state
217      pub fn new(runtime: Option<Arc<RwLock<DeltaRuntime>>>) -> Self {
218          let (block_tx, _) = broadcast::channel(1000);
219          Self {
220              runtime,
221              block_height: RwLock::new(0),
222              is_halted: RwLock::new(false),
223              halt_reason: RwLock::new(None),
224              last_alpha_height: RwLock::new(None),
225              peer_count: RwLock::new(0),
226              pending_deposits: RwLock::new(HashMap::new()),
227              pending_withdrawals: RwLock::new(HashMap::new()),
228              attestations: RwLock::new(Vec::new()),
229              messages: RwLock::new(Vec::new()),
230              block_tx,
231              withdrawal_nonce: RwLock::new(0),
232              deposit_nonce: RwLock::new(0),
233          }
234      }
235  
236      /// Update block height
237      pub async fn set_block_height(&self, height: u64) {
238          *self.block_height.write().await = height;
239      }
240  
241      /// Update halt status
242      pub async fn set_halted(&self, halted: bool, reason: Option<HaltReason>) {
243          *self.is_halted.write().await = halted;
244          *self.halt_reason.write().await = reason;
245      }
246  
247      /// Update last ALPHA height
248      pub async fn set_last_alpha_height(&self, height: u64) {
249          *self.last_alpha_height.write().await = Some(height);
250      }
251  
252      /// Update peer count
253      pub async fn set_peer_count(&self, count: usize) {
254          *self.peer_count.write().await = count;
255      }
256  
257      /// Add a pending withdrawal
258      pub async fn add_withdrawal(&self, amount: u64, recipient: String) -> u64 {
259          let mut nonce = self.withdrawal_nonce.write().await;
260          *nonce += 1;
261          let withdrawal_nonce = *nonce;
262  
263          let withdrawal = WithdrawalInfo {
264              nonce: withdrawal_nonce,
265              amount,
266              recipient,
267              status: "pending".to_string(),
268              delta_height: *self.block_height.read().await,
269              created_at: std::time::SystemTime::now()
270                  .duration_since(std::time::UNIX_EPOCH)
271                  .unwrap_or_default()
272                  .as_secs(),
273          };
274  
275          self.pending_withdrawals
276              .write()
277              .await
278              .insert(withdrawal_nonce, withdrawal);
279          withdrawal_nonce
280      }
281  
282      /// Add a pending deposit
283      pub async fn add_deposit(
284          &self,
285          amount: u64,
286          sender: String,
287          recipient: String,
288          alpha_height: u64,
289      ) -> u64 {
290          let mut nonce = self.deposit_nonce.write().await;
291          *nonce += 1;
292          let deposit_nonce = *nonce;
293  
294          let deposit = DepositInfo {
295              nonce: deposit_nonce,
296              amount,
297              sender,
298              recipient,
299              alpha_height,
300              status: "pending".to_string(),
301          };
302  
303          self.pending_deposits
304              .write()
305              .await
306              .insert(deposit_nonce, deposit);
307          deposit_nonce
308      }
309  
310      /// Broadcast a new block notification
311      pub fn broadcast_block(&self, notification: BlockNotification) {
312          let _ = self.block_tx.send(notification);
313      }
314  }
315  
316  /// DELTA IPC Server
317  pub struct DeltaIpcServer {
318      config: IpcServerConfig,
319      state: Arc<IpcServerState>,
320      shutdown_tx: Option<mpsc::Sender<()>>,
321  }
322  
323  impl DeltaIpcServer {
324      /// Create a new IPC server
325      pub fn new(config: IpcServerConfig, runtime: Option<Arc<RwLock<DeltaRuntime>>>) -> Self {
326          Self {
327              config,
328              state: Arc::new(IpcServerState::new(runtime)),
329              shutdown_tx: None,
330          }
331      }
332  
333      /// Create with default configuration
334      pub fn with_defaults(runtime: Option<Arc<RwLock<DeltaRuntime>>>) -> Self {
335          Self::new(IpcServerConfig::default(), runtime)
336      }
337  
338      /// Get reference to server state
339      pub fn state(&self) -> &Arc<IpcServerState> {
340          &self.state
341      }
342  
343      /// Start the IPC server
344      pub async fn start(&mut self) -> anyhow::Result<()> {
345          // Ensure socket directory exists
346          if let Some(parent) = self.config.socket_path.parent() {
347              std::fs::create_dir_all(parent)?;
348          }
349  
350          // Remove existing socket file
351          if self.config.socket_path.exists() {
352              std::fs::remove_file(&self.config.socket_path)?;
353          }
354  
355          let listener = UnixListener::bind(&self.config.socket_path)?;
356          tracing::info!(
357              "DELTA IPC server listening on {:?}",
358              self.config.socket_path
359          );
360  
361          let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
362          self.shutdown_tx = Some(shutdown_tx);
363  
364          let state = self.state.clone();
365          let max_connections = self.config.max_connections;
366  
367          tokio::spawn(async move {
368              let connection_count = Arc::new(RwLock::new(0usize));
369  
370              loop {
371                  tokio::select! {
372                      accept_result = listener.accept() => {
373                          match accept_result {
374                              Ok((stream, _addr)) => {
375                                  let current_count = *connection_count.read().await;
376                                  if current_count >= max_connections {
377                                      tracing::warn!("Max IPC connections reached, rejecting");
378                                      continue;
379                                  }
380  
381                                  *connection_count.write().await += 1;
382                                  let state = state.clone();
383                                  let conn_count = connection_count.clone();
384  
385                                  tokio::spawn(async move {
386                                      if let Err(e) = handle_connection(stream, state).await {
387                                          tracing::error!("IPC connection error: {}", e);
388                                      }
389                                      *conn_count.write().await -= 1;
390                                  });
391                              }
392                              Err(e) => {
393                                  tracing::error!("IPC accept error: {}", e);
394                              }
395                          }
396                      }
397                      _ = shutdown_rx.recv() => {
398                          tracing::info!("DELTA IPC server shutting down");
399                          break;
400                      }
401                  }
402              }
403          });
404  
405          Ok(())
406      }
407  
408      /// Stop the IPC server
409      pub async fn stop(&mut self) {
410          if let Some(tx) = self.shutdown_tx.take() {
411              let _ = tx.send(()).await;
412          }
413  
414          // Clean up socket file
415          if self.config.socket_path.exists() {
416              let _ = std::fs::remove_file(&self.config.socket_path);
417          }
418      }
419  }
420  
421  /// Handle a single IPC connection
422  async fn handle_connection(stream: UnixStream, state: Arc<IpcServerState>) -> anyhow::Result<()> {
423      let (read_half, write_half) = stream.into_split();
424      let mut reader = BufReader::new(read_half);
425      let mut writer = BufWriter::new(write_half);
426  
427      // Optional: subscribe to blocks
428      let mut block_rx: Option<broadcast::Receiver<BlockNotification>> = None;
429  
430      loop {
431          let mut line = String::new();
432  
433          tokio::select! {
434              // Check for block notifications if subscribed
435              notification = async {
436                  if let Some(ref mut rx) = block_rx {
437                      rx.recv().await.ok()
438                  } else {
439                      std::future::pending::<Option<BlockNotification>>().await
440                  }
441              } => {
442                  if let Some(notification) = notification {
443                      let json = serde_json::to_string(&notification)?;
444                      writer.write_all(json.as_bytes()).await?;
445                      writer.write_all(b"\n").await?;
446                      writer.flush().await?;
447                  }
448              }
449  
450              // Handle incoming requests
451              result = reader.read_line(&mut line) => {
452                  match result {
453                      Ok(0) => {
454                          // Connection closed
455                          tracing::debug!("IPC client disconnected");
456                          break;
457                      }
458                      Ok(_) => {
459                          let response = match serde_json::from_str::<IpcRequest>(line.trim()) {
460                              Ok(request) => {
461                                  // Handle subscribeBlocks specially
462                                  if request.method == "subscribeBlocks" {
463                                      block_rx = Some(state.block_tx.subscribe());
464                                      IpcResponse::success(request.id, serde_json::json!({"subscribed": true}))
465                                  } else {
466                                      handle_request(&request, &state).await
467                                  }
468                              }
469                              Err(e) => IpcResponse::error(0, format!("Invalid request: {}", e)),
470                          };
471  
472                          let response_json = serde_json::to_string(&response)?;
473                          writer.write_all(response_json.as_bytes()).await?;
474                          writer.write_all(b"\n").await?;
475                          writer.flush().await?;
476                      }
477                      Err(e) => {
478                          tracing::error!("IPC read error: {}", e);
479                          break;
480                      }
481                  }
482              }
483          }
484      }
485  
486      Ok(())
487  }
488  
489  /// Handle an IPC request
490  async fn handle_request(request: &IpcRequest, state: &Arc<IpcServerState>) -> IpcResponse {
491      match request.method.as_str() {
492          "getNodeStatus" => {
493              let height = *state.block_height.read().await;
494              let is_halted = *state.is_halted.read().await;
495              let peer_count = *state.peer_count.read().await;
496  
497              IpcResponse::success(
498                  request.id,
499                  serde_json::to_value(NodeStatusResponse {
500                      block_height: height,
501                      is_syncing: !is_halted, // Simplified: not halted = syncing
502                      peer_count,
503                  })
504                  .unwrap_or_default(),
505              )
506          }
507  
508          "getStatus" => {
509              let height = *state.block_height.read().await;
510              let halted = *state.is_halted.read().await;
511              let halt_reason = state
512                  .halt_reason
513                  .read()
514                  .await
515                  .as_ref()
516                  .map(|r| r.to_string());
517              let last_alpha_height = *state.last_alpha_height.read().await;
518              let pending_deposits = state.pending_deposits.read().await.len();
519              let pending_withdrawals = state.pending_withdrawals.read().await.len();
520  
521              IpcResponse::success(
522                  request.id,
523                  serde_json::to_value(ChainStatusResponse {
524                      height,
525                      halted,
526                      halt_reason,
527                      last_alpha_height,
528                      pending_deposits,
529                      pending_withdrawals,
530                  })
531                  .unwrap_or_default(),
532              )
533          }
534  
535          "getBlock" => {
536              let height = match request.params.get("height").and_then(|h| h.as_u64()) {
537                  Some(h) => h,
538                  None => {
539                      return IpcResponse::error(request.id, "Missing or invalid height parameter");
540                  }
541              };
542  
543              // Return mock block data (runtime doesn't have block storage in current impl)
544              let notification = BlockNotification {
545                  height,
546                  state_root: format!("0x{:064x}", height),
547                  previous_state_root: format!("0x{:064x}", height.saturating_sub(1)),
548                  timestamp: std::time::SystemTime::now()
549                      .duration_since(std::time::UNIX_EPOCH)
550                      .unwrap_or_default()
551                      .as_secs(),
552                  transaction_count: 0,
553              };
554              IpcResponse::success(
555                  request.id,
556                  serde_json::to_value(notification).unwrap_or_default(),
557              )
558          }
559  
560          "submitAttestation" => {
561              match serde_json::from_value::<AttestationRequest>(request.params.clone()) {
562                  Ok(attestation) => {
563                      // Update last ALPHA height
564                      state.set_last_alpha_height(attestation.block_height).await;
565  
566                      // Store attestation
567                      state.attestations.write().await.push(attestation);
568  
569                      IpcResponse::success(request.id, serde_json::json!(true))
570                  }
571                  Err(e) => IpcResponse::error(request.id, format!("Invalid attestation: {}", e)),
572              }
573          }
574  
575          "submitMessage" => {
576              match serde_json::from_value::<MessageRequest>(request.params.clone()) {
577                  Ok(message) => {
578                      // Handle deposit messages
579                      if message.message_type == "deposit"
580                          && let (Some(amount), Some(sender), Some(recipient)) = (
581                              message.payload.get("amount").and_then(|a| a.as_u64()),
582                              message.payload.get("sender").and_then(|s| s.as_str()),
583                              message.payload.get("recipient").and_then(|r| r.as_str()),
584                          )
585                      {
586                          state
587                              .add_deposit(
588                                  amount,
589                                  sender.to_string(),
590                                  recipient.to_string(),
591                                  message.source_height,
592                              )
593                              .await;
594                      }
595  
596                      state.messages.write().await.push(message);
597                      IpcResponse::success(request.id, serde_json::json!(true))
598                  }
599                  Err(e) => IpcResponse::error(request.id, format!("Invalid message: {}", e)),
600              }
601          }
602  
603          "getPendingWithdrawals" => {
604              let withdrawals: Vec<WithdrawalInfo> = state
605                  .pending_withdrawals
606                  .read()
607                  .await
608                  .values()
609                  .filter(|w| w.status == "pending")
610                  .cloned()
611                  .collect();
612  
613              IpcResponse::success(
614                  request.id,
615                  serde_json::to_value(withdrawals).unwrap_or_default(),
616              )
617          }
618  
619          "confirmWithdrawal" => {
620              let nonce = match request.params.get("nonce").and_then(|n| n.as_u64()) {
621                  Some(n) => n,
622                  None => {
623                      return IpcResponse::error(request.id, "Missing or invalid nonce parameter");
624                  }
625              };
626  
627              let mut withdrawals = state.pending_withdrawals.write().await;
628              if let Some(withdrawal) = withdrawals.get_mut(&nonce) {
629                  withdrawal.status = "confirmed".to_string();
630                  IpcResponse::success(request.id, serde_json::json!(true))
631              } else {
632                  IpcResponse::error(request.id, format!("Withdrawal {} not found", nonce))
633              }
634          }
635  
636          "getPendingDeposits" => {
637              let deposits: Vec<DepositInfo> = state
638                  .pending_deposits
639                  .read()
640                  .await
641                  .values()
642                  .filter(|d| d.status == "pending")
643                  .cloned()
644                  .collect();
645  
646              IpcResponse::success(
647                  request.id,
648                  serde_json::to_value(deposits).unwrap_or_default(),
649              )
650          }
651  
652          "confirmDeposit" => {
653              let nonce = match request.params.get("nonce").and_then(|n| n.as_u64()) {
654                  Some(n) => n,
655                  None => {
656                      return IpcResponse::error(request.id, "Missing or invalid nonce parameter");
657                  }
658              };
659  
660              let mut deposits = state.pending_deposits.write().await;
661              if let Some(deposit) = deposits.get_mut(&nonce) {
662                  deposit.status = "confirmed".to_string();
663                  IpcResponse::success(request.id, serde_json::json!(true))
664              } else {
665                  IpcResponse::error(request.id, format!("Deposit {} not found", nonce))
666              }
667          }
668  
669          _ => IpcResponse::error(request.id, format!("Unknown method: {}", request.method)),
670      }
671  }
672  
673  #[cfg(test)]
674  mod tests {
675      use super::*;
676  
677      #[test]
678      fn test_ipc_response_success() {
679          let response = IpcResponse::success(1, serde_json::json!({"test": true}));
680          assert_eq!(response.id, 1);
681          assert!(response.result.is_some());
682          assert!(response.error.is_none());
683      }
684  
685      #[test]
686      fn test_ipc_response_error() {
687          let response = IpcResponse::error(2, "test error");
688          assert_eq!(response.id, 2);
689          assert!(response.result.is_none());
690          assert_eq!(response.error, Some("test error".to_string()));
691      }
692  
693      #[test]
694      fn test_config_default() {
695          let config = IpcServerConfig::default();
696          assert_eq!(config.socket_path, PathBuf::from(DEFAULT_DELTA_SOCKET));
697          assert_eq!(config.max_connections, 100);
698      }
699  
700      #[tokio::test]
701      async fn test_server_state() {
702          let state = IpcServerState::new(None);
703  
704          state.set_block_height(100).await;
705          assert_eq!(*state.block_height.read().await, 100);
706  
707          state
708              .set_halted(true, Some(HaltReason::MissingAttestations))
709              .await;
710          assert!(*state.is_halted.read().await);
711  
712          state.set_peer_count(5).await;
713          assert_eq!(*state.peer_count.read().await, 5);
714      }
715  
716      #[tokio::test]
717      async fn test_add_withdrawal() {
718          let state = IpcServerState::new(None);
719  
720          let nonce = state.add_withdrawal(1000, "recipient1".to_string()).await;
721          assert_eq!(nonce, 1);
722  
723          let nonce2 = state.add_withdrawal(2000, "recipient2".to_string()).await;
724          assert_eq!(nonce2, 2);
725  
726          let withdrawals = state.pending_withdrawals.read().await;
727          assert_eq!(withdrawals.len(), 2);
728      }
729  
730      #[tokio::test]
731      async fn test_handle_get_status() {
732          let state = Arc::new(IpcServerState::new(None));
733          state.set_block_height(42).await;
734          state.set_halted(false, None).await;
735          state.set_last_alpha_height(100).await;
736  
737          let request = IpcRequest {
738              id: 1,
739              method: "getStatus".to_string(),
740              params: serde_json::json!({}),
741          };
742  
743          let response = handle_request(&request, &state).await;
744          assert!(response.error.is_none());
745  
746          let result = response.result.unwrap();
747          assert_eq!(result["height"], 42);
748          assert_eq!(result["halted"], false);
749          assert_eq!(result["last_alpha_height"], 100);
750      }
751  
752      #[tokio::test]
753      async fn test_handle_submit_attestation() {
754          let state = Arc::new(IpcServerState::new(None));
755  
756          let request = IpcRequest {
757              id: 1,
758              method: "submitAttestation".to_string(),
759              params: serde_json::json!({
760                  "source_chain": "ALPHA",
761                  "target_chain": "DELTA",
762                  "block_height": 100,
763                  "state_root": "0x1234",
764                  "previous_state_root": "0x1233",
765                  "timestamp": 1234567890,
766                  "validator_epoch": 1
767              }),
768          };
769  
770          let response = handle_request(&request, &state).await;
771          assert!(response.error.is_none());
772          assert_eq!(response.result, Some(serde_json::json!(true)));
773  
774          // Check attestation was stored
775          let attestations = state.attestations.read().await;
776          assert_eq!(attestations.len(), 1);
777          assert_eq!(attestations[0].block_height, 100);
778  
779          // Check last_alpha_height was updated
780          assert_eq!(*state.last_alpha_height.read().await, Some(100));
781      }
782  
783      #[tokio::test]
784      async fn test_handle_unknown_method() {
785          let state = Arc::new(IpcServerState::new(None));
786  
787          let request = IpcRequest {
788              id: 1,
789              method: "unknownMethod".to_string(),
790              params: serde_json::json!({}),
791          };
792  
793          let response = handle_request(&request, &state).await;
794          assert!(response.error.is_some());
795          assert!(response.error.unwrap().contains("Unknown method"));
796      }
797  }