/ node / src / ipc_server.rs
ipc_server.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  //! # ALPHA Node IPC Server
 17  //!
 18  //! Unix domain socket IPC server for the ALPHA chain node.
 19  //! Provides JSON-RPC style interface for bridge communication.
 20  //!
 21  //! ## Supported Methods
 22  //!
 23  //! - `getNodeStatus` - Get current node status (height, syncing, peers)
 24  //! - `getBlock` - Get block by height
 25  //! - `subscribeBlocks` - Subscribe to new block notifications
 26  //! - `submitAttestation` - Submit a state root attestation
 27  //! - `submitMessage` - Submit a cross-chain message
 28  //! - `getStatus` - Get detailed chain status
 29  //! - `getPendingWithdrawals` - Get pending withdrawal requests
 30  //! - `confirmWithdrawal` - Confirm a withdrawal was processed
 31  
 32  use alphavm::prelude::{Ledger, Network, store::helpers::rocksdb::ConsensusDB};
 33  #[cfg(feature = "locktick")]
 34  use locktick::tokio::RwLock;
 35  use serde::{Deserialize, Serialize};
 36  use std::{collections::HashMap, path::PathBuf, sync::Arc};
 37  #[cfg(not(feature = "locktick"))]
 38  use tokio::sync::RwLock;
 39  use tokio::{
 40      io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
 41      net::{UnixListener, UnixStream},
 42      sync::{broadcast, mpsc},
 43  };
 44  
 45  /// Default socket path for ALPHA node IPC
 46  pub const DEFAULT_ALPHA_SOCKET: &str = "/tmp/adnet/alpha.sock";
 47  
 48  /// IPC request message (JSON-RPC style)
 49  #[derive(Debug, Clone, Serialize, Deserialize)]
 50  pub struct IpcRequest {
 51      pub id: u64,
 52      pub method: String,
 53      pub params: serde_json::Value,
 54  }
 55  
 56  /// IPC response message
 57  #[derive(Debug, Clone, Serialize, Deserialize)]
 58  pub struct IpcResponse {
 59      pub id: u64,
 60      #[serde(skip_serializing_if = "Option::is_none")]
 61      pub result: Option<serde_json::Value>,
 62      #[serde(skip_serializing_if = "Option::is_none")]
 63      pub error: Option<String>,
 64  }
 65  
 66  impl IpcResponse {
 67      /// Create a success response
 68      pub fn success(id: u64, result: serde_json::Value) -> Self {
 69          Self { id, result: Some(result), error: None }
 70      }
 71  
 72      /// Create an error response
 73      pub fn error(id: u64, message: impl Into<String>) -> Self {
 74          Self { id, result: None, error: Some(message.into()) }
 75      }
 76  }
 77  
 78  /// Block notification sent to subscribers
 79  #[derive(Debug, Clone, Serialize, Deserialize)]
 80  pub struct BlockNotification {
 81      pub height: u64,
 82      pub state_root: String,
 83      pub previous_state_root: String,
 84      pub timestamp: u64,
 85      pub transaction_count: u32,
 86  }
 87  
 88  /// Node status response
 89  #[derive(Debug, Clone, Serialize, Deserialize)]
 90  pub struct NodeStatusResponse {
 91      pub block_height: u64,
 92      pub is_syncing: bool,
 93      pub peer_count: usize,
 94  }
 95  
 96  /// Attestation request
 97  #[derive(Debug, Clone, Deserialize)]
 98  pub struct AttestationRequest {
 99      pub source_chain: String,
100      pub target_chain: String,
101      pub block_height: u64,
102      pub state_root: String,
103      pub previous_state_root: String,
104      pub timestamp: u64,
105      pub validator_epoch: u64,
106  }
107  
108  /// Message request
109  #[derive(Debug, Clone, Deserialize)]
110  pub struct MessageRequest {
111      pub message_type: String,
112      pub source_chain: String,
113      pub source_height: u64,
114      pub validator_epoch: u64,
115      pub payload: serde_json::Value,
116  }
117  
118  /// Withdrawal info
119  #[derive(Debug, Clone, Serialize, Deserialize)]
120  pub struct WithdrawalInfo {
121      pub nonce: u64,
122      pub amount: u64,
123      pub recipient: String,
124      pub status: String,
125  }
126  
127  /// IPC server configuration
128  #[derive(Clone, Debug)]
129  pub struct IpcServerConfig {
130      /// Socket path
131      pub socket_path: PathBuf,
132      /// Maximum concurrent connections
133      pub max_connections: usize,
134      /// Buffer size for block notifications
135      pub notification_buffer_size: usize,
136  }
137  
138  impl Default for IpcServerConfig {
139      fn default() -> Self {
140          Self { socket_path: PathBuf::from(DEFAULT_ALPHA_SOCKET), max_connections: 100, notification_buffer_size: 1000 }
141      }
142  }
143  
144  /// State shared across all IPC connections
145  pub struct IpcServerState<N: Network> {
146      /// Reference to the ledger
147      pub ledger: Option<Arc<Ledger<N, ConsensusDB<N>>>>,
148      /// Current block height
149      pub block_height: RwLock<u64>,
150      /// Is node syncing
151      pub is_syncing: RwLock<bool>,
152      /// Connected peer count
153      pub peer_count: RwLock<usize>,
154      /// Pending attestations (for testing/mock)
155      pub pending_attestations: RwLock<Vec<AttestationRequest>>,
156      /// Pending messages (for testing/mock)
157      pub pending_messages: RwLock<Vec<MessageRequest>>,
158      /// Pending withdrawals
159      pub pending_withdrawals: RwLock<HashMap<u64, WithdrawalInfo>>,
160      /// Block notification broadcaster
161      pub block_tx: broadcast::Sender<BlockNotification>,
162  }
163  
164  impl<N: Network> IpcServerState<N> {
165      /// Create new server state
166      pub fn new(ledger: Option<Arc<Ledger<N, ConsensusDB<N>>>>) -> Self {
167          let (block_tx, _) = broadcast::channel(1000);
168          Self {
169              ledger,
170              block_height: RwLock::new(0),
171              is_syncing: RwLock::new(true),
172              peer_count: RwLock::new(0),
173              pending_attestations: RwLock::new(Vec::new()),
174              pending_messages: RwLock::new(Vec::new()),
175              pending_withdrawals: RwLock::new(HashMap::new()),
176              block_tx,
177          }
178      }
179  
180      /// Update block height
181      pub async fn set_block_height(&self, height: u64) {
182          *self.block_height.write().await = height;
183      }
184  
185      /// Update sync status
186      pub async fn set_syncing(&self, syncing: bool) {
187          *self.is_syncing.write().await = syncing;
188      }
189  
190      /// Update peer count
191      pub async fn set_peer_count(&self, count: usize) {
192          *self.peer_count.write().await = count;
193      }
194  
195      /// Broadcast a new block notification
196      pub fn broadcast_block(&self, notification: BlockNotification) {
197          // Ignore send errors (no subscribers)
198          let _ = self.block_tx.send(notification);
199      }
200  }
201  
202  /// ALPHA IPC Server
203  pub struct AlphaIpcServer<N: Network> {
204      config: IpcServerConfig,
205      state: Arc<IpcServerState<N>>,
206      shutdown_tx: Option<mpsc::Sender<()>>,
207  }
208  
209  impl<N: Network> AlphaIpcServer<N> {
210      /// Create a new IPC server
211      pub fn new(config: IpcServerConfig, ledger: Option<Arc<Ledger<N, ConsensusDB<N>>>>) -> Self {
212          Self { config, state: Arc::new(IpcServerState::new(ledger)), shutdown_tx: None }
213      }
214  
215      /// Create with default configuration
216      pub fn with_defaults(ledger: Option<Arc<Ledger<N, ConsensusDB<N>>>>) -> Self {
217          Self::new(IpcServerConfig::default(), ledger)
218      }
219  
220      /// Get reference to server state
221      pub fn state(&self) -> &Arc<IpcServerState<N>> {
222          &self.state
223      }
224  
225      /// Start the IPC server
226      pub async fn start(&mut self) -> anyhow::Result<()> {
227          // Ensure socket directory exists
228          if let Some(parent) = self.config.socket_path.parent() {
229              std::fs::create_dir_all(parent)?;
230          }
231  
232          // Remove existing socket file
233          if self.config.socket_path.exists() {
234              std::fs::remove_file(&self.config.socket_path)?;
235          }
236  
237          let listener = UnixListener::bind(&self.config.socket_path)?;
238          tracing::info!("ALPHA IPC server listening on {:?}", self.config.socket_path);
239  
240          let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
241          self.shutdown_tx = Some(shutdown_tx);
242  
243          let state = self.state.clone();
244          let max_connections = self.config.max_connections;
245  
246          tokio::spawn(async move {
247              let connection_count = Arc::new(RwLock::new(0usize));
248  
249              loop {
250                  tokio::select! {
251                      accept_result = listener.accept() => {
252                          match accept_result {
253                              Ok((stream, _addr)) => {
254                                  let current_count = *connection_count.read().await;
255                                  if current_count >= max_connections {
256                                      tracing::warn!("Max IPC connections reached, rejecting");
257                                      continue;
258                                  }
259  
260                                  *connection_count.write().await += 1;
261                                  let state = state.clone();
262                                  let conn_count = connection_count.clone();
263  
264                                  tokio::spawn(async move {
265                                      if let Err(e) = handle_connection(stream, state).await {
266                                          tracing::error!("IPC connection error: {}", e);
267                                      }
268                                      *conn_count.write().await -= 1;
269                                  });
270                              }
271                              Err(e) => {
272                                  tracing::error!("IPC accept error: {}", e);
273                              }
274                          }
275                      }
276                      _ = shutdown_rx.recv() => {
277                          tracing::info!("ALPHA IPC server shutting down");
278                          break;
279                      }
280                  }
281              }
282          });
283  
284          Ok(())
285      }
286  
287      /// Stop the IPC server
288      pub async fn stop(&mut self) {
289          if let Some(tx) = self.shutdown_tx.take() {
290              let _ = tx.send(()).await;
291          }
292  
293          // Clean up socket file
294          if self.config.socket_path.exists() {
295              let _ = std::fs::remove_file(&self.config.socket_path);
296          }
297      }
298  }
299  
300  /// Handle a single IPC connection
301  async fn handle_connection<N: Network>(stream: UnixStream, state: Arc<IpcServerState<N>>) -> anyhow::Result<()> {
302      let (read_half, write_half) = stream.into_split();
303      let mut reader = BufReader::new(read_half);
304      let mut writer = BufWriter::new(write_half);
305  
306      // Optional: subscribe to blocks
307      let mut block_rx: Option<broadcast::Receiver<BlockNotification>> = None;
308  
309      loop {
310          let mut line = String::new();
311  
312          tokio::select! {
313              // Check for block notifications if subscribed
314              notification = async {
315                  if let Some(ref mut rx) = block_rx {
316                      rx.recv().await.ok()
317                  } else {
318                      std::future::pending::<Option<BlockNotification>>().await
319                  }
320              } => {
321                  if let Some(notification) = notification {
322                      let json = serde_json::to_string(&notification)?;
323                      writer.write_all(json.as_bytes()).await?;
324                      writer.write_all(b"\n").await?;
325                      writer.flush().await?;
326                  }
327              }
328  
329              // Handle incoming requests
330              result = reader.read_line(&mut line) => {
331                  match result {
332                      Ok(0) => {
333                          // Connection closed
334                          tracing::debug!("IPC client disconnected");
335                          break;
336                      }
337                      Ok(_) => {
338                          let response = match serde_json::from_str::<IpcRequest>(line.trim()) {
339                              Ok(request) => {
340                                  // Handle subscribeBlocks specially
341                                  if request.method == "subscribeBlocks" {
342                                      block_rx = Some(state.block_tx.subscribe());
343                                      IpcResponse::success(request.id, serde_json::json!({"subscribed": true}))
344                                  } else {
345                                      handle_request(&request, &state).await
346                                  }
347                              }
348                              Err(e) => IpcResponse::error(0, format!("Invalid request: {e}")),
349                          };
350  
351                          let response_json = serde_json::to_string(&response)?;
352                          writer.write_all(response_json.as_bytes()).await?;
353                          writer.write_all(b"\n").await?;
354                          writer.flush().await?;
355                      }
356                      Err(e) => {
357                          tracing::error!("IPC read error: {}", e);
358                          break;
359                      }
360                  }
361              }
362          }
363      }
364  
365      Ok(())
366  }
367  
368  /// Handle an IPC request
369  async fn handle_request<N: Network>(request: &IpcRequest, state: &Arc<IpcServerState<N>>) -> IpcResponse {
370      match request.method.as_str() {
371          "getNodeStatus" => {
372              let height = *state.block_height.read().await;
373              let is_syncing = *state.is_syncing.read().await;
374              let peer_count = *state.peer_count.read().await;
375  
376              IpcResponse::success(
377                  request.id,
378                  serde_json::to_value(NodeStatusResponse { block_height: height, is_syncing, peer_count })
379                      .unwrap_or_default(),
380              )
381          }
382  
383          "getBlock" => {
384              let height = match request.params.get("height").and_then(|h| h.as_u64()) {
385                  Some(h) => h,
386                  None => return IpcResponse::error(request.id, "Missing or invalid height parameter"),
387              };
388  
389              // Try to get block from ledger
390              if let Some(ref ledger) = state.ledger {
391                  match ledger.get_block(height as u32) {
392                      Ok(block) => {
393                          let notification = BlockNotification {
394                              height,
395                              state_root: format!("{:?}", block.header().previous_state_root()),
396                              previous_state_root: format!("{:?}", block.header().previous_state_root()),
397                              timestamp: block.header().metadata().timestamp() as u64,
398                              transaction_count: block.transactions().len() as u32,
399                          };
400                          IpcResponse::success(request.id, serde_json::to_value(notification).unwrap_or_default())
401                      }
402                      Err(e) => IpcResponse::error(request.id, format!("Block not found: {e}")),
403                  }
404              } else {
405                  // Return mock data if no ledger
406                  let notification = BlockNotification {
407                      height,
408                      state_root: format!("0x{height:064x}"),
409                      previous_state_root: format!("0x{:064x}", height.saturating_sub(1)),
410                      timestamp: std::time::SystemTime::now()
411                          .duration_since(std::time::UNIX_EPOCH)
412                          .unwrap_or_default()
413                          .as_secs(),
414                      transaction_count: 0,
415                  };
416                  IpcResponse::success(request.id, serde_json::to_value(notification).unwrap_or_default())
417              }
418          }
419  
420          "submitAttestation" => match serde_json::from_value::<AttestationRequest>(request.params.clone()) {
421              Ok(attestation) => {
422                  state.pending_attestations.write().await.push(attestation);
423                  IpcResponse::success(request.id, serde_json::json!(true))
424              }
425              Err(e) => IpcResponse::error(request.id, format!("Invalid attestation: {e}")),
426          },
427  
428          "submitMessage" => match serde_json::from_value::<MessageRequest>(request.params.clone()) {
429              Ok(message) => {
430                  state.pending_messages.write().await.push(message);
431                  IpcResponse::success(request.id, serde_json::json!(true))
432              }
433              Err(e) => IpcResponse::error(request.id, format!("Invalid message: {e}")),
434          },
435  
436          "getStatus" => {
437              let height = *state.block_height.read().await;
438              let is_syncing = *state.is_syncing.read().await;
439              let peer_count = *state.peer_count.read().await;
440              let pending_attestations = state.pending_attestations.read().await.len();
441              let pending_messages = state.pending_messages.read().await.len();
442  
443              IpcResponse::success(
444                  request.id,
445                  serde_json::json!({
446                      "height": height,
447                      "is_syncing": is_syncing,
448                      "peer_count": peer_count,
449                      "pending_attestations": pending_attestations,
450                      "pending_messages": pending_messages,
451                  }),
452              )
453          }
454  
455          "getPendingWithdrawals" => {
456              let withdrawals: Vec<WithdrawalInfo> = state.pending_withdrawals.read().await.values().cloned().collect();
457  
458              IpcResponse::success(request.id, serde_json::to_value(withdrawals).unwrap_or_default())
459          }
460  
461          "confirmWithdrawal" => {
462              let nonce = match request.params.get("nonce").and_then(|n| n.as_u64()) {
463                  Some(n) => n,
464                  None => return IpcResponse::error(request.id, "Missing or invalid nonce parameter"),
465              };
466  
467              let mut withdrawals = state.pending_withdrawals.write().await;
468              if let Some(withdrawal) = withdrawals.get_mut(&nonce) {
469                  withdrawal.status = "confirmed".to_string();
470                  IpcResponse::success(request.id, serde_json::json!(true))
471              } else {
472                  IpcResponse::error(request.id, format!("Withdrawal {nonce} not found"))
473              }
474          }
475  
476          _ => IpcResponse::error(request.id, format!("Unknown method: {}", request.method)),
477      }
478  }
479  
480  #[cfg(test)]
481  mod tests {
482      use super::*;
483  
484      #[test]
485      fn test_ipc_response_success() {
486          let response = IpcResponse::success(1, serde_json::json!({"test": true}));
487          assert_eq!(response.id, 1);
488          assert!(response.result.is_some());
489          assert!(response.error.is_none());
490      }
491  
492      #[test]
493      fn test_ipc_response_error() {
494          let response = IpcResponse::error(2, "test error");
495          assert_eq!(response.id, 2);
496          assert!(response.result.is_none());
497          assert_eq!(response.error, Some("test error".to_string()));
498      }
499  
500      #[test]
501      fn test_config_default() {
502          let config = IpcServerConfig::default();
503          assert_eq!(config.socket_path, PathBuf::from(DEFAULT_ALPHA_SOCKET));
504          assert_eq!(config.max_connections, 100);
505      }
506  
507      #[tokio::test]
508      async fn test_server_state() {
509          use alphavm::console::network::MainnetV0;
510  
511          let state: IpcServerState<MainnetV0> = IpcServerState::new(None);
512  
513          state.set_block_height(100).await;
514          assert_eq!(*state.block_height.read().await, 100);
515  
516          state.set_syncing(false).await;
517          assert!(!*state.is_syncing.read().await);
518  
519          state.set_peer_count(5).await;
520          assert_eq!(*state.peer_count.read().await, 5);
521      }
522  
523      #[tokio::test]
524      async fn test_handle_get_node_status() {
525          use alphavm::console::network::MainnetV0;
526  
527          let state: Arc<IpcServerState<MainnetV0>> = Arc::new(IpcServerState::new(None));
528          state.set_block_height(42).await;
529          state.set_syncing(false).await;
530          state.set_peer_count(10).await;
531  
532          let request = IpcRequest { id: 1, method: "getNodeStatus".to_string(), params: serde_json::json!({}) };
533  
534          let response = handle_request(&request, &state).await;
535          assert!(response.error.is_none());
536  
537          let result = response.result.unwrap();
538          assert_eq!(result["block_height"], 42);
539          assert_eq!(result["is_syncing"], false);
540          assert_eq!(result["peer_count"], 10);
541      }
542  
543      #[tokio::test]
544      async fn test_handle_unknown_method() {
545          use alphavm::console::network::MainnetV0;
546  
547          let state: Arc<IpcServerState<MainnetV0>> = Arc::new(IpcServerState::new(None));
548  
549          let request = IpcRequest { id: 1, method: "unknownMethod".to_string(), params: serde_json::json!({}) };
550  
551          let response = handle_request(&request, &state).await;
552          assert!(response.error.is_some());
553          assert!(response.error.unwrap().contains("Unknown method"));
554      }
555  }