diff options
author | Paweł Dybiec <pawel@dybiec.info> | 2022-12-27 17:47:24 +0000 |
---|---|---|
committer | Paweł Dybiec <pawel@dybiec.info> | 2022-12-27 17:50:29 +0000 |
commit | c23b1ecb97ee16d4b2ecb2d9a97b39778d30f2ef (patch) | |
tree | aee90b17c0e4274ba4f9afbf6d67eac0d60fba76 | |
parent | Handle shutdown gracefully (diff) |
format
-rw-r--r-- | src/main.rs | 124 | ||||
-rw-r--r-- | src/mattermost/client.rs | 7 |
2 files changed, 72 insertions, 59 deletions
diff --git a/src/main.rs b/src/main.rs index da396c2..bc3b8b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,25 +6,28 @@ use tokio::{self}; use tracing::{debug, warn}; struct Vav { - db_connection: Option<sqlite::ConnectionWithFullMutex> + db_connection: Option<sqlite::ConnectionWithFullMutex>, } -impl Vav{ - fn new<T:AsRef<Path>>(path:Option<T>) -> Self { - let ret = Self{ - db_connection:path.and_then(|path|sqlite::Connection::open_with_full_mutex(path).ok()), +impl Vav { + fn new<T: AsRef<Path>>(path: Option<T>) -> Self { + let ret = Self { + db_connection: path + .and_then(|path| sqlite::Connection::open_with_full_mutex(path).ok()), }; - if let Some(connection) = &ret.db_connection{ - let create_table_res=connection.execute("CREATE TABLE keyval (key text NOT NULL PRIMARY KEY, value text NOT NULL)"); - if let Err(err) = create_table_res{ + if let Some(connection) = &ret.db_connection { + let create_table_res = connection.execute( + "CREATE TABLE keyval (key text NOT NULL PRIMARY KEY, value text NOT NULL)", + ); + if let Err(err) = create_table_res { warn!("Error while creating db {err}"); - } + } } ret } } -const INSERT_STATEMENT:&str = "INSERT INTO keyval (key,value) VALUES (?,?)"; -const SELECT_ONE:&str = "SELECT key,value FROM keyval where key=?"; -const SELECT_ALL:&str = "SELECT key,value FROM keyval"; +const INSERT_STATEMENT: &str = "INSERT INTO keyval (key,value) VALUES (?,?)"; +const SELECT_ONE: &str = "SELECT key,value FROM keyval where key=?"; +const SELECT_ALL: &str = "SELECT key,value FROM keyval"; #[async_trait::async_trait] impl mattermost::Handler for Vav { async fn handle( @@ -42,52 +45,56 @@ impl mattermost::Handler for Vav { if !message.starts_with('!') { return Ok(()); } - let message=message.strip_prefix('!').unwrap(); - let (message,rest) = message.split_once(' ').unwrap_or((message,"")); - match message{ - "store" =>{ + let message = message.strip_prefix('!').unwrap(); + let (message, rest) = message.split_once(' ').unwrap_or((message, "")); + match message { + "store" => { let message = rest; - let (name,value) =message.split_once(' ').ok_or(anyhow::anyhow!("missing value in command store"))?; - if let Some(connection) = &self.db_connection{ + let (name, value) = message + .split_once(' ') + .ok_or(anyhow::anyhow!("missing value in command store"))?; + if let Some(connection) = &self.db_connection { let mut statement = connection.prepare(INSERT_STATEMENT)?; - statement.bind((1,name))?; - statement.bind((2,value))?; - if let Err(err) = statement.next(){ + statement.bind((1, name))?; + statement.bind((2, value))?; + if let Err(err) = statement.next() { warn!("Error while writing to db {err}"); - } + } } - } - "lookup" =>{ + "lookup" => { let name = rest; - let response =if let Some(connection) = &self.db_connection{ + let response = if let Some(connection) = &self.db_connection { let mut statement = connection.prepare(SELECT_ONE)?; - statement.bind((1,name))?; - match statement.next(){ - Ok(sqlite::State::Done) => { - "no entry under that name".to_owned() - }, - Ok(sqlite::State::Row) => { - statement.read::<String,_>(1)? - }, + statement.bind((1, name))?; + match statement.next() { + Ok(sqlite::State::Done) => "no entry under that name".to_owned(), + Ok(sqlite::State::Row) => statement.read::<String, _>(1)?, Err(err) => { - warn!("Error while writing to db {err}"); - return Err(err.into())}, + warn!("Error while writing to db {err}"); + return Err(err.into()); + } } - } else {"uggh, no db".to_owned()}; - client.reply_to(posted.post,response).await?; + } else { + "uggh, no db".to_owned() + }; + client.reply_to(posted.post, response).await?; } - "list" =>{ - let response =if let Some(connection) = &self.db_connection{ + "list" => { + let response = if let Some(connection) = &self.db_connection { let mut statement = connection.prepare(SELECT_ALL)?; - let mut res =vec!["Stored keys:".to_owned()]; - while let Ok(result) = statement.next(){ - if result == sqlite::State::Done{ break;} - res.push(statement.read::<String,_>(0)?) + let mut res = vec!["Stored keys:".to_owned()]; + while let Ok(result) = statement.next() { + if result == sqlite::State::Done { + break; + } + res.push(statement.read::<String, _>(0)?) } res.join("\n") - } else {"uggh, no db".to_owned()}; - client.reply_to(posted.post,response).await?; + } else { + "uggh, no db".to_owned() + }; + client.reply_to(posted.post, response).await?; } _ => return Err(anyhow!("Unrecognized command {message}")), } @@ -105,18 +112,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl"); client.update_bearer_token().await?; { - let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel(); - let websocket_task =tokio::spawn( async move { - client.handle_websocket_stream(Vav::new(db),shutdown_recv).await}); - match tokio::signal::ctrl_c().await { - Ok(()) => {}, - Err(err) => { - eprintln!("Unable to listen for shutdown signal: {}", err); - // we also shut down in case of error - }, - } - shutdown_send.send(())?; - websocket_task.await??; + let (shutdown_send, shutdown_recv) = tokio::sync::mpsc::unbounded_channel(); + let websocket_task = tokio::spawn(async move { + client + .handle_websocket_stream(Vav::new(db), shutdown_recv) + .await + }); + match tokio::signal::ctrl_c().await { + Ok(()) => {} + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + // we also shut down in case of error + } + } + shutdown_send.send(())?; + websocket_task.await??; } Ok(()) } diff --git a/src/mattermost/client.rs b/src/mattermost/client.rs index 6025970..3d59837 100644 --- a/src/mattermost/client.rs +++ b/src/mattermost/client.rs @@ -56,7 +56,10 @@ impl Client { } async fn get_working_ws_stream( &self, - ) -> Result<async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>, anyhow::Error> { + ) -> Result< + async_tungstenite::WebSocketStream<async_tungstenite::tokio::ConnectStream>, + anyhow::Error, + > { let url = format!("{}/api/v4/websocket", self.url.replacen("http", "ws", 1)); let token = self .bearer_token @@ -76,7 +79,7 @@ impl Client { handler: T, mut shutdown: tokio::sync::mpsc::UnboundedReceiver<()>, ) -> Result<(), anyhow::Error> { - let mut ws_stream=self.get_working_ws_stream().await?; + let mut ws_stream = self.get_working_ws_stream().await?; loop { tokio::select! { message = ws_stream.next() => { |