/ examples / ping_pong.rs
ping_pong.rs
  1  use std::sync::Arc;
  2  use std::time::Duration;
  3  
  4  use anyhow::{anyhow, Result};
  5  use async_trait::async_trait;
  6  use tracing::{info, warn};
  7  
  8  use royksopp::actor::{Actor, ActorContext, ActorSignal, Addr};
  9  use royksopp::registry::Registry;
 10  use royksopp::supervisor::{
 11      ChildRestart, ChildShutdown, ChildSpec, RestartStrategy, Service, ServiceEvent, Supervisor,
 12  };
 13  
 14  #[derive(Debug)]
 15  enum PingMsg {
 16      Pong(u64),
 17      PongEvent(ServiceEvent),
 18  }
 19  
 20  #[derive(Debug)]
 21  enum PongMsg {
 22      Ping { reply_to: Addr<PingMsg> },
 23  }
 24  
 25  struct Ping {
 26      registry: Arc<Registry>,
 27      pong: Service<PongMsg>,
 28      tick_every: Duration,
 29  }
 30  
 31  #[async_trait]
 32  impl Actor for Ping {
 33      type Msg = PingMsg;
 34  
 35      fn tick_interval(&self) -> Option<Duration> {
 36          Some(self.tick_every)
 37      }
 38  
 39      async fn started(&mut self, ctx: &mut ActorContext<Self::Msg>) -> Result<()> {
 40          // Monitor Pong via service events, forward into our mailbox.
 41          let mut ev_rx = self.pong.subscribe();
 42          let myself = ctx.myself.clone();
 43  
 44          tokio::spawn(async move {
 45              loop {
 46                  match ev_rx.recv().await {
 47                      Ok(ev) => {
 48                          if myself.send(PingMsg::PongEvent(ev)).await.is_err() {
 49                              break;
 50                          }
 51                      }
 52                      Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
 53                      Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
 54                  }
 55              }
 56          });
 57  
 58          Ok(())
 59      }
 60  
 61      async fn tick(&mut self, ctx: &mut ActorContext<Self::Msg>) -> Result<ActorSignal> {
 62          // Demonstrate whereis/registry (optional but useful).
 63          let pong = self
 64              .registry
 65              .whereis::<PongMsg>("pong")
 66              .unwrap_or_else(|| self.pong.clone());
 67  
 68          let reply_to = ctx.myself.clone();
 69          if let Err(e) = pong.send(PongMsg::Ping { reply_to }).await {
 70              warn!(ping = %ctx.name, "pong not ready yet: {:?}", e);
 71          }
 72          Ok(ActorSignal::Continue)
 73      }
 74  
 75      async fn handle(
 76          &mut self,
 77          msg: Self::Msg,
 78          ctx: &mut ActorContext<Self::Msg>,
 79      ) -> Result<ActorSignal> {
 80          match msg {
 81              PingMsg::Pong(n) => {
 82                  info!(ping = %ctx.name, "got pong #{n}");
 83                  Ok(ActorSignal::Continue)
 84              }
 85              PingMsg::PongEvent(ev) => {
 86                  match ev {
 87                      ServiceEvent::Up => info!(ping = %ctx.name, "pong is UP"),
 88                      ServiceEvent::Down(reason) => {
 89                          warn!(ping = %ctx.name, reason = ?reason, "pong is DOWN");
 90                          // "link"-ish behavior: crash if pong dies abnormally
 91                          if reason.is_abnormal() {
 92                              return Err(anyhow!("linked pong went down: {:?}", reason));
 93                          }
 94                      }
 95                  }
 96                  Ok(ActorSignal::Continue)
 97              }
 98          }
 99      }
100  }
101  
102  struct Pong {
103      count: u64,
104      crash_every: u64,
105  }
106  
107  #[async_trait]
108  impl Actor for Pong {
109      type Msg = PongMsg;
110  
111      async fn handle(
112          &mut self,
113          msg: Self::Msg,
114          _ctx: &mut ActorContext<Self::Msg>,
115      ) -> Result<ActorSignal> {
116          match msg {
117              PongMsg::Ping { reply_to } => {
118                  self.count += 1;
119  
120                  if self.crash_every != 0 && self.count % self.crash_every == 0 {
121                      return Err(anyhow!("intentional crash at count={}", self.count));
122                  }
123  
124                  let _ = reply_to.send(PingMsg::Pong(self.count)).await;
125                  Ok(ActorSignal::Continue)
126              }
127          }
128      }
129  }
130  
131  #[tokio::main]
132  async fn main() -> Result<()> {
133      tracing_subscriber::fmt().init();
134  
135      let registry = Arc::new(Registry::new());
136  
137      let (pong_service, pong_pub) = Service::new();
138      registry.register("pong", pong_service.clone()).ok();
139  
140      let pong_spec = ChildSpec::actor_with_service(
141          "pong",
142          Arc::new(|| Pong {
143              count: 0,
144              crash_every: 5,
145          }),
146          pong_pub,
147      )
148      .restart(ChildRestart::Permanent)
149      .shutdown(ChildShutdown::Timeout(Duration::from_millis(200)));
150  
151      let reg2 = registry.clone();
152      let pong2 = pong_service.clone();
153      let ping_spec = ChildSpec::actor(
154          "ping",
155          Arc::new(move || Ping {
156              registry: reg2.clone(),
157              pong: pong2.clone(),
158              tick_every: Duration::from_millis(250),
159          }),
160      )
161      .restart(ChildRestart::Permanent)
162      .shutdown(ChildShutdown::Timeout(Duration::from_millis(200)));
163  
164      let _root = Supervisor::new("root")
165          .strategy(RestartStrategy::OneForOne)
166          .child(pong_spec)
167          .child(ping_spec)
168          .spawn();
169  
170      tokio::time::sleep(Duration::from_secs(8)).await;
171      Ok(())
172  }