summary refs log tree commit diff
path: root/src/mattermost/client.rs
blob: 6025970651a4904a932017a5372bd30fa5ea5cf5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use anyhow::anyhow;
use async_tungstenite::{tokio::connect_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tracing::{debug, warn};

use super::WebsocketMessage;
pub struct AuthData {
    pub(crate) login: String,
    pub(crate) password: String,
}
pub struct Client {
    pub(crate) auth: AuthData,
    pub(crate) url: String,
    bearer_token: Option<String>,
    client: reqwest::Client,
}

#[async_trait::async_trait]
pub trait Handler {
    async fn handle(
        &self,
        message: super::model::WebsocketUpdate,
        client: &Client,
    ) -> Result<(), anyhow::Error>;
}

impl Client {
    pub(crate) fn new(auth: AuthData, url: &str) -> Self {
        Self {
            auth,
            url: url.to_owned(),
            bearer_token: None,
            client: reqwest::Client::new(),
        }
    }
    pub(crate) async fn update_bearer_token(&mut self) -> Result<(), anyhow::Error> {
        debug!("Fetching bearer token from api");
        let response = self
            .client
            .post(format!("{}/api/v4/users/login", self.url))
            .json(&json!({
                "login_id":self.auth.login,
                "password":self.auth.password,
            }))
            .send()
            .await?;
        let token = response
            .headers()
            .get("token")
            .ok_or(anyhow!("Haven't received bearer token in headers"))?
            .to_str()?;
        self.bearer_token = Some(token.to_owned());
        debug!("Successfully updated bearer token");
        Ok(())
    }
    async fn get_working_ws_stream(
        &self,
    ) -> Result<async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>, anyhow::Error> {
        let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
        let token = self
            .bearer_token
            .clone()
            .ok_or(anyhow!("Missing bearer token"))?;
        let (mut ws_stream, _) = connect_async(url).await?;
        ws_stream
            .send(Message::Text(
                json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
                    .to_string(),
            ))
            .await?;
        Ok(ws_stream)
    }
    pub(crate) async fn handle_websocket_stream<T: Handler>(
        &mut self,
        handler: T,
        mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>,
    ) -> Result<(), anyhow::Error> {
        let mut ws_stream=self.get_working_ws_stream().await?;
        loop {
            tokio::select! {
                message = ws_stream.next() => {
                    match message {
                        Some(Ok(Message::Text(text))) => {
                            let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
                            match json {
                                Ok(websocketmessage) => {
                                    if let WebsocketMessage::Update(update) = websocketmessage {
                                        if let Err(err) = handler.handle(update, &self).await {
                                            warn!("Handler returned error: {err}");
                                        }
                                    } else {
                                        debug!("Got only websocket reply. {websocketmessage:?}");
                                    }
                                }
                                Err(err) => warn!("Error while deserializing: {err} {text}"),
                            }
                        },
                        Some(Ok(Message::Close(_))) => {},
                        Some(Ok(_)) => {
                            debug!("Websocket message: {message:?}");
                        },
                        Some(Err(err)) => warn!("Error while reading message: {err} "),
                        None => {
                            ws_stream = self.get_working_ws_stream().await?
                        },
                    }
                }
                _ = shutdown.recv() => {
                    debug!("Shutting down gracefully");
                    return Ok(())
                }
            }
        }
    }
    #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove
    pub(crate) async fn reply_to(
        &self,
        post: super::model::Post,
        reply: String,
    ) -> Result<(), anyhow::Error> {
        let token = self
            .bearer_token
            .clone()
            .ok_or(anyhow!("Missing bearer token"))?;
        let id = post.id;
        let root_id = post.root_id;
        let post_id = if root_id == "" { id } else { root_id };
        let channel_id = post.channel_id;

        debug!("Sending response");
        let response = self
            .client
            .post(format!("{}/api/v4/posts", self.url))
            .json(&json!({
                "channel_id":channel_id,
                "root_id":post_id,
                "message":&reply,
            }))
            .header("Authorization", format!("Bearer {token}"))
            .send()
            .await?;
        let status = response.status();
        let resp = response.text().await;
        if !status.is_success() {
            warn!("{:?}", resp);
            return Err(anyhow!("Error {}", status));
        }
        debug!("Sent response");
        Ok(())
    }
}