diff options
author | Paweł Dybiec <pawel@dybiec.info> | 2022-12-26 20:26:37 +0000 |
---|---|---|
committer | Paweł Dybiec <pawel@dybiec.info> | 2022-12-26 22:34:24 +0000 |
commit | 73dbbabb69278f7784251737b28c4d2f19f93b58 (patch) | |
tree | ffe1a63ff8c7e5bd49298d75c0a3389fc68152f1 | |
parent | Add dockerfile and build/publish step for it (diff) |
Add basic model for messages + parsing
-rw-r--r-- | src/main.rs | 45 | ||||
-rw-r--r-- | src/mattermost/client.rs (renamed from src/mattermost.rs) | 45 | ||||
-rw-r--r-- | src/mattermost/mod.rs | 4 | ||||
-rw-r--r-- | src/mattermost/model.rs | 299 |
4 files changed, 331 insertions, 62 deletions
diff --git a/src/main.rs b/src/main.rs index 36dfbb8..b735100 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,54 +1,29 @@ mod mattermost; - use anyhow::anyhow; use tokio::{self}; -use tracing::{debug, info}; +use tracing::debug; struct Vav {} #[async_trait::async_trait] impl mattermost::Handler for Vav { async fn handle( &self, - value: serde_json::Value, + update: mattermost::model::WebsocketUpdate, client: &mattermost::Client, ) -> Result<(), anyhow::Error> { - debug!("{value:?}"); - let object = value - .as_object() - .ok_or(anyhow!("Response from mattermost should be object"))?; - if object - .get("event") - .ok_or(anyhow!("missing event type"))? - .as_str() - .ok_or(anyhow!("event type should be a string"))? - != "posted" - { - return Ok(()); - } - let post: serde_json::Value = serde_json::from_str( - object - .get("data") - .unwrap() - .get("post") - .ok_or(anyhow!("missing post field"))? - .as_str() - .ok_or(anyhow!("post should be string"))?, - )?; - debug!("Post field {post:?}"); - let object = post - .as_object() - .ok_or(anyhow!("Post field should represent legit object"))?; - let message = object - .get("message") - .ok_or(anyhow!("missing message"))? - .as_str() - .ok_or(anyhow!("message should be string"))?; + debug!("as json: {update:?}"); + + let posted = match update.update { + mattermost::model::Update::Posted(posted) => posted, + _ => return Ok(()), + }; + let message = &posted.post.message; if !message.starts_with('!') { return Ok(()); } let message = message.strip_prefix('!').unwrap(); match message{ - "adres" => client.reply_to(&post, "Uniwersytet Wrocławski Instytut Informatyki; pl. Frederyka Joliot-Curie 15; 50-383 Wrocław".to_owned()).await?, + "adres" => client.reply_to(posted.post, "Uniwersytet Wrocławski Instytut Informatyki; pl. Frederyka Joliot-Curie 15; 50-383 Wrocław".to_owned()).await?, _ => return Err(anyhow!("Unrecognized command {message}")), } Ok(()) diff --git a/src/mattermost.rs b/src/mattermost/client.rs index c8759c3..abdf1ba 100644 --- a/src/mattermost.rs +++ b/src/mattermost/client.rs @@ -3,6 +3,8 @@ use async_tungstenite::{tokio::connect_async, tungstenite::Message}; use futures_util::{SinkExt, StreamExt}; use serde_json::json; use tracing::{debug, warn}; + +use super::WebsocketMessage; pub struct AuthData { pub(crate) login: String, pub(crate) password: String, @@ -18,7 +20,7 @@ pub struct Client { pub trait Handler { async fn handle( &self, - message: serde_json::Value, + message: super::model::WebsocketUpdate, client: &Client, ) -> Result<(), anyhow::Error>; } @@ -72,15 +74,18 @@ impl Client { match message { Ok(message) => match message { Message::Text(text) => { - let json: Result<serde_json::Value, _> = serde_json::from_str(&text); + let json: Result<WebsocketMessage, _> = serde_json::from_str(&text); match json { - Ok(json) => { - // debug!("Got json: {json}"); - if let Err(err) = handler.handle(json, &self).await { - warn!("Handler returned error: {err}"); - }; + Ok(websocketmessage) => { + if let WebsocketMessage::Update(update) = websocketmessage { + if let Err(err) = handler.handle(update, &self).await { + warn!("Handler returned error: {err}"); + } + } else { + debug!("Got only websocket reply. {websocketmessage:?}"); + } } - Err(err) => warn!("Error while deserializing: {err}"), + Err(err) => warn!("Error while deserializing: {err} {text}"), } } Message::Close(_) => break, @@ -93,34 +98,20 @@ impl Client { } Ok(()) } + #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove pub(crate) async fn reply_to( &self, - post: &serde_json::Value, + post: super::model::Post, reply: String, ) -> Result<(), anyhow::Error> { let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; - let object = post - .as_object() - .ok_or(anyhow!("Response from mattermost should be object"))?; - let id = object - .get("id") - .ok_or(anyhow!("missing id"))? - .as_str() - .ok_or(anyhow!("id should be a string"))?; - let root_id = object - .get("root_id") - .ok_or(anyhow!("missing root_id"))? - .as_str() - .ok_or(anyhow!("root_id should be a string"))?; + let id = post.id; + let root_id = post.root_id; let post_id = if root_id == "" { id } else { root_id }; - let channel_id = object - .get("channel_id") - .ok_or(anyhow!("missing channel_id"))? - .as_str() - .ok_or(anyhow!("channel_id should be a string"))?; + let channel_id = post.channel_id; debug!("Sending response"); let response = self diff --git a/src/mattermost/mod.rs b/src/mattermost/mod.rs new file mode 100644 index 0000000..68c4465 --- /dev/null +++ b/src/mattermost/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod model; +pub use client::{AuthData, Client, Handler}; +pub use model::WebsocketMessage; diff --git a/src/mattermost/model.rs b/src/mattermost/model.rs new file mode 100644 index 0000000..9d4d78e --- /dev/null +++ b/src/mattermost/model.rs @@ -0,0 +1,299 @@ +use serde::Deserialize; +type PostId = String; +type UserId = String; + +#[derive(Deserialize, Debug, PartialEq, Eq)] +#[serde(untagged)] +pub enum WebsocketMessage { + Update(WebsocketUpdate), + Reply(WebsocketReply), +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct WebsocketReply { + pub status: String, + pub seq_reply: u32, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Broadcast { + pub channel_id: String, + pub connection_id: String, + pub omit_connection_id: String, + // omit_users: + pub team_id: String, + pub user_id: String, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct WebsocketUpdateStuff { + pub broadcast: Broadcast, + pub seq: u32, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct WebsocketUpdate { + #[serde(flatten)] + pub stuff: WebsocketUpdateStuff, + #[serde(flatten)] + pub update: Update, +} +#[derive(Deserialize, Debug, PartialEq, Eq)] +#[serde(tag = "event", content = "data")] +pub enum Update { + #[serde(rename = "posted")] + Posted(Posted), + #[serde(rename = "hello")] + Hello(Hello), + #[serde(rename = "status_change")] + StatusChange {}, + #[serde(rename = "typing")] + Typing {}, + #[serde(rename = "channel_viewed")] + ChannelViewed {}, + #[serde(other)] + Other, +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Posted { + pub channel_display_name: String, + pub channel_name: String, + pub channel_type: String, + #[serde(deserialize_with = "deserialize_json_field")] + pub mentions: Vec<String>, + #[serde(deserialize_with = "deserialize_json_field")] + pub post: Post, +} +fn deserialize_json_field<'de, D, T>(deserializer: D) -> Result<T, D::Error> +where + D: serde::Deserializer<'de>, + T: serde::de::DeserializeOwned, +{ + let buf: String = String::deserialize(deserializer)?; + serde_json::from_str(&buf).map_err(|x| { + serde::de::Error::custom(format!("Error while deserializing json encoded field {x}")) + }) +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Post { + pub id: String, + pub create_at: u64, + pub update_at: u64, + pub edit_at: u64, + pub delete_at: u64, + pub is_pinned: bool, + pub user_id: String, + pub channel_id: String, + // pub hashtags: String, + // pub last_reply_at: u64, + pub message: String, + // pub metadata:, + // pub original_id: String, + // pub participants:, + // pub pending_post_id: String, + // pub preops + // pub reply_count: u64, + pub root_id: String, + #[serde(rename = "type")] + pub ttype: String, +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Hello { + pub connection_id: String, + pub server_version: String, +} +#[cfg(test)] +mod tests { + use serde_json::Value; + + use crate::mattermost::model::{ + Broadcast, Post, Update, WebsocketUpdate, WebsocketUpdateStuff, + }; + #[test] + fn parse_post() { + let post = Post { + id: "a".to_owned(), + create_at: 1, + update_at: 2, + edit_at: 3, + delete_at: 4, + is_pinned: true, + user_id: "b".to_owned(), + channel_id: "c".to_owned(), + message: "d".to_owned(), + root_id: "e".to_owned(), + ttype: "f".to_owned(), + }; + let post_str = r#"{ + "id":"a", + "create_at":1, + "update_at":2, + "edit_at":3, + "delete_at":4, + "is_pinned":true, + "user_id":"b", + "channel_id":"c", + "message":"d", + "root_id":"e", + "type":"f" + }"#; + let d: Result<Post, _> = serde_json::from_str(post_str); + assert_eq!(d.unwrap(), post); + let data = format!( + "\"{}\"", + post_str.replace('\n', "\\n").replace('\"', "\\\"") + ); + let x: Value = serde_json::from_str(&data).unwrap(); + let s = x.as_str().unwrap(); + let d: Result<Post, _> = serde_json::from_str(s); + assert_eq!(d.unwrap(), post); + } + + #[test] + fn parse_update() { + let post = Post { + id: "a".to_owned(), + create_at: 1, + update_at: 2, + edit_at: 3, + delete_at: 4, + is_pinned: true, + user_id: "b".to_owned(), + channel_id: "c".to_owned(), + message: "d".to_owned(), + root_id: "e".to_owned(), + ttype: "f".to_owned(), + }; + let data = r#" + { + "data":{ + "connection_id": "aaaa", + "server_version": "2137" + }, + "event":"hello" + }"#; + let update: Result<Update, _> = serde_json::from_str(data); + assert_eq!( + update.unwrap(), + Update::Hello(super::Hello { + connection_id: "aaaa".to_owned(), + server_version: "2137".to_owned() + }) + ); + let data = r#" + { + "data":{ + "channel_display_name":"a", + "channel_name":"b", + "channel_type":"c", + "mentions":"[\"d\"]", + "post": "{\"id\":\"a\",\"create_at\":1,\"update_at\":2,\"edit_at\":3,\"delete_at\":4,\"is_pinned\":true,\"user_id\":\"b\",\"channel_id\":\"c\",\"message\":\"d\",\"root_id\":\"e\",\"type\":\"f\"}" + }, + "event":"posted" + }"#; + let update: Result<Update, _> = serde_json::from_str(&data); + assert_eq!( + update.unwrap(), + Update::Posted(super::Posted { + channel_display_name: "a".to_owned(), + channel_name: "b".to_owned(), + channel_type: "c".to_owned(), + mentions: vec!["d".to_owned()], + post: post + }) + ); + } + #[test] + fn parse_websocketupdate() { + let post = Post { + id: "a".to_owned(), + create_at: 1, + update_at: 2, + edit_at: 3, + delete_at: 4, + is_pinned: true, + user_id: "b".to_owned(), + channel_id: "c".to_owned(), + message: "d".to_owned(), + root_id: "e".to_owned(), + ttype: "f".to_owned(), + }; + let data = r#" + { + "broadcast":{ + "channel_id":"a", + "connection_id":"b", + "omit_connection_id":"c", + "team_id":"d", + "user_id":"e" + }, + "data":{ + "connection_id": "aaaa", + "server_version": "2137" + }, + "event":"hello", + "seq":2137 + }"#; + let update: Result<WebsocketUpdate, _> = serde_json::from_str(data); + assert_eq!( + update.unwrap(), + WebsocketUpdate { + stuff: WebsocketUpdateStuff { + broadcast: Broadcast { + channel_id: "a".to_owned(), + connection_id: "b".to_owned(), + omit_connection_id: "c".to_owned(), + team_id: "d".to_owned(), + user_id: "e".to_owned() + }, + seq: 2137, + }, + update: Update::Hello(super::Hello { + connection_id: "aaaa".to_owned(), + server_version: "2137".to_owned() + }) + } + ); + let data = r#" + { + "broadcast":{ + "channel_id":"a", + "connection_id":"b", + "omit_connection_id":"c", + "team_id":"d", + "user_id":"e" + }, + "data":{ + "channel_display_name":"a", + "channel_name":"b", + "channel_type":"c", + "mentions":"[\"d\"]", + "post": "{\"id\":\"a\",\"create_at\":1,\"update_at\":2,\"edit_at\":3,\"delete_at\":4,\"is_pinned\":true,\"user_id\":\"b\",\"channel_id\":\"c\",\"message\":\"d\",\"root_id\":\"e\",\"type\":\"f\"}" + }, + "event":"posted", + "seq": 2137 + }"#; + let update: Result<WebsocketUpdate, _> = serde_json::from_str(&data); + assert_eq!( + update.unwrap(), + WebsocketUpdate { + stuff: WebsocketUpdateStuff { + broadcast: Broadcast { + channel_id: "a".to_owned(), + connection_id: "b".to_owned(), + omit_connection_id: "c".to_owned(), + team_id: "d".to_owned(), + user_id: "e".to_owned() + }, + seq: 2137, + }, + update: Update::Posted(super::Posted { + channel_display_name: "a".to_owned(), + channel_name: "b".to_owned(), + channel_type: "c".to_owned(), + mentions: vec!["d".to_owned()], + post: post + }) + } + ); + } +} |