summaryrefslogtreecommitdiffstats
path: root/store/src/backend/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'store/src/backend/mod.rs')
-rw-r--r--store/src/backend/mod.rs213
1 files changed, 87 insertions, 126 deletions
diff --git a/store/src/backend/mod.rs b/store/src/backend/mod.rs
index d9a88373..2a18f9a0 100644
--- a/store/src/backend/mod.rs
+++ b/store/src/backend/mod.rs
@@ -10,8 +10,6 @@ use capnp::capability::Promise;
use capnp;
use capnp_rpc::rpc_twoparty_capnp::Side;
use capnp_rpc::{self, RpcSystem, twoparty};
-use futures::Future;
-use futures::future::{self, loop_fn, Loop};
use rand::distributions::{Distribution, Uniform};
use rand::thread_rng;
use rusqlite::{
@@ -20,9 +18,7 @@ use rusqlite::{
NO_PARAMS,
types::ToSql,
};
-use tokio_core::reactor::{Handle, Timeout};
-use tokio_core;
-use tokio_io::io::ReadHalf;
+use tokio_util::compat::Compat;
use crate::openpgp::{self, Cert, KeyID, Fingerprint};
use crate::openpgp::parse::Parse;
@@ -66,29 +62,24 @@ fn random_duration(d: Duration) -> Duration {
/* Entry point. */
/// Makes backends.
-pub fn factory(descriptor: ipc::Descriptor, handle: Handle)
- -> Result<Box<dyn ipc::Handler>> {
- Backend::new(descriptor, handle)
- .map(|b| -> Box<dyn ipc::Handler> { Box::new(b) })
+pub fn factory(
+ descriptor: ipc::Descriptor,
+ local: &tokio::task::LocalSet
+) -> Result<Box<dyn ipc::Handler>> {
+ let store = capnp_rpc::new_client(NodeServer::new(descriptor, local)?);
+
+ Ok(Box::new(Backend { store }))
}
struct Backend {
store: node::Client,
}
-impl Backend {
- fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> {
- Ok(Backend {
- store: node::ToClient::new(NodeServer::new(descriptor, handle)?)
- .into_client::<capnp_rpc::Server>(),
- })
- }
-}
-
impl ipc::Handler for Backend {
- fn handle(&self,
- network: twoparty::VatNetwork<ReadHalf<tokio_core::net::TcpStream>>)
- -> RpcSystem<Side> {
+ fn handle(
+ &self,
+ network: twoparty::VatNetwork<Compat<tokio::net::tcp::OwnedReadHalf>>
+ ) -> RpcSystem<Side> {
RpcSystem::new(Box::new(network), Some(self.store.clone().client))
}
}
@@ -101,7 +92,7 @@ struct NodeServer {
}
impl NodeServer {
- fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> {
+ fn new(descriptor: ipc::Descriptor, local: &tokio::task::LocalSet) -> Result<Self> {
let mut db_path = descriptor.context().home().to_path_buf();
db_path.push("public-key-store.sqlite");
@@ -114,7 +105,8 @@ impl NodeServer {
};
server.init()?;
- KeyServer::start_housekeeping(server.c.clone(), handle)?;
+ local.spawn_local(KeyServer::start_housekeeping(server.c.clone()));
+
Ok(server)
}
@@ -152,8 +144,8 @@ impl node::Server for NodeServer {
pry!(params.get_realm()),
pry!(params.get_network_policy()).into(),
pry!(params.get_name())));
- pry!(pry!(results.get().get_result()).set_ok(
- node::mapping::ToClient::new(mapping).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::mapping::Client>(
+ capnp_rpc::new_client(mapping)));
Promise::ok(())
}
@@ -164,8 +156,8 @@ impl node::Server for NodeServer {
bind_results!(results);
let prefix = pry!(pry!(params.get()).get_realm_prefix());
let iter = MappingIterServer::new(self.c.clone(), prefix);
- pry!(pry!(results.get().get_result()).set_ok(
- node::mapping_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::mapping_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -175,8 +167,8 @@ impl node::Server for NodeServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = KeyIterServer::new(self.c.clone());
- pry!(pry!(results.get().get_result()).set_ok(
- node::key_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -186,8 +178,8 @@ impl node::Server for NodeServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::All);
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -202,8 +194,7 @@ impl node::Server for NodeServer {
let key = KeyServer::new(self.c.clone(), key_id);
sry!(key.merge(new));
pry!(pry!(results.get().get_result())
- .set_ok(node::key::ToClient::new(key)
- .into_client::<capnp_rpc::Server>()));
+ .set_ok::<node::key::Client>(capnp_rpc::new_client(key)));
Promise::ok(())
}
@@ -216,10 +207,9 @@ impl node::Server for NodeServer {
let keyid = KeyID::new(keyid);
let key_id = sry!(KeyServer::lookup_by_id(&self.c, &keyid));
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key_id))));
Promise::ok(())
}
@@ -232,10 +222,9 @@ impl node::Server for NodeServer {
let fingerprint: openpgp::Fingerprint = sry!(fingerprint.parse());
let key_id = sry!(KeyServer::lookup(&self.c, &fingerprint));
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key_id))));
Promise::ok(())
}
@@ -252,10 +241,9 @@ impl node::Server for NodeServer {
WHERE key_by_keyid.keyid = ?1",
&[&(keyid as i64)], |row| row.get(0)));
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key_id))));
Promise::ok(())
}
}
@@ -351,10 +339,9 @@ impl node::mapping::Server for MappingServer {
}
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>(
+ capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding_id))));
Promise::ok(())
}
@@ -370,10 +357,9 @@ impl node::mapping::Server for MappingServer {
"SELECT id FROM bindings WHERE mapping = ?1 AND label = ?2",
&[&self.id as &dyn ToSql, &label], |row| row.get(0)));
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>(
+ capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding_id))));
Promise::ok(())
}
@@ -391,10 +377,9 @@ impl node::mapping::Server for MappingServer {
WHERE key_by_keyid.keyid = ?1",
&[&(keyid as i64)], |row| row.get(0)));
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>(
+ capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding_id))));
Promise::ok(())
}
@@ -414,8 +399,8 @@ impl node::mapping::Server for MappingServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = BundleIterServer::new(self.c.clone(), self.id);
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -425,8 +410,8 @@ impl node::mapping::Server for MappingServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::Mapping(self.id));
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
}
@@ -541,9 +526,9 @@ impl node::binding::Server for BindingServer {
bind_results!(results);
let key = sry!(self.key_id());
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key)).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key))));
Promise::ok(())
}
@@ -678,8 +663,8 @@ impl node::binding::Server for BindingServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::Binding(self.id));
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -917,71 +902,47 @@ impl KeyServer {
}
/// Updates the key that was least recently updated.
- fn update(c: &Rc<Connection>,
+ async fn update(c: &Rc<Connection>,
network_policy: core::NetworkPolicy)
- -> Box<dyn Future<Item=Duration, Error=anyhow::Error> + 'static> {
- let (key, id, mut keyserver)
- = match Self::update_helper(c, network_policy) {
- Ok((key, id, keyserver)) => (key, id, keyserver),
- Err(e) => return Box::new(future::err(e.into())),
- };
+ -> Result<Duration> {
+ let (key, id, mut keyserver) = Self::update_helper(c, network_policy)?;
- let c = c.clone();
let now = Timestamp::now();
let at = Self::next_update_at(&c, network_policy)
.unwrap_or(now + min_sleep_time());
if at <= now {
- Box::new(
- keyserver.get(&id)
- .then(move |cert| {
- let next = Self::need_update(&c, network_policy)
- .map(|c| refresh_interval() / c)
- .unwrap_or(min_sleep_time());
-
- if let Err(e) = cert.map(|t| key.merge(t)) {
- key.error("Update unsuccessful",
- &format!("{:?}", e), next / 2)
- .unwrap_or(());
- } else {
- key.success("Update successful", next)
- .unwrap_or(());
- }
+ let cert = keyserver.get(&id).await;
- future::ok(next)
- }))
+ let next = Self::need_update(&c, network_policy)
+ .map(|c| refresh_interval() / c)
+ .unwrap_or(min_sleep_time());
+
+ if let Err(e) = cert.map(|t| key.merge(t)) {
+ key.error("Update unsuccessful",
+ &format!("{:?}", e), next / 2)
+ .unwrap_or(());
+ } else {
+ key.success("Update successful", next)
+ .unwrap_or(());
+ }
+
+ Ok(next)
} else {
assert!(at > now);
- Box::new(future::ok(cmp::max(min_sleep_time(),
- now.duration_since(at).unwrap())))
+
+ Ok(cmp::max(min_sleep_time(), now.duration_since(at).unwrap()))
}
}
- /// Starts the periodic housekeeping.
- fn start_housekeeping(c: Rc<Connection>, handle: Handle) -> Result<()> {
- let h0 = handle.clone();
-
- let forever = loop_fn(0, move |_| {
- // For now, we only update keys with this network policy.
- let network_policy = core::NetworkPolicy::Encrypted;
-
- let h1 = h0.clone();
- Self::update(&c, network_policy)
- .then(move |d| {
- let d = d.unwrap_or(min_sleep_time());
- Timeout::new(random_duration(d), &h1)
- .unwrap() // XXX: May fail if the eventloop expired.
- .then(move |timeout| {
- if timeout.is_ok() {
- Ok(Loop::Continue(0))
- } else {
- Ok(Loop::Break(()))
- }
- })
- })
- });
- handle.spawn(forever);
- Ok(())
+ /// Perform periodic housekeeping.
+ async fn start_housekeeping(c: Rc<Connection>) {
+ loop {
+ let duration = Self::update(&c, core::NetworkPolicy::Encrypted).await;
+
+ let duration = duration.unwrap_or(min_sleep_time());
+ tokio::time::delay_for(random_duration(duration)).await;
+ }
}
}
@@ -1052,8 +1013,8 @@ impl node::key::Server for KeyServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::Key(self.id));
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
}
@@ -1159,8 +1120,8 @@ impl node::mapping_iter::Server for MappingIterServer {
entry.set_realm(&realm);
entry.set_name(&name);
entry.set_network_policy(network_policy.into());
- entry.set_mapping(node::mapping::ToClient::new(
- MappingServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>());
+ entry.set_mapping(capnp_rpc::new_client(
+ MappingServer::new(self.c.clone(), id)));
self.n = id;
Promise::ok(())
}
@@ -1196,8 +1157,8 @@ impl node::binding_iter::Server for BundleIterServer {
let mut entry = pry!(results.get().get_result()).init_ok();
entry.set_label(&label);
entry.set_fingerprint(&fingerprint);
- entry.set_binding(node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>());
+ entry.set_binding(capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), id)));
self.n = id;
Promise::ok(())
}
@@ -1230,8 +1191,8 @@ impl node::key_iter::Server for KeyIterServer {
let mut entry = pry!(results.get().get_result()).init_ok();
entry.set_fingerprint(&fingerprint);
- entry.set_key(node::key::ToClient::new(
- KeyServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>());
+ entry.set_key(capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), id)));
self.n = id;
Promise::ok(())
}