use anyhow::anyhow; use async_tungstenite::{tokio::connect_async, tungstenite::Message}; use futures_util::{SinkExt, StreamExt}; use serde_json::json; use tracing::{debug, warn}; use super::WebsocketMessage; pub struct AuthData { pub(crate) login: String, pub(crate) password: String, } pub struct Client { pub(crate) auth: AuthData, pub(crate) url: String, bearer_token: Option, client: reqwest::Client, } #[async_trait::async_trait] pub trait Handler { async fn handle( &self, message: super::model::WebsocketUpdate, client: &Client, ) -> Result<(), anyhow::Error>; } impl Client { pub(crate) fn new(auth: AuthData, url: &str) -> Self { Self { auth, url: url.to_owned(), bearer_token: None, client: reqwest::Client::new(), } } pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> { debug!("Fetching bearer token from api"); let response = self .client .post(format!("{}/api/v4/users/login", self.url)) .json(&json!({ "login_id":self.auth.login, "password":self.auth.password, })) .send() .await?; let token = response .headers() .get("token") .ok_or(anyhow!("Haven't received bearer token in headers"))? .to_str()?; self.bearer_token = Some(token.to_owned()); debug!("Successfully updated bearer token"); Ok(()) } pub(crate) async fn handle_websocket_stream( &mut self, handler: T, ) -> Result<(), anyhow::Error> { let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); let (mut ws_stream, _) = connect_async(url).await?; let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; ws_stream .send(Message::Text( json!({"seq":1,"action":"authentication_challenge","data":{"token":token}}) .to_string(), )) .await?; while let Some(message) = ws_stream.next().await { match message { Ok(message) => match message { Message::Text(text) => { let json: Result = serde_json::from_str(&text); match json { Ok(websocketmessage) => { if let WebsocketMessage::Update(update) = websocketmessage { if let Err(err) = handler.handle(update, &self).await { warn!("Handler returned error: {err}"); } } else { debug!("Got only websocket reply. {websocketmessage:?}"); } } Err(err) => warn!("Error while deserializing: {err} {text}"), } } Message::Close(_) => break, _ => { debug!("Message: {message:?}"); } }, Err(err) => warn!("Error {err} while reading message"), } } Ok(()) } #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove pub(crate) async fn reply_to( &self, post: super::model::Post, reply: String, ) -> Result<(), anyhow::Error> { let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; let id = post.id; let root_id = post.root_id; let post_id = if root_id == "" { id } else { root_id }; let channel_id = post.channel_id; debug!("Sending response"); let response = self .client .post(format!("{}/api/v4/posts", self.url)) .json(&json!({ "channel_id":channel_id, "root_id":post_id, "message":&reply, })) .header("Authorization", format!("Bearer {token}")) .send() .await?; let status = response.status(); let resp = response.text().await; if !status.is_success() { warn!("{:?}", resp); return Err(anyhow!("Error {}", status)); } debug!("Sent response"); Ok(()) } }