/ src / gateway / client.rs
client.rs
  1  use crate::{
  2    RUNTIME,
  3    api::ClientProperties,
  4    gateway::{
  5      compression::Decompressor,
  6      opcode::OpCode,
  7      payload::{Payload, incoming::hello::Hello},
  8    },
  9  };
 10  use async_tungstenite::{tokio::connect_async, tungstenite::Message};
 11  use futures_util::StreamExt;
 12  use gpui::*;
 13  use tokio::sync::mpsc;
 14  
 15  pub struct Client {
 16    client_properties: ClientProperties,
 17  }
 18  
 19  impl Global for Client {}
 20  
 21  impl Client {
 22    pub fn start_with(cx: &mut App, token: String) {
 23      let (to_ws_tx, mut task_rx) = mpsc::channel::<Message>(32);
 24  
 25      cx.spawn(|cx: &mut AsyncApp| {
 26        let cx = cx.clone();
 27  
 28        async move {
 29          let (cx_tx, cx_rx) = smol::channel::unbounded::<Message>();
 30  
 31          RUNTIME.spawn(async move {
 32            let url =
 33              "wss://gateway.discord.gg/?encoding=etf&v=9&compress=zstd-stream";
 34            println!("ws: connecting to {url}");
 35  
 36            let (ws_stream, _) =
 37              connect_async(url).await.expect("ws: connection failed");
 38  
 39            println!("ws: connected");
 40  
 41            let (mut write, mut read) = ws_stream.split();
 42  
 43            loop {
 44              tokio::select! {
 45                  Some(msg) = task_rx.recv() => {
 46                      println!("ws: sending {:?}", msg);
 47                      if write.send(msg).await.is_err() {
 48                          break;
 49                      }
 50                  }
 51  
 52                  incoming = read.next() => {
 53                      match incoming {
 54                          Some(Ok(msg)) => {
 55                              cx_tx.send(msg).await.ok();
 56                          }
 57                          Some(Err(err)) => {
 58                              eprintln!("ws: {err}");
 59                              break;
 60                          }
 61                          None => break,
 62                      }
 63                  }
 64              }
 65            }
 66  
 67            println!("ws: disconnected");
 68          });
 69  
 70          // Send a test message
 71          // ws_tx
 72          //   .send(Message::Text(r#"{"hello":"world"}"#.into()))
 73          //   .await
 74          //   .ok();
 75  
 76          let mut decompressor = Decompressor::new();
 77  
 78          while let Ok(msg) = cx_rx.recv().await {
 79            match msg {
 80              Message::Binary(msg) => {
 81                let msg = decompressor.decompress(&msg)?;
 82                let payload = erltf_serde::from_bytes::<Payload>(&msg)
 83                  .expect("decoding error");
 84  
 85                match payload.op {
 86                  OpCode::Hello => {
 87                    let payload: Hello = erltf_serde::from_term(&payload.d)?;
 88                    println!("{payload:#?}");
 89                  }
 90                  _ => println!("ws: received unknown opcode {payload:#?}"),
 91                }
 92              }
 93              _ => println!("ws: received {:?}", msg),
 94            }
 95          }
 96  
 97          anyhow::Ok(())
 98        }
 99      })
100      .detach();
101    }
102  }
103  
104  pub fn init(cx: &mut App, client_properties: ClientProperties) {
105    cx.set_global(Client { client_properties });
106  }