-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode.rs
More file actions
59 lines (53 loc) · 1.8 KB
/
node.rs
File metadata and controls
59 lines (53 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/// This module contains an implementation of a single node.
/// The node keeps a state, wich could be updated by tcp requests.
use anyhow::Result;
use lib::command::CommandResult;
use lib::{command::ClientCommand, store::Store};
use log::error;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
#[derive(Clone)]
/// The node keep a key value store.
pub struct Node {
pub store: Store,
}
impl Node {
pub fn new() -> Self {
Self {
store: Store::new(".db_single_node").unwrap(),
}
}
/// Runs the node to process network messages incoming in the given receiver
pub async fn run(
&mut self,
mut client_receiver: Receiver<(ClientCommand, oneshot::Sender<CommandResult>)>,
) {
while let Some((message, reply_sender)) = client_receiver.recv().await {
let result = self
.handle_msg(message.clone())
.await
.map_err(|e| e.to_string());
if let Err(error) = reply_sender.send(result) {
error!("failed to send message {:?} response {:?}", message, error);
};
}
}
/// Process each messages coming from clients
pub async fn handle_msg(&mut self, message: ClientCommand) -> Result<Option<String>> {
match message {
ClientCommand::Set { key, value } => {
self.store
.write(key.clone().into(), value.clone().into())
.await?;
Ok(Some(value))
}
ClientCommand::Get { key } => {
if let Ok(Some(val)) = self.store.read(key.clone().into()).await {
let value = String::from_utf8(val)?;
return Ok(Some(value));
}
Ok(None)
}
}
}
}