behaviour.rs
1 use std::{ 2 net, 3 str::FromStr as _, 4 }; 5 6 use cyn::ResultExt as _; 7 use libp2p::{ 8 self as p2p, 9 dcutr as p2p_dcutr, 10 futures::StreamExt as _, 11 identify as p2p_identify, 12 identity as p2p_id, 13 kad::{ 14 self as p2p_kad, 15 store as p2p_kad_store, 16 }, 17 multiaddr as p2p_multiaddr, 18 noise as p2p_noise, 19 relay as p2p_relay, 20 swarm as p2p_swarm, 21 tcp as p2p_tcp, 22 yamux as p2p_yamux, 23 }; 24 use tokio::{ 25 io::{ 26 AsyncReadExt as _, 27 AsyncWriteExt as _, 28 }, 29 select, 30 }; 31 32 use crate::{ 33 Config, 34 Interface, 35 MTU, 36 address, 37 ip, 38 }; 39 40 #[derive(p2p_swarm::NetworkBehaviour)] 41 pub struct Behaviour<P: ip::Policy> { 42 pub identify: p2p_identify::Behaviour, 43 pub relay: p2p_relay::Behaviour, 44 pub dcutr: p2p_dcutr::Behaviour, 45 pub kad: p2p_kad::Behaviour<p2p_kad_store::MemoryStore>, 46 pub ip: ip::Behaviour<P>, 47 } 48 49 #[must_use] 50 #[expect(clippy::needless_pass_by_value)] 51 pub fn new<'a>( 52 keypair: p2p_id::Keypair, 53 bootstrap: impl Iterator<Item = &'a p2p::Multiaddr>, 54 peers: Vec<p2p::PeerId>, 55 ) -> Behaviour<impl ip::Policy> { 56 let peer_id = keypair.public().to_peer_id(); 57 58 let identify = p2p_identify::Behaviour::new(p2p_identify::Config::new( 59 p2p_identify::PROTOCOL_NAME.to_string(), 60 keypair.public(), 61 )); 62 63 let relay = p2p_relay::Behaviour::new(peer_id, p2p_relay::Config::default()); 64 65 let dcutr = p2p_dcutr::Behaviour::new(peer_id); 66 67 let mut kad = p2p_kad::Behaviour::new(peer_id, p2p_kad_store::MemoryStore::new(peer_id)); 68 69 // Add bootstrap peers to Kademlia DHT for peer discovery. 70 for addr in bootstrap { 71 let Some(peer_id) = addr.iter().find_map(|protocol| { 72 let p2p_multiaddr::Protocol::P2p(peer_id) = protocol else { 73 return None; 74 }; 75 76 Some(peer_id) 77 }) else { 78 continue; 79 }; 80 81 kad.add_address(&peer_id, addr.clone()); 82 } 83 84 let ip = ip::Behaviour::new(move |peer_id| { 85 if !peers.contains(peer_id) { 86 return Err(p2p_swarm::ConnectionDenied::new(format!( 87 "peer '{peer_id}' is not in the peer list" 88 ))); 89 } 90 91 Ok(()) 92 }); 93 94 Behaviour { 95 identify, 96 relay, 97 dcutr, 98 kad, 99 ip, 100 } 101 } 102 103 #[expect( 104 clippy::missing_asserts_for_indexing, 105 reason = "clippy is too dumb to see if guards" 106 )] 107 fn ip_of(packet: &[u8]) -> Option<net::IpAddr> { 108 Some(match packet.first()? >> 4 { 109 4 => { 110 if packet.len() < 20 { 111 return None; 112 } 113 114 let slice = &packet[16..20]; 115 116 net::IpAddr::V4(net::Ipv4Addr::new(slice[0], slice[1], slice[2], slice[3])) 117 }, 118 119 6 => { 120 if packet.len() < 40 { 121 return None; 122 } 123 124 let slice = &packet[24..40]; 125 126 net::IpAddr::V6(net::Ipv6Addr::from( 127 <[u8; 16]>::try_from(slice).expect("asdasd"), 128 )) 129 }, 130 131 _ => return None, 132 }) 133 } 134 135 pub async fn run(config: Config) -> cyn::Result<()> { 136 let mut swarm = p2p::SwarmBuilder::with_existing_identity(config.keypair.clone().into()) 137 .with_tokio() 138 .with_tcp( 139 p2p_tcp::Config::default(), 140 p2p_noise::Config::new, 141 p2p_yamux::Config::default, 142 ) 143 .chain_err("failed to create tcp transport layer")? 144 .with_quic() 145 .with_behaviour(|keypair| { 146 new( 147 keypair.clone(), 148 config.bootstrap.iter(), 149 config.peers.iter().map(|peer| peer.id).collect(), 150 ) 151 }) 152 .unwrap() 153 .build(); 154 155 swarm 156 .listen_on(p2p::Multiaddr::from_str("/ip6/::/tcp/0").expect("literal is valid")) 157 .chain_err("failed to listen on local port")?; 158 159 swarm 160 .behaviour_mut() 161 .kad 162 .bootstrap() 163 .chain_err("failed to start DHT bootstrap")?; 164 165 let mut tun_buffer = vec![0_u8; MTU as usize]; 166 let mut tun_interface = Interface::create( 167 &config.interface, 168 address::generate_v4(&config.id), 169 address::generate_v6(&config.id), 170 )?; 171 172 let mut address_map = address::Map::new(); 173 174 for peer in &config.peers { 175 address_map.v4_of(peer.id); 176 address_map.v6_of(peer.id); 177 } 178 179 loop { 180 select! { 181 swarm_event = swarm.select_next_some() => { 182 match swarm_event { 183 p2p_swarm::SwarmEvent::NewListenAddr { address, .. } => { 184 tracing::info!("Listening on {address:?}."); 185 }, 186 187 p2p_swarm::SwarmEvent::Behaviour(BehaviourEvent::Ip(packet)) => { 188 tracing::trace!("Got packet: {packet:?}"); 189 190 if let Err(error) = tun_interface.write_all(&packet).await { 191 tracing::warn!("Failed to write packet to TUN interface: {error}"); 192 } 193 }, 194 195 other => tracing::debug!("Other swarm event: {other:?}."), 196 } 197 }, 198 199 tun_result = tun_interface.read(&mut tun_buffer) => { 200 let Ok(packet_len) = tun_result.inspect_err(|error| { 201 tracing::warn!("Failed to read from TUN interface: {error}"); 202 }) else { 203 continue; 204 }; 205 206 let packet = &tun_buffer[..packet_len]; 207 208 tracing::warn!("Got tun packet: {packet:?}"); 209 210 let Some(ip) = ip_of(packet) else { 211 tracing::warn!("Ignoring invalid tun packet (could not determine ip) {packet:?}"); 212 continue; 213 }; 214 215 let peer_id = match ip { 216 net::IpAddr::V4(v4) => address_map.peer_of_v4(&v4), 217 net::IpAddr::V6(v6) => address_map.peer_of_v6(&v6), 218 }; 219 220 let Some(peer_id) = peer_id else { 221 tracing::warn!("Tried to send packet to ip {ip} not in peer map, dropping."); 222 continue; 223 }; 224 225 // Send packet to peer 226 let packet = ip::Packet::new(packet.to_vec()); 227 swarm.behaviour_mut().ip.send(&peer_id, packet); 228 }, 229 } 230 } 231 }