summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs15
-rw-r--r--src/mattermost/client.rs67
2 files changed, 55 insertions, 27 deletions
diff --git a/src/main.rs b/src/main.rs
index c2d2be3..da396c2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -104,6 +104,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let auth = mattermost::AuthData { login, password };
let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl");
client.update_bearer_token().await?;
- client.handle_websocket_stream(Vav::new(db)).await?;
+ {
+ let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel();
+ let websocket_task =tokio::spawn( async move {
+ client.handle_websocket_stream(Vav::new(db),shutdown_recv).await});
+ match tokio::signal::ctrl_c().await {
+ Ok(()) => {},
+ Err(err) => {
+ eprintln!("Unable to listen for shutdown signal: {}", err);
+ // we also shut down in case of error
+ },
+ }
+ shutdown_send.send(())?;
+ websocket_task.await??;
+ }
Ok(())
}
diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs
index abdf1ba..6025970 100644
--- a/src/mattermost/client.rs
+++ b/src/mattermost/client.rs
@@ -54,49 +54,64 @@ impl Client {
debug!("Successfully updated bearer token");
Ok(())
}
- pub(crate) async fn handle_websocket_stream<T: Handler>(
- &mut self,
- handler: T,
- ) -> Result<(), anyhow::Error> {
+ async fn get_working_ws_stream(
+ &self,
+ ) -> Result<async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>, anyhow::Error> {
let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1));
- let (mut ws_stream, _) = connect_async(url).await?;
let token = self
.bearer_token
.clone()
.ok_or(anyhow!("Missing bearer token"))?;
+ let (mut ws_stream, _) = connect_async(url).await?;
ws_stream
.send(Message::Text(
json!({"seq":1,"action":"authentication_challenge","data":{"token":token}})
.to_string(),
))
.await?;
- while let Some(message) = ws_stream.next().await {
- match message {
- Ok(message) => match message {
- Message::Text(text) => {
- let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
- match json {
- Ok(websocketmessage) => {
- if let WebsocketMessage::Update(update) = websocketmessage {
- if let Err(err) = handler.handle(update, &self).await {
- warn!("Handler returned error: {err}");
+ Ok(ws_stream)
+ }
+ pub(crate) async fn handle_websocket_stream<T: Handler>(
+ &mut self,
+ handler: T,
+ mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>,
+ ) -> Result<(), anyhow::Error> {
+ let mut ws_stream=self.get_working_ws_stream().await?;
+ loop {
+ tokio::select! {
+ message = ws_stream.next() => {
+ match message {
+ Some(Ok(Message::Text(text))) => {
+ let json: Result<WebsocketMessage, _> = serde_json::from_str(&text);
+ match json {
+ Ok(websocketmessage) => {
+ if let WebsocketMessage::Update(update) = websocketmessage {
+ if let Err(err) = handler.handle(update, &self).await {
+ warn!("Handler returned error: {err}");
+ }
+ } else {
+ debug!("Got only websocket reply. {websocketmessage:?}");
}
- } else {
- debug!("Got only websocket reply. {websocketmessage:?}");
}
+ Err(err) => warn!("Error while deserializing: {err} {text}"),
}
- Err(err) => warn!("Error while deserializing: {err} {text}"),
- }
- }
- Message::Close(_) => break,
- _ => {
- debug!("Message: {message:?}");
+ },
+ Some(Ok(Message::Close(_))) => {},
+ Some(Ok(_)) => {
+ debug!("Websocket message: {message:?}");
+ },
+ Some(Err(err)) => warn!("Error while reading message: {err} "),
+ None => {
+ ws_stream = self.get_working_ws_stream().await?
+ },
}
- },
- Err(err) => warn!("Error {err} while reading message"),
+ }
+ _ = shutdown.recv() => {
+ debug!("Shutting down gracefully");
+ return Ok(())
+ }
}
}
- Ok(())
}
#[allow(unreachable_code, unused_variables)] //NOCOMMIT: remove
pub(crate) async fn reply_to(