net.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use async_lock::Mutex; 20 use darkfi_serial::{async_trait, deserialize, Decodable, Encodable, SerialDecodable, VarInt}; 21 use std::{io::Cursor, sync::Arc}; 22 use zeromq::{Socket, SocketRecv, SocketSend}; 23 24 use crate::{ 25 error::{Error, Result}, 26 expr::SExprCode, 27 gfx::{gfxtag, RenderApi}, 28 prop::{PropertyType, Role}, 29 scene::{SceneNodeId, SceneNodePtr, ScenePath}, 30 ExecutorPtr, 31 }; 32 33 #[derive(Debug, SerialDecodable)] 34 #[repr(u8)] 35 enum Command { 36 Hello = 0, 37 AddNode = 1, 38 RemoveNode = 9, 39 RenameNode = 23, 40 ScanDangling = 24, 41 LookupNodeId = 12, 42 AddProperty = 11, 43 LinkNode = 2, 44 UnlinkNode = 8, 45 GetInfo = 19, 46 GetChildren = 4, 47 GetParents = 5, 48 GetProperties = 3, 49 GetPropertyValue = 6, 50 SetPropertyValue = 7, 51 GetSignals = 14, 52 RegisterSlot = 15, 53 UnregisterSlot = 16, 54 LookupSlotId = 17, 55 GetSlots = 18, 56 GetMethods = 20, 57 GetMethod = 21, 58 CallMethod = 22, 59 } 60 61 // Missing calls todo: 62 // GetPropLen 63 // UnsetProperty 64 // SetPropertyNull 65 // PropertyPushNull 66 // PropertyPush 67 // PropertyIsUnset 68 69 pub struct ZeroMQAdapter { 70 /* 71 // req-reply commands 72 req_socket: zmq::Socket, 73 // We cannot share zmq sockets across threads, and we cannot quickly spawn 74 // pub sockets due to address reuse errors. 75 slot_sender: mpsc::SyncSender<(Vec<u8>, Vec<u8>)>, 76 slot_recvr: Option<mpsc::Receiver<(Vec<u8>, Vec<u8>)>>, 77 */ 78 sg_root: SceneNodePtr, 79 render_api: RenderApi, 80 _ex: ExecutorPtr, 81 82 zmq_rep: Mutex<zeromq::RepSocket>, 83 _zmq_pub: Mutex<zeromq::PubSocket>, 84 } 85 86 impl ZeroMQAdapter { 87 pub async fn new(sg_root: SceneNodePtr, render_api: RenderApi, ex: ExecutorPtr) -> Arc<Self> { 88 let mut zmq_rep = zeromq::RepSocket::new(); 89 zmq_rep.bind("tcp://0.0.0.0:9484").await.unwrap(); 90 91 let mut zmq_pub = zeromq::PubSocket::new(); 92 zmq_pub.bind("tcp://0.0.0.0:9485").await.unwrap(); 93 94 Arc::new(Self { 95 sg_root, 96 render_api, 97 _ex: ex, 98 zmq_rep: Mutex::new(zmq_rep), 99 _zmq_pub: Mutex::new(zmq_pub), 100 }) 101 } 102 103 pub async fn run(self: Arc<Self>) { 104 loop { 105 let req = self.zmq_rep.lock().await.recv().await.unwrap(); 106 assert_eq!(req.len(), 2); 107 let cmd = req.get(0).unwrap().to_vec(); 108 assert_eq!(cmd.len(), 1); 109 let payload = req.get(1).unwrap().to_vec(); 110 111 let cmd = deserialize(&cmd).unwrap(); 112 debug!(target: "req", "zmq: {:?} {:?}", cmd, payload); 113 114 let self2 = self.clone(); 115 match self2.process_request(cmd, payload).await { 116 Ok(reply) => { 117 let mut m = zeromq::ZmqMessage::from(vec![0u8]); 118 m.push_back(reply.into()); 119 120 // [errc:1] [reply] 121 self.zmq_rep.lock().await.send(m).await.unwrap(); 122 } 123 Err(err) => { 124 let errc = err as u8; 125 warn!(target: "req", "errc {}: {}", errc, err); 126 127 let mut m = zeromq::ZmqMessage::from(vec![errc]); 128 m.push_back(vec![].into()); 129 130 // [errc:1] [reply] 131 self.zmq_rep.lock().await.send(m).await.unwrap(); 132 } 133 } 134 } 135 } 136 137 async fn process_request(self: Arc<Self>, cmd: Command, payload: Vec<u8>) -> Result<Vec<u8>> { 138 let mut cur = Cursor::new(&payload); 139 let mut reply = vec![]; 140 match cmd { 141 Command::Hello => { 142 debug!(target: "req", "hello()"); 143 assert_eq!(payload.len(), 0); 144 "hello".encode(&mut reply).unwrap(); 145 } 146 Command::GetInfo => { 147 /* 148 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 149 debug!(target: "req", "{:?}({})", cmd, node_id); 150 151 let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?; 152 node.name.encode(&mut reply).unwrap(); 153 node.typ.encode(&mut reply).unwrap(); 154 */ 155 } 156 Command::GetChildren => { 157 let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?; 158 debug!(target: "req", "{cmd:?}({node_path})"); 159 let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?; 160 161 let children: Vec<_> = node 162 .get_children() 163 .iter() 164 .map(|node| (node.name.clone(), node.id, node.typ)) 165 .collect(); 166 children.encode(&mut reply).unwrap(); 167 } 168 Command::GetParents => { 169 /* 170 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 171 debug!(target: "req", "{:?}({})", cmd, node_id); 172 173 let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?; 174 let parents: Vec<_> = node 175 .parents 176 .iter() 177 .map(|node_inf| (node_inf.name.clone(), node_inf.id, node_inf.typ)) 178 .collect(); 179 parents.encode(&mut reply).unwrap(); 180 */ 181 } 182 Command::GetProperties => { 183 let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?; 184 debug!(target: "req", "{cmd:?}({node_path})"); 185 let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?; 186 187 VarInt(node.props.len() as u64).encode(&mut reply).unwrap(); 188 for prop in &node.props { 189 prop.name.encode(&mut reply).unwrap(); 190 prop.typ.encode(&mut reply).unwrap(); 191 prop.subtype.encode(&mut reply).unwrap(); 192 //prop.defaults.encode(&mut reply).unwrap(); 193 prop.ui_name.encode(&mut reply).unwrap(); 194 prop.desc.encode(&mut reply).unwrap(); 195 prop.is_null_allowed.encode(&mut reply).unwrap(); 196 prop.is_expr_allowed.encode(&mut reply).unwrap(); 197 (prop.array_len as u32).encode(&mut reply).unwrap(); 198 prop.min_val.encode(&mut reply).unwrap(); 199 prop.max_val.encode(&mut reply).unwrap(); 200 prop.enum_items.encode(&mut reply).unwrap(); 201 202 let depends: Vec<_> = prop 203 .get_depends() 204 .into_iter() 205 .map(|d| (d.i as u32, d.local_name)) 206 .collect(); 207 depends.encode(&mut reply).unwrap(); 208 } 209 } 210 Command::GetPropertyValue => { 211 let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?; 212 let prop_name = String::decode(&mut cur).unwrap(); 213 debug!(target: "req", "{cmd:?}({node_path}, {prop_name})"); 214 let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?; 215 216 let prop = node.get_property(&prop_name).ok_or(Error::PropertyNotFound)?; 217 prop.typ.encode(&mut reply).unwrap(); 218 VarInt(prop.get_len() as u64).encode(&mut reply).unwrap(); 219 for i in 0..prop.get_len() { 220 let val = prop.get_value(i)?; 221 if val.is_unset() { 222 1u8.encode(&mut reply).unwrap(); 223 let default = &prop.defaults[i]; 224 default.encode(&mut reply).unwrap(); 225 } else if val.is_null() { 226 2u8.encode(&mut reply).unwrap(); 227 } else if val.is_expr() { 228 3u8.encode(&mut reply).unwrap(); 229 } else { 230 0u8.encode(&mut reply).unwrap(); 231 val.encode(&mut reply).unwrap(); 232 } 233 } 234 } 235 Command::SetPropertyValue => { 236 let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?; 237 let prop_name = String::decode(&mut cur).unwrap(); 238 let prop_i = u32::decode(&mut cur).unwrap() as usize; 239 let prop_type = PropertyType::decode(&mut cur).unwrap(); 240 debug!(target: "req", "{cmd:?}({node_path}, {prop_name}, {prop_i}, {prop_type:?})"); 241 242 let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?; 243 let prop = node.get_property(&prop_name).ok_or(Error::PropertyNotFound)?; 244 245 let atom = 246 &mut self.render_api.make_guard(gfxtag!("ZeroMQAdapter::SetPropertyValue")); 247 248 match prop_type { 249 PropertyType::Null => { 250 prop.set_null(atom, Role::User, prop_i)?; 251 } 252 PropertyType::Bool => { 253 let val = bool::decode(&mut cur).unwrap(); 254 prop.set_bool(atom, Role::User, prop_i, val)?; 255 } 256 PropertyType::Uint32 => { 257 let val = u32::decode(&mut cur).unwrap(); 258 prop.set_u32(atom, Role::User, prop_i, val)?; 259 } 260 PropertyType::Float32 => { 261 let val = f32::decode(&mut cur).unwrap(); 262 prop.set_f32(atom, Role::User, prop_i, val)?; 263 } 264 PropertyType::Str => { 265 let val = String::decode(&mut cur).unwrap(); 266 prop.set_str(atom, Role::User, prop_i, val)?; 267 } 268 PropertyType::Enum => { 269 let val = String::decode(&mut cur).unwrap(); 270 prop.set_enum(atom, Role::User, prop_i, val)?; 271 } 272 PropertyType::SceneNodeId => { 273 let val = SceneNodeId::decode(&mut cur).unwrap(); 274 prop.set_node_id(atom, Role::User, prop_i, val)?; 275 } 276 PropertyType::SExpr => { 277 let val = SExprCode::decode(&mut cur).unwrap(); 278 debug!(target: "req", " received code {:?}", val); 279 prop.set_expr(atom, Role::User, prop_i, val)?; 280 } 281 } 282 } 283 Command::AddNode => { 284 /* 285 let node_name = String::decode(&mut cur).unwrap(); 286 let node_type = SceneNodeType::decode(&mut cur).unwrap(); 287 debug!(target: "req", "{:?}({}, {:?})", cmd, node_name, node_type); 288 289 let node_id = scene_graph.add_node(&node_name, node_type).id; 290 node_id.encode(&mut reply).unwrap(); 291 */ 292 } 293 Command::RemoveNode => { 294 /* 295 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 296 debug!(target: "req", "{:?}({})", cmd, node_id); 297 scene_graph.remove_node(node_id)?; 298 */ 299 } 300 Command::RenameNode => { 301 /* 302 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 303 let node_name = String::decode(&mut cur).unwrap(); 304 debug!(target: "req", "{:?}({}, {})", cmd, node_id, node_name); 305 scene_graph.rename_node(node_id, node_name)?; 306 */ 307 } 308 Command::ScanDangling => { 309 /* 310 let dangling = scene_graph.scan_dangling(); 311 dangling.encode(&mut reply).unwrap(); 312 */ 313 } 314 Command::LookupNodeId => { 315 /* 316 let node_path: String = deserialize(&payload).unwrap(); 317 debug!(target: "req", "{:?}({})", cmd, node_path); 318 let node_id = scene_graph.lookup_node_id(&node_path).ok_or(Error::NodeNotFound)?; 319 node_id.encode(&mut reply).unwrap(); 320 */ 321 } 322 Command::AddProperty => { 323 /* 324 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 325 let prop_name = String::decode(&mut cur).unwrap(); 326 let prop_type = PropertyType::decode(&mut cur).unwrap(); 327 let prop_subtype = PropertySubType::decode(&mut cur).unwrap(); 328 329 debug!(target: "req", "{:?}({}, {}, {:?}, {:?}, ...)", cmd, node_id, prop_name, prop_type, prop_subtype); 330 let mut prop = Property::new(prop_name, prop_type, prop_subtype); 331 332 let prop_array_len = u32::decode(&mut cur).unwrap(); 333 prop.set_array_len(prop_array_len as usize); 334 335 let prop_defaults_is_some = bool::decode(&mut cur).unwrap(); 336 if prop_defaults_is_some { 337 let prop_defaults_len = VarInt::decode(&mut cur).unwrap(); 338 match prop_type { 339 PropertyType::Uint32 => { 340 let mut prop_defaults = vec![]; 341 for _ in 0..prop_defaults_len.0 { 342 prop_defaults.push(u32::decode(&mut cur).unwrap()); 343 } 344 prop.set_defaults_u32(prop_defaults)?; 345 } 346 PropertyType::Float32 => { 347 let mut prop_defaults = vec![]; 348 for _ in 0..prop_defaults_len.0 { 349 prop_defaults.push(f32::decode(&mut cur).unwrap()); 350 } 351 prop.set_defaults_f32(prop_defaults)?; 352 } 353 PropertyType::Str => { 354 let mut prop_defaults = vec![]; 355 for _ in 0..prop_defaults_len.0 { 356 prop_defaults.push(String::decode(&mut cur).unwrap()); 357 } 358 prop.set_defaults_str(prop_defaults)?; 359 } 360 _ => return Err(Error::PropertyWrongType), 361 } 362 } 363 364 let prop_ui_name = String::decode(&mut cur).unwrap(); 365 let prop_desc = String::decode(&mut cur).unwrap(); 366 let prop_is_null_allowed = bool::decode(&mut cur).unwrap(); 367 let prop_is_expr_allowed = bool::decode(&mut cur).unwrap(); 368 369 match prop_type { 370 PropertyType::Uint32 => { 371 let min_is_some = bool::decode(&mut cur).unwrap(); 372 let min = if min_is_some { 373 let min = u32::decode(&mut cur).unwrap(); 374 Some(PropertyValue::Uint32(min)) 375 } else { 376 None 377 }; 378 let max_is_some = bool::decode(&mut cur).unwrap(); 379 let max = if max_is_some { 380 let max = u32::decode(&mut cur).unwrap(); 381 Some(PropertyValue::Uint32(max)) 382 } else { 383 None 384 }; 385 prop.min_val = min; 386 prop.max_val = max; 387 } 388 PropertyType::Float32 => { 389 let min_is_some = bool::decode(&mut cur).unwrap(); 390 let min = if min_is_some { 391 let min = f32::decode(&mut cur).unwrap(); 392 Some(PropertyValue::Float32(min)) 393 } else { 394 None 395 }; 396 let max_is_some = bool::decode(&mut cur).unwrap(); 397 let max = if max_is_some { 398 let max = f32::decode(&mut cur).unwrap(); 399 Some(PropertyValue::Float32(max)) 400 } else { 401 None 402 }; 403 prop.min_val = min; 404 prop.max_val = max; 405 } 406 _ => { 407 let min_is_some = bool::decode(&mut cur).unwrap(); 408 if min_is_some { 409 return Err(Error::PropertyWrongType) 410 } 411 let max_is_some = bool::decode(&mut cur).unwrap(); 412 if max_is_some { 413 return Err(Error::PropertyWrongType) 414 } 415 } 416 } 417 418 let prop_enum_items = Vec::<String>::decode(&mut cur).unwrap(); 419 420 let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?; 421 422 prop.set_ui_text(prop_ui_name, prop_desc); 423 prop.is_null_allowed = prop_is_null_allowed; 424 prop.is_expr_allowed = prop_is_expr_allowed; 425 if !prop_enum_items.is_empty() { 426 prop.set_enum_items(prop_enum_items)?; 427 } 428 node.add_property(prop)?; 429 */ 430 } 431 Command::LinkNode => { 432 /* 433 let child_id = SceneNodeId::decode(&mut cur).unwrap(); 434 let parent_id = SceneNodeId::decode(&mut cur).unwrap(); 435 debug!(target: "req", "{:?}({}, {})", cmd, child_id, parent_id); 436 scene_graph.link(child_id, parent_id)?; 437 */ 438 } 439 Command::UnlinkNode => { 440 /* 441 let child_id = SceneNodeId::decode(&mut cur).unwrap(); 442 let parent_id = SceneNodeId::decode(&mut cur).unwrap(); 443 debug!(target: "req", "{:?}({}, {})", cmd, child_id, parent_id); 444 scene_graph.unlink(child_id, parent_id)?; 445 */ 446 } 447 Command::GetSignals => { 448 /* 449 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 450 debug!(target: "req", "{:?}({})", cmd, node_id); 451 452 let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?; 453 454 let mut sigs = vec![]; 455 for sig in &node.sigs { 456 sigs.push(sig.name.clone()); 457 } 458 sigs.encode(&mut reply).unwrap(); 459 */ 460 } 461 Command::RegisterSlot => { 462 /* 463 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 464 let sig_name = String::decode(&mut cur).unwrap(); 465 let slot_name = String::decode(&mut cur).unwrap(); 466 let user_data = Vec::<u8>::decode(&mut cur).unwrap(); 467 debug!(target: "req", "{:?}({}, {}, {}, {:?})", cmd, node_id, sig_name, slot_name, user_data); 468 469 let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?; 470 471 let (sendr, recvr) = async_channel::unbounded(); 472 let slot = Slot { name: slot_name, notify: sendr }; 473 474 // This task will auto-die when the slot is unregistered 475 let self2 = self.clone(); 476 self.ex 477 .spawn(async move { 478 loop { 479 let Ok(signal_data) = recvr.recv().await else { 480 // Die 481 break 482 }; 483 484 let mut m = zeromq::ZmqMessage::from(signal_data); 485 m.push_back(user_data.clone().into()); 486 487 self2.zmq_pub.lock().await.send(m).await.unwrap(); 488 } 489 }) 490 .detach(); 491 492 let slot_id = node.register(&sig_name, slot)?; 493 slot_id.encode(&mut reply).unwrap(); 494 */ 495 } 496 Command::UnregisterSlot => { 497 /* 498 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 499 let sig_name = String::decode(&mut cur).unwrap(); 500 let slot_id = SlotId::decode(&mut cur).unwrap(); 501 debug!(target: "req", "{:?}({}, {}, {})", cmd, node_id, sig_name, slot_id); 502 503 let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?; 504 node.unregister(&sig_name, slot_id)?; 505 */ 506 } 507 Command::LookupSlotId => { 508 /* 509 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 510 let sig_name = String::decode(&mut cur).unwrap(); 511 let slot_name = String::decode(&mut cur).unwrap(); 512 debug!(target: "req", "{:?}({}, {}, {})", cmd, node_id, sig_name, slot_name); 513 514 let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?; 515 let signal = node.get_signal(&sig_name).ok_or(Error::SignalNotFound)?; 516 let slot_id = signal.lookup_slot_id(&slot_name).ok_or(Error::SlotNotFound)?; 517 slot_id.encode(&mut reply).unwrap(); 518 */ 519 } 520 Command::GetSlots => { 521 /* 522 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 523 let sig_name = String::decode(&mut cur).unwrap(); 524 debug!(target: "req", "{:?}({}, {})", cmd, node_id, sig_name); 525 526 let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?; 527 let signal = node.get_signal(&sig_name).ok_or(Error::SignalNotFound)?; 528 529 let mut slots = vec![]; 530 for (slot_id, slot) in signal.get_slots() { 531 slots.push((slot.name.clone(), slot_id)); 532 } 533 slots.encode(&mut reply).unwrap(); 534 */ 535 } 536 Command::GetMethods => { 537 /* 538 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 539 debug!(target: "req", "{:?}({})", cmd, node_id); 540 541 let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?; 542 let method_names: Vec<_> = node.methods.iter().map(|m| m.name.clone()).collect(); 543 544 method_names.encode(&mut reply).unwrap(); 545 */ 546 } 547 Command::GetMethod => { 548 /* 549 let node_id = SceneNodeId::decode(&mut cur).unwrap(); 550 let method_name = String::decode(&mut cur).unwrap(); 551 debug!(target: "req", "{:?}({}, {})", cmd, node_id, method_name); 552 553 let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?; 554 let method = node.get_method(&method_name).ok_or(Error::MethodNotFound)?; 555 556 method.args.encode(&mut reply).unwrap(); 557 method.result.encode(&mut reply).unwrap(); 558 */ 559 } 560 Command::CallMethod => { 561 let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?; 562 let method_name = String::decode(&mut cur).unwrap(); 563 let arg_data = Vec::<u8>::decode(&mut cur).unwrap(); 564 debug!(target: "req", "{cmd:?}({node_path}, {method_name}, ...)"); 565 566 let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?; 567 let result = node.call_method(&method_name, arg_data).await?; 568 result.encode(&mut reply).unwrap(); 569 } 570 } 571 572 Ok(reply) 573 } 574 }