From 3fd21d2f10b9d2946c7c6fd5af8cc38a69402996 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Tue, 20 Oct 2020 18:40:21 +0200 Subject: store: Migrate to std::futures --- store/Cargo.toml | 12 +-- store/src/backend/log.rs | 15 ++-- store/src/backend/mod.rs | 213 +++++++++++++++++++---------------------------- store/src/lib.rs | 73 ++++++++++------ store/src/macros.rs | 6 +- 5 files changed, 149 insertions(+), 170 deletions(-) (limited to 'store') diff --git a/store/Cargo.toml b/store/Cargo.toml index 7cff7319..c5e1c452 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -37,21 +37,21 @@ sequoia-core = { path = "../core", version = "0.20" } sequoia-ipc = { path = "../ipc", version = "0.20", default-features = false } sequoia-net = { path = "../net", version = "0.20", default-features = false } anyhow = "1" -capnp = "0.10" -capnp-rpc = "0.10" -futures = "0.1.17" +capnp = "0.13" +capnp-rpc = "0.13" +futures-util = "0.3" rand = { version = "0.7", default-features = false } rusqlite = "0.24" thiserror = "1" -tokio-core = "0.1.10" -tokio-io = "0.1.4" +tokio = { version = "0.2", features = ["rt-core", "tcp", "io-driver", "time"] } +tokio-util = { version = "0.3", features = ["compat"] } [target.'cfg(target_os="android")'.dependencies.rusqlite] version = "0.24.0" features = ["bundled"] [build-dependencies] -capnpc = "0.10" +capnpc = "0.13" [lib] name = "sequoia_store" diff --git a/store/src/backend/log.rs b/store/src/backend/log.rs index 42defd02..65606c2d 100644 --- a/store/src/backend/log.rs +++ b/store/src/backend/log.rs @@ -167,21 +167,18 @@ impl node::log_iter::Server for IterServer { entry.set_timestamp(timestamp.unix()); if let Some(mapping) = mapping { - entry.set_mapping(node::mapping::ToClient::new( - MappingServer::new(self.c.clone(), mapping)) - .into_client::()); + entry.set_mapping(capnp_rpc::new_client( + MappingServer::new(self.c.clone(), mapping))); } if let Some(binding) = binding { - entry.set_binding(node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding)) - .into_client::()); + entry.set_binding(capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding))); } if let Some(key) = key { - entry.set_key(node::key::ToClient::new( - KeyServer::new(self.c.clone(), key)) - .into_client::()); + entry.set_key(capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key))); } entry.set_slug(&slug); diff --git a/store/src/backend/mod.rs b/store/src/backend/mod.rs index d9a88373..2a18f9a0 100644 --- a/store/src/backend/mod.rs +++ b/store/src/backend/mod.rs @@ -10,8 +10,6 @@ use capnp::capability::Promise; use capnp; use capnp_rpc::rpc_twoparty_capnp::Side; use capnp_rpc::{self, RpcSystem, twoparty}; -use futures::Future; -use futures::future::{self, loop_fn, Loop}; use rand::distributions::{Distribution, Uniform}; use rand::thread_rng; use rusqlite::{ @@ -20,9 +18,7 @@ use rusqlite::{ NO_PARAMS, types::ToSql, }; -use tokio_core::reactor::{Handle, Timeout}; -use tokio_core; -use tokio_io::io::ReadHalf; +use tokio_util::compat::Compat; use crate::openpgp::{self, Cert, KeyID, Fingerprint}; use crate::openpgp::parse::Parse; @@ -66,29 +62,24 @@ fn random_duration(d: Duration) -> Duration { /* Entry point. */ /// Makes backends. -pub fn factory(descriptor: ipc::Descriptor, handle: Handle) - -> Result> { - Backend::new(descriptor, handle) - .map(|b| -> Box { Box::new(b) }) +pub fn factory( + descriptor: ipc::Descriptor, + local: &tokio::task::LocalSet +) -> Result> { + let store = capnp_rpc::new_client(NodeServer::new(descriptor, local)?); + + Ok(Box::new(Backend { store })) } struct Backend { store: node::Client, } -impl Backend { - fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result { - Ok(Backend { - store: node::ToClient::new(NodeServer::new(descriptor, handle)?) - .into_client::(), - }) - } -} - impl ipc::Handler for Backend { - fn handle(&self, - network: twoparty::VatNetwork>) - -> RpcSystem { + fn handle( + &self, + network: twoparty::VatNetwork> + ) -> RpcSystem { RpcSystem::new(Box::new(network), Some(self.store.clone().client)) } } @@ -101,7 +92,7 @@ struct NodeServer { } impl NodeServer { - fn new(descriptor: ipc::Descriptor, handle: Handle) -> Result { + fn new(descriptor: ipc::Descriptor, local: &tokio::task::LocalSet) -> Result { let mut db_path = descriptor.context().home().to_path_buf(); db_path.push("public-key-store.sqlite"); @@ -114,7 +105,8 @@ impl NodeServer { }; server.init()?; - KeyServer::start_housekeeping(server.c.clone(), handle)?; + local.spawn_local(KeyServer::start_housekeeping(server.c.clone())); + Ok(server) } @@ -152,8 +144,8 @@ impl node::Server for NodeServer { pry!(params.get_realm()), pry!(params.get_network_policy()).into(), pry!(params.get_name()))); - pry!(pry!(results.get().get_result()).set_ok( - node::mapping::ToClient::new(mapping).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(mapping))); Promise::ok(()) } @@ -164,8 +156,8 @@ impl node::Server for NodeServer { bind_results!(results); let prefix = pry!(pry!(params.get()).get_realm_prefix()); let iter = MappingIterServer::new(self.c.clone(), prefix); - pry!(pry!(results.get().get_result()).set_ok( - node::mapping_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -175,8 +167,8 @@ impl node::Server for NodeServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = KeyIterServer::new(self.c.clone()); - pry!(pry!(results.get().get_result()).set_ok( - node::key_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -186,8 +178,8 @@ impl node::Server for NodeServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::All); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -202,8 +194,7 @@ impl node::Server for NodeServer { let key = KeyServer::new(self.c.clone(), key_id); sry!(key.merge(new)); pry!(pry!(results.get().get_result()) - .set_ok(node::key::ToClient::new(key) - .into_client::())); + .set_ok::(capnp_rpc::new_client(key))); Promise::ok(()) } @@ -216,10 +207,9 @@ impl node::Server for NodeServer { let keyid = KeyID::new(keyid); let key_id = sry!(KeyServer::lookup_by_id(&self.c, &keyid)); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key_id)) - .into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key_id)))); Promise::ok(()) } @@ -232,10 +222,9 @@ impl node::Server for NodeServer { let fingerprint: openpgp::Fingerprint = sry!(fingerprint.parse()); let key_id = sry!(KeyServer::lookup(&self.c, &fingerprint)); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key_id)) - .into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key_id)))); Promise::ok(()) } @@ -252,10 +241,9 @@ impl node::Server for NodeServer { WHERE key_by_keyid.keyid = ?1", &[&(keyid as i64)], |row| row.get(0))); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key_id)) - .into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key_id)))); Promise::ok(()) } } @@ -351,10 +339,9 @@ impl node::mapping::Server for MappingServer { } - pry!(pry!(results.get().get_result()).set_ok( - node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding_id)) - .into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding_id)))); Promise::ok(()) } @@ -370,10 +357,9 @@ impl node::mapping::Server for MappingServer { "SELECT id FROM bindings WHERE mapping = ?1 AND label = ?2", &[&self.id as &dyn ToSql, &label], |row| row.get(0))); - pry!(pry!(results.get().get_result()).set_ok( - node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding_id)) - .into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding_id)))); Promise::ok(()) } @@ -391,10 +377,9 @@ impl node::mapping::Server for MappingServer { WHERE key_by_keyid.keyid = ?1", &[&(keyid as i64)], |row| row.get(0))); - pry!(pry!(results.get().get_result()).set_ok( - node::binding::ToClient::new( - BindingServer::new(self.c.clone(), binding_id)) - .into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + BindingServer::new(self.c.clone(), binding_id)))); Promise::ok(()) } @@ -414,8 +399,8 @@ impl node::mapping::Server for MappingServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = BundleIterServer::new(self.c.clone(), self.id); - pry!(pry!(results.get().get_result()).set_ok( - node::binding_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -425,8 +410,8 @@ impl node::mapping::Server for MappingServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::Mapping(self.id)); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } } @@ -541,9 +526,9 @@ impl node::binding::Server for BindingServer { bind_results!(results); let key = sry!(self.key_id()); - pry!(pry!(results.get().get_result()).set_ok( - node::key::ToClient::new( - KeyServer::new(self.c.clone(), key)).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client( + KeyServer::new(self.c.clone(), key)))); Promise::ok(()) } @@ -678,8 +663,8 @@ impl node::binding::Server for BindingServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::Binding(self.id)); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } @@ -917,71 +902,47 @@ impl KeyServer { } /// Updates the key that was least recently updated. - fn update(c: &Rc, + async fn update(c: &Rc, network_policy: core::NetworkPolicy) - -> Box + 'static> { - let (key, id, mut keyserver) - = match Self::update_helper(c, network_policy) { - Ok((key, id, keyserver)) => (key, id, keyserver), - Err(e) => return Box::new(future::err(e.into())), - }; + -> Result { + let (key, id, mut keyserver) = Self::update_helper(c, network_policy)?; - let c = c.clone(); let now = Timestamp::now(); let at = Self::next_update_at(&c, network_policy) .unwrap_or(now + min_sleep_time()); if at <= now { - Box::new( - keyserver.get(&id) - .then(move |cert| { - let next = Self::need_update(&c, network_policy) - .map(|c| refresh_interval() / c) - .unwrap_or(min_sleep_time()); - - if let Err(e) = cert.map(|t| key.merge(t)) { - key.error("Update unsuccessful", - &format!("{:?}", e), next / 2) - .unwrap_or(()); - } else { - key.success("Update successful", next) - .unwrap_or(()); - } + let cert = keyserver.get(&id).await; - future::ok(next) - })) + let next = Self::need_update(&c, network_policy) + .map(|c| refresh_interval() / c) + .unwrap_or(min_sleep_time()); + + if let Err(e) = cert.map(|t| key.merge(t)) { + key.error("Update unsuccessful", + &format!("{:?}", e), next / 2) + .unwrap_or(()); + } else { + key.success("Update successful", next) + .unwrap_or(()); + } + + Ok(next) } else { assert!(at > now); - Box::new(future::ok(cmp::max(min_sleep_time(), - now.duration_since(at).unwrap()))) + + Ok(cmp::max(min_sleep_time(), now.duration_since(at).unwrap())) } } - /// Starts the periodic housekeeping. - fn start_housekeeping(c: Rc, handle: Handle) -> Result<()> { - let h0 = handle.clone(); - - let forever = loop_fn(0, move |_| { - // For now, we only update keys with this network policy. - let network_policy = core::NetworkPolicy::Encrypted; - - let h1 = h0.clone(); - Self::update(&c, network_policy) - .then(move |d| { - let d = d.unwrap_or(min_sleep_time()); - Timeout::new(random_duration(d), &h1) - .unwrap() // XXX: May fail if the eventloop expired. - .then(move |timeout| { - if timeout.is_ok() { - Ok(Loop::Continue(0)) - } else { - Ok(Loop::Break(())) - } - }) - }) - }); - handle.spawn(forever); - Ok(()) + /// Perform periodic housekeeping. + async fn start_housekeeping(c: Rc) { + loop { + let duration = Self::update(&c, core::NetworkPolicy::Encrypted).await; + + let duration = duration.unwrap_or(min_sleep_time()); + tokio::time::delay_for(random_duration(duration)).await; + } } } @@ -1052,8 +1013,8 @@ impl node::key::Server for KeyServer { -> Promise<(), capnp::Error> { bind_results!(results); let iter = log::IterServer::new(self.c.clone(), log::Selector::Key(self.id)); - pry!(pry!(results.get().get_result()).set_ok( - node::log_iter::ToClient::new(iter).into_client::())); + pry!(pry!(results.get().get_result()).set_ok::( + capnp_rpc::new_client(iter))); Promise::ok(()) } } @@ -1159,8 +1120,8 @@ impl node::mapping_iter::Server for MappingIterServer { entry.set_realm(&realm); entry.set_name(&name); entry.set_network_policy(network_policy.into()); - entry.set_mapping(node::mapping::ToClient::new( - MappingServer::new(self.c.clone(), id)).into_client::()); + entry.set_mapping(capnp_rpc::new_client( + MappingServer::new(self.c.clone(), id))); self.n = id; Promise::ok(()) } @@ -1196,8 +1157,8 @@ impl node::binding_iter::Server for BundleIterServer { let mut entry = pry!(results.get().get_result()).init_ok(); entry.set_label(&label); entry.set_fingerprint(&fingerprint); - entry.set_binding(node::binding::ToClient::new( - BindingServer::new(self.c.clone(), id)).into_client::()); + entry.set_binding(capnp_rpc::new_client( + BindingServer::new(self.c.clone(), id))); self.n = id; Promise::ok(()) } @@ -1230,8 +1191,8 @@ impl node::key_iter::Server for KeyIterServer { let mut entry = pry!(results.get().get_result()).init_ok(); entry.set_fingerprint(&fingerprint); - entry.set_key(node::key::ToClient::new( - KeyServer::new(self.c.clone(), id)).into_client::()); + entry.set_key(capnp_rpc::new_client( + KeyServer::new(self.c.clone(), id))); self.n = id; Promise::ok(()) } diff --git a/store/src/lib.rs b/store/src/lib.rs index 5a718128..d91bd59c 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -51,18 +51,15 @@ use capnp; #[macro_use] extern crate capnp_rpc; -use futures; -use tokio_core; use std::cell::RefCell; use std::fmt; +use std::future::Future; use std::rc::Rc; +use std::time; use capnp::capability::Promise; use capnp_rpc::rpc_twoparty_capnp::Side; -use futures::{Future}; -use std::time; -use tokio_core::reactor::Core; use sequoia_openpgp as openpgp; #[allow(unused_imports)] @@ -86,6 +83,19 @@ use crate::store_protocol_capnp::node; pub(crate) mod backend; +/// Single-threaded Tokio runtime that drives the Cap'n Proto `RpcSystem`. +struct RpcRuntime { + runtime: tokio::runtime::Runtime, + rpc_task: tokio::task::LocalSet, +} + +impl RpcRuntime { + /// Blocks until a `fut` is finished, concurrently driving the RPC system. + fn block_on(&mut self, fut: F) -> F::Output { + self.rpc_task.block_on(&mut self.runtime, fut) + } +} + /// Returns the service descriptor. #[doc(hidden)] pub fn descriptor(c: &Context) -> ipc::Descriptor { @@ -111,17 +121,26 @@ pub struct Store { impl Store { /// Establishes a connection to the backend. - fn connect(c: &Context) -> Result<(Core, node::Client)> { + fn connect(c: &Context) -> Result<(RpcRuntime, node::Client)> { let descriptor = descriptor(c); - let core = Core::new()?; - let handle = core.handle(); - let mut rpc_system = descriptor.connect(&handle)?; + let rt = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_io() + .enable_time() + .build()?; + // Need to enter Tokio context due to Tokio TcpStream creation binding + // eagerly to an I/O reactor + let mut rpc_system = rt.enter(|| descriptor.connect())?; let client: node::Client = rpc_system.bootstrap(Side::Server); - handle.spawn(rpc_system.map_err(|_e| ())); - Ok((core, client)) + // Since RpcSystem is explicitly `!Send`, we need to spawn it on + // a `LocalSet` that needs to be exclusively driven by `RpcRuntime` + let rpc_task = tokio::task::LocalSet::new(); + rpc_task.spawn_local(rpc_system); + + Ok((RpcRuntime { runtime: rt, rpc_task }, client)) } /// Imports a key into the common key pool. @@ -151,7 +170,7 @@ impl Store { let mut blob = vec![]; cert.serialize(&mut blob)?; - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.import_request(); request.get().set_key(&blob); let key = make_request!(&mut core, request)?; @@ -183,7 +202,7 @@ impl Store { /// # } /// ``` pub fn lookup(c: &Context, fp: &Fingerprint) -> Result { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.lookup_by_fingerprint_request(); let fp = format!("{:X}", fp); request.get().set_fingerprint(&fp); @@ -216,7 +235,7 @@ impl Store { /// # } /// ``` pub fn lookup_by_keyid(c: &Context, keyid: &KeyID) -> Result { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.lookup_by_keyid_request(); request.get().set_keyid(keyid.as_u64()?); let key = make_request!(&mut core, request)?; @@ -265,7 +284,7 @@ impl Store { /// # } /// ``` pub fn lookup_by_subkeyid(c: &Context, keyid: &KeyID) -> Result { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let mut request = client.lookup_by_subkeyid_request(); request.get().set_keyid(keyid.as_u64()?); let key = make_request!(&mut core, request)?; @@ -274,7 +293,7 @@ impl Store { /// Lists all keys in the common key pool. pub fn list_keys(c: &Context) -> Result { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let request = client.iter_keys_request(); let iter = make_request!(&mut core, request)?; Ok(KeyIter{core: Rc::new(RefCell::new(core)), iter: iter}) @@ -282,7 +301,7 @@ impl Store { /// Lists all log entries. pub fn server_log(c: &Context) -> Result { - let (mut core, client) = Self::connect(c)?; + let (mut core, client) = Store::connect(c)?; let request = client.log_request(); let iter = make_request!(&mut core, request)?; Ok(LogIter{core: Rc::new(RefCell::new(core)), iter: iter}) @@ -292,7 +311,7 @@ impl Store { /// A public key store. pub struct Mapping { name: String, - core: Rc>, + core: Rc>, mapping: node::mapping::Client, } @@ -327,7 +346,7 @@ impl Mapping { Ok(Self::new(Rc::new(RefCell::new(core)), name, mapping)) } - fn new(core: Rc>, name: &str, mapping: node::mapping::Client) -> Self { + fn new(core: Rc>, name: &str, mapping: node::mapping::Client) -> Self { Mapping{core: core, name: name.into(), mapping: mapping} } @@ -553,7 +572,7 @@ macro_rules! make_stats_request { /// with these pairs. pub struct Binding { label: Option, - core: Rc>, + core: Rc>, binding: node::binding::Client, } @@ -564,7 +583,7 @@ impl fmt::Debug for Binding { } impl Binding { - fn new(core: Rc>, + fn new(core: Rc>, label: Option<&str>, binding: node::binding::Client) -> Self { Binding{label: label.map(|l| l.into()), core: core, binding: binding} @@ -798,7 +817,7 @@ impl Binding { /// A `Key` is a handle to a stored Cert. We make this explicit /// because we associate metadata with Certs. pub struct Key { - core: Rc>, + core: Rc>, key: node::key::Client, } @@ -809,7 +828,7 @@ impl fmt::Debug for Key { } impl Key { - fn new(core: Rc>, key: node::key::Client) -> Self { + fn new(core: Rc>, key: node::key::Client) -> Self { Key{core: core, key: key} } @@ -1012,7 +1031,7 @@ impl Stamps { /// Iterates over mappings. pub struct MappingIter { - core: Rc>, + core: Rc>, iter: node::mapping_iter::Client, } @@ -1037,7 +1056,7 @@ impl Iterator for MappingIter { /// Iterates over bindings in a mapping. pub struct BundleIter { - core: Rc>, + core: Rc>, iter: node::binding_iter::Client, } @@ -1061,7 +1080,7 @@ impl Iterator for BundleIter { /// Iterates over keys in the common key pool. pub struct KeyIter { - core: Rc>, + core: Rc>, iter: node::key_iter::Client, } @@ -1083,7 +1102,7 @@ impl Iterator for KeyIter { /// Iterates over logs. pub struct LogIter { - core: Rc>, + core: Rc>, iter: node::log_iter::Client, } diff --git a/store/src/macros.rs b/store/src/macros.rs index 7d69ff54..144a8d0d 100644 --- a/store/src/macros.rs +++ b/store/src/macros.rs @@ -12,9 +12,10 @@ // Sends the given request and decodes the result. macro_rules! make_request { ( $core: expr, $request: expr ) => {{ + use futures_util::TryFutureExt; use crate::node::result::Which; - let r: std::result::Result, capnp::Error> = $core.run( + let r: std::result::Result, capnp::Error> = $core.block_on( $request.send().promise .and_then(|response| -> Promise, capnp::Error> { let r = pry!(pry!(pry!(response.get()).get_result()).which()); @@ -34,9 +35,10 @@ macro_rules! make_request { macro_rules! make_request_map { ( $core: expr, $request: expr, $map: expr ) => {{ + use futures_util::TryFutureExt; use crate::node::result::Which; - let r: std::result::Result, capnp::Error> = $core.run( + let r: std::result::Result, capnp::Error> = $core.block_on( $request.send().promise .and_then(|response| -> Promise, capnp::Error> { let r = pry!(pry!(pry!(response.get()).get_result()).which()); -- cgit v1.2.3