/ bin / fud / fud / src / dht.rs
dht.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::sync::Arc;
 20  
 21  use async_trait::async_trait;
 22  use num_bigint::BigUint;
 23  use rand::{rngs::OsRng, Rng};
 24  use tracing::debug;
 25  use url::Url;
 26  
 27  use darkfi::{
 28      dht::{impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode},
 29      geode::hash_to_string,
 30      net::ChannelPtr,
 31      util::time::Timestamp,
 32      Error, Result,
 33  };
 34  use darkfi_sdk::crypto::schnorr::SchnorrPublic;
 35  use darkfi_serial::{SerialDecodable, SerialEncodable};
 36  
 37  use crate::{
 38      pow::VerifiableNodeData,
 39      proto::{
 40          FudAnnounce, FudFindNodesReply, FudFindNodesRequest, FudFindSeedersReply,
 41          FudFindSeedersRequest, FudPingReply, FudPingRequest,
 42      },
 43      Fud,
 44  };
 45  
 46  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 47  pub struct FudNode {
 48      pub data: VerifiableNodeData,
 49      pub addresses: Vec<Url>,
 50  }
 51  impl_dht_node_defaults!(FudNode);
 52  
 53  impl DhtNode for FudNode {
 54      fn id(&self) -> blake3::Hash {
 55          self.data.id()
 56      }
 57      fn addresses(&self) -> Vec<Url> {
 58          self.addresses.clone()
 59      }
 60  }
 61  
 62  /// The values of the DHT are `Vec<FudSeeder>`, mapping resource hashes to lists of [`FudSeeder`]s
 63  #[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
 64  pub struct FudSeeder {
 65      /// Resource that this seeder provides
 66      pub key: blake3::Hash,
 67      /// Seeder's node data
 68      pub node: FudNode,
 69      /// When this [`FudSeeder`] was added to our hash table.
 70      /// This is not sent to other nodes.
 71      #[skip_serialize]
 72      pub timestamp: u64,
 73  }
 74  
 75  impl PartialEq for FudSeeder {
 76      fn eq(&self, other: &Self) -> bool {
 77          self.key == other.key && self.node.id() == other.node.id()
 78      }
 79  }
 80  
 81  /// [`DhtHandler`] implementation for fud
 82  #[async_trait]
 83  impl DhtHandler for Fud {
 84      type Value = Vec<FudSeeder>;
 85      type Node = FudNode;
 86  
 87      fn dht(&self) -> Arc<Dht<Self>> {
 88          self.dht.clone()
 89      }
 90  
 91      async fn node(&self) -> FudNode {
 92          FudNode {
 93              data: self.node_data.read().await.clone(),
 94              addresses: self
 95                  .p2p
 96                  .clone()
 97                  .hosts()
 98                  .external_addrs()
 99                  .await
100                  .iter()
101                  .filter(|addr| !addr.to_string().contains("[::]"))
102                  .cloned()
103                  .collect(),
104          }
105      }
106  
107      async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
108          debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
109          let msg_subsystem = channel.message_subsystem();
110          msg_subsystem.add_dispatch::<FudPingReply>().await;
111          let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
112  
113          // Send `FudPingRequest`
114          let mut rng = OsRng;
115          let request = FudPingRequest { random: rng.gen() };
116          channel.send(&request).await?;
117  
118          // Wait for `FudPingReply`
119          let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await;
120          msg_subscriber.unsubscribe().await;
121          let reply = reply?;
122  
123          // Verify the signature
124          if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
125              channel.ban().await;
126              return Err(Error::InvalidSignature)
127          }
128  
129          // Verify PoW
130          if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await {
131              channel.ban().await;
132              return Err(e)
133          }
134  
135          Ok(reply.node.clone())
136      }
137  
138      // TODO: Optimize this
139      async fn on_new_node(&self, node: &FudNode) -> Result<()> {
140          debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id()));
141  
142          // If this is the first node we know about, then bootstrap and announce our files
143          if !self.dht.is_bootstrapped().await {
144              let _ = self.init().await;
145          }
146  
147          // Send keys that are closer to this node than we are
148          let self_id = self.node_data.read().await.id();
149          let channel = self.dht.get_channel(node, None).await?;
150          for (key, seeders) in self.dht.hash_table.read().await.iter() {
151              let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id()));
152              let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
153              if node_distance <= self_distance {
154                  let _ = channel.send(&FudAnnounce { key: *key, seeders: seeders.clone() }).await;
155              }
156          }
157          self.dht.cleanup_channel(channel).await;
158  
159          Ok(())
160      }
161  
162      async fn find_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> {
163          debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
164  
165          let channel = self.dht.get_channel(node, None).await?;
166          let msg_subsystem = channel.message_subsystem();
167          msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
168          let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
169  
170          let request = FudFindNodesRequest { key: *key };
171          channel.send(&request).await?;
172  
173          let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await;
174  
175          msg_subscriber_nodes.unsubscribe().await;
176          self.dht.cleanup_channel(channel).await;
177  
178          Ok(reply?.nodes.clone())
179      }
180  
181      async fn find_value(
182          &self,
183          node: &FudNode,
184          key: &blake3::Hash,
185      ) -> Result<DhtLookupReply<FudNode, Vec<FudSeeder>>> {
186          debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
187  
188          let channel = self.dht.get_channel(node, None).await?;
189          let msg_subsystem = channel.message_subsystem();
190          msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
191          let msg_subscriber = channel.subscribe_msg::<FudFindSeedersReply>().await.unwrap();
192  
193          let request = FudFindSeedersRequest { key: *key };
194          channel.send(&request).await?;
195  
196          let recv = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await;
197  
198          msg_subscriber.unsubscribe().await;
199          self.dht.cleanup_channel(channel).await;
200  
201          let rep = recv?;
202          Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone()))
203      }
204  
205      async fn add_value(&self, key: &blake3::Hash, value: &Vec<FudSeeder>) {
206          let mut seeders = value.clone();
207  
208          // Remove seeders with no external addresses
209          seeders.retain(|item| !item.node.addresses().is_empty());
210  
211          // Set all seeders' timestamp. They are not sent to others nodes so they default to 0.
212          let timestamp = Timestamp::current_time().inner();
213          for seeder in &mut seeders {
214              seeder.timestamp = timestamp;
215          }
216  
217          debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key));
218  
219          let mut seeders_write = self.dht.hash_table.write().await;
220          let existing_seeders = seeders_write.get_mut(key);
221  
222          if let Some(existing_seeders) = existing_seeders {
223              existing_seeders.retain(|it| !seeders.contains(it));
224              existing_seeders.extend(seeders.clone());
225          } else {
226              let mut vec = Vec::new();
227              vec.extend(seeders.clone());
228              seeders_write.insert(*key, vec);
229          }
230      }
231  
232      fn key_to_string(key: &blake3::Hash) -> String {
233          hash_to_string(key)
234      }
235  }