summaryrefslogtreecommitdiffstats
path: root/store/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'store/src/lib.rs')
-rw-r--r--store/src/lib.rs73
1 files changed, 46 insertions, 27 deletions
diff --git a/store/src/lib.rs b/store/src/lib.rs
index 5a718128..d91bd59c 100644
--- a/store/src/lib.rs
+++ b/store/src/lib.rs
@@ -51,18 +51,15 @@
use capnp;
#[macro_use]
extern crate capnp_rpc;
-use futures;
-use tokio_core;
use std::cell::RefCell;
use std::fmt;
+use std::future::Future;
use std::rc::Rc;
+use std::time;
use capnp::capability::Promise;
use capnp_rpc::rpc_twoparty_capnp::Side;
-use futures::{Future};
-use std::time;
-use tokio_core::reactor::Core;
use sequoia_openpgp as openpgp;
#[allow(unused_imports)]
@@ -86,6 +83,19 @@ use crate::store_protocol_capnp::node;
pub(crate) mod backend;
+/// Single-threaded Tokio runtime that drives the Cap'n Proto `RpcSystem`.
+struct RpcRuntime {
+ runtime: tokio::runtime::Runtime,
+ rpc_task: tokio::task::LocalSet,
+}
+
+impl RpcRuntime {
+ /// Blocks until a `fut` is finished, concurrently driving the RPC system.
+ fn block_on<F: Future>(&mut self, fut: F) -> F::Output {
+ self.rpc_task.block_on(&mut self.runtime, fut)
+ }
+}
+
/// Returns the service descriptor.
#[doc(hidden)]
pub fn descriptor(c: &Context) -> ipc::Descriptor {
@@ -111,17 +121,26 @@ pub struct Store {
impl Store {
/// Establishes a connection to the backend.
- fn connect(c: &Context) -> Result<(Core, node::Client)> {
+ fn connect(c: &Context) -> Result<(RpcRuntime, node::Client)> {
let descriptor = descriptor(c);
- let core = Core::new()?;
- let handle = core.handle();
- let mut rpc_system = descriptor.connect(&handle)?;
+ let rt = tokio::runtime::Builder::new()
+ .basic_scheduler()
+ .enable_io()
+ .enable_time()
+ .build()?;
+ // Need to enter Tokio context due to Tokio TcpStream creation binding
+ // eagerly to an I/O reactor
+ let mut rpc_system = rt.enter(|| descriptor.connect())?;
let client: node::Client = rpc_system.bootstrap(Side::Server);
- handle.spawn(rpc_system.map_err(|_e| ()));
- Ok((core, client))
+ // Since RpcSystem is explicitly `!Send`, we need to spawn it on
+ // a `LocalSet` that needs to be exclusively driven by `RpcRuntime`
+ let rpc_task = tokio::task::LocalSet::new();
+ rpc_task.spawn_local(rpc_system);
+
+ Ok((RpcRuntime { runtime: rt, rpc_task }, client))
}
/// Imports a key into the common key pool.
@@ -151,7 +170,7 @@ impl Store {
let mut blob = vec![];
cert.serialize(&mut blob)?;
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.import_request();
request.get().set_key(&blob);
let key = make_request!(&mut core, request)?;
@@ -183,7 +202,7 @@ impl Store {
/// # }
/// ```
pub fn lookup(c: &Context, fp: &Fingerprint) -> Result<Key> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.lookup_by_fingerprint_request();
let fp = format!("{:X}", fp);
request.get().set_fingerprint(&fp);
@@ -216,7 +235,7 @@ impl Store {
/// # }
/// ```
pub fn lookup_by_keyid(c: &Context, keyid: &KeyID) -> Result<Key> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.lookup_by_keyid_request();
request.get().set_keyid(keyid.as_u64()?);
let key = make_request!(&mut core, request)?;
@@ -265,7 +284,7 @@ impl Store {
/// # }
/// ```
pub fn lookup_by_subkeyid(c: &Context, keyid: &KeyID) -> Result<Key> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.lookup_by_subkeyid_request();
request.get().set_keyid(keyid.as_u64()?);
let key = make_request!(&mut core, request)?;
@@ -274,7 +293,7 @@ impl Store {
/// Lists all keys in the common key pool.
pub fn list_keys(c: &Context) -> Result<KeyIter> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let request = client.iter_keys_request();
let iter = make_request!(&mut core, request)?;
Ok(KeyIter{core: Rc::new(RefCell::new(core)), iter: iter})
@@ -282,7 +301,7 @@ impl Store {
/// Lists all log entries.
pub fn server_log(c: &Context) -> Result<LogIter> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let request = client.log_request();
let iter = make_request!(&mut core, request)?;
Ok(LogIter{core: Rc::new(RefCell::new(core)), iter: iter})
@@ -292,7 +311,7 @@ impl Store {
/// A public key store.
pub struct Mapping {
name: String,
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
mapping: node::mapping::Client,
}
@@ -327,7 +346,7 @@ impl Mapping {
Ok(Self::new(Rc::new(RefCell::new(core)), name, mapping))
}
- fn new(core: Rc<RefCell<Core>>, name: &str, mapping: node::mapping::Client) -> Self {
+ fn new(core: Rc<RefCell<RpcRuntime>>, name: &str, mapping: node::mapping::Client) -> Self {
Mapping{core: core, name: name.into(), mapping: mapping}
}
@@ -553,7 +572,7 @@ macro_rules! make_stats_request {
/// with these pairs.
pub struct Binding {
label: Option<String>,
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
binding: node::binding::Client,
}
@@ -564,7 +583,7 @@ impl fmt::Debug for Binding {
}
impl Binding {
- fn new(core: Rc<RefCell<Core>>,
+ fn new(core: Rc<RefCell<RpcRuntime>>,
label: Option<&str>,
binding: node::binding::Client) -> Self {
Binding{label: label.map(|l| l.into()), core: core, binding: binding}
@@ -798,7 +817,7 @@ impl Binding {
/// A `Key` is a handle to a stored Cert. We make this explicit
/// because we associate metadata with Certs.
pub struct Key {
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
key: node::key::Client,
}
@@ -809,7 +828,7 @@ impl fmt::Debug for Key {
}
impl Key {
- fn new(core: Rc<RefCell<Core>>, key: node::key::Client) -> Self {
+ fn new(core: Rc<RefCell<RpcRuntime>>, key: node::key::Client) -> Self {
Key{core: core, key: key}
}
@@ -1012,7 +1031,7 @@ impl Stamps {
/// Iterates over mappings.
pub struct MappingIter {
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
iter: node::mapping_iter::Client,
}
@@ -1037,7 +1056,7 @@ impl Iterator for MappingIter {
/// Iterates over bindings in a mapping.
pub struct BundleIter {
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
iter: node::binding_iter::Client,
}
@@ -1061,7 +1080,7 @@ impl Iterator for BundleIter {
/// Iterates over keys in the common key pool.
pub struct KeyIter {
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
iter: node::key_iter::Client,
}
@@ -1083,7 +1102,7 @@ impl Iterator for KeyIter {
/// Iterates over logs.
pub struct LogIter {
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
iter: node::log_iter::Client,
}