diff options
Diffstat (limited to 'store/src/lib.rs')
-rw-r--r-- | store/src/lib.rs | 73 |
1 files changed, 46 insertions, 27 deletions
diff --git a/store/src/lib.rs b/store/src/lib.rs index 5a718128..d91bd59c 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -51,18 +51,15 @@ use capnp; #[macro_use] extern crate capnp_rpc; -use futures; -use tokio_core; use std::cell::RefCell; use std::fmt; +use std::future::Future; use std::rc::Rc; +use std::time; use capnp::capability::Promise; use capnp_rpc::rpc_twoparty_capnp::Side; -use futures::{Future}; -use std::time; -use tokio_core::reactor::Core; use sequoia_openpgp as openpgp; #[allow(unused_imports)] @@ -86,6 +83,19 @@ use crate::store_protocol_capnp::node; pub(crate) mod backend; +/// Single-threaded Tokio runtime that drives the Cap'n Proto `RpcSystem`. +struct RpcRuntime { + runtime: tokio::runtime::Runtime, + rpc_task: tokio::task::LocalSet, +} + +impl RpcRuntime { + /// Blocks until a `fut` is finished, concurrently driving the RPC system. + fn block_on<F: Future>(&mut self, fut: F) -> F::Output { + self.rpc_task.block_on(&mut self.runtime, fut) + } +} + /// Returns the service descriptor. #[doc(hidden)] pub fn descriptor(c: &Context) -> ipc::Descriptor { @@ -111,17 +121,26 @@ pub struct Store { impl Store { /// Establishes a connection to the backend. - fn connect(c: &Context) -> Result<(Core, node::Client)> { + fn connect(c: &Context) -> Result<(RpcRuntime, node::Client)> { let descriptor = descriptor(c); - let core = Core::new()?; - let handle = core.handle(); - let mut rpc_system = descriptor.connect(&handle)?; + let rt = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_io() + .enable_time() + .build()?; + // Need to enter Tokio context due to Tokio TcpStream creation binding + // eagerly to an I/O reactor + let mut rpc_system = rt.enter(|| descriptor.connect())?; let client: node::Client = rpc_system.bootstrap(Side::Server); - handle.spawn(rpc_system.map_err(|_e| ())); - Ok((core, client)) + // Since RpcSystem is explicitly `!Send`, we need to spawn it on + // a `LocalSet` that needs to be exclusively driven by `RpcRuntime` + let rpc_task = tokio::task::LocalSet::new(); + rpc_task.spawn_local(rpc_system); + + Ok((RpcRuntime { runtime: rt, rpc_task }, client)) } /// Imports a key into the common key pool. @@ -151,7 +170,7 @@ impl Store { let mut blob = vec![]; cert.serialize(&mut blob)?; - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.import_request(); request.get().set_key(&blob); let key = make_request!(&mut core, request)?; @@ -183,7 +202,7 @@ impl Store { /// # } /// ``` pub fn lookup(c: &Context, fp: &Fingerprint) -> Result<Key> { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.lookup_by_fingerprint_request(); let fp = format!("{:X}", fp); request.get().set_fingerprint(&fp); @@ -216,7 +235,7 @@ impl Store { /// # } /// ``` pub fn lookup_by_keyid(c: &Context, keyid: &KeyID) -> Result<Key> { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.lookup_by_keyid_request(); request.get().set_keyid(keyid.as_u64()?); let key = make_request!(&mut core, request)?; @@ -265,7 +284,7 @@ impl Store { /// # } /// ``` pub fn lookup_by_subkeyid(c: &Context, keyid: &KeyID) -> Result<Key> { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.lookup_by_subkeyid_request(); request.get().set_keyid(keyid.as_u64()?); let key = make_request!(&mut core, request)?; @@ -274,7 +293,7 @@ impl Store { /// Lists all keys in the common key pool. pub fn list_keys(c: &Context) -> Result<KeyIter> { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let request = client.iter_keys_request(); let iter = make_request!(&mut core, request)?; Ok(KeyIter{core: Rc::new(RefCell::new(core)), iter: iter}) @@ -282,7 +301,7 @@ impl Store { /// Lists all log entries. pub fn server_log(c: &Context) -> Result<LogIter> { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let request = client.log_request(); let iter = make_request!(&mut core, request)?; Ok(LogIter{core: Rc::new(RefCell::new(core)), iter: iter}) @@ -292,7 +311,7 @@ impl Store { /// A public key store. pub struct Mapping { name: String, - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, mapping: node::mapping::Client, } @@ -327,7 +346,7 @@ impl Mapping { Ok(Self::new(Rc::new(RefCell::new(core)), name, mapping)) } - fn new(core: Rc<RefCell<Core>>, name: &str, mapping: node::mapping::Client) -> Self { + fn new(core: Rc<RefCell<RpcRuntime>>, name: &str, mapping: node::mapping::Client) -> Self { Mapping{core: core, name: name.into(), mapping: mapping} } @@ -553,7 +572,7 @@ macro_rules! make_stats_request { /// with these pairs. pub struct Binding { label: Option<String>, - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, binding: node::binding::Client, } @@ -564,7 +583,7 @@ impl fmt::Debug for Binding { } impl Binding { - fn new(core: Rc<RefCell<Core>>, + fn new(core: Rc<RefCell<RpcRuntime>>, label: Option<&str>, binding: node::binding::Client) -> Self { Binding{label: label.map(|l| l.into()), core: core, binding: binding} @@ -798,7 +817,7 @@ impl Binding { /// A `Key` is a handle to a stored Cert. We make this explicit /// because we associate metadata with Certs. pub struct Key { - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, key: node::key::Client, } @@ -809,7 +828,7 @@ impl fmt::Debug for Key { } impl Key { - fn new(core: Rc<RefCell<Core>>, key: node::key::Client) -> Self { + fn new(core: Rc<RefCell<RpcRuntime>>, key: node::key::Client) -> Self { Key{core: core, key: key} } @@ -1012,7 +1031,7 @@ impl Stamps { /// Iterates over mappings. pub struct MappingIter { - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, iter: node::mapping_iter::Client, } @@ -1037,7 +1056,7 @@ impl Iterator for MappingIter { /// Iterates over bindings in a mapping. pub struct BundleIter { - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, iter: node::binding_iter::Client, } @@ -1061,7 +1080,7 @@ impl Iterator for BundleIter { /// Iterates over keys in the common key pool. pub struct KeyIter { - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, iter: node::key_iter::Client, } @@ -1083,7 +1102,7 @@ impl Iterator for KeyIter { /// Iterates over logs. pub struct LogIter { - core: Rc<RefCell<Core>>, + core: Rc<RefCell<RpcRuntime>>, iter: node::log_iter::Client, } |