ping_pong.rs
1 use std::sync::Arc; 2 use std::time::Duration; 3 4 use anyhow::{anyhow, Result}; 5 use async_trait::async_trait; 6 use tracing::{info, warn}; 7 8 use royksopp::actor::{Actor, ActorContext, ActorSignal, Addr}; 9 use royksopp::registry::Registry; 10 use royksopp::supervisor::{ 11 ChildRestart, ChildShutdown, ChildSpec, RestartStrategy, Service, ServiceEvent, Supervisor, 12 }; 13 14 #[derive(Debug)] 15 enum PingMsg { 16 Pong(u64), 17 PongEvent(ServiceEvent), 18 } 19 20 #[derive(Debug)] 21 enum PongMsg { 22 Ping { reply_to: Addr<PingMsg> }, 23 } 24 25 struct Ping { 26 registry: Arc<Registry>, 27 pong: Service<PongMsg>, 28 tick_every: Duration, 29 } 30 31 #[async_trait] 32 impl Actor for Ping { 33 type Msg = PingMsg; 34 35 fn tick_interval(&self) -> Option<Duration> { 36 Some(self.tick_every) 37 } 38 39 async fn started(&mut self, ctx: &mut ActorContext<Self::Msg>) -> Result<()> { 40 // Monitor Pong via service events, forward into our mailbox. 41 let mut ev_rx = self.pong.subscribe(); 42 let myself = ctx.myself.clone(); 43 44 tokio::spawn(async move { 45 loop { 46 match ev_rx.recv().await { 47 Ok(ev) => { 48 if myself.send(PingMsg::PongEvent(ev)).await.is_err() { 49 break; 50 } 51 } 52 Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 53 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, 54 } 55 } 56 }); 57 58 Ok(()) 59 } 60 61 async fn tick(&mut self, ctx: &mut ActorContext<Self::Msg>) -> Result<ActorSignal> { 62 // Demonstrate whereis/registry (optional but useful). 63 let pong = self 64 .registry 65 .whereis::<PongMsg>("pong") 66 .unwrap_or_else(|| self.pong.clone()); 67 68 let reply_to = ctx.myself.clone(); 69 if let Err(e) = pong.send(PongMsg::Ping { reply_to }).await { 70 warn!(ping = %ctx.name, "pong not ready yet: {:?}", e); 71 } 72 Ok(ActorSignal::Continue) 73 } 74 75 async fn handle( 76 &mut self, 77 msg: Self::Msg, 78 ctx: &mut ActorContext<Self::Msg>, 79 ) -> Result<ActorSignal> { 80 match msg { 81 PingMsg::Pong(n) => { 82 info!(ping = %ctx.name, "got pong #{n}"); 83 Ok(ActorSignal::Continue) 84 } 85 PingMsg::PongEvent(ev) => { 86 match ev { 87 ServiceEvent::Up => info!(ping = %ctx.name, "pong is UP"), 88 ServiceEvent::Down(reason) => { 89 warn!(ping = %ctx.name, reason = ?reason, "pong is DOWN"); 90 // "link"-ish behavior: crash if pong dies abnormally 91 if reason.is_abnormal() { 92 return Err(anyhow!("linked pong went down: {:?}", reason)); 93 } 94 } 95 } 96 Ok(ActorSignal::Continue) 97 } 98 } 99 } 100 } 101 102 struct Pong { 103 count: u64, 104 crash_every: u64, 105 } 106 107 #[async_trait] 108 impl Actor for Pong { 109 type Msg = PongMsg; 110 111 async fn handle( 112 &mut self, 113 msg: Self::Msg, 114 _ctx: &mut ActorContext<Self::Msg>, 115 ) -> Result<ActorSignal> { 116 match msg { 117 PongMsg::Ping { reply_to } => { 118 self.count += 1; 119 120 if self.crash_every != 0 && self.count % self.crash_every == 0 { 121 return Err(anyhow!("intentional crash at count={}", self.count)); 122 } 123 124 let _ = reply_to.send(PingMsg::Pong(self.count)).await; 125 Ok(ActorSignal::Continue) 126 } 127 } 128 } 129 } 130 131 #[tokio::main] 132 async fn main() -> Result<()> { 133 tracing_subscriber::fmt().init(); 134 135 let registry = Arc::new(Registry::new()); 136 137 let (pong_service, pong_pub) = Service::new(); 138 registry.register("pong", pong_service.clone()).ok(); 139 140 let pong_spec = ChildSpec::actor_with_service( 141 "pong", 142 Arc::new(|| Pong { 143 count: 0, 144 crash_every: 5, 145 }), 146 pong_pub, 147 ) 148 .restart(ChildRestart::Permanent) 149 .shutdown(ChildShutdown::Timeout(Duration::from_millis(200))); 150 151 let reg2 = registry.clone(); 152 let pong2 = pong_service.clone(); 153 let ping_spec = ChildSpec::actor( 154 "ping", 155 Arc::new(move || Ping { 156 registry: reg2.clone(), 157 pong: pong2.clone(), 158 tick_every: Duration::from_millis(250), 159 }), 160 ) 161 .restart(ChildRestart::Permanent) 162 .shutdown(ChildShutdown::Timeout(Duration::from_millis(200))); 163 164 let _root = Supervisor::new("root") 165 .strategy(RestartStrategy::OneForOne) 166 .child(pong_spec) 167 .child(ping_spec) 168 .spawn(); 169 170 tokio::time::sleep(Duration::from_secs(8)).await; 171 Ok(()) 172 }