client.rs
1 use crate::{ 2 RUNTIME, 3 api::ClientProperties, 4 gateway::{ 5 compression::Decompressor, 6 opcode::OpCode, 7 payload::{Payload, incoming::hello::Hello}, 8 }, 9 }; 10 use async_tungstenite::{tokio::connect_async, tungstenite::Message}; 11 use futures_util::StreamExt; 12 use gpui::*; 13 use tokio::sync::mpsc; 14 15 pub struct Client { 16 client_properties: ClientProperties, 17 } 18 19 impl Global for Client {} 20 21 impl Client { 22 pub fn start_with(cx: &mut App, token: String) { 23 let (to_ws_tx, mut task_rx) = mpsc::channel::<Message>(32); 24 25 cx.spawn(|cx: &mut AsyncApp| { 26 let cx = cx.clone(); 27 28 async move { 29 let (cx_tx, cx_rx) = smol::channel::unbounded::<Message>(); 30 31 RUNTIME.spawn(async move { 32 let url = 33 "wss://gateway.discord.gg/?encoding=etf&v=9&compress=zstd-stream"; 34 println!("ws: connecting to {url}"); 35 36 let (ws_stream, _) = 37 connect_async(url).await.expect("ws: connection failed"); 38 39 println!("ws: connected"); 40 41 let (mut write, mut read) = ws_stream.split(); 42 43 loop { 44 tokio::select! { 45 Some(msg) = task_rx.recv() => { 46 println!("ws: sending {:?}", msg); 47 if write.send(msg).await.is_err() { 48 break; 49 } 50 } 51 52 incoming = read.next() => { 53 match incoming { 54 Some(Ok(msg)) => { 55 cx_tx.send(msg).await.ok(); 56 } 57 Some(Err(err)) => { 58 eprintln!("ws: {err}"); 59 break; 60 } 61 None => break, 62 } 63 } 64 } 65 } 66 67 println!("ws: disconnected"); 68 }); 69 70 // Send a test message 71 // ws_tx 72 // .send(Message::Text(r#"{"hello":"world"}"#.into())) 73 // .await 74 // .ok(); 75 76 let mut decompressor = Decompressor::new(); 77 78 while let Ok(msg) = cx_rx.recv().await { 79 match msg { 80 Message::Binary(msg) => { 81 let msg = decompressor.decompress(&msg)?; 82 let payload = erltf_serde::from_bytes::<Payload>(&msg) 83 .expect("decoding error"); 84 85 match payload.op { 86 OpCode::Hello => { 87 let payload: Hello = erltf_serde::from_term(&payload.d)?; 88 println!("{payload:#?}"); 89 } 90 _ => println!("ws: received unknown opcode {payload:#?}"), 91 } 92 } 93 _ => println!("ws: received {:?}", msg), 94 } 95 } 96 97 anyhow::Ok(()) 98 } 99 }) 100 .detach(); 101 } 102 } 103 104 pub fn init(cx: &mut App, client_properties: ClientProperties) { 105 cx.set_global(Client { client_properties }); 106 }