protocol_dchat.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 // ANCHOR: protocol_dchat 20 use async_trait::async_trait; 21 use darkfi::{net, Result}; 22 use smol::Executor; 23 use std::sync::Arc; 24 use tracing::debug; 25 26 use crate::dchatmsg::{DchatMsg, DchatMsgsBuffer}; 27 28 pub struct ProtocolDchat { 29 jobsman: net::ProtocolJobsManagerPtr, 30 msg_sub: net::MessageSubscription<DchatMsg>, 31 msgs: DchatMsgsBuffer, 32 } 33 // ANCHOR_END: protocol_dchat 34 35 // ANCHOR: constructor 36 impl ProtocolDchat { 37 pub async fn init(channel: net::ChannelPtr, msgs: DchatMsgsBuffer) -> net::ProtocolBasePtr { 38 debug!(target: "dchat", "ProtocolDchat::init() [START]"); 39 let message_subsytem = channel.message_subsystem(); 40 message_subsytem.add_dispatch::<DchatMsg>().await; 41 42 let msg_sub = 43 channel.subscribe_msg::<DchatMsg>().await.expect("Missing DchatMsg dispatcher!"); 44 45 Arc::new(Self { 46 jobsman: net::ProtocolJobsManager::new("ProtocolDchat", channel.clone()), 47 msg_sub, 48 msgs, 49 }) 50 } 51 // ANCHOR_END: constructor 52 53 // ANCHOR: receive 54 async fn handle_receive_msg(self: Arc<Self>) -> Result<()> { 55 debug!(target: "dchat", "ProtocolDchat::handle_receive_msg() [START]"); 56 while let Ok(msg) = self.msg_sub.receive().await { 57 let msg = (*msg).to_owned(); 58 self.msgs.lock().await.push(msg); 59 } 60 61 Ok(()) 62 } 63 // ANCHOR_END: receive 64 } 65 66 #[async_trait] 67 impl net::ProtocolBase for ProtocolDchat { 68 // ANCHOR: start 69 async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> { 70 debug!(target: "dchat", "ProtocolDchat::ProtocolBase::start() [START]"); 71 self.jobsman.clone().start(executor.clone()); 72 self.jobsman.clone().spawn(self.clone().handle_receive_msg(), executor.clone()).await; 73 debug!(target: "dchat", "ProtocolDchat::ProtocolBase::start() [STOP]"); 74 Ok(()) 75 } 76 // ANCHOR_END: start 77 78 fn name(&self) -> &'static str { 79 "ProtocolDchat" 80 } 81 }