summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorPaweł Dybiec <pawel@dybiec.info>2022-12-27 17:43:38 +0000
committerPaweł Dybiec <pawel@dybiec.info>2022-12-27 17:50:24 +0000
commit82e78ef563dc2744c1e135ce0675361960747d4f (patch)
tree1ea041b958537872628e62eeeca111779d18d45c /src
parentTreat missing mentions as empty list (diff)
Handle shutdown gracefully
Diffstat (limited to 'src')
-rw-r--r--src/main.rs15
-rw-r--r--src/mattermost/client.rs67
2 files changed, 55 insertions, 27 deletions
diff --git a/src/main.rs b/src/main.rs
index c2d2be3..da396c2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -104,6 +104,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let auth = mattermost::AuthData { login, password };
     let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl");
     client.update_bearer_token().await?;
-    client.handle_websocket_stream(Vav::new(db)).await?;
+    {
+    let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel();
+    let websocket_task =tokio::spawn( async move {
+    client.handle_websocket_stream(Vav::new(db),shutdown_recv).await});
+    match tokio::signal::ctrl_c().await {
+        Ok(()) => {},
+        Err(err) => {
+            eprintln!("Unable to listen for shutdown signal: {}", err);
+            // we also shut down in case of error
+        },
+    }
+    shutdown_send.send(())?;
+    websocket_task.await??;
+    }
     Ok(())
 }
diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs
index abdf1ba..6025970 100644
--- a/src/mattermost/client.rs
+++ b/src/mattermost/client.rs
@@ -54,49 +54,64 @@ impl Client {
         debug!("Successfully updated bearer token");
         Ok(())
     }
-    pub(crate) async fn handle_websocket_stream<T: Handler>(
-        &mut self,
-        handler: T,
-    ) -> Result<(), anyhow::Error> {
+    async fn get_working_ws_stream(
+        &self,
+    ) -> Result<async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>, anyhow::Error> {
         let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
-        let (mut ws_stream, _) = connect_async(url).await?;
         let token = self
             .bearer_token
             .clone()
             .ok_or(anyhow!("Missing bearer token"))?;
+        let (mut ws_stream, _) = connect_async(url).await?;
         ws_stream
             .send(Message::Text(
                 json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
                     .to_string(),
             ))
             .await?;
-        while let Some(message) = ws_stream.next().await {
-            match message {
-                Ok(message) => match message {
-                    Message::Text(text) => {
-                        let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
-                        match json {
-                            Ok(websocketmessage) => {
-                                if let WebsocketMessage::Update(update) = websocketmessage {
-                                    if let Err(err) = handler.handle(update, &self).await {
-                                        warn!("Handler returned error: {err}");
+        Ok(ws_stream)
+    }
+    pub(crate) async fn handle_websocket_stream<T: Handler>(
+        &mut self,
+        handler: T,
+        mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>,
+    ) -> Result<(), anyhow::Error> {
+        let mut ws_stream=self.get_working_ws_stream().await?;
+        loop {
+            tokio::select! {
+                message = ws_stream.next() => {
+                    match message {
+                        Some(Ok(Message::Text(text))) => {
+                            let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
+                            match json {
+                                Ok(websocketmessage) => {
+                                    if let WebsocketMessage::Update(update) = websocketmessage {
+                                        if let Err(err) = handler.handle(update, &self).await {
+                                            warn!("Handler returned error: {err}");
+                                        }
+                                    } else {
+                                        debug!("Got only websocket reply. {websocketmessage:?}");
                                     }
-                                } else {
-                                    debug!("Got only websocket reply. {websocketmessage:?}");
                                 }
+                                Err(err) => warn!("Error while deserializing: {err} {text}"),
                             }
-                            Err(err) => warn!("Error while deserializing: {err} {text}"),
-                        }
-                    }
-                    Message::Close(_) => break,
-                    _ => {
-                        debug!("Message: {message:?}");
+                        },
+                        Some(Ok(Message::Close(_))) => {},
+                        Some(Ok(_)) => {
+                            debug!("Websocket message: {message:?}");
+                        },
+                        Some(Err(err)) => warn!("Error while reading message: {err} "),
+                        None => {
+                            ws_stream = self.get_working_ws_stream().await?
+                        },
                     }
-                },
-                Err(err) => warn!("Error {err} while reading message"),
+                }
+                _ = shutdown.recv() => {
+                    debug!("Shutting down gracefully");
+                    return Ok(())
+                }
             }
         }
-        Ok(())
     }
     #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove
     pub(crate) async fn reply_to(