summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Matuszewski <igor@sequoia-pgp.org>2020-10-20 18:40:21 +0200
committerIgor Matuszewski <igor@sequoia-pgp.org>2020-10-23 12:05:54 +0200
commit3fd21d2f10b9d2946c7c6fd5af8cc38a69402996 (patch)
treec135754ead003b8844b07eaeba2905cd082614cc
parentd30e05a43655a3884c6119e282e0ac58a9c723f7 (diff)
store: Migrate to std::futures
-rw-r--r--Cargo.lock55
-rw-r--r--store/Cargo.toml12
-rw-r--r--store/src/backend/log.rs15
-rw-r--r--store/src/backend/mod.rs213
-rw-r--r--store/src/lib.rs73
-rw-r--r--store/src/macros.rs6
6 files changed, 161 insertions, 213 deletions
diff --git a/Cargo.lock b/Cargo.lock
index b7272c19..e46cafda 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -288,69 +288,38 @@ dependencies = [
[[package]]
name = "capnp"
-version = "0.10.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b867c15d8ff93c4d81b69c89280840f877331ef2a1fccbaf947afecc68b51a9e"
-dependencies = [
- "futures 0.1.30",
-]
-
-[[package]]
-name = "capnp"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "394f9ad87764d43d41c8d3ea270fd03def2f455011f3ada86c9f01d88592105d"
[[package]]
name = "capnp-futures"
-version = "0.10.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa07b8de7e06c61c287fb5a03a644e2439fec4fe17e206d4658ac09aeec4b161"
-dependencies = [
- "capnp 0.10.3",
- "futures 0.1.30",
-]
-
-[[package]]
-name = "capnp-futures"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9f9ff1dae086de0d7ecbc147fee21aed8b3ad64468f0f991c98da06fb8c8459"
dependencies = [
- "capnp 0.13.5",
+ "capnp",
"futures 0.3.6",
]
[[package]]
name = "capnp-rpc"
-version = "0.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "568eecd404ea80e98d506b922be2de5e1013ac8f9b170242a53068affc79ddc8"
-dependencies = [
- "capnp 0.10.3",
- "capnp-futures 0.10.1",
- "capnpc",
- "futures 0.1.30",
-]
-
-[[package]]
-name = "capnp-rpc"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37998522d42bbe4a1d266f418b1a053b679a338e904e55afd5ff22333df0e09e"
dependencies = [
- "capnp 0.13.5",
- "capnp-futures 0.13.1",
+ "capnp",
+ "capnp-futures",
"futures 0.3.6",
]
[[package]]
name = "capnpc"
-version = "0.10.2"
+version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d2afedfc194b01c6804ad0a10c7139024b99ee3df6a39bb09bdf759067ababff"
+checksum = "81855cee80548f7a2ee549d3bc2e55ed5f7cabe469e85614046e5475712f75c1"
dependencies = [
- "capnp 0.10.3",
+ "capnp",
]
[[package]]
@@ -2034,7 +2003,7 @@ version = "0.20.0"
dependencies = [
"anyhow",
"buffered-reader",
- "capnp-rpc 0.13.1",
+ "capnp-rpc",
"clap",
"ctor",
"fs2",
@@ -2173,10 +2142,10 @@ name = "sequoia-store"
version = "0.20.0"
dependencies = [
"anyhow",
- "capnp 0.10.3",
- "capnp-rpc 0.10.0",
+ "capnp",
+ "capnp-rpc",
"capnpc",
- "futures 0.1.30",
+ "futures-util",
"rand",
"rusqlite",
"sequoia-core",
@@ -2184,8 +2153,8 @@ dependencies = [
"sequoia-net",
"sequoia-openpgp",
"thiserror",
- "tokio-core",
- "tokio-io",
+ "tokio 0.2.22",
+ "tokio-util",
]
[[package]]
diff --git a/store/Cargo.toml b/store/Cargo.toml
index 7cff7319..c5e1c452 100644
--- a/store/Cargo.toml
+++ b/store/Cargo.toml
@@ -37,21 +37,21 @@ sequoia-core = { path = "../core", version = "0.20" }
sequoia-ipc = { path = "../ipc", version = "0.20", default-features = false }
sequoia-net = { path = "../net", version = "0.20", default-features = false }
anyhow = "1"
-capnp = "0.10"
-capnp-rpc = "0.10"
-futures = "0.1.17"
+capnp = "0.13"
+capnp-rpc = "0.13"
+futures-util = "0.3"
rand = { version = "0.7", default-features = false }
rusqlite = "0.24"
thiserror = "1"
-tokio-core = "0.1.10"
-tokio-io = "0.1.4"
+tokio = { version = "0.2", features = ["rt-core", "tcp", "io-driver", "time"] }
+tokio-util = { version = "0.3", features = ["compat"] }
[target.'cfg(target_os="android")'.dependencies.rusqlite]
version = "0.24.0"
features = ["bundled"]
[build-dependencies]
-capnpc = "0.10"
+capnpc = "0.13"
[lib]
name = "sequoia_store"
diff --git a/store/src/backend/log.rs b/store/src/backend/log.rs
index 42defd02..65606c2d 100644
--- a/store/src/backend/log.rs
+++ b/store/src/backend/log.rs
@@ -167,21 +167,18 @@ impl node::log_iter::Server for IterServer {
entry.set_timestamp(timestamp.unix());
if let Some(mapping) = mapping {
- entry.set_mapping(node::mapping::ToClient::new(
- MappingServer::new(self.c.clone(), mapping))
- .into_client::<capnp_rpc::Server>());
+ entry.set_mapping(capnp_rpc::new_client(
+ MappingServer::new(self.c.clone(), mapping)));
}
if let Some(binding) = binding {
- entry.set_binding(node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding))
- .into_client::<capnp_rpc::Server>());
+ entry.set_binding(capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding)));
}
if let Some(key) = key {
- entry.set_key(node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key))
- .into_client::<capnp_rpc::Server>());
+ entry.set_key(capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key)));
}
entry.set_slug(&slug);
diff --git a/store/src/backend/mod.rs b/store/src/backend/mod.rs
index d9a88373..2a18f9a0 100644
--- a/store/src/backend/mod.rs
+++ b/store/src/backend/mod.rs
@@ -10,8 +10,6 @@ use capnp::capability::Promise;
use capnp;
use capnp_rpc::rpc_twoparty_capnp::Side;
use capnp_rpc::{self, RpcSystem, twoparty};
-use futures::Future;
-use futures::future::{self, loop_fn, Loop};
use rand::distributions::{Distribution, Uniform};
use rand::thread_rng;
use rusqlite::{
@@ -20,9 +18,7 @@ use rusqlite::{
NO_PARAMS,
types::ToSql,
};
-use tokio_core::reactor::{Handle, Timeout};
-use tokio_core;
-use tokio_io::io::ReadHalf;
+use tokio_util::compat::Compat;
use crate::openpgp::{self, Cert, KeyID, Fingerprint};
use crate::openpgp::parse::Parse;
@@ -66,29 +62,24 @@ fn random_duration(d: Duration) -> Duration {
/* Entry point. */
/// Makes backends.
-pub fn factory(descriptor: ipc::Descriptor, handle: Handle)
- -> Result<Box<dyn ipc::Handler>> {
- Backend::new(descriptor, handle)
- .map(|b| -> Box<dyn ipc::Handler> { Box::new(b) })
+pub fn factory(
+ descriptor: ipc::Descriptor,
+ local: &tokio::task::LocalSet
+) -> Result<Box<dyn ipc::Handler>> {
+ let store = capnp_rpc::new_client(NodeServer::new(descriptor, local)?);
+
+ Ok(Box::new(Backend { store }))
}
struct Backend {
store: node::Client,
}
-impl Backend {
- fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> {
- Ok(Backend {
- store: node::ToClient::new(NodeServer::new(descriptor, handle)?)
- .into_client::<capnp_rpc::Server>(),
- })
- }
-}
-
impl ipc::Handler for Backend {
- fn handle(&self,
- network: twoparty::VatNetwork<ReadHalf<tokio_core::net::TcpStream>>)
- -> RpcSystem<Side> {
+ fn handle(
+ &self,
+ network: twoparty::VatNetwork<Compat<tokio::net::tcp::OwnedReadHalf>>
+ ) -> RpcSystem<Side> {
RpcSystem::new(Box::new(network), Some(self.store.clone().client))
}
}
@@ -101,7 +92,7 @@ struct NodeServer {
}
impl NodeServer {
- fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result<Self> {
+ fn new(descriptor: ipc::Descriptor, local: &tokio::task::LocalSet) -> Result<Self> {
let mut db_path = descriptor.context().home().to_path_buf();
db_path.push("public-key-store.sqlite");
@@ -114,7 +105,8 @@ impl NodeServer {
};
server.init()?;
- KeyServer::start_housekeeping(server.c.clone(), handle)?;
+ local.spawn_local(KeyServer::start_housekeeping(server.c.clone()));
+
Ok(server)
}
@@ -152,8 +144,8 @@ impl node::Server for NodeServer {
pry!(params.get_realm()),
pry!(params.get_network_policy()).into(),
pry!(params.get_name())));
- pry!(pry!(results.get().get_result()).set_ok(
- node::mapping::ToClient::new(mapping).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::mapping::Client>(
+ capnp_rpc::new_client(mapping)));
Promise::ok(())
}
@@ -164,8 +156,8 @@ impl node::Server for NodeServer {
bind_results!(results);
let prefix = pry!(pry!(params.get()).get_realm_prefix());
let iter = MappingIterServer::new(self.c.clone(), prefix);
- pry!(pry!(results.get().get_result()).set_ok(
- node::mapping_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::mapping_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -175,8 +167,8 @@ impl node::Server for NodeServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = KeyIterServer::new(self.c.clone());
- pry!(pry!(results.get().get_result()).set_ok(
- node::key_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -186,8 +178,8 @@ impl node::Server for NodeServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::All);
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -202,8 +194,7 @@ impl node::Server for NodeServer {
let key = KeyServer::new(self.c.clone(), key_id);
sry!(key.merge(new));
pry!(pry!(results.get().get_result())
- .set_ok(node::key::ToClient::new(key)
- .into_client::<capnp_rpc::Server>()));
+ .set_ok::<node::key::Client>(capnp_rpc::new_client(key)));
Promise::ok(())
}
@@ -216,10 +207,9 @@ impl node::Server for NodeServer {
let keyid = KeyID::new(keyid);
let key_id = sry!(KeyServer::lookup_by_id(&self.c, &keyid));
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key_id))));
Promise::ok(())
}
@@ -232,10 +222,9 @@ impl node::Server for NodeServer {
let fingerprint: openpgp::Fingerprint = sry!(fingerprint.parse());
let key_id = sry!(KeyServer::lookup(&self.c, &fingerprint));
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key_id))));
Promise::ok(())
}
@@ -252,10 +241,9 @@ impl node::Server for NodeServer {
WHERE key_by_keyid.keyid = ?1",
&[&(keyid as i64)], |row| row.get(0)));
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key_id))));
Promise::ok(())
}
}
@@ -351,10 +339,9 @@ impl node::mapping::Server for MappingServer {
}
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>(
+ capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding_id))));
Promise::ok(())
}
@@ -370,10 +357,9 @@ impl node::mapping::Server for MappingServer {
"SELECT id FROM bindings WHERE mapping = ?1 AND label = ?2",
&[&self.id as &dyn ToSql, &label], |row| row.get(0)));
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>(
+ capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding_id))));
Promise::ok(())
}
@@ -391,10 +377,9 @@ impl node::mapping::Server for MappingServer {
WHERE key_by_keyid.keyid = ?1",
&[&(keyid as i64)], |row| row.get(0)));
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), binding_id))
- .into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding::Client>(
+ capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), binding_id))));
Promise::ok(())
}
@@ -414,8 +399,8 @@ impl node::mapping::Server for MappingServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = BundleIterServer::new(self.c.clone(), self.id);
- pry!(pry!(results.get().get_result()).set_ok(
- node::binding_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::binding_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -425,8 +410,8 @@ impl node::mapping::Server for MappingServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::Mapping(self.id));
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
}
@@ -541,9 +526,9 @@ impl node::binding::Server for BindingServer {
bind_results!(results);
let key = sry!(self.key_id());
- pry!(pry!(results.get().get_result()).set_ok(
- node::key::ToClient::new(
- KeyServer::new(self.c.clone(), key)).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::key::Client>(
+ capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), key))));
Promise::ok(())
}
@@ -678,8 +663,8 @@ impl node::binding::Server for BindingServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::Binding(self.id));
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
@@ -917,71 +902,47 @@ impl KeyServer {
}
/// Updates the key that was least recently updated.
- fn update(c: &Rc<Connection>,
+ async fn update(c: &Rc<Connection>,
network_policy: core::NetworkPolicy)
- -> Box<dyn Future<Item=Duration, Error=anyhow::Error> + 'static> {
- let (key, id, mut keyserver)
- = match Self::update_helper(c, network_policy) {
- Ok((key, id, keyserver)) => (key, id, keyserver),
- Err(e) => return Box::new(future::err(e.into())),
- };
+ -> Result<Duration> {
+ let (key, id, mut keyserver) = Self::update_helper(c, network_policy)?;
- let c = c.clone();
let now = Timestamp::now();
let at = Self::next_update_at(&c, network_policy)
.unwrap_or(now + min_sleep_time());
if at <= now {
- Box::new(
- keyserver.get(&id)
- .then(move |cert| {
- let next = Self::need_update(&c, network_policy)
- .map(|c| refresh_interval() / c)
- .unwrap_or(min_sleep_time());
-
- if let Err(e) = cert.map(|t| key.merge(t)) {
- key.error("Update unsuccessful",
- &format!("{:?}", e), next / 2)
- .unwrap_or(());
- } else {
- key.success("Update successful", next)
- .unwrap_or(());
- }
+ let cert = keyserver.get(&id).await;
- future::ok(next)
- }))
+ let next = Self::need_update(&c, network_policy)
+ .map(|c| refresh_interval() / c)
+ .unwrap_or(min_sleep_time());
+
+ if let Err(e) = cert.map(|t| key.merge(t)) {
+ key.error("Update unsuccessful",
+ &format!("{:?}", e), next / 2)
+ .unwrap_or(());
+ } else {
+ key.success("Update successful", next)
+ .unwrap_or(());
+ }
+
+ Ok(next)
} else {
assert!(at > now);
- Box::new(future::ok(cmp::max(min_sleep_time(),
- now.duration_since(at).unwrap())))
+
+ Ok(cmp::max(min_sleep_time(), now.duration_since(at).unwrap()))
}
}
- /// Starts the periodic housekeeping.
- fn start_housekeeping(c: Rc<Connection>, handle: Handle) -> Result<()> {
- let h0 = handle.clone();
-
- let forever = loop_fn(0, move |_| {
- // For now, we only update keys with this network policy.
- let network_policy = core::NetworkPolicy::Encrypted;
-
- let h1 = h0.clone();
- Self::update(&c, network_policy)
- .then(move |d| {
- let d = d.unwrap_or(min_sleep_time());
- Timeout::new(random_duration(d), &h1)
- .unwrap() // XXX: May fail if the eventloop expired.
- .then(move |timeout| {
- if timeout.is_ok() {
- Ok(Loop::Continue(0))
- } else {
- Ok(Loop::Break(()))
- }
- })
- })
- });
- handle.spawn(forever);
- Ok(())
+ /// Perform periodic housekeeping.
+ async fn start_housekeeping(c: Rc<Connection>) {
+ loop {
+ let duration = Self::update(&c, core::NetworkPolicy::Encrypted).await;
+
+ let duration = duration.unwrap_or(min_sleep_time());
+ tokio::time::delay_for(random_duration(duration)).await;
+ }
}
}
@@ -1052,8 +1013,8 @@ impl node::key::Server for KeyServer {
-> Promise<(), capnp::Error> {
bind_results!(results);
let iter = log::IterServer::new(self.c.clone(), log::Selector::Key(self.id));
- pry!(pry!(results.get().get_result()).set_ok(
- node::log_iter::ToClient::new(iter).into_client::<capnp_rpc::Server>()));
+ pry!(pry!(results.get().get_result()).set_ok::<node::log_iter::Client>(
+ capnp_rpc::new_client(iter)));
Promise::ok(())
}
}
@@ -1159,8 +1120,8 @@ impl node::mapping_iter::Server for MappingIterServer {
entry.set_realm(&realm);
entry.set_name(&name);
entry.set_network_policy(network_policy.into());
- entry.set_mapping(node::mapping::ToClient::new(
- MappingServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>());
+ entry.set_mapping(capnp_rpc::new_client(
+ MappingServer::new(self.c.clone(), id)));
self.n = id;
Promise::ok(())
}
@@ -1196,8 +1157,8 @@ impl node::binding_iter::Server for BundleIterServer {
let mut entry = pry!(results.get().get_result()).init_ok();
entry.set_label(&label);
entry.set_fingerprint(&fingerprint);
- entry.set_binding(node::binding::ToClient::new(
- BindingServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>());
+ entry.set_binding(capnp_rpc::new_client(
+ BindingServer::new(self.c.clone(), id)));
self.n = id;
Promise::ok(())
}
@@ -1230,8 +1191,8 @@ impl node::key_iter::Server for KeyIterServer {
let mut entry = pry!(results.get().get_result()).init_ok();
entry.set_fingerprint(&fingerprint);
- entry.set_key(node::key::ToClient::new(
- KeyServer::new(self.c.clone(), id)).into_client::<capnp_rpc::Server>());
+ entry.set_key(capnp_rpc::new_client(
+ KeyServer::new(self.c.clone(), id)));
self.n = id;
Promise::ok(())
}
diff --git a/store/src/lib.rs b/store/src/lib.rs
index 5a718128..d91bd59c 100644
--- a/store/src/lib.rs
+++ b/store/src/lib.rs
@@ -51,18 +51,15 @@
use capnp;
#[macro_use]
extern crate capnp_rpc;
-use futures;
-use tokio_core;
use std::cell::RefCell;
use std::fmt;
+use std::future::Future;
use std::rc::Rc;
+use std::time;
use capnp::capability::Promise;
use capnp_rpc::rpc_twoparty_capnp::Side;
-use futures::{Future};
-use std::time;
-use tokio_core::reactor::Core;
use sequoia_openpgp as openpgp;
#[allow(unused_imports)]
@@ -86,6 +83,19 @@ use crate::store_protocol_capnp::node;
pub(crate) mod backend;
+/// Single-threaded Tokio runtime that drives the Cap'n Proto `RpcSystem`.
+struct RpcRuntime {
+ runtime: tokio::runtime::Runtime,
+ rpc_task: tokio::task::LocalSet,
+}
+
+impl RpcRuntime {
+ /// Blocks until a `fut` is finished, concurrently driving the RPC system.
+ fn block_on<F: Future>(&mut self, fut: F) -> F::Output {
+ self.rpc_task.block_on(&mut self.runtime, fut)
+ }
+}
+
/// Returns the service descriptor.
#[doc(hidden)]
pub fn descriptor(c: &Context) -> ipc::Descriptor {
@@ -111,17 +121,26 @@ pub struct Store {
impl Store {
/// Establishes a connection to the backend.
- fn connect(c: &Context) -> Result<(Core, node::Client)> {
+ fn connect(c: &Context) -> Result<(RpcRuntime, node::Client)> {
let descriptor = descriptor(c);
- let core = Core::new()?;
- let handle = core.handle();
- let mut rpc_system = descriptor.connect(&handle)?;
+ let rt = tokio::runtime::Builder::new()
+ .basic_scheduler()
+ .enable_io()
+ .enable_time()
+ .build()?;
+ // Need to enter Tokio context due to Tokio TcpStream creation binding
+ // eagerly to an I/O reactor
+ let mut rpc_system = rt.enter(|| descriptor.connect())?;
let client: node::Client = rpc_system.bootstrap(Side::Server);
- handle.spawn(rpc_system.map_err(|_e| ()));
- Ok((core, client))
+ // Since RpcSystem is explicitly `!Send`, we need to spawn it on
+ // a `LocalSet` that needs to be exclusively driven by `RpcRuntime`
+ let rpc_task = tokio::task::LocalSet::new();
+ rpc_task.spawn_local(rpc_system);
+
+ Ok((RpcRuntime { runtime: rt, rpc_task }, client))
}
/// Imports a key into the common key pool.
@@ -151,7 +170,7 @@ impl Store {
let mut blob = vec![];
cert.serialize(&mut blob)?;
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.import_request();
request.get().set_key(&blob);
let key = make_request!(&mut core, request)?;
@@ -183,7 +202,7 @@ impl Store {
/// # }
/// ```
pub fn lookup(c: &Context, fp: &Fingerprint) -> Result<Key> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.lookup_by_fingerprint_request();
let fp = format!("{:X}", fp);
request.get().set_fingerprint(&fp);
@@ -216,7 +235,7 @@ impl Store {
/// # }
/// ```
pub fn lookup_by_keyid(c: &Context, keyid: &KeyID) -> Result<Key> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.lookup_by_keyid_request();
request.get().set_keyid(keyid.as_u64()?);
let key = make_request!(&mut core, request)?;
@@ -265,7 +284,7 @@ impl Store {
/// # }
/// ```
pub fn lookup_by_subkeyid(c: &Context, keyid: &KeyID) -> Result<Key> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let mut request = client.lookup_by_subkeyid_request();
request.get().set_keyid(keyid.as_u64()?);
let key = make_request!(&mut core, request)?;
@@ -274,7 +293,7 @@ impl Store {
/// Lists all keys in the common key pool.
pub fn list_keys(c: &Context) -> Result<KeyIter> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let request = client.iter_keys_request();
let iter = make_request!(&mut core, request)?;
Ok(KeyIter{core: Rc::new(RefCell::new(core)), iter: iter})
@@ -282,7 +301,7 @@ impl Store {
/// Lists all log entries.
pub fn server_log(c: &Context) -> Result<LogIter> {
- let (mut core, client) = Self::connect(c)?;
+ let (mut core, client) = Store::connect(c)?;
let request = client.log_request();
let iter = make_request!(&mut core, request)?;
Ok(LogIter{core: Rc::new(RefCell::new(core)), iter: iter})
@@ -292,7 +311,7 @@ impl Store {
/// A public key store.
pub struct Mapping {
name: String,
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
mapping: node::mapping::Client,
}
@@ -327,7 +346,7 @@ impl Mapping {
Ok(Self::new(Rc::new(RefCell::new(core)), name, mapping))
}
- fn new(core: Rc<RefCell<Core>>, name: &str, mapping: node::mapping::Client) -> Self {
+ fn new(core: Rc<RefCell<RpcRuntime>>, name: &str, mapping: node::mapping::Client) -> Self {
Mapping{core: core, name: name.into(), mapping: mapping}
}
@@ -553,7 +572,7 @@ macro_rules! make_stats_request {
/// with these pairs.
pub struct Binding {
label: Option<String>,
- core: Rc<RefCell<Core>>,
+ core: Rc<RefCell<RpcRuntime>>,
binding: node::binding::Client,
}
@@ -564,7 +583,7 @@ impl fmt::Debug for Binding {
}
impl Binding {
- fn new(core: Rc<RefCell<Core>>,
+ fn new(core: Rc<RefCell<RpcRuntime>>,
lab