1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
mod mattermost;
use std::path::Path;
use anyhow::anyhow;
use tokio::signal::unix;
use tokio::{self};
use tracing::{debug, warn};
struct Vav {
db_connection: Option<sqlite::ConnectionWithFullMutex>,
}
impl Vav {
fn new<T: AsRef<Path>>(path: Option<T>) -> Self {
let ret = Self {
db_connection: path
.and_then(|path| sqlite::Connection::open_with_full_mutex(path).ok()),
};
if let Some(connection) = &ret.db_connection {
let create_table_res = connection.execute(
"CREATE TABLE keyval (key text NOT NULL PRIMARY KEY, value text NOT NULL)",
);
if let Err(err) = create_table_res {
warn!("Error while creating db {err}");
}
}
ret
}
}
const INSERT_STATEMENT: &str = "INSERT INTO keyval (key,value) VALUES (?,?)";
const SELECT_ONE: &str = "SELECT key,value FROM keyval where key=?";
const SELECT_ALL: &str = "SELECT key,value FROM keyval";
#[async_trait::async_trait]
impl mattermost::Handler for Vav {
async fn handle(
&self,
update: mattermost::model::WebsocketUpdate,
client: &mattermost::Client,
) -> Result<(), anyhow::Error> {
debug!("as json: {update:?}");
let posted = match update.update {
mattermost::model::Update::Posted(posted) => posted,
_ => return Ok(()),
};
let message = &posted.post.message;
if !message.starts_with('!') {
return Ok(());
}
let message = message.strip_prefix('!').unwrap();
let (message, rest) = message.split_once(' ').unwrap_or((message, ""));
match message {
"store" => {
let message = rest;
let (name, value) = message
.split_once(' ')
.ok_or(anyhow::anyhow!("missing value in command store"))?;
if let Some(connection) = &self.db_connection {
let mut statement = connection.prepare(INSERT_STATEMENT)?;
statement.bind((1, name))?;
statement.bind((2, value))?;
if let Err(err) = statement.next() {
warn!("Error while writing to db {err}");
}
}
}
"lookup" => {
let name = rest;
let response = if let Some(connection) = &self.db_connection {
let mut statement = connection.prepare(SELECT_ONE)?;
statement.bind((1, name))?;
match statement.next() {
Ok(sqlite::State::Done) => "no entry under that name".to_owned(),
Ok(sqlite::State::Row) => statement.read::<String, _>(1)?,
Err(err) => {
warn!("Error while writing to db {err}");
return Err(err.into());
}
}
} else {
"uggh, no db".to_owned()
};
client.reply_to(posted.post, response).await?;
}
"list" => {
let response = if let Some(connection) = &self.db_connection {
let mut statement = connection.prepare(SELECT_ALL)?;
let mut res = vec!["Stored keys:".to_owned()];
while let Ok(result) = statement.next() {
if result == sqlite::State::Done {
break;
}
res.push(statement.read::<String, _>(0)?)
}
res.join("\n")
} else {
"uggh, no db".to_owned()
};
client.reply_to(posted.post, response).await?;
}
_ => return Err(anyhow!("Unrecognized command {message}")),
}
Ok(())
}
}
#[tokio::main(worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let login = std::env::var("USER_MAIL")?;
let password = std::env::var("USER_PASSWORD")?;
let db = std::env::var("SQLITE_PATH").ok();
let auth = mattermost::AuthData { login, password };
let mut client = mattermost::Client::new(auth, "https://mattermost.continuum.ii.uni.wroc.pl");
client.update_bearer_token().await?;
{
let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel();
let (on_task_finish_send, on_task_finish_recv) = tokio::sync::oneshot::channel();
let websocket_task = tokio::spawn(async move {
let result = client
.handle_websocket_stream(Vav::new(db), shutdown_recv)
.await;
on_task_finish_send.send(result).unwrap();
});
{
let mut sigterm = unix::signal(unix::SignalKind::terminate())?;
let mut sigint = unix::signal(unix::SignalKind::interrupt())?;
let mut sigquit = unix::signal(unix::SignalKind::quit())?;
tokio::select! {
_ = sigterm.recv() => {},
_ = sigint.recv() => {},
_ = sigquit.recv() => {},
task_result = on_task_finish_recv => {
warn!("Task closed with result {:?}",task_result);
},
};
}
_ = shutdown_send.send(());
websocket_task.await?;
}
Ok(())
}
|