summary refs log tree commit diff
diff options
context:
space:
mode:
authorPaweł Dybiec <pawel@dybiec.info>2023-08-27 12:50:31 +0100
committerPaweł Dybiec <pawel@dybiec.info>2023-08-27 12:50:31 +0100
commit29e78ee652a23da403385959dd618719e21a5c27 (patch)
treec0d35a9c7235bd54a114f38c486f749187f250a8
parentUse nix for docker builds (diff)
Terminate on connection error
-rw-r--r--flake.nix15
-rw-r--r--src/main.rs21
-rw-r--r--src/mattermost/client.rs10
3 files changed, 25 insertions, 21 deletions
diff --git a/flake.nix b/flake.nix
index 2326526..6d49a6f 100644
--- a/flake.nix
+++ b/flake.nix
@@ -36,14 +36,15 @@
           };
           cargoArtifacts = craneLib.buildDepsOnly commonArgs;
           bin = craneLib.buildPackage (commonArgs // { inherit cargoArtifacts; });
-          dockerImage = pkgs.dockerTools.streamLayeredImage {
-            name = "vavbot";
-            tag = "latest";
-            contents = [ bin pkgs.cacert ];
-            config = {
-              Cmd = [ "${bin}/bin/vavbot" ];
+          dockerImage = pkgs.dockerTools.streamLayeredImage
+            {
+              name = "vavbot";
+              tag = "latest";
+              contents = [ bin pkgs.cacert ];
+              config = {
+                Cmd = [ "${bin}/bin/vavbot" ];
+              };
             };
-          };
         in
         with pkgs;
         {
diff --git a/src/main.rs b/src/main.rs
index 6ea162b..1dc8e63 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -113,24 +113,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl");
     client.update_bearer_token().await?;
     {
-        let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel();
+        let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel();
+        let (on_task_finish_send, on_task_finish_recv) = tokio::sync::oneshot::channel();
         let websocket_task = tokio::spawn(async move {
-            client
+            let result = client
                 .handle_websocket_stream(Vav::new(db), shutdown_recv)
-                .await
+                .await;
+            on_task_finish_send.send(result).unwrap();
         });
         {
             let mut sigterm = unix::signal(unix::SignalKind::terminate())?;
             let mut sigint = unix::signal(unix::SignalKind::interrupt())?;
             let mut sigquit = unix::signal(unix::SignalKind::quit())?;
             tokio::select! {
-                signal = sigterm.recv() => signal,
-                signal = sigint.recv() => signal,
-                signal = sigquit.recv() => signal,
+                _ = sigterm.recv() => {},
+                _ = sigint.recv() => {},
+                _ = sigquit.recv() => {},
+                task_result = on_task_finish_recv => {
+                    warn!("Task closed with result {:?}",task_result);
+                },
             };
         }
-        shutdown_send.send(())?;
-        websocket_task.await??;
+        _ = shutdown_send.send(());
+        websocket_task.await?;
     }
     Ok(())
 }
diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs
index 83aa5f6..861e0a8 100644
--- a/src/mattermost/client.rs
+++ b/src/mattermost/client.rs
@@ -78,7 +78,7 @@ impl Client {
     pub(crate) async fn handle_websocket_stream<T: Handler>(
         &mut self,
         handler: T,
-        mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>,
+        mut shutdown: tokio::sync::oneshot::Receiver<()>,
     ) -> Result<(), anyhow::Error> {
         let mut ws_stream = self.get_working_ws_stream().await?;
         loop {
@@ -110,17 +110,15 @@ impl Client {
                             debug!("Websocket message: {message:?}");
                         },
                         Some(Err(err)) =>{
-                            warn!("Error while reading message: {err} ");
-                            if matches!(err,tungstenite::Error::Protocol(_)){
-                                ws_stream = self.get_working_ws_stream().await?
-                            }
+                            return Err(err.into())
                          }
                         None => {
+                            debug!("Stream closed, restarting");
                             ws_stream = self.get_working_ws_stream().await?
                         },
                     }
                 }
-                _ = shutdown.recv() => {
+                _ = &mut shutdown => {
                     debug!("Shutting down gracefully");
                     return Ok(())
                 }