summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPaweł Dybiec <pawel@dybiec.info>2022-12-26 20:26:37 +0000
committerPaweł Dybiec <pawel@dybiec.info>2022-12-26 22:34:24 +0000
commit73dbbabb69278f7784251737b28c4d2f19f93b58 (patch)
treeffe1a63ff8c7e5bd49298d75c0a3389fc68152f1 /src
parentAdd dockerfile and build/publish step for it (diff)
Add basic model for messages + parsing
Diffstat (limited to 'src')
-rw-r--r--src/main.rs45
-rw-r--r--src/mattermost/client.rs (renamed from src/mattermost.rs)45
-rw-r--r--src/mattermost/mod.rs4
-rw-r--r--src/mattermost/model.rs299
4 files changed, 331 insertions, 62 deletions
diff --git a/src/main.rs b/src/main.rs
index 36dfbb8..b735100 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,54 +1,29 @@
mod mattermost;
-
use anyhow::anyhow;
use tokio::{self};
-use tracing::{debug, info};
+use tracing::debug;
struct Vav {}
#[async_trait::async_trait]
impl mattermost::Handler for Vav {
async fn handle(
&self,
- value: serde_json::Value,
+ update: mattermost::model::WebsocketUpdate,
client: &mattermost::Client,
) -> Result<(), anyhow::Error> {
- debug!("{value:?}");
- let object = value
- .as_object()
- .ok_or(anyhow!("Response from mattermost should be object"))?;
- if object
- .get("event")
- .ok_or(anyhow!("missing event type"))?
- .as_str()
- .ok_or(anyhow!("event type should be a string"))?
- != "posted"
- {
- return Ok(());
- }
- let post: serde_json::Value = serde_json::from_str(
- object
- .get("data")
- .unwrap()
- .get("post")
- .ok_or(anyhow!("missing post field"))?
- .as_str()
- .ok_or(anyhow!("post should be string"))?,
- )?;
- debug!("Post field {post:?}");
- let object = post
- .as_object()
- .ok_or(anyhow!("Post field should represent legit object"))?;
- let message = object
- .get("message")
- .ok_or(anyhow!("missing message"))?
- .as_str()
- .ok_or(anyhow!("message should be string"))?;
+ debug!("as json: {update:?}");
+
+ let posted = match update.update {
+ mattermost::model::Update::Posted(posted) => posted,
+ _ => return Ok(()),
+ };
+ let message = &posted.post.message;
if !message.starts_with('!') {
return Ok(());
}
let message = message.strip_prefix('!').unwrap();
match message{
- "adres" => client.reply_to(&post, "Uniwersytet Wrocławski Instytut Informatyki; pl. Frederyka Joliot-Curie 15; 50-383 Wrocław".to_owned()).await?,
+ "adres" => client.reply_to(posted.post, "Uniwersytet Wrocławski Instytut Informatyki; pl. Frederyka Joliot-Curie 15; 50-383 Wrocław".to_owned()).await?,
_ => return Err(anyhow!("Unrecognized command {message}")),
}
Ok(())
diff --git a/src/mattermost.rs b/src/mattermost/client.rs
index c8759c3..abdf1ba 100644
--- a/src/mattermost.rs
+++ b/src/mattermost/client.rs
@@ -3,6 +3,8 @@ use async_tungstenite::{tokio::connect_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};
+
+use super::WebsocketMessage;
pub struct AuthData {
pub(crate) login: String,
pub(crate) password: String,
@@ -18,7 +20,7 @@ pub struct Client {
pub trait Handler {
async fn handle(
&self,
- message: serde_json::Value,
+ message: super::model::WebsocketUpdate,
client: &Client,
) -> Result<(), anyhow::Error>;
}
@@ -72,15 +74,18 @@ impl Client {
match message {
Ok(message) => match message {
Message::Text(text) => {
- let json: Result<serde_json::Value, _> = serde_json::from_str(&text);
+ let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
match json {
- Ok(json) => {
- // debug!("Got json: {json}");
- if let Err(err) = handler.handle(json, &self).await {
- warn!("Handler returned error: {err}");
- };
+ Ok(websocketmessage) => {
+ if let WebsocketMessage::Update(update) = websocketmessage {
+ if let Err(err) = handler.handle(update, &self).await {
+ warn!("Handler returned error: {err}");
+ }
+ } else {
+ debug!("Got only websocket reply. {websocketmessage:?}");
+ }
}
- Err(err) => warn!("Error while deserializing: {err}"),
+ Err(err) => warn!("Error while deserializing: {err} {text}"),
}
}
Message::Close(_) => break,
@@ -93,34 +98,20 @@ impl Client {
}
Ok(())
}
+ #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove
pub(crate) async fn reply_to(
&self,
- post: &serde_json::Value,
+ post: super::model::Post,
reply: String,
) -> Result<(), anyhow::Error> {
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
- let object = post
- .as_object()
- .ok_or(anyhow!("Response from mattermost should be object"))?;
- let id = object
- .get("id")
- .ok_or(anyhow!("missing id"))?
- .as_str()
- .ok_or(anyhow!("id should be a string"))?;
- let root_id = object
- .get("root_id")
- .ok_or(anyhow!("missing root_id"))?
- .as_str()
- .ok_or(anyhow!("root_id should be a string"))?;
+ let id = post.id;
+ let root_id = post.root_id;
let post_id = if root_id == "" { id } else { root_id };
- let channel_id = object
- .get("channel_id")
- .ok_or(anyhow!("missing channel_id"))?
- .as_str()
- .ok_or(anyhow!("channel_id should be a string"))?;
+ let channel_id = post.channel_id;
debug!("Sending response");
let response = self
diff --git a/src/mattermost/mod.rs b/src/mattermost/mod.rs
new file mode 100644
index 0000000..68c4465
--- /dev/null
+++ b/src/mattermost/mod.rs
@@ -0,0 +1,4 @@
+pub mod client;
+pub mod model;
+pub use client::{AuthData, Client, Handler};
+pub use model::WebsocketMessage;
diff --git a/src/mattermost/model.rs b/src/mattermost/model.rs
new file mode 100644
index 0000000..9d4d78e
--- /dev/null
+++ b/src/mattermost/model.rs
@@ -0,0 +1,299 @@
+use serde::Deserialize;
+type PostId = String;
+type UserId = String;
+
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[serde(untagged)]
+pub enum WebsocketMessage {
+ Update(WebsocketUpdate),
+ Reply(WebsocketReply),
+}
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct WebsocketReply {
+ pub status: String,
+ pub seq_reply: u32,
+}
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct Broadcast {
+ pub channel_id: String,
+ pub connection_id: String,
+ pub omit_connection_id: String,
+ // omit_users:
+ pub team_id: String,
+ pub user_id: String,
+}
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct WebsocketUpdateStuff {
+ pub broadcast: Broadcast,
+ pub seq: u32,
+}
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct WebsocketUpdate {
+ #[serde(flatten)]
+ pub stuff: WebsocketUpdateStuff,
+ #[serde(flatten)]
+ pub update: Update,
+}
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[serde(tag = "event", content = "data")]
+pub enum Update {
+ #[serde(rename = "posted")]
+ Posted(Posted),
+ #[serde(rename = "hello")]
+ Hello(Hello),
+ #[serde(rename = "status_change")]
+ StatusChange {},
+ #[serde(rename = "typing")]
+ Typing {},
+ #[serde(rename = "channel_viewed")]
+ ChannelViewed {},
+ #[serde(other)]
+ Other,
+}
+
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct Posted {
+ pub channel_display_name: String,
+ pub channel_name: String,
+ pub channel_type: String,
+ #[serde(deserialize_with = "deserialize_json_field")]
+ pub mentions: Vec<String>,
+ #[serde(deserialize_with = "deserialize_json_field")]
+ pub post: Post,
+}
+fn deserialize_json_field<'de, D, T>(deserializer: D) -> Result<T, D::Error>
+where
+ D: serde::Deserializer<'de>,
+ T: serde::de::DeserializeOwned,
+{
+ let buf: String = String::deserialize(deserializer)?;
+ serde_json::from_str(&buf).map_err(|x| {
+ serde::de::Error::custom(format!("Error while deserializing json encoded field {x}"))
+ })
+}
+
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct Post {
+ pub id: String,
+ pub create_at: u64,
+ pub update_at: u64,
+ pub edit_at: u64,
+ pub delete_at: u64,
+ pub is_pinned: bool,
+ pub user_id: String,
+ pub channel_id: String,
+ // pub hashtags: String,
+ // pub last_reply_at: u64,
+ pub message: String,
+ // pub metadata:,
+ // pub original_id: String,
+ // pub participants:,
+ // pub pending_post_id: String,
+ // pub preops
+ // pub reply_count: u64,
+ pub root_id: String,
+ #[serde(rename = "type")]
+ pub ttype: String,
+}
+
+#[derive(Deserialize, Debug, PartialEq, Eq)]
+pub struct Hello {
+ pub connection_id: String,
+ pub server_version: String,
+}
+#[cfg(test)]
+mod tests {
+ use serde_json::Value;
+
+ use crate::mattermost::model::{
+ Broadcast, Post, Update, WebsocketUpdate, WebsocketUpdateStuff,
+ };
+ #[test]
+ fn parse_post() {
+ let post = Post {
+ id: "a".to_owned(),
+ create_at: 1,
+ update_at: 2,
+ edit_at: 3,
+ delete_at: 4,
+ is_pinned: true,
+ user_id: "b".to_owned(),
+ channel_id: "c".to_owned(),
+ message: "d".to_owned(),
+ root_id: "e".to_owned(),
+ ttype: "f".to_owned(),
+ };
+ let post_str = r#"{
+ "id":"a",
+ "create_at":1,
+ "update_at":2,
+ "edit_at":3,
+ "delete_at":4,
+ "is_pinned":true,
+ "user_id":"b",
+ "channel_id":"c",
+ "message":"d",
+ "root_id":"e",
+ "type":"f"
+ }"#;
+ let d: Result<Post, _> = serde_json::from_str(post_str);
+ assert_eq!(d.unwrap(), post);
+ let data = format!(
+ "\"{}\"",
+ post_str.replace('\n', "\\n").replace('\"', "\\\"")
+ );
+ let x: Value = serde_json::from_str(&data).unwrap();
+ let s = x.as_str().unwrap();
+ let d: Result<Post, _> = serde_json::from_str(s);
+ assert_eq!(d.unwrap(), post);
+ }
+
+ #[test]
+ fn parse_update() {
+ let post = Post {
+ id: "a".to_owned(),
+ create_at: 1,
+ update_at: 2,
+ edit_at: 3,
+ delete_at: 4,
+ is_pinned: true,
+ user_id: "b".to_owned(),
+ channel_id: "c".to_owned(),
+ message: "d".to_owned(),
+ root_id: "e".to_owned(),
+ ttype: "f".to_owned(),
+ };
+ let data = r#"
+ {
+ "data":{
+ "connection_id": "aaaa",
+ "server_version": "2137"
+ },
+ "event":"hello"
+ }"#;
+ let update: Result<Update, _> = serde_json::from_str(data);
+ assert_eq!(
+ update.unwrap(),
+ Update::Hello(super::Hello {
+ connection_id: "aaaa".to_owned(),
+ server_version: "2137".to_owned()
+ })
+ );
+ let data = r#"
+ {
+ "data":{
+ "channel_display_name":"a",
+ "channel_name":"b",
+ "channel_type":"c",
+ "mentions":"[\"d\"]",
+ "post": "{\"id\":\"a\",\"create_at\":1,\"update_at\":2,\"edit_at\":3,\"delete_at\":4,\"is_pinned\":true,\"user_id\":\"b\",\"channel_id\":\"c\",\"message\":\"d\",\"root_id\":\"e\",\"type\":\"f\"}"
+ },
+ "event":"posted"
+ }"#;
+ let update: Result<Update, _> = serde_json::from_str(&data);
+ assert_eq!(
+ update.unwrap(),
+ Update::Posted(super::Posted {
+ channel_display_name: "a".to_owned(),
+ channel_name: "b".to_owned(),
+ channel_type: "c".to_owned(),
+ mentions: vec!["d".to_owned()],
+ post: post
+ })
+ );
+ }
+ #[test]
+ fn parse_websocketupdate() {
+ let post = Post {
+ id: "a".to_owned(),
+ create_at: 1,
+ update_at: 2,
+ edit_at: 3,
+ delete_at: 4,
+ is_pinned: true,
+ user_id: "b".to_owned(),
+ channel_id: "c".to_owned(),
+ message: "d".to_owned(),
+ root_id: "e".to_owned(),
+ ttype: "f".to_owned(),
+ };
+ let data = r#"
+ {
+ "broadcast":{
+ "channel_id":"a",
+ "connection_id":"b",
+ "omit_connection_id":"c",
+ "team_id":"d",
+ "user_id":"e"
+ },
+ "data":{
+ "connection_id": "aaaa",
+ "server_version": "2137"
+ },
+ "event":"hello",
+ "seq":2137
+ }"#;
+ let update: Result<WebsocketUpdate, _> = serde_json::from_str(data);
+ assert_eq!(
+ update.unwrap(),
+ WebsocketUpdate {
+ stuff: WebsocketUpdateStuff {
+ broadcast: Broadcast {
+ channel_id: "a".to_owned(),
+ connection_id: "b".to_owned(),
+ omit_connection_id: "c".to_owned(),
+ team_id: "d".to_owned(),
+ user_id: "e".to_owned()
+ },
+ seq: 2137,
+ },
+ update: Update::Hello(super::Hello {
+ connection_id: "aaaa".to_owned(),
+ server_version: "2137".to_owned()
+ })
+ }
+ );
+ let data = r#"
+ {
+ "broadcast":{
+ "channel_id":"a",
+ "connection_id":"b",
+ "omit_connection_id":"c",
+ "team_id":"d",
+ "user_id":"e"
+ },
+ "data":{
+ "channel_display_name":"a",
+ "channel_name":"b",
+ "channel_type":"c",
+ "mentions":"[\"d\"]",
+ "post": "{\"id\":\"a\",\"create_at\":1,\"update_at\":2,\"edit_at\":3,\"delete_at\":4,\"is_pinned\":true,\"user_id\":\"b\",\"channel_id\":\"c\",\"message\":\"d\",\"root_id\":\"e\",\"type\":\"f\"}"
+ },
+ "event":"posted",
+ "seq": 2137
+ }"#;
+ let update: Result<WebsocketUpdate, _> = serde_json::from_str(&data);
+ assert_eq!(
+ update.unwrap(),
+ WebsocketUpdate {
+ stuff: WebsocketUpdateStuff {
+ broadcast: Broadcast {
+ channel_id: "a".to_owned(),
+ connection_id: "b".to_owned(),
+ omit_connection_id: "c".to_owned(),
+ team_id: "d".to_owned(),
+ user_id: "e".to_owned()
+ },
+ seq: 2137,
+ },
+ update: Update::Posted(super::Posted {
+ channel_display_name: "a".to_owned(),
+ channel_name: "b".to_owned(),
+ channel_type: "c".to_owned(),
+ mentions: vec!["d".to_owned()],
+ post: post
+ })
+ }
+ );
+ }
+}