1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
use anyhow::anyhow;
use async_tungstenite::{tokio::connect_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};
pub struct AuthData {
pub(crate) login: String,
pub(crate) password: String,
}
pub struct Client {
pub(crate) auth: AuthData,
pub(crate) url: String,
bearer_token: Option<String>,
client: reqwest::Client,
}
#[async_trait::async_trait]
pub trait Handler {
async fn handle(
&self,
message: serde_json::Value,
client: &Client,
) -> Result<(), anyhow::Error>;
}
impl Client {
pub(crate) fn new(auth: AuthData, url: &str) -> Self {
Self {
auth,
url: url.to_owned(),
bearer_token: None,
client: reqwest::Client::new(),
}
}
pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> {
debug!("Fetching bearer token from api");
let response = self
.client
.post(format!("{}/api/v4/users/login", self.url))
.json(&json!({
"login_id":self.auth.login,
"password":self.auth.password,
}))
.send()
.await?;
let token = response
.headers()
.get("token")
.ok_or(anyhow!("Haven't received bearer token in headers"))?
.to_str()?;
self.bearer_token = Some(token.to_owned());
debug!("Successfully updated bearer token");
Ok(())
}
pub(crate) async fn handle_websocket_stream<T: Handler>(
&mut self,
handler: T,
) -> Result<(), anyhow::Error> {
let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
let (mut ws_stream, _) = connect_async(url).await?;
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
ws_stream
.send(Message::Text(
json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
.to_string(),
))
.await?;
while let Some(message) = ws_stream.next().await {
match message {
Ok(message) => match message {
Message::Text(text) => {
let json: Result<serde_json::Value, _> = serde_json::from_str(&text);
match json {
Ok(json) => {
// debug!("Got json: {json}");
if let Err(err) = handler.handle(json, &self).await {
warn!("Handler returned error: {err}");
};
}
Err(err) => warn!("Error while deserializing: {err}"),
}
}
Message::Close(_) => break,
_ => {
debug!("Message: {message:?}");
}
},
Err(err) => warn!("Error {err} while reading message"),
}
}
Ok(())
}
pub(crate) async fn reply_to(
&self,
post: &serde_json::Value,
reply: String,
) -> Result<(), anyhow::Error> {
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
let object = post
.as_object()
.ok_or(anyhow!("Response from mattermost should be object"))?;
let id = object
.get("id")
.ok_or(anyhow!("missing id"))?
.as_str()
.ok_or(anyhow!("id should be a string"))?;
let root_id = object
.get("root_id")
.ok_or(anyhow!("missing root_id"))?
.as_str()
.ok_or(anyhow!("root_id should be a string"))?;
let post_id = if root_id == "" { id } else { root_id };
let channel_id = object
.get("channel_id")
.ok_or(anyhow!("missing channel_id"))?
.as_str()
.ok_or(anyhow!("channel_id should be a string"))?;
debug!("Sending response");
let response = self
.client
.post(format!("{}/api/v4/posts", self.url))
.json(&json!({
"channel_id":channel_id,
"root_id":post_id,
"message":&reply,
}))
.header("Authorization", format!("Bearer {token}"))
.send()
.await?;
let status = response.status();
let resp = response.text().await;
if !status.is_success() {
warn!("{:?}", resp);
return Err(anyhow!("Error {}", status));
}
debug!("Sent response");
Ok(())
}
}
|