net_node.rs
1 use anyhow::Result; 2 use std::net::SocketAddr; 3 use std::time::Duration; 4 use tokio::io::{AsyncBufReadExt, BufReader}; 5 use tracing::info; 6 7 use royksopp::net::{NetEvent, NodeId}; 8 9 fn parse_arg(flag: &str) -> Option<String> { 10 let mut args = std::env::args().skip(1); 11 while let Some(a) = args.next() { 12 if a == flag { 13 return args.next(); 14 } 15 } 16 None 17 } 18 19 fn parse_multi(flag: &str) -> Vec<String> { 20 let mut out = vec![]; 21 let mut args = std::env::args().skip(1); 22 while let Some(a) = args.next() { 23 if a == flag { 24 if let Some(v) = args.next() { 25 out.push(v); 26 } 27 } 28 } 29 out 30 } 31 32 #[tokio::main] 33 async fn main() -> Result<()> { 34 tracing_subscriber::fmt().init(); 35 36 let bind: SocketAddr = parse_arg("--bind") 37 .unwrap_or_else(|| "127.0.0.1:4000".into()) 38 .parse()?; 39 40 let peers: Vec<SocketAddr> = parse_multi("--peer") 41 .into_iter() 42 .filter_map(|s| s.parse().ok()) 43 .collect(); 44 45 let node_id = NodeId::random(); 46 let (net, _join) = royksopp::net::spawn_net("net", node_id, bind); 47 48 info!(node = %net.node_id.short(), listen = %net.listen, "node started"); 49 50 for p in peers { 51 net.connect(p).await; 52 } 53 54 // Print events 55 let mut ev = net.subscribe(); 56 tokio::spawn(async move { 57 while let Ok(e) = ev.recv().await { 58 match e { 59 NetEvent::PeerUp { id, addr } => println!("[peer up] {} @ {}", id.short(), addr), 60 NetEvent::PeerDown { id, addr, reason } => { 61 println!("[peer down] {} @ {} ({})", id.short(), addr, reason) 62 } 63 NetEvent::Message { from, topic, data } => { 64 let txt = String::from_utf8_lossy(&data); 65 println!("[msg] {} :: {} :: {}", from.short(), topic, txt); 66 } 67 NetEvent::Dropped { to, topic } => { 68 println!("[dropped] to={:?} topic={}", to.map(|x| x.short()), topic) 69 } 70 } 71 } 72 }); 73 74 // Read stdin and broadcast as "chat" 75 let stdin = BufReader::new(tokio::io::stdin()); 76 let mut lines = stdin.lines(); 77 78 println!("Type messages and press Enter. Ctrl+C to quit."); 79 while let Some(line) = lines.next_line().await? { 80 if line.trim().is_empty() { 81 continue; 82 } 83 net.broadcast("chat", line.into_bytes()).await; 84 tokio::time::sleep(Duration::from_millis(10)).await; 85 } 86 87 Ok(()) 88 }