From 73dbbabb69278f7784251737b28c4d2f19f93b58 Mon Sep 17 00:00:00 2001 From: Paweł Dybiec Date: Mon, 26 Dec 2022 20:26:37 +0000 Subject: Add basic model for messages + parsing --- src/mattermost/client.rs | 137 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 src/mattermost/client.rs (limited to 'src/mattermost/client.rs') diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs new file mode 100644 index 0000000..abdf1ba --- /dev/null +++ b/src/mattermost/client.rs @@ -0,0 +1,137 @@ +use anyhow::anyhow; +use async_tungstenite::{tokio::connect_async, tungstenite::Message}; +use futures_util::{SinkExt, StreamExt}; +use serde_json::json; +use tracing::{debug, warn}; + +use super::WebsocketMessage; +pub struct AuthData { + pub(crate) login: String, + pub(crate) password: String, +} +pub struct Client { + pub(crate) auth: AuthData, + pub(crate) url: String, + bearer_token: Option, + client: reqwest::Client, +} + +#[async_trait::async_trait] +pub trait Handler { + async fn handle( + &self, + message: super::model::WebsocketUpdate, + client: &Client, + ) -> Result<(), anyhow::Error>; +} + +impl Client { + pub(crate) fn new(auth: AuthData, url: &str) -> Self { + Self { + auth, + url: url.to_owned(), + bearer_token: None, + client: reqwest::Client::new(), + } + } + pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> { + debug!("Fetching bearer token from api"); + let response = self + .client + .post(format!("{}/api/v4/users/login", self.url)) + .json(&json!({ + "login_id":self.auth.login, + "password":self.auth.password, + })) + .send() + .await?; + let token = response + .headers() + .get("token") + .ok_or(anyhow!("Haven't received bearer token in headers"))? + .to_str()?; + self.bearer_token = Some(token.to_owned()); + debug!("Successfully updated bearer token"); + Ok(()) + } + pub(crate) async fn handle_websocket_stream( + &mut self, + handler: T, + ) -> Result<(), anyhow::Error> { + let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); + let (mut ws_stream, _) = connect_async(url).await?; + let token = self + .bearer_token + .clone() + .ok_or(anyhow!("Missing bearer token"))?; + ws_stream + .send(Message::Text( + json!({"seq":1,"action":"authentication_challenge","data":{"token":token}}) + .to_string(), + )) + .await?; + while let Some(message) = ws_stream.next().await { + match message { + Ok(message) => match message { + Message::Text(text) => { + let json: Result = serde_json::from_str(&text); + match json { + Ok(websocketmessage) => { + if let WebsocketMessage::Update(update) = websocketmessage { + if let Err(err) = handler.handle(update, &self).await { + warn!("Handler returned error: {err}"); + } + } else { + debug!("Got only websocket reply. {websocketmessage:?}"); + } + } + Err(err) => warn!("Error while deserializing: {err} {text}"), + } + } + Message::Close(_) => break, + _ => { + debug!("Message: {message:?}"); + } + }, + Err(err) => warn!("Error {err} while reading message"), + } + } + Ok(()) + } + #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove + pub(crate) async fn reply_to( + &self, + post: super::model::Post, + reply: String, + ) -> Result<(), anyhow::Error> { + let token = self + .bearer_token + .clone() + .ok_or(anyhow!("Missing bearer token"))?; + let id = post.id; + let root_id = post.root_id; + let post_id = if root_id == "" { id } else { root_id }; + let channel_id = post.channel_id; + + debug!("Sending response"); + let response = self + .client + .post(format!("{}/api/v4/posts", self.url)) + .json(&json!({ + "channel_id":channel_id, + "root_id":post_id, + "message":&reply, + })) + .header("Authorization", format!("Bearer {token}")) + .send() + .await?; + let status = response.status(); + let resp = response.text().await; + if !status.is_success() { + warn!("{:?}", resp); + return Err(anyhow!("Error {}", status)); + } + debug!("Sent response"); + Ok(()) + } +} -- cgit 1.4.1