/ bin / app / src / net.rs
net.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use async_lock::Mutex;
 20  use darkfi_serial::{async_trait, deserialize, Decodable, Encodable, SerialDecodable, VarInt};
 21  use std::{io::Cursor, sync::Arc};
 22  use zeromq::{Socket, SocketRecv, SocketSend};
 23  
 24  use crate::{
 25      error::{Error, Result},
 26      expr::SExprCode,
 27      gfx::{gfxtag, RenderApi},
 28      prop::{PropertyType, Role},
 29      scene::{SceneNodeId, SceneNodePtr, ScenePath},
 30      ExecutorPtr,
 31  };
 32  
 33  #[derive(Debug, SerialDecodable)]
 34  #[repr(u8)]
 35  enum Command {
 36      Hello = 0,
 37      AddNode = 1,
 38      RemoveNode = 9,
 39      RenameNode = 23,
 40      ScanDangling = 24,
 41      LookupNodeId = 12,
 42      AddProperty = 11,
 43      LinkNode = 2,
 44      UnlinkNode = 8,
 45      GetInfo = 19,
 46      GetChildren = 4,
 47      GetParents = 5,
 48      GetProperties = 3,
 49      GetPropertyValue = 6,
 50      SetPropertyValue = 7,
 51      GetSignals = 14,
 52      RegisterSlot = 15,
 53      UnregisterSlot = 16,
 54      LookupSlotId = 17,
 55      GetSlots = 18,
 56      GetMethods = 20,
 57      GetMethod = 21,
 58      CallMethod = 22,
 59  }
 60  
 61  // Missing calls todo:
 62  // GetPropLen
 63  // UnsetProperty
 64  // SetPropertyNull
 65  // PropertyPushNull
 66  // PropertyPush
 67  // PropertyIsUnset
 68  
 69  pub struct ZeroMQAdapter {
 70      /*
 71      // req-reply commands
 72      req_socket: zmq::Socket,
 73      // We cannot share zmq sockets across threads, and we cannot quickly spawn
 74      // pub sockets due to address reuse errors.
 75      slot_sender: mpsc::SyncSender<(Vec<u8>, Vec<u8>)>,
 76      slot_recvr: Option<mpsc::Receiver<(Vec<u8>, Vec<u8>)>>,
 77      */
 78      sg_root: SceneNodePtr,
 79      render_api: RenderApi,
 80      _ex: ExecutorPtr,
 81  
 82      zmq_rep: Mutex<zeromq::RepSocket>,
 83      _zmq_pub: Mutex<zeromq::PubSocket>,
 84  }
 85  
 86  impl ZeroMQAdapter {
 87      pub async fn new(sg_root: SceneNodePtr, render_api: RenderApi, ex: ExecutorPtr) -> Arc<Self> {
 88          let mut zmq_rep = zeromq::RepSocket::new();
 89          zmq_rep.bind("tcp://0.0.0.0:9484").await.unwrap();
 90  
 91          let mut zmq_pub = zeromq::PubSocket::new();
 92          zmq_pub.bind("tcp://0.0.0.0:9485").await.unwrap();
 93  
 94          Arc::new(Self {
 95              sg_root,
 96              render_api,
 97              _ex: ex,
 98              zmq_rep: Mutex::new(zmq_rep),
 99              _zmq_pub: Mutex::new(zmq_pub),
100          })
101      }
102  
103      pub async fn run(self: Arc<Self>) {
104          loop {
105              let req = self.zmq_rep.lock().await.recv().await.unwrap();
106              assert_eq!(req.len(), 2);
107              let cmd = req.get(0).unwrap().to_vec();
108              assert_eq!(cmd.len(), 1);
109              let payload = req.get(1).unwrap().to_vec();
110  
111              let cmd = deserialize(&cmd).unwrap();
112              debug!(target: "req", "zmq: {:?} {:?}", cmd, payload);
113  
114              let self2 = self.clone();
115              match self2.process_request(cmd, payload).await {
116                  Ok(reply) => {
117                      let mut m = zeromq::ZmqMessage::from(vec![0u8]);
118                      m.push_back(reply.into());
119  
120                      // [errc:1] [reply]
121                      self.zmq_rep.lock().await.send(m).await.unwrap();
122                  }
123                  Err(err) => {
124                      let errc = err as u8;
125                      warn!(target: "req", "errc {}: {}", errc, err);
126  
127                      let mut m = zeromq::ZmqMessage::from(vec![errc]);
128                      m.push_back(vec![].into());
129  
130                      // [errc:1] [reply]
131                      self.zmq_rep.lock().await.send(m).await.unwrap();
132                  }
133              }
134          }
135      }
136  
137      async fn process_request(self: Arc<Self>, cmd: Command, payload: Vec<u8>) -> Result<Vec<u8>> {
138          let mut cur = Cursor::new(&payload);
139          let mut reply = vec![];
140          match cmd {
141              Command::Hello => {
142                  debug!(target: "req", "hello()");
143                  assert_eq!(payload.len(), 0);
144                  "hello".encode(&mut reply).unwrap();
145              }
146              Command::GetInfo => {
147                  /*
148                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
149                  debug!(target: "req", "{:?}({})", cmd, node_id);
150  
151                  let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?;
152                  node.name.encode(&mut reply).unwrap();
153                  node.typ.encode(&mut reply).unwrap();
154                  */
155              }
156              Command::GetChildren => {
157                  let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?;
158                  debug!(target: "req", "{cmd:?}({node_path})");
159                  let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?;
160  
161                  let children: Vec<_> = node
162                      .get_children()
163                      .iter()
164                      .map(|node| (node.name.clone(), node.id, node.typ))
165                      .collect();
166                  children.encode(&mut reply).unwrap();
167              }
168              Command::GetParents => {
169                  /*
170                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
171                  debug!(target: "req", "{:?}({})", cmd, node_id);
172  
173                  let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?;
174                  let parents: Vec<_> = node
175                      .parents
176                      .iter()
177                      .map(|node_inf| (node_inf.name.clone(), node_inf.id, node_inf.typ))
178                      .collect();
179                  parents.encode(&mut reply).unwrap();
180                  */
181              }
182              Command::GetProperties => {
183                  let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?;
184                  debug!(target: "req", "{cmd:?}({node_path})");
185                  let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?;
186  
187                  VarInt(node.props.len() as u64).encode(&mut reply).unwrap();
188                  for prop in &node.props {
189                      prop.name.encode(&mut reply).unwrap();
190                      prop.typ.encode(&mut reply).unwrap();
191                      prop.subtype.encode(&mut reply).unwrap();
192                      //prop.defaults.encode(&mut reply).unwrap();
193                      prop.ui_name.encode(&mut reply).unwrap();
194                      prop.desc.encode(&mut reply).unwrap();
195                      prop.is_null_allowed.encode(&mut reply).unwrap();
196                      prop.is_expr_allowed.encode(&mut reply).unwrap();
197                      (prop.array_len as u32).encode(&mut reply).unwrap();
198                      prop.min_val.encode(&mut reply).unwrap();
199                      prop.max_val.encode(&mut reply).unwrap();
200                      prop.enum_items.encode(&mut reply).unwrap();
201  
202                      let depends: Vec<_> = prop
203                          .get_depends()
204                          .into_iter()
205                          .map(|d| (d.i as u32, d.local_name))
206                          .collect();
207                      depends.encode(&mut reply).unwrap();
208                  }
209              }
210              Command::GetPropertyValue => {
211                  let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?;
212                  let prop_name = String::decode(&mut cur).unwrap();
213                  debug!(target: "req", "{cmd:?}({node_path}, {prop_name})");
214                  let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?;
215  
216                  let prop = node.get_property(&prop_name).ok_or(Error::PropertyNotFound)?;
217                  prop.typ.encode(&mut reply).unwrap();
218                  VarInt(prop.get_len() as u64).encode(&mut reply).unwrap();
219                  for i in 0..prop.get_len() {
220                      let val = prop.get_value(i)?;
221                      if val.is_unset() {
222                          1u8.encode(&mut reply).unwrap();
223                          let default = &prop.defaults[i];
224                          default.encode(&mut reply).unwrap();
225                      } else if val.is_null() {
226                          2u8.encode(&mut reply).unwrap();
227                      } else if val.is_expr() {
228                          3u8.encode(&mut reply).unwrap();
229                      } else {
230                          0u8.encode(&mut reply).unwrap();
231                          val.encode(&mut reply).unwrap();
232                      }
233                  }
234              }
235              Command::SetPropertyValue => {
236                  let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?;
237                  let prop_name = String::decode(&mut cur).unwrap();
238                  let prop_i = u32::decode(&mut cur).unwrap() as usize;
239                  let prop_type = PropertyType::decode(&mut cur).unwrap();
240                  debug!(target: "req", "{cmd:?}({node_path}, {prop_name}, {prop_i}, {prop_type:?})");
241  
242                  let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?;
243                  let prop = node.get_property(&prop_name).ok_or(Error::PropertyNotFound)?;
244  
245                  let atom =
246                      &mut self.render_api.make_guard(gfxtag!("ZeroMQAdapter::SetPropertyValue"));
247  
248                  match prop_type {
249                      PropertyType::Null => {
250                          prop.set_null(atom, Role::User, prop_i)?;
251                      }
252                      PropertyType::Bool => {
253                          let val = bool::decode(&mut cur).unwrap();
254                          prop.set_bool(atom, Role::User, prop_i, val)?;
255                      }
256                      PropertyType::Uint32 => {
257                          let val = u32::decode(&mut cur).unwrap();
258                          prop.set_u32(atom, Role::User, prop_i, val)?;
259                      }
260                      PropertyType::Float32 => {
261                          let val = f32::decode(&mut cur).unwrap();
262                          prop.set_f32(atom, Role::User, prop_i, val)?;
263                      }
264                      PropertyType::Str => {
265                          let val = String::decode(&mut cur).unwrap();
266                          prop.set_str(atom, Role::User, prop_i, val)?;
267                      }
268                      PropertyType::Enum => {
269                          let val = String::decode(&mut cur).unwrap();
270                          prop.set_enum(atom, Role::User, prop_i, val)?;
271                      }
272                      PropertyType::SceneNodeId => {
273                          let val = SceneNodeId::decode(&mut cur).unwrap();
274                          prop.set_node_id(atom, Role::User, prop_i, val)?;
275                      }
276                      PropertyType::SExpr => {
277                          let val = SExprCode::decode(&mut cur).unwrap();
278                          debug!(target: "req", "  received code {:?}", val);
279                          prop.set_expr(atom, Role::User, prop_i, val)?;
280                      }
281                  }
282              }
283              Command::AddNode => {
284                  /*
285                  let node_name = String::decode(&mut cur).unwrap();
286                  let node_type = SceneNodeType::decode(&mut cur).unwrap();
287                  debug!(target: "req", "{:?}({}, {:?})", cmd, node_name, node_type);
288  
289                  let node_id = scene_graph.add_node(&node_name, node_type).id;
290                  node_id.encode(&mut reply).unwrap();
291                  */
292              }
293              Command::RemoveNode => {
294                  /*
295                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
296                  debug!(target: "req", "{:?}({})", cmd, node_id);
297                  scene_graph.remove_node(node_id)?;
298                  */
299              }
300              Command::RenameNode => {
301                  /*
302                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
303                  let node_name = String::decode(&mut cur).unwrap();
304                  debug!(target: "req", "{:?}({}, {})", cmd, node_id, node_name);
305                  scene_graph.rename_node(node_id, node_name)?;
306                  */
307              }
308              Command::ScanDangling => {
309                  /*
310                  let dangling = scene_graph.scan_dangling();
311                  dangling.encode(&mut reply).unwrap();
312                  */
313              }
314              Command::LookupNodeId => {
315                  /*
316                  let node_path: String = deserialize(&payload).unwrap();
317                  debug!(target: "req", "{:?}({})", cmd, node_path);
318                  let node_id = scene_graph.lookup_node_id(&node_path).ok_or(Error::NodeNotFound)?;
319                  node_id.encode(&mut reply).unwrap();
320                  */
321              }
322              Command::AddProperty => {
323                  /*
324                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
325                  let prop_name = String::decode(&mut cur).unwrap();
326                  let prop_type = PropertyType::decode(&mut cur).unwrap();
327                  let prop_subtype = PropertySubType::decode(&mut cur).unwrap();
328  
329                  debug!(target: "req", "{:?}({}, {}, {:?}, {:?}, ...)", cmd, node_id, prop_name, prop_type, prop_subtype);
330                  let mut prop = Property::new(prop_name, prop_type, prop_subtype);
331  
332                  let prop_array_len = u32::decode(&mut cur).unwrap();
333                  prop.set_array_len(prop_array_len as usize);
334  
335                  let prop_defaults_is_some = bool::decode(&mut cur).unwrap();
336                  if prop_defaults_is_some {
337                      let prop_defaults_len = VarInt::decode(&mut cur).unwrap();
338                      match prop_type {
339                          PropertyType::Uint32 => {
340                              let mut prop_defaults = vec![];
341                              for _ in 0..prop_defaults_len.0 {
342                                  prop_defaults.push(u32::decode(&mut cur).unwrap());
343                              }
344                              prop.set_defaults_u32(prop_defaults)?;
345                          }
346                          PropertyType::Float32 => {
347                              let mut prop_defaults = vec![];
348                              for _ in 0..prop_defaults_len.0 {
349                                  prop_defaults.push(f32::decode(&mut cur).unwrap());
350                              }
351                              prop.set_defaults_f32(prop_defaults)?;
352                          }
353                          PropertyType::Str => {
354                              let mut prop_defaults = vec![];
355                              for _ in 0..prop_defaults_len.0 {
356                                  prop_defaults.push(String::decode(&mut cur).unwrap());
357                              }
358                              prop.set_defaults_str(prop_defaults)?;
359                          }
360                          _ => return Err(Error::PropertyWrongType),
361                      }
362                  }
363  
364                  let prop_ui_name = String::decode(&mut cur).unwrap();
365                  let prop_desc = String::decode(&mut cur).unwrap();
366                  let prop_is_null_allowed = bool::decode(&mut cur).unwrap();
367                  let prop_is_expr_allowed = bool::decode(&mut cur).unwrap();
368  
369                  match prop_type {
370                      PropertyType::Uint32 => {
371                          let min_is_some = bool::decode(&mut cur).unwrap();
372                          let min = if min_is_some {
373                              let min = u32::decode(&mut cur).unwrap();
374                              Some(PropertyValue::Uint32(min))
375                          } else {
376                              None
377                          };
378                          let max_is_some = bool::decode(&mut cur).unwrap();
379                          let max = if max_is_some {
380                              let max = u32::decode(&mut cur).unwrap();
381                              Some(PropertyValue::Uint32(max))
382                          } else {
383                              None
384                          };
385                          prop.min_val = min;
386                          prop.max_val = max;
387                      }
388                      PropertyType::Float32 => {
389                          let min_is_some = bool::decode(&mut cur).unwrap();
390                          let min = if min_is_some {
391                              let min = f32::decode(&mut cur).unwrap();
392                              Some(PropertyValue::Float32(min))
393                          } else {
394                              None
395                          };
396                          let max_is_some = bool::decode(&mut cur).unwrap();
397                          let max = if max_is_some {
398                              let max = f32::decode(&mut cur).unwrap();
399                              Some(PropertyValue::Float32(max))
400                          } else {
401                              None
402                          };
403                          prop.min_val = min;
404                          prop.max_val = max;
405                      }
406                      _ => {
407                          let min_is_some = bool::decode(&mut cur).unwrap();
408                          if min_is_some {
409                              return Err(Error::PropertyWrongType)
410                          }
411                          let max_is_some = bool::decode(&mut cur).unwrap();
412                          if max_is_some {
413                              return Err(Error::PropertyWrongType)
414                          }
415                      }
416                  }
417  
418                  let prop_enum_items = Vec::<String>::decode(&mut cur).unwrap();
419  
420                  let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?;
421  
422                  prop.set_ui_text(prop_ui_name, prop_desc);
423                  prop.is_null_allowed = prop_is_null_allowed;
424                  prop.is_expr_allowed = prop_is_expr_allowed;
425                  if !prop_enum_items.is_empty() {
426                      prop.set_enum_items(prop_enum_items)?;
427                  }
428                  node.add_property(prop)?;
429                  */
430              }
431              Command::LinkNode => {
432                  /*
433                  let child_id = SceneNodeId::decode(&mut cur).unwrap();
434                  let parent_id = SceneNodeId::decode(&mut cur).unwrap();
435                  debug!(target: "req", "{:?}({}, {})", cmd, child_id, parent_id);
436                  scene_graph.link(child_id, parent_id)?;
437                  */
438              }
439              Command::UnlinkNode => {
440                  /*
441                  let child_id = SceneNodeId::decode(&mut cur).unwrap();
442                  let parent_id = SceneNodeId::decode(&mut cur).unwrap();
443                  debug!(target: "req", "{:?}({}, {})", cmd, child_id, parent_id);
444                  scene_graph.unlink(child_id, parent_id)?;
445                  */
446              }
447              Command::GetSignals => {
448                  /*
449                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
450                  debug!(target: "req", "{:?}({})", cmd, node_id);
451  
452                  let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?;
453  
454                  let mut sigs = vec![];
455                  for sig in &node.sigs {
456                      sigs.push(sig.name.clone());
457                  }
458                  sigs.encode(&mut reply).unwrap();
459                  */
460              }
461              Command::RegisterSlot => {
462                  /*
463                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
464                  let sig_name = String::decode(&mut cur).unwrap();
465                  let slot_name = String::decode(&mut cur).unwrap();
466                  let user_data = Vec::<u8>::decode(&mut cur).unwrap();
467                  debug!(target: "req", "{:?}({}, {}, {}, {:?})", cmd, node_id, sig_name, slot_name, user_data);
468  
469                  let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?;
470  
471                  let (sendr, recvr) = async_channel::unbounded();
472                  let slot = Slot { name: slot_name, notify: sendr };
473  
474                  // This task will auto-die when the slot is unregistered
475                  let self2 = self.clone();
476                  self.ex
477                      .spawn(async move {
478                          loop {
479                              let Ok(signal_data) = recvr.recv().await else {
480                                  // Die
481                                  break
482                              };
483  
484                              let mut m = zeromq::ZmqMessage::from(signal_data);
485                              m.push_back(user_data.clone().into());
486  
487                              self2.zmq_pub.lock().await.send(m).await.unwrap();
488                          }
489                      })
490                      .detach();
491  
492                  let slot_id = node.register(&sig_name, slot)?;
493                  slot_id.encode(&mut reply).unwrap();
494                  */
495              }
496              Command::UnregisterSlot => {
497                  /*
498                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
499                  let sig_name = String::decode(&mut cur).unwrap();
500                  let slot_id = SlotId::decode(&mut cur).unwrap();
501                  debug!(target: "req", "{:?}({}, {}, {})", cmd, node_id, sig_name, slot_id);
502  
503                  let node = scene_graph.get_node_mut(node_id).ok_or(Error::NodeNotFound)?;
504                  node.unregister(&sig_name, slot_id)?;
505                  */
506              }
507              Command::LookupSlotId => {
508                  /*
509                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
510                  let sig_name = String::decode(&mut cur).unwrap();
511                  let slot_name = String::decode(&mut cur).unwrap();
512                  debug!(target: "req", "{:?}({}, {}, {})", cmd, node_id, sig_name, slot_name);
513  
514                  let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?;
515                  let signal = node.get_signal(&sig_name).ok_or(Error::SignalNotFound)?;
516                  let slot_id = signal.lookup_slot_id(&slot_name).ok_or(Error::SlotNotFound)?;
517                  slot_id.encode(&mut reply).unwrap();
518                  */
519              }
520              Command::GetSlots => {
521                  /*
522                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
523                  let sig_name = String::decode(&mut cur).unwrap();
524                  debug!(target: "req", "{:?}({}, {})", cmd, node_id, sig_name);
525  
526                  let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?;
527                  let signal = node.get_signal(&sig_name).ok_or(Error::SignalNotFound)?;
528  
529                  let mut slots = vec![];
530                  for (slot_id, slot) in signal.get_slots() {
531                      slots.push((slot.name.clone(), slot_id));
532                  }
533                  slots.encode(&mut reply).unwrap();
534                  */
535              }
536              Command::GetMethods => {
537                  /*
538                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
539                  debug!(target: "req", "{:?}({})", cmd, node_id);
540  
541                  let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?;
542                  let method_names: Vec<_> = node.methods.iter().map(|m| m.name.clone()).collect();
543  
544                  method_names.encode(&mut reply).unwrap();
545                  */
546              }
547              Command::GetMethod => {
548                  /*
549                  let node_id = SceneNodeId::decode(&mut cur).unwrap();
550                  let method_name = String::decode(&mut cur).unwrap();
551                  debug!(target: "req", "{:?}({}, {})", cmd, node_id, method_name);
552  
553                  let node = scene_graph.get_node(node_id).ok_or(Error::NodeNotFound)?;
554                  let method = node.get_method(&method_name).ok_or(Error::MethodNotFound)?;
555  
556                  method.args.encode(&mut reply).unwrap();
557                  method.result.encode(&mut reply).unwrap();
558                  */
559              }
560              Command::CallMethod => {
561                  let node_path: ScenePath = String::decode(&mut cur).unwrap().parse()?;
562                  let method_name = String::decode(&mut cur).unwrap();
563                  let arg_data = Vec::<u8>::decode(&mut cur).unwrap();
564                  debug!(target: "req", "{cmd:?}({node_path}, {method_name}, ...)");
565  
566                  let node = self.sg_root.lookup_node(node_path).ok_or(Error::NodeNotFound)?;
567                  let result = node.call_method(&method_name, arg_data).await?;
568                  result.encode(&mut reply).unwrap();
569              }
570          }
571  
572          Ok(reply)
573      }
574  }