diff options
Diffstat (limited to 'store/src/backend/mod.rs')
-rw-r--r-- | store/src/backend/mod.rs | 213 |
1 files changed, 87 insertions, 126 deletions
diff --git a/store/src/backend/mod.rs b/store/src/backend/mod.rs index d9a88373..2a18f9a0 100644 --- a/store/src/backend/mod.rs +++ b/store/src/backend/mod.rs @@ -10,8 +10,6 @@ use capnp::capability::Promise; use capnp; use capnp_rpc::rpc_twoparty_capnp::Side; use capnp_rpc::{self, RpcSystem, twoparty}; -use futures::Future; -use futures::future::{self, loop_fn, Loop}; use rand::distributions::{Distribution, Uniform}; use rand::thread_rng; use rusqlite::{ @@ -20,9 +18,7 @@ use rusqlite::{ NO_PARAMS, types::ToSql, }; -use tokio_core::reactor::{Handle, Timeout}; -use tokio_core; -use tokio_io::io::ReadHalf; +use tokio_util::compat::Compat; use crate::openpgp::{self, Cert, KeyID, Fingerprint}; use crate::openpgp::parse::Parse; @@ -66,29 +62,24 @@ fn random_duration(d: Duration) -> Duration { /* Entry point. */ /// Makes backends. -pub fn factory(descriptor: ipc::Descriptor, handle: Handle) - -> Result<Box<dyn ipc::Handler>> { - Backend::new(descriptor, handle) - .map(|b| -> Box<dyn ipc::Handler> { Box::new(b) }) +pub fn factory( + descriptor: ipc::Descriptor, + local: &tokio::task::LocalSet +) -> Result<Box<dyn ipc::Handler>> { + let store = capnp_rpc::new_client(NodeServer::new(descriptor, local)?); + + Ok(Box::new(Backend { store })) } struct Backend { store: node::Client, } -impl Backend { - fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> { - Ok(Backend { - store: node::ToClient::new(NodeServer::new(descriptor, handle)?) - .into_client::<capnp_rpc::Server>(), - }) - } -} - impl ipc::Handler for Backend { - fn handle(&self, - network: twoparty::VatNetwork<ReadHalf<tokio_core::net::TcpStream>>) - -> RpcSystem<Side> { + fn handle( + &self, + network: twoparty::VatNetwork<Compat<tokio::net::tcp::OwnedReadHalf>> + ) -> RpcSystem<Side> { RpcSystem::new(Box::new(network), Some(self.store.clone().client)) } } @@ -101,7 +92,7 @@ struct NodeServer { } impl NodeServer { - fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> { + fn new(descriptor: ipc::Descriptor, local: &tokio::task::LocalSet) -> Result<Self> { let mut db_path = descriptor.context().home().to_path_buf(); db_path.push("public-key-store.sqlite"); @@ -114,7 +105,8 @@ impl NodeServer { }; server.init()?; - KeyServer::start_housekeeping(server.c.clone(), handle)?; + local.spawn_local(KeyServer::start_housekeeping(server.c.clone())); + Ok(server) } @@ -152,8 +144,8 @@ impl node::Server for NodeServer { pry!(params.get_realm()), pry!(params.get_network_policy()).into(), pry!(params.get_name()))); - pry!(pry!(results.get().get_result()).set_ok( - node::mapping::ToClient::new(mapping).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::mapping::Client>( + capnp_rpc::new_client(mapping))); Promise::ok(()) } @@ -164,8 +156,8 @@ impl node::Server for NodeServer { bind_results!(results); let prefix = pry!(pry!(params.get()).get_realm_prefix()); let iter = MappingIterServer::new(self.c.clone(), prefix); - pry!(pry!(results.get().get_result()).set_ok( - node::mapping_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::mapping_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -175,8 +167,8 @@ impl node::Server for NodeServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = KeyIterServer::new(self.c.clone()); - pry!(pry!(results.get().get_result()).set_ok( - node::key_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::key_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -186,8 +178,8 @@ impl node::Server for NodeServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::All); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -202,8 +194,7 @@ impl node::Server for NodeServer { let key = KeyServer::new(self.c.clone(), key_id); sry!(key.merge(new)); pry!(pry!(results.get().get_result()) - .set_ok(node::key::ToClient::new(key) - .into_client::<capnp_rpc::Server>())); + .set_ok::<node::key::Client>(capnp_rpc::new_client(key))); Promise::ok(()) } @@ -216,10 +207,9 @@ impl node::Server for NodeServer { let keyid = KeyID::new(keyid); let key_id = sry!(KeyServer::lookup_by_id(&self.c, &keyid)); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key_id)) - .into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key_id)))); Promise::ok(()) } @@ -232,10 +222,9 @@ impl node::Server for NodeServer { let fingerprint: openpgp::Fingerprint = sry!(fingerprint.parse()); let key_id = sry!(KeyServer::lookup(&self.c, &fingerprint)); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key_id)) - .into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key_id)))); Promise::ok(()) } @@ -252,10 +241,9 @@ impl node::Server for NodeServer { WHERE key_by_keyid.keyid = ?1", &[&(keyid as i64)], |row| row.get(0))); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key_id)) - .into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key_id)))); Promise::ok(()) } } @@ -351,10 +339,9 @@ impl node::mapping::Server for MappingServer { } - pry!(pry!(results.get().get_result()).set_ok( - node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding_id)) - .into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>( + capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding_id)))); Promise::ok(()) } @@ -370,10 +357,9 @@ impl node::mapping::Server for MappingServer { "SELECT id FROM bindings WHERE mapping = ?1 AND label = ?2", &[&self.id as &dyn ToSql, &label], |row| row.get(0))); - pry!(pry!(results.get().get_result()).set_ok( - node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding_id)) - .into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>( + capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding_id)))); Promise::ok(()) } @@ -391,10 +377,9 @@ impl node::mapping::Server for MappingServer { WHERE key_by_keyid.keyid = ?1", &[&(keyid as i64)], |row| row.get(0))); - pry!(pry!(results.get().get_result()).set_ok( - node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding_id)) - .into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>( + capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding_id)))); Promise::ok(()) } @@ -414,8 +399,8 @@ impl node::mapping::Server for MappingServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = BundleIterServer::new(self.c.clone(), self.id); - pry!(pry!(results.get().get_result()).set_ok( - node::binding_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::binding_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -425,8 +410,8 @@ impl node::mapping::Server for MappingServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::Mapping(self.id)); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } } @@ -541,9 +526,9 @@ impl node::binding::Server for BindingServer { bind_results!(results); let key = sry!(self.key_id()); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key)).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key)))); Promise::ok(()) } @@ -678,8 +663,8 @@ impl node::binding::Server for BindingServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::Binding(self.id)); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -917,71 +902,47 @@ impl KeyServer { } /// Updates the key that was least recently updated. - fn update(c: &Rc<Connection>, + async fn update(c: &Rc<Connection>, network_policy: core::NetworkPolicy) - -> Box<dyn Future<Item=Duration, Error=anyhow::Error> + 'static> { - let (key, id, mut keyserver) - = match Self::update_helper(c, network_policy) { - Ok((key, id, keyserver)) => (key, id, keyserver), - Err(e) => return Box::new(future::err(e.into())), - }; + -> Result<Duration> { + let (key, id, mut keyserver) = Self::update_helper(c, network_policy)?; - let c = c.clone(); let now = Timestamp::now(); let at = Self::next_update_at(&c, network_policy) .unwrap_or(now + min_sleep_time()); if at <= now { - Box::new( - keyserver.get(&id) - .then(move |cert| { - let next = Self::need_update(&c, network_policy) - .map(|c| refresh_interval() / c) - .unwrap_or(min_sleep_time()); - - if let Err(e) = cert.map(|t| key.merge(t)) { - key.error("Update unsuccessful", - &format!("{:?}", e), next / 2) - .unwrap_or(()); - } else { - key.success("Update successful", next) - .unwrap_or(()); - } + let cert = keyserver.get(&id).await; - future::ok(next) - })) + let next = Self::need_update(&c, network_policy) + .map(|c| refresh_interval() / c) + .unwrap_or(min_sleep_time()); + + if let Err(e) = cert.map(|t| key.merge(t)) { + key.error("Update unsuccessful", + &format!("{:?}", e), next / 2) + .unwrap_or(()); + } else { + key.success("Update successful", next) + .unwrap_or(()); + } + + Ok(next) } else { assert!(at > now); - Box::new(future::ok(cmp::max(min_sleep_time(), - now.duration_since(at).unwrap()))) + + Ok(cmp::max(min_sleep_time(), now.duration_since(at).unwrap())) } } - /// Starts the periodic housekeeping. - fn start_housekeeping(c: Rc<Connection>, handle: Handle) -> Result<()> { - let h0 = handle.clone(); - - let forever = loop_fn(0, move |_| { - // For now, we only update keys with this network policy. - let network_policy = core::NetworkPolicy::Encrypted; - - let h1 = h0.clone(); - Self::update(&c, network_policy) - .then(move |d| { - let d = d.unwrap_or(min_sleep_time()); - Timeout::new(random_duration(d), &h1) - .unwrap() // XXX: May fail if the eventloop expired. - .then(move |timeout| { - if timeout.is_ok() { - Ok(Loop::Continue(0)) - } else { - Ok(Loop::Break(())) - } - }) - }) - }); - handle.spawn(forever); - Ok(()) + /// Perform periodic housekeeping. + async fn start_housekeeping(c: Rc<Connection>) { + loop { + let duration = Self::update(&c, core::NetworkPolicy::Encrypted).await; + + let duration = duration.unwrap_or(min_sleep_time()); + tokio::time::delay_for(random_duration(duration)).await; + } } } @@ -1052,8 +1013,8 @@ impl node::key::Server for KeyServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::Key(self.id)); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>())); + pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>( + capnp_rpc::new_client(iter))); Promise::ok(()) } } @@ -1159,8 +1120,8 @@ impl node::mapping_iter::Server for MappingIterServer { entry.set_realm(&realm); entry.set_name(&name); entry.set_network_policy(network_policy.into()); - entry.set_mapping(node::mapping::ToClient::new( - MappingServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>()); + entry.set_mapping(capnp_rpc::new_client( + MappingServer::new(self.c.clone(), id))); self.n = id; Promise::ok(()) } @@ -1196,8 +1157,8 @@ impl node::binding_iter::Server for BundleIterServer { let mut entry = pry!(results.get().get_result()).init_ok(); entry.set_label(&label); entry.set_fingerprint(&fingerprint); - entry.set_binding(node::binding::ToClient::new( - BindingServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>()); + entry.set_binding(capnp_rpc::new_client( + BindingServer::new(self.c.clone(), id))); self.n = id; Promise::ok(()) } @@ -1230,8 +1191,8 @@ impl node::key_iter::Server for KeyIterServer { let mut entry = pry!(results.get().get_result()).init_ok(); entry.set_fingerprint(&fingerprint); - entry.set_key(node::key::ToClient::new( - KeyServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>()); + entry.set_key(capnp_rpc::new_client( + KeyServer::new(self.c.clone(), id))); self.n = id; Promise::ok(()) } |