diff options
Diffstat (limited to 'src/mattermost')
-rw-r--r-- | src/mattermost/client.rs | 137 | ||||
-rw-r--r-- | src/mattermost/mod.rs | 4 | ||||
-rw-r--r-- | src/mattermost/model.rs | 299 |
3 files changed, 440 insertions, 0 deletions
diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs new file mode 100644 index 0000000..abdf1ba --- /dev/null +++ b/src/mattermost/client.rs @@ -0,0 +1,137 @@ +use anyhow::anyhow; +use async_tungstenite::{tokio::connect_async, tungstenite::Message}; +use futures_util::{SinkExt, StreamExt}; +use serde_json::json; +use tracing::{debug, warn}; + +use super::WebsocketMessage; +pub struct AuthData { + pub(crate) login: String, + pub(crate) password: String, +} +pub struct Client { + pub(crate) auth: AuthData, + pub(crate) url: String, + bearer_token: Option<String>, + client: reqwest::Client, +} + +#[async_trait::async_trait] +pub trait Handler { + async fn handle( + &self, + message: super::model::WebsocketUpdate, + client: &Client, + ) -> Result<(), anyhow::Error>; +} + +impl Client { + pub(crate) fn new(auth: AuthData, url: &str) -> Self { + Self { + auth, + url: url.to_owned(), + bearer_token: None, + client: reqwest::Client::new(), + } + } + pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> { + debug!("Fetching bearer token from api"); + let response = self + .client + .post(format!("{}/api/v4/users/login", self.url)) + .json(&json!({ + "login_id":self.auth.login, + "password":self.auth.password, + })) + .send() + .await?; + let token = response + .headers() + .get("token") + .ok_or(anyhow!("Haven't received bearer token in headers"))? + .to_str()?; + self.bearer_token = Some(token.to_owned()); + debug!("Successfully updated bearer token"); + Ok(()) + } + pub(crate) async fn handle_websocket_stream<T: Handler>( + &mut self, + handler: T, + ) -> Result<(), anyhow::Error> { + let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); + let (mut ws_stream, _) = connect_async(url).await?; + let token = self + .bearer_token + .clone() + .ok_or(anyhow!("Missing bearer token"))?; + ws_stream + .send(Message::Text( + json!({"seq":1,"action":"authentication_challenge","data":{"token":token}}) + .to_string(), + )) + .await?; + while let Some(message) = ws_stream.next().await { + match message { + Ok(message) => match message { + Message::Text(text) => { + let json: Result<WebsocketMessage, _> = serde_json::from_str(&text); + match json { + Ok(websocketmessage) => { + if let WebsocketMessage::Update(update) = websocketmessage { + if let Err(err) = handler.handle(update, &self).await { + warn!("Handler returned error: {err}"); + } + } else { + debug!("Got only websocket reply. {websocketmessage:?}"); + } + } + Err(err) => warn!("Error while deserializing: {err} {text}"), + } + } + Message::Close(_) => break, + _ => { + debug!("Message: {message:?}"); + } + }, + Err(err) => warn!("Error {err} while reading message"), + } + } + Ok(()) + } + #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove + pub(crate) async fn reply_to( + &self, + post: super::model::Post, + reply: String, + ) -> Result<(), anyhow::Error> { + let token = self + .bearer_token + .clone() + .ok_or(anyhow!("Missing bearer token"))?; + let id = post.id; + let root_id = post.root_id; + let post_id = if root_id == "" { id } else { root_id }; + let channel_id = post.channel_id; + + debug!("Sending response"); + let response = self + .client + .post(format!("{}/api/v4/posts", self.url)) + .json(&json!({ + "channel_id":channel_id, + "root_id":post_id, + "message":&reply, + })) + .header("Authorization", format!("Bearer {token}")) + .send() + .await?; + let status = response.status(); + let resp = response.text().await; + if !status.is_success() { + warn!("{:?}", resp); + return Err(anyhow!("Error {}", status)); + } + debug!("Sent response"); + Ok(()) + } +} diff --git a/src/mattermost/mod.rs b/src/mattermost/mod.rs new file mode 100644 index 0000000..68c4465 --- /dev/null +++ b/src/mattermost/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod model; +pub use client::{AuthData, Client, Handler}; +pub use model::WebsocketMessage; diff --git a/src/mattermost/model.rs b/src/mattermost/model.rs new file mode 100644 index 0000000..9d4d78e --- /dev/null +++ b/src/mattermost/model.rs @@ -0,0 +1,299 @@ +use serde::Deserialize; +type PostId = String; +type UserId = String; + +#[derive(Deserialize, Debug, PartialEq, Eq)] +#[serde(untagged)] +pub enum WebsocketMessage { + Update(WebsocketUpdate), + Reply(WebsocketReply), +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct WebsocketReply { + pub status: String, + pub seq_reply: u32, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Broadcast { + pub channel_id: String, + pub connection_id: String, + pub omit_connection_id: String, + // omit_users: + pub team_id: String, + pub user_id: String, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct WebsocketUpdateStuff { + pub broadcast: Broadcast, + pub seq: u32, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct WebsocketUpdate { + #[serde(flatten)] + pub stuff: WebsocketUpdateStuff, + #[serde(flatten)] + pub update: Update, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +#[serde(tag = "event", content = "data")] +pub enum Update { + #[serde(rename = "posted")] + Posted(Posted), + #[serde(rename = "hello")] + Hello(Hello), + #[serde(rename = "status_change")] + StatusChange {}, + #[serde(rename = "typing")] + Typing {}, + #[serde(rename = "channel_viewed")] + ChannelViewed {}, + #[serde(other)] + Other, +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Posted { + pub channel_display_name: String, + pub channel_name: String, + pub channel_type: String, + #[serde(deserialize_with = "deserialize_json_field")] + pub mentions: Vec<String>, + #[serde(deserialize_with = "deserialize_json_field")] + pub post: Post, +} +fn deserialize_json_field<'de, D, T>(deserializer: D) -> Result<T, D::Error> +where + D: serde::Deserializer<'de>, + T: serde::de::DeserializeOwned, +{ + let buf: String = String::deserialize(deserializer)?; + serde_json::from_str(&buf).map_err(|x| { + serde::de::Error::custom(format!("Error while deserializing json encoded field {x}")) + }) +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Post { + pub id: String, + pub create_at: u64, + pub update_at: u64, + pub edit_at: u64, + pub delete_at: u64, + pub is_pinned: bool, + pub user_id: String, + pub channel_id: String, + // pub hashtags: String, + // pub last_reply_at: u64, + pub message: String, + // pub metadata:, + // pub original_id: String, + // pub participants:, + // pub pending_post_id: String, + // pub preops + // pub reply_count: u64, + pub root_id: String, + #[serde(rename = "type")] + pub ttype: String, +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Hello { + pub connection_id: String, + pub server_version: String, +} +#[cfg(test)] +mod tests { + use serde_json::Value; + + use crate::mattermost::model::{ + Broadcast, Post, Update, WebsocketUpdate, WebsocketUpdateStuff, + }; + #[test] + fn parse_post() { + let post = Post { + id: "a".to_owned(), + create_at: 1, + update_at: 2, + edit_at: 3, + delete_at: 4, + is_pinned: true, + user_id: "b".to_owned(), + channel_id: "c".to_owned(), + message: "d".to_owned(), + root_id: "e".to_owned(), + ttype: "f".to_owned(), + }; + let post_str = r#"{ + "id":"a", + "create_at":1, + "update_at":2, + "edit_at":3, + "delete_at":4, + "is_pinned":true, + "user_id":"b", + "channel_id":"c", + "message":"d", + "root_id":"e", + "type":"f" + }"#; + let d: Result<Post, _> = serde_json::from_str(post_str); + assert_eq!(d.unwrap(), post); + let data = format!( + "\"{}\"", + post_str.replace('\n', "\\n").replace('\"', "\\\"") + ); + let x: Value = serde_json::from_str(&data).unwrap(); + let s = x.as_str().unwrap(); + let d: Result<Post, _> = serde_json::from_str(s); + assert_eq!(d.unwrap(), post); + } + + #[test] + fn parse_update() { + let post = Post { + id: "a".to_owned(), + create_at: 1, + update_at: 2, + edit_at: 3, + delete_at: 4, + is_pinned: true, + user_id: "b".to_owned(), + channel_id: "c".to_owned(), + message: "d".to_owned(), + root_id: "e".to_owned(), + ttype: "f".to_owned(), + }; + let data = r#" + { + "data":{ + "connection_id": "aaaa", + "server_version": "2137" + }, + "event":"hello" + }"#; + let update: Result<Update, _> = serde_json::from_str(data); + assert_eq!( + update.unwrap(), + Update::Hello(super::Hello { + connection_id: "aaaa".to_owned(), + server_version: "2137".to_owned() + }) + ); + let data = r#" + { + "data":{ + "channel_display_name":"a", + "channel_name":"b", + "channel_type":"c", + "mentions":"[\"d\"]", + "post": "{\"id\":\"a\",\"create_at\":1,\"update_at\":2,\"edit_at\":3,\"delete_at\":4,\"is_pinned\":true,\"user_id\":\"b\",\"channel_id\":\"c\",\"message\":\"d\",\"root_id\":\"e\",\"type\":\"f\"}" + }, + "event":"posted" + }"#; + let update: Result<Update, _> = serde_json::from_str(&data); + assert_eq!( + update.unwrap(), + Update::Posted(super::Posted { + channel_display_name: "a".to_owned(), + channel_name: "b".to_owned(), + channel_type: "c".to_owned(), + mentions: vec!["d".to_owned()], + post: post + }) + ); + } + #[test] + fn parse_websocketupdate() { + let post = Post { + id: "a".to_owned(), + create_at: 1, + update_at: 2, + edit_at: 3, + delete_at: 4, + is_pinned: true, + user_id: "b".to_owned(), + channel_id: "c".to_owned(), + message: "d".to_owned(), + root_id: "e".to_owned(), + ttype: "f".to_owned(), + }; + let data = r#" + { + "broadcast":{ + "channel_id":"a", + "connection_id":"b", + "omit_connection_id":"c", + "team_id":"d", + "user_id":"e" + }, + "data":{ + "connection_id": "aaaa", + "server_version": "2137" + }, + "event":"hello", + "seq":2137 + }"#; + let update: Result<WebsocketUpdate, _> = serde_json::from_str(data); + assert_eq!( + update.unwrap(), + WebsocketUpdate { + stuff: WebsocketUpdateStuff { + broadcast: Broadcast { + channel_id: "a".to_owned(), + connection_id: "b".to_owned(), + omit_connection_id: "c".to_owned(), + team_id: "d".to_owned(), + user_id: "e".to_owned() + }, + seq: 2137, + }, + update: Update::Hello(super::Hello { + connection_id: "aaaa".to_owned(), + server_version: "2137".to_owned() + }) + } + ); + let data = r#" + { + "broadcast":{ + "channel_id":"a", + "connection_id":"b", + "omit_connection_id":"c", + "team_id":"d", + "user_id":"e" + }, + "data":{ + "channel_display_name":"a", + "channel_name":"b", + "channel_type":"c", + "mentions":"[\"d\"]", + "post": "{\"id\":\"a\",\"create_at\":1,\"update_at\":2,\"edit_at\":3,\"delete_at\":4,\"is_pinned\":true,\"user_id\":\"b\",\"channel_id\":\"c\",\"message\":\"d\",\"root_id\":\"e\",\"type\":\"f\"}" + }, + "event":"posted", + "seq": 2137 + }"#; + let update: Result<WebsocketUpdate, _> = serde_json::from_str(&data); + assert_eq!( + update.unwrap(), + WebsocketUpdate { + stuff: WebsocketUpdateStuff { + broadcast: Broadcast { + channel_id: "a".to_owned(), + connection_id: "b".to_owned(), + omit_connection_id: "c".to_owned(), + team_id: "d".to_owned(), + user_id: "e".to_owned() + }, + seq: 2137, + }, + update: Update::Posted(super::Posted { + channel_display_name: "a".to_owned(), + channel_name: "b".to_owned(), + channel_type: "c".to_owned(), + mentions: vec!["d".to_owned()], + post: post + }) + } + ); + } +} |