From 82e78ef563dc2744c1e135ce0675361960747d4f Mon Sep 17 00:00:00 2001 From: Paweł Dybiec Date: Tue, 27 Dec 2022 17:43:38 +0000 Subject: Handle shutdown gracefully --- src/main.rs | 15 ++++++++++- src/mattermost/client.rs | 67 +++++++++++++++++++++++++++++------------------- 2 files changed, 55 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index c2d2be3..da396c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,6 +104,19 @@ async fn main() -> Result<(), Box> { let auth = mattermost::AuthData { login, password }; let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl"); client.update_bearer_token().await?; - client.handle_websocket_stream(Vav::new(db)).await?; + { + let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel(); + let websocket_task =tokio::spawn( async move { + client.handle_websocket_stream(Vav::new(db),shutdown_recv).await}); + match tokio::signal::ctrl_c().await { + Ok(()) => {}, + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + // we also shut down in case of error + }, + } + shutdown_send.send(())?; + websocket_task.await??; + } Ok(()) } diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs index abdf1ba..6025970 100644 --- a/src/mattermost/client.rs +++ b/src/mattermost/client.rs @@ -54,49 +54,64 @@ impl Client { debug!("Successfully updated bearer token"); Ok(()) } - pub(crate) async fn handle_websocket_stream( - &mut self, - handler: T, - ) -> Result<(), anyhow::Error> { + async fn get_working_ws_stream( + &self, + ) -> Result, anyhow::Error> { let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); - let (mut ws_stream, _) = connect_async(url).await?; let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; + let (mut ws_stream, _) = connect_async(url).await?; ws_stream .send(Message::Text( json!({"seq":1,"action":"authentication_challenge","data":{"token":token}}) .to_string(), )) .await?; - while let Some(message) = ws_stream.next().await { - match message { - Ok(message) => match message { - Message::Text(text) => { - let json: Result = serde_json::from_str(&text); - match json { - Ok(websocketmessage) => { - if let WebsocketMessage::Update(update) = websocketmessage { - if let Err(err) = handler.handle(update, &self).await { - warn!("Handler returned error: {err}"); + Ok(ws_stream) + } + pub(crate) async fn handle_websocket_stream( + &mut self, + handler: T, + mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>, + ) -> Result<(), anyhow::Error> { + let mut ws_stream=self.get_working_ws_stream().await?; + loop { + tokio::select! { + message = ws_stream.next() => { + match message { + Some(Ok(Message::Text(text))) => { + let json: Result = serde_json::from_str(&text); + match json { + Ok(websocketmessage) => { + if let WebsocketMessage::Update(update) = websocketmessage { + if let Err(err) = handler.handle(update, &self).await { + warn!("Handler returned error: {err}"); + } + } else { + debug!("Got only websocket reply. {websocketmessage:?}"); } - } else { - debug!("Got only websocket reply. {websocketmessage:?}"); } + Err(err) => warn!("Error while deserializing: {err} {text}"), } - Err(err) => warn!("Error while deserializing: {err} {text}"), - } - } - Message::Close(_) => break, - _ => { - debug!("Message: {message:?}"); + }, + Some(Ok(Message::Close(_))) => {}, + Some(Ok(_)) => { + debug!("Websocket message: {message:?}"); + }, + Some(Err(err)) => warn!("Error while reading message: {err} "), + None => { + ws_stream = self.get_working_ws_stream().await? + }, } - }, - Err(err) => warn!("Error {err} while reading message"), + } + _ = shutdown.recv() => { + debug!("Shutting down gracefully"); + return Ok(()) + } } } - Ok(()) } #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove pub(crate) async fn reply_to( -- cgit 1.4.1