summaryrefslogblamecommitdiff
path: root/src/mattermost/client.rs
blob: 861e0a8e043a0c2c4e22bef2786e7e249c53ed3d (plain) (tree)
1
2
3
4
5
6
7
                   
                                                           

                                       
                           

                            









                                 


                           

                    
                                               

                                   

 




























                                                                                     

                                   



                                                                                    
                                                                                     



                                                     
                                                           
                 
                                             



                                                                                           
                                           




                                                            
                                                         
                                    
                                                                



                                               
                                                                       








                                                                                                 
                                     
                                 
                                                                                             
                             
                          





                                                                                   


                                                                     
                                          
                                                  
                          
                                 
                                                                

                                                                           
                     
                 
                                      


                                                       
             
         
     

                                 
                                 





                                                     

                                   
                                                               
                                         





















                                                               
 
use anyhow::anyhow;
use async_tungstenite::{tokio::connect_async, tungstenite};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};

use super::WebsocketMessage;
pub struct AuthData {
    pub(crate) login: String,
    pub(crate) password: String,
}
pub struct Client {
    pub(crate) auth: AuthData,
    pub(crate) url: String,
    bearer_token: Option<String>,
    client: reqwest::Client,
}

#[async_trait::async_trait]
pub trait Handler {
    async fn handle(
        &self,
        message: super::model::WebsocketUpdate,
        client: &Client,
    ) -> Result<(), anyhow::Error>;
}

impl Client {
    pub(crate) fn new(auth: AuthData, url: &str) -> Self {
        Self {
            auth,
            url: url.to_owned(),
            bearer_token: None,
            client: reqwest::Client::new(),
        }
    }
    pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> {
        debug!("Fetching bearer token from api");
        let response = self
            .client
            .post(format!("{}/api/v4/users/login", self.url))
            .json(&json!({
                "login_id":self.auth.login,
                "password":self.auth.password,
            }))
            .send()
            .await?;
        let token = response
            .headers()
            .get("token")
            .ok_or(anyhow!("Haven't received bearer token in headers"))?
            .to_str()?;
        self.bearer_token = Some(token.to_owned());
        debug!("Successfully updated bearer token");
        Ok(())
    }
    async fn get_working_ws_stream(
        &self,
    ) -> Result<
        async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>,
        anyhow::Error,
    > {
        let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
        let token = self
            .bearer_token
            .clone()
            .ok_or(anyhow!("Missing bearer token"))?;
        let (mut ws_stream, _) = connect_async(url).await?;
        ws_stream
            .send(tungstenite::Message::Text(
                json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
                    .to_string(),
            ))
            .await?;
        debug!("Started websocket stream");
        Ok(ws_stream)
    }
    pub(crate) async fn handle_websocket_stream<T: Handler>(
        &mut self,
        handler: T,
        mut shutdown: tokio::sync::oneshot::Receiver<()>,
    ) -> Result<(), anyhow::Error> {
        let mut ws_stream = self.get_working_ws_stream().await?;
        loop {
            tokio::select! {
                message = ws_stream.next() => {
                    match message {
                        Some(Ok(tungstenite::Message::Text(text))) => {
                            let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
                            match json {
                                Ok(websocketmessage) => {
                                    if let WebsocketMessage::Update(update) = websocketmessage {
                                        if let Err(err) = handler.handle(update, &self).await {
                                            warn!("Handler returned error: {err}");
                                        }
                                    } else {
                                        debug!("Got only websocket reply. {websocketmessage:?}");
                                    }
                                }
                                Err(err) => warn!("Error while deserializing: {err} {text}"),
                            }
                        },
                        Some(Ok(tungstenite::Message::Close(_))) => {},
                        Some(Ok(tungstenite::Message::Ping(vec))) => {
                            debug!("got Ping");
                            ws_stream.send(tungstenite::Message::Pong(vec)).await?;
                            debug!("sent Pong");
                        },
                        Some(Ok(_)) => {
                            debug!("Websocket message: {message:?}");
                        },
                        Some(Err(err)) =>{
                            return Err(err.into())
                         }
                        None => {
                            debug!("Stream closed, restarting");
                            ws_stream = self.get_working_ws_stream().await?
                        },
                    }
                }
                _ = &mut shutdown => {
                    debug!("Shutting down gracefully");
                    return Ok(())
                }
            }
        }
    }
    pub(crate) async fn reply_to(
        &self,
        post: super::model::Post,
        reply: String,
    ) -> Result<(), anyhow::Error> {
        let token = self
            .bearer_token
            .clone()
            .ok_or(anyhow!("Missing bearer token"))?;
        let id = post.id;
        let root_id = post.root_id;
        let post_id = if root_id == "" { id } else { root_id };
        let channel_id = post.channel_id;

        debug!("Sending response");
        let response = self
            .client
            .post(format!("{}/api/v4/posts", self.url))
            .json(&json!({
                "channel_id":channel_id,
                "root_id":post_id,
                "message":&reply,
            }))
            .header("Authorization", format!("Bearer {token}"))
            .send()
            .await?;
        let status = response.status();
        let resp = response.text().await;
        if !status.is_success() {
            warn!("{:?}", resp);
            return Err(anyhow!("Error {}", status));
        }
        debug!("Sent response");
        Ok(())
    }
}