diff options
-rw-r--r-- | flake.nix | 15 | ||||
-rw-r--r-- | src/main.rs | 21 | ||||
-rw-r--r-- | src/mattermost/client.rs | 10 |
3 files changed, 25 insertions, 21 deletions
diff --git a/flake.nix b/flake.nix index 2326526..6d49a6f 100644 --- a/flake.nix +++ b/flake.nix @@ -36,14 +36,15 @@ }; cargoArtifacts = craneLib.buildDepsOnly commonArgs; bin = craneLib.buildPackage (commonArgs // { inherit cargoArtifacts; }); - dockerImage = pkgs.dockerTools.streamLayeredImage { - name = "vavbot"; - tag = "latest"; - contents = [ bin pkgs.cacert ]; - config = { - Cmd = [ "${bin}/bin/vavbot" ]; + dockerImage = pkgs.dockerTools.streamLayeredImage + { + name = "vavbot"; + tag = "latest"; + contents = [ bin pkgs.cacert ]; + config = { + Cmd = [ "${bin}/bin/vavbot" ]; + }; }; - }; in with pkgs; { diff --git a/src/main.rs b/src/main.rs index 6ea162b..1dc8e63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -113,24 +113,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl"); client.update_bearer_token().await?; { - let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel(); + let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel(); + let (on_task_finish_send, on_task_finish_recv) = tokio::sync::oneshot::channel(); let websocket_task = tokio::spawn(async move { - client + let result = client .handle_websocket_stream(Vav::new(db), shutdown_recv) - .await + .await; + on_task_finish_send.send(result).unwrap(); }); { let mut sigterm = unix::signal(unix::SignalKind::terminate())?; let mut sigint = unix::signal(unix::SignalKind::interrupt())?; let mut sigquit = unix::signal(unix::SignalKind::quit())?; tokio::select! { - signal = sigterm.recv() => signal, - signal = sigint.recv() => signal, - signal = sigquit.recv() => signal, + _ = sigterm.recv() => {}, + _ = sigint.recv() => {}, + _ = sigquit.recv() => {}, + task_result = on_task_finish_recv => { + warn!("Task closed with result {:?}",task_result); + }, }; } - shutdown_send.send(())?; - websocket_task.await??; + _ = shutdown_send.send(()); + websocket_task.await?; } Ok(()) } diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs index 83aa5f6..861e0a8 100644 --- a/src/mattermost/client.rs +++ b/src/mattermost/client.rs @@ -78,7 +78,7 @@ impl Client { pub(crate) async fn handle_websocket_stream<T: Handler>( &mut self, handler: T, - mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>, + mut shutdown: tokio::sync::oneshot::Receiver<()>, ) -> Result<(), anyhow::Error> { let mut ws_stream = self.get_working_ws_stream().await?; loop { @@ -110,17 +110,15 @@ impl Client { debug!("Websocket message: {message:?}"); }, Some(Err(err)) =>{ - warn!("Error while reading message: {err} "); - if matches!(err,tungstenite::Error::Protocol(_)){ - ws_stream = self.get_working_ws_stream().await? - } + return Err(err.into()) } None => { + debug!("Stream closed, restarting"); ws_stream = self.get_working_ws_stream().await? }, } } - _ = shutdown.recv() => { + _ = &mut shutdown => { debug!("Shutting down gracefully"); return Ok(()) } |