use anyhow::anyhow; use async_tungstenite::{tokio::connect_async, tungstenite}; use futures_util::{SinkExt, StreamExt}; use serde_json::json; use tracing::{debug, warn}; use super::WebsocketMessage; pub struct AuthData { pub(crate) login: String, pub(crate) password: String, } pub struct Client { pub(crate) auth: AuthData, pub(crate) url: String, bearer_token: Option, client: reqwest::Client, } #[async_trait::async_trait] pub trait Handler { async fn handle( &self, message: super::model::WebsocketUpdate, client: &Client, ) -> Result<(), anyhow::Error>; } impl Client { pub(crate) fn new(auth: AuthData, url: &str) -> Self { Self { auth, url: url.to_owned(), bearer_token: None, client: reqwest::Client::new(), } } pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> { debug!("Fetching bearer token from api"); let response = self .client .post(format!("{}/api/v4/users/login", self.url)) .json(&json!({ "login_id":self.auth.login, "password":self.auth.password, })) .send() .await?; let token = response .headers() .get("token") .ok_or(anyhow!("Haven't received bearer token in headers"))? .to_str()?; self.bearer_token = Some(token.to_owned()); debug!("Successfully updated bearer token"); Ok(()) } async fn get_working_ws_stream( &self, ) -> Result< async_tungstenite::WebSocketStream, anyhow::Error, > { let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; let (mut ws_stream, _) = connect_async(url).await?; ws_stream .send(tungstenite::Message::Text( json!({"seq":1,"action":"authentication_challenge","data":{"token":token}}) .to_string(), )) .await?; debug!("Started websocket stream"); Ok(ws_stream) } pub(crate) async fn handle_websocket_stream( &mut self, handler: T, mut shutdown: tokio::sync::oneshot::Receiver<()>, ) -> Result<(), anyhow::Error> { let mut ws_stream = self.get_working_ws_stream().await?; loop { tokio::select! { message = ws_stream.next() => { match message { Some(Ok(tungstenite::Message::Text(text))) => { let json: Result = serde_json::from_str(&text); match json { Ok(websocketmessage) => { if let WebsocketMessage::Update(update) = websocketmessage { if let Err(err) = handler.handle(update, &self).await { warn!("Handler returned error: {err}"); } } else { debug!("Got only websocket reply. {websocketmessage:?}"); } } Err(err) => warn!("Error while deserializing: {err} {text}"), } }, Some(Ok(tungstenite::Message::Close(_))) => {}, Some(Ok(tungstenite::Message::Ping(vec))) => { debug!("got Ping"); ws_stream.send(tungstenite::Message::Pong(vec)).await?; debug!("sent Pong"); }, Some(Ok(_)) => { debug!("Websocket message: {message:?}"); }, Some(Err(err)) =>{ return Err(err.into()) } None => { debug!("Stream closed, restarting"); ws_stream = self.get_working_ws_stream().await? }, } } _ = &mut shutdown => { debug!("Shutting down gracefully"); return Ok(()) } } } } pub(crate) async fn reply_to( &self, post: super::model::Post, reply: String, ) -> Result<(), anyhow::Error> { let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; let id = post.id; let root_id = post.root_id; let post_id = if root_id == "" { id } else { root_id }; let channel_id = post.channel_id; debug!("Sending response"); let response = self .client .post(format!("{}/api/v4/posts", self.url)) .json(&json!({ "channel_id":channel_id, "root_id":post_id, "message":&reply, })) .header("Authorization", format!("Bearer {token}")) .send() .await?; let status = response.status(); let resp = response.text().await; if !status.is_success() { warn!("{:?}", resp); return Err(anyhow!("Error {}", status)); } debug!("Sent response"); Ok(()) } }