1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
use anyhow::anyhow;
use async_tungstenite::{tokio::connect_async, tungstenite};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};
use super::WebsocketMessage;
pub struct AuthData {
pub(crate) login: String,
pub(crate) password: String,
}
pub struct Client {
pub(crate) auth: AuthData,
pub(crate) url: String,
bearer_token: Option<String>,
client: reqwest::Client,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client").field("url", &self.url).finish()
}
}
#[async_trait::async_trait]
pub trait Handler {
async fn handle(
&self,
message: super::model::WebsocketUpdate,
client: &Client,
) -> Result<(), anyhow::Error>;
}
impl Client {
pub(crate) fn new(auth: AuthData, url: String) -> Self {
Self {
auth,
url,
bearer_token: None,
client: reqwest::Client::new(),
}
}
pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> {
debug!("Fetching bearer token from api");
let response = self
.client
.post(format!("{}/api/v4/users/login", self.url))
.json(&json!({
"login_id":self.auth.login,
"password":self.auth.password,
}))
.send()
.await?;
let token = response
.headers()
.get("token")
.ok_or(anyhow!("Haven't received bearer token in headers"))?
.to_str()?;
self.bearer_token = Some(token.to_owned());
debug!("Successfully updated bearer token");
Ok(())
}
async fn get_working_ws_stream(
&self,
) -> Result<
async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>,
anyhow::Error,
> {
let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
let (mut ws_stream, _) = connect_async(url).await?;
ws_stream
.send(tungstenite::Message::Text(
json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
.to_string(),
))
.await?;
debug!("Started websocket stream");
Ok(ws_stream)
}
pub(crate) async fn handle_websocket_stream<T: Handler>(
&mut self,
handler: T,
mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), anyhow::Error> {
let mut ws_stream = self.get_working_ws_stream().await?;
loop {
tokio::select! {
message = ws_stream.next() => {
match message {
Some(Ok(tungstenite::Message::Text(text))) => {
let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
match json {
Ok(websocketmessage) => {
if let WebsocketMessage::Update(update) = websocketmessage {
debug!("Handling update {:?}", update);
if let Err(err) = handler.handle(update, &self).await {
warn!("Handler returned error: {err}");
}
} else {
debug!("Got only websocket reply. {websocketmessage:?}");
}
}
Err(err) => warn!("Error while deserializing: {err} {text}"),
}
},
Some(Ok(tungstenite::Message::Close(_))) => {},
Some(Ok(tungstenite::Message::Ping(vec))) => {
debug!("got Ping");
ws_stream.send(tungstenite::Message::Pong(vec)).await?;
debug!("sent Pong");
},
Some(Ok(_)) => {
debug!("Websocket message: {message:?}");
},
Some(Err(err)) =>{
return Err(err.into())
}
None => {
debug!("Stream closed, restarting");
ws_stream = self.get_working_ws_stream().await?
},
}
}
_ = &mut shutdown => {
debug!("Shutting down gracefully");
return Ok(())
}
}
}
}
pub(crate) async fn reply_to(
&self,
post: super::model::Post,
reply: String,
) -> Result<(), anyhow::Error> {
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
let id = post.id;
let root_id = post.root_id;
let post_id = if root_id == "" { id } else { root_id };
let channel_id = post.channel_id;
debug!("Sending response");
let response = self
.client
.post(format!("{}/api/v4/posts", self.url))
.json(&json!({
"channel_id":channel_id,
"root_id":post_id,
"message":&reply,
}))
.header("Authorization", format!("Bearer {token}"))
.send()
.await?;
let status = response.status();
let resp = response.text().await;
if !status.is_success() {
warn!("{:?}", resp);
return Err(anyhow!("Error {}", status));
}
debug!("Sent response");
Ok(())
}
}
|