dht.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::sync::Arc; 20 21 use async_trait::async_trait; 22 use num_bigint::BigUint; 23 use rand::{rngs::OsRng, Rng}; 24 use tracing::debug; 25 use url::Url; 26 27 use darkfi::{ 28 dht::{impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode}, 29 geode::hash_to_string, 30 net::ChannelPtr, 31 util::time::Timestamp, 32 Error, Result, 33 }; 34 use darkfi_sdk::crypto::schnorr::SchnorrPublic; 35 use darkfi_serial::{SerialDecodable, SerialEncodable}; 36 37 use crate::{ 38 pow::VerifiableNodeData, 39 proto::{ 40 FudAnnounce, FudFindNodesReply, FudFindNodesRequest, FudFindSeedersReply, 41 FudFindSeedersRequest, FudPingReply, FudPingRequest, 42 }, 43 Fud, 44 }; 45 46 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 47 pub struct FudNode { 48 pub data: VerifiableNodeData, 49 pub addresses: Vec<Url>, 50 } 51 impl_dht_node_defaults!(FudNode); 52 53 impl DhtNode for FudNode { 54 fn id(&self) -> blake3::Hash { 55 self.data.id() 56 } 57 fn addresses(&self) -> Vec<Url> { 58 self.addresses.clone() 59 } 60 } 61 62 /// The values of the DHT are `Vec<FudSeeder>`, mapping resource hashes to lists of [`FudSeeder`]s 63 #[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)] 64 pub struct FudSeeder { 65 /// Resource that this seeder provides 66 pub key: blake3::Hash, 67 /// Seeder's node data 68 pub node: FudNode, 69 /// When this [`FudSeeder`] was added to our hash table. 70 /// This is not sent to other nodes. 71 #[skip_serialize] 72 pub timestamp: u64, 73 } 74 75 impl PartialEq for FudSeeder { 76 fn eq(&self, other: &Self) -> bool { 77 self.key == other.key && self.node.id() == other.node.id() 78 } 79 } 80 81 /// [`DhtHandler`] implementation for fud 82 #[async_trait] 83 impl DhtHandler for Fud { 84 type Value = Vec<FudSeeder>; 85 type Node = FudNode; 86 87 fn dht(&self) -> Arc<Dht<Self>> { 88 self.dht.clone() 89 } 90 91 async fn node(&self) -> FudNode { 92 FudNode { 93 data: self.node_data.read().await.clone(), 94 addresses: self 95 .p2p 96 .clone() 97 .hosts() 98 .external_addrs() 99 .await 100 .iter() 101 .filter(|addr| !addr.to_string().contains("[::]")) 102 .cloned() 103 .collect(), 104 } 105 } 106 107 async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> { 108 debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id); 109 let msg_subsystem = channel.message_subsystem(); 110 msg_subsystem.add_dispatch::<FudPingReply>().await; 111 let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap(); 112 113 // Send `FudPingRequest` 114 let mut rng = OsRng; 115 let request = FudPingRequest { random: rng.gen() }; 116 channel.send(&request).await?; 117 118 // Wait for `FudPingReply` 119 let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await; 120 msg_subscriber.unsubscribe().await; 121 let reply = reply?; 122 123 // Verify the signature 124 if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) { 125 channel.ban().await; 126 return Err(Error::InvalidSignature) 127 } 128 129 // Verify PoW 130 if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await { 131 channel.ban().await; 132 return Err(e) 133 } 134 135 Ok(reply.node.clone()) 136 } 137 138 // TODO: Optimize this 139 async fn on_new_node(&self, node: &FudNode) -> Result<()> { 140 debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id())); 141 142 // If this is the first node we know about, then bootstrap and announce our files 143 if !self.dht.is_bootstrapped().await { 144 let _ = self.init().await; 145 } 146 147 // Send keys that are closer to this node than we are 148 let self_id = self.node_data.read().await.id(); 149 let channel = self.dht.get_channel(node, None).await?; 150 for (key, seeders) in self.dht.hash_table.read().await.iter() { 151 let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id())); 152 let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id)); 153 if node_distance <= self_distance { 154 let _ = channel.send(&FudAnnounce { key: *key, seeders: seeders.clone() }).await; 155 } 156 } 157 self.dht.cleanup_channel(channel).await; 158 159 Ok(()) 160 } 161 162 async fn find_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> { 163 debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id())); 164 165 let channel = self.dht.get_channel(node, None).await?; 166 let msg_subsystem = channel.message_subsystem(); 167 msg_subsystem.add_dispatch::<FudFindNodesReply>().await; 168 let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap(); 169 170 let request = FudFindNodesRequest { key: *key }; 171 channel.send(&request).await?; 172 173 let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await; 174 175 msg_subscriber_nodes.unsubscribe().await; 176 self.dht.cleanup_channel(channel).await; 177 178 Ok(reply?.nodes.clone()) 179 } 180 181 async fn find_value( 182 &self, 183 node: &FudNode, 184 key: &blake3::Hash, 185 ) -> Result<DhtLookupReply<FudNode, Vec<FudSeeder>>> { 186 debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} from node {}", hash_to_string(key), hash_to_string(&node.id())); 187 188 let channel = self.dht.get_channel(node, None).await?; 189 let msg_subsystem = channel.message_subsystem(); 190 msg_subsystem.add_dispatch::<FudFindSeedersReply>().await; 191 let msg_subscriber = channel.subscribe_msg::<FudFindSeedersReply>().await.unwrap(); 192 193 let request = FudFindSeedersRequest { key: *key }; 194 channel.send(&request).await?; 195 196 let recv = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await; 197 198 msg_subscriber.unsubscribe().await; 199 self.dht.cleanup_channel(channel).await; 200 201 let rep = recv?; 202 Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone())) 203 } 204 205 async fn add_value(&self, key: &blake3::Hash, value: &Vec<FudSeeder>) { 206 let mut seeders = value.clone(); 207 208 // Remove seeders with no external addresses 209 seeders.retain(|item| !item.node.addresses().is_empty()); 210 211 // Set all seeders' timestamp. They are not sent to others nodes so they default to 0. 212 let timestamp = Timestamp::current_time().inner(); 213 for seeder in &mut seeders { 214 seeder.timestamp = timestamp; 215 } 216 217 debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key)); 218 219 let mut seeders_write = self.dht.hash_table.write().await; 220 let existing_seeders = seeders_write.get_mut(key); 221 222 if let Some(existing_seeders) = existing_seeders { 223 existing_seeders.retain(|it| !seeders.contains(it)); 224 existing_seeders.extend(seeders.clone()); 225 } else { 226 let mut vec = Vec::new(); 227 vec.extend(seeders.clone()); 228 seeders_write.insert(*key, vec); 229 } 230 } 231 232 fn key_to_string(key: &blake3::Hash) -> String { 233 hash_to_string(key) 234 } 235 }