summary refs log tree commit diff
diff options
context:
space:
mode:
authorPaweł Dybiec <pawel@dybiec.info>2023-08-22 21:24:27 +0100
committerPaweł Dybiec <pawel@dybiec.info>2023-08-22 21:24:27 +0100
commitaeec9a347d1d6157a9d584dc34ae58e06a94c63a (patch)
tree7ee8fc8721c238a501cc83378df3445ebeb7b156
parentbump versions (diff)
Handle pings and protocol errors
-rw-r--r--src/mattermost/client.rs22
1 files changed, 16 insertions, 6 deletions
diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs
index 3d59837..83aa5f6 100644
--- a/src/mattermost/client.rs
+++ b/src/mattermost/client.rs
@@ -1,5 +1,5 @@
 use anyhow::anyhow;
-use async_tungstenite::{tokio::connect_async, tungstenite::Message};
+use async_tungstenite::{tokio::connect_async, tungstenite};
 use futures_util::{SinkExt, StreamExt};
 use serde_json::json;
 use tracing::{debug, warn};
@@ -67,11 +67,12 @@ impl Client {
             .ok_or(anyhow!("Missing bearer token"))?;
         let (mut ws_stream, _) = connect_async(url).await?;
         ws_stream
-            .send(Message::Text(
+            .send(tungstenite::Message::Text(
                 json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
                     .to_string(),
             ))
             .await?;
+        debug!("Started websocket stream");
         Ok(ws_stream)
     }
     pub(crate) async fn handle_websocket_stream<T: Handler>(
@@ -84,7 +85,7 @@ impl Client {
             tokio::select! {
                 message = ws_stream.next() => {
                     match message {
-                        Some(Ok(Message::Text(text))) => {
+                        Some(Ok(tungstenite::Message::Text(text))) => {
                             let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
                             match json {
                                 Ok(websocketmessage) => {
@@ -99,11 +100,21 @@ impl Client {
                                 Err(err) => warn!("Error while deserializing: {err} {text}"),
                             }
                         },
-                        Some(Ok(Message::Close(_))) => {},
+                        Some(Ok(tungstenite::Message::Close(_))) => {},
+                        Some(Ok(tungstenite::Message::Ping(vec))) => {
+                            debug!("got Ping");
+                            ws_stream.send(tungstenite::Message::Pong(vec)).await?;
+                            debug!("sent Pong");
+                        },
                         Some(Ok(_)) => {
                             debug!("Websocket message: {message:?}");
                         },
-                        Some(Err(err)) => warn!("Error while reading message: {err} "),
+                        Some(Err(err)) =>{
+                            warn!("Error while reading message: {err} ");
+                            if matches!(err,tungstenite::Error::Protocol(_)){
+                                ws_stream = self.get_working_ws_stream().await?
+                            }
+                         }
                         None => {
                             ws_stream = self.get_working_ws_stream().await?
                         },
@@ -116,7 +127,6 @@ impl Client {
             }
         }
     }
-    #[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove
     pub(crate) async fn reply_to(
         &self,
         post: super::model::Post,