use anyhow::anyhow;
use async_tungstenite::{tokio::connect_async, tungstenite};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};
use super::WebsocketMessage;
pub struct AuthData {
pub(crate) login: String,
pub(crate) password: String,
}
pub struct Client {
pub(crate) auth: AuthData,
pub(crate) url: String,
bearer_token: Option<String>,
client: reqwest::Client,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client").field("url", &self.url).finish()
}
}
#[async_trait::async_trait]
pub trait Handler {
async fn handle(
&self,
message: super::model::WebsocketUpdate,
client: &Client,
) -> Result<(), anyhow::Error>;
}
impl Client {
pub(crate) fn new(auth: AuthData, url: String) -> Self {
Self {
auth,
url,
bearer_token: None,
client: reqwest::Client::new(),
}
}
pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> {
debug!("Fetching bearer token from api");
let response = self
.client
.post(format!("{}/api/v4/users/login", self.url))
.json(&json!({
"login_id":self.auth.login,
"password":self.auth.password,
}))
.send()
.await?;
let token = response
.headers()
.get("token")
.ok_or(anyhow!("Haven't received bearer token in headers"))?
.to_str()?;
self.bearer_token = Some(token.to_owned());
debug!("Successfully updated bearer token");
Ok(())
}
async fn get_working_ws_stream(
&self,
) -> Result<
async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>,
anyhow::Error,
> {
let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
let (mut ws_stream, _) = connect_async(url).await?;
ws_stream
.send(tungstenite::Message::Text(
json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
.to_string(),
))
.await?;
debug!("Started websocket stream");
Ok(ws_stream)
}
pub(crate) async fn handle_websocket_stream<T: Handler>(
&mut self,
handler: T,
mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), anyhow::Error> {
let mut ws_stream = self.get_working_ws_stream().await?;
loop {
tokio::select! {
message = ws_stream.next() => {
match message {
Some(Ok(tungstenite::Message::Text(text))) => {
let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
match json {
Ok(websocketmessage) => {
if let WebsocketMessage::Update(update) = websocketmessage {
debug!("Handling update {:?}", update);
if let Err(err) = handler.handle(update, &self).await {
warn!("Handler returned error: {err}");
}
} else {
debug!("Got only websocket reply. {websocketmessage:?}");
}
}
Err(err) => warn!("Error while deserializing: {err} {text}"),
}
},
Some(Ok(tungstenite::Message::Close(_))) => {},
Some(Ok(tungstenite::Message::Ping(vec))) => {
debug!("got Ping");
ws_stream.send(tungstenite::Message::Pong(vec)).await?;
debug!("sent Pong");
},
Some(Ok(_)) => {
debug!("Websocket message: {message:?}");
},
Some(Err(err)) =>{
return Err(err.into())
}
None => {
debug!("Stream closed, restarting");
ws_stream = self.get_working_ws_stream().await?
},
}
}
_ = &mut shutdown => {
debug!("Shutting down gracefully");
return Ok(())
}
}
}
}
pub(crate) async fn reply_to(
&self,
post: super::model::Post,
reply: String,
) -> Result<(), anyhow::Error> {
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
let id = post.id;
let root_id = post.root_id;
let post_id = if root_id == "" { id } else { root_id };
let channel_id = post.channel_id;
debug!("Sending response");
let response = self
.client
.post(format!("{}/api/v4/posts", self.url))
.json(&json!({
"channel_id":channel_id,
"root_id":post_id,
"message":&reply,
}))
.header("Authorization", format!("Bearer {token}"))
.send()
.await?;
let status = response.status();
let resp = response.text().await;
if !status.is_success() {
warn!("{:?}", resp);
return Err(anyhow!("Error {}", status));
}
debug!("Sent response");
Ok(())
}
}