1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
use anyhow::anyhow;
use async_tungstenite::{tokio::connect_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};
use super::WebsocketMessage;
pub struct AuthData {
pub(crate) login: String,
pub(crate) password: String,
}
pub struct Client {
pub(crate) auth: AuthData,
pub(crate) url: String,
bearer_token: Option<String>,
client: reqwest::Client,
}
#[async_trait::async_trait]
pub trait Handler {
async fn handle(
&self,
message: super::model::WebsocketUpdate,
client: &Client,
) -> Result<(), anyhow::Error>;
}
impl Client {
pub(crate) fn new(auth: AuthData, url: &str) -> Self {
Self {
auth,
url: url.to_owned(),
bearer_token: None,
client: reqwest::Client::new(),
}
}
pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> {
debug!("Fetching bearer token from api");
let response = self
.client
.post(format!("{}/api/v4/users/login", self.url))
.json(&json!({
"login_id":self.auth.login,
"password":self.auth.password,
}))
.send()
.await?;
let token = response
.headers()
.get("token")
.ok_or(anyhow!("Haven't received bearer token in headers"))?
.to_str()?;
self.bearer_token = Some(token.to_owned());
debug!("Successfully updated bearer token");
Ok(())
}
pub(crate) async fn handle_websocket_stream<T: Handler>(
&mut self,
handler: T,
) -> Result<(), anyhow::Error> {
let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
let (mut ws_stream, _) = connect_async(url).await?;
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
ws_stream
.send(Message::Text(
json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
.to_string(),
))
.await?;
while let Some(message) = ws_stream.next().await {
match message {
Ok(message) => match message {
Message::Text(text) => {
let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
match json {
Ok(websocketmessage) => {
if let WebsocketMessage::Update(update) = websocketmessage {
if let Err(err) = handler.handle(update, &self).await {
warn!("Handler returned error: {err}");
}
} else {
debug!("Got only websocket reply. {websocketmessage:?}");
}
}
Err(err) => warn!("Error while deserializing: {err} {text}"),
}
}
Message::Close(_) => break,
_ => {
debug!("Message: {message:?}");
}
},
Err(err) => warn!("Error {err} while reading message"),
}
}
Ok(())
}
#[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove
pub(crate) async fn reply_to(
&self,
post: super::model::Post,
reply: String,
) -> Result<(), anyhow::Error> {
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
let id = post.id;
let root_id = post.root_id;
let post_id = if root_id == "" { id } else { root_id };
let channel_id = post.channel_id;
debug!("Sending response");
let response = self
.client
.post(format!("{}/api/v4/posts", self.url))
.json(&json!({
"channel_id":channel_id,
"root_id":post_id,
"message":&reply,
}))
.header("Authorization", format!("Bearer {token}"))
.send()
.await?;
let status = response.status();
let resp = response.text().await;
if !status.is_success() {
warn!("{:?}", resp);
return Err(anyhow!("Error {}", status));
}
debug!("Sent response");
Ok(())
}
}
|