diff options
author | Paweł Dybiec <pawel@dybiec.info> | 2022-12-27 17:43:38 +0000 |
---|---|---|
committer | Paweł Dybiec <pawel@dybiec.info> | 2022-12-27 17:50:24 +0000 |
commit | 82e78ef563dc2744c1e135ce0675361960747d4f (patch) | |
tree | 1ea041b958537872628e62eeeca111779d18d45c /src/mattermost | |
parent | Treat missing mentions as empty list (diff) |
Handle shutdown gracefully
Diffstat (limited to 'src/mattermost')
-rw-r--r-- | src/mattermost/client.rs | 67 |
1 files changed, 41 insertions, 26 deletions
diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs index abdf1ba..6025970 100644 --- a/src/mattermost/client.rs +++ b/src/mattermost/client.rs @@ -54,49 +54,64 @@ impl Client { debug!("Successfully updated bearer token"); Ok(()) } - pub(crate) async fn handle_websocket_stream<T: Handler>( - &mut self, - handler: T, - ) -> Result<(), anyhow::Error> { + async fn get_working_ws_stream( + &self, + ) -> Result<async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>, anyhow::Error> { let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); - let (mut ws_stream, _) = connect_async(url).await?; let token = self .bearer_token .clone() .ok_or(anyhow!("Missing bearer token"))?; + let (mut ws_stream, _) = connect_async(url).await?; ws_stream .send(Message::Text( json!({"seq":1,"action":"authentication_challenge","data":{"token":token}}) .to_string(), )) .await?; - while let Some(message) = ws_stream.next().await { - match message { - Ok(message) => match message { - Message::Text(text) => { - let json: Result<WebsocketMessage, _> = serde_json::from_str(&text); - match json { - Ok(websocketmessage) => { - if let WebsocketMessage::Update(update) = websocketmessage { - if let Err(err) = handler.handle(update, &self).await { - warn!("Handler returned error: {err}"); + Ok(ws_stream) + } + pub(crate) async fn handle_websocket_stream<T: Handler>( + &mut self, + handler: T, + mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>, + ) -> Result<(), anyhow::Error> { + let mut ws_stream=self.get_working_ws_stream().await?; + loop { + tokio::select! { + message = ws_stream.next() => { + match message { + Some(Ok(Message::Text(text))) => { + let json: Result<WebsocketMessage, _> = serde_json::from_str(&text); + match json { + Ok(websocketmessage) => { + if let WebsocketMessage::Update(update) = websocketmessage { + if let Err(err) = handler.handle(update, &self).await { + warn!("Handler returned error: {err}"); + } + } else { + debug!("Got only websocket reply. {websocketmessage:?}"); } - } else { - debug!("Got only websocket reply. {websocketmessage:?}"); } + Err(err) => warn!("Error while deserializing: {err} {text}"), } - Err(err) => warn!("Error while deserializing: {err} {text}"), - } - } - Message::Close(_) => break, - _ => { - debug!("Message: {message:?}"); + }, + Some(Ok(Message::Close(_))) => {}, + Some(Ok(_)) => { + debug!("Websocket message: {message:?}"); + }, + Some(Err(err)) => warn!("Error while reading message: {err} "), + None => { + ws_stream = self.get_working_ws_stream().await? + }, } - }, - Err(err) => warn!("Error {err} while reading message"), + } + _ = shutdown.recv() => { + debug!("Shutting down gracefully"); + return Ok(()) + } } } - Ok(()) } #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove pub(crate) async fn reply_to( |