diff options
author | Igor Matuszewski <igor@sequoia-pgp.org> | 2020-10-19 17:59:19 +0200 |
---|---|---|
committer | Igor Matuszewski <igor@sequoia-pgp.org> | 2020-10-23 12:05:54 +0200 |
commit | d30e05a43655a3884c6119e282e0ac58a9c723f7 (patch) | |
tree | eeff82e5a4d40b933a9bd7849396812013eacf39 | |
parent | eb324f60bbd4184057797f72cc3db34e6160497d (diff) |
ipc: Migrate to std::futures
-rw-r--r-- | Cargo.lock | 146 | ||||
-rw-r--r-- | ipc/Cargo.toml | 11 | ||||
-rw-r--r-- | ipc/examples/assuan-client.rs | 25 | ||||
-rw-r--r-- | ipc/examples/gpg-agent-client.rs | 25 | ||||
-rw-r--r-- | ipc/src/assuan/mod.rs | 126 | ||||
-rw-r--r-- | ipc/src/gnupg.rs | 268 | ||||
-rw-r--r-- | ipc/src/lib.rs | 114 | ||||
-rw-r--r-- | ipc/tests/gpg-agent.rs | 20 |
8 files changed, 400 insertions, 335 deletions
@@ -292,17 +292,33 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b867c15d8ff93c4d81b69c89280840f877331ef2a1fccbaf947afecc68b51a9e" dependencies = [ - "futures", + "futures 0.1.30", ] [[package]] +name = "capnp" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "394f9ad87764d43d41c8d3ea270fd03def2f455011f3ada86c9f01d88592105d" + +[[package]] name = "capnp-futures" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa07b8de7e06c61c287fb5a03a644e2439fec4fe17e206d4658ac09aeec4b161" dependencies = [ - "capnp", - "futures", + "capnp 0.10.3", + "futures 0.1.30", +] + +[[package]] +name = "capnp-futures" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9f9ff1dae086de0d7ecbc147fee21aed8b3ad64468f0f991c98da06fb8c8459" +dependencies = [ + "capnp 0.13.5", + "futures 0.3.6", ] [[package]] @@ -311,10 +327,21 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568eecd404ea80e98d506b922be2de5e1013ac8f9b170242a53068affc79ddc8" dependencies = [ - "capnp", - "capnp-futures", + "capnp 0.10.3", + "capnp-futures 0.10.1", "capnpc", - "futures", + "futures 0.1.30", +] + +[[package]] +name = "capnp-rpc" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37998522d42bbe4a1d266f418b1a053b679a338e904e55afd5ff22333df0e09e" +dependencies = [ + "capnp 0.13.5", + "capnp-futures 0.13.1", + "futures 0.3.6", ] [[package]] @@ -323,7 +350,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2afedfc194b01c6804ad0a10c7139024b99ee3df6a39bb09bdf759067ababff" dependencies = [ - "capnp", + "capnp 0.10.3", ] [[package]] @@ -823,12 +850,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7e4c2612746b0df8fed4ce0c69156021b704c9aefa360311c04e6e9e002eed" [[package]] +name = "futures" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8e3078b7b2a8a671cb7a3d17b4760e4181ea243227776ba83fd043b4ca034e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-channel" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a4d35f7401e948629c9c3d6638fb9bf94e0b2121e96c3b428cc4e631f3eb74" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -838,6 +881,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d674eaa0056896d5ada519900dbf97ead2e46a7b6621e8160d79e2f2e1e2784b" [[package]] +name = "futures-executor" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc709ca1da6f66143b8c9bec8e6260181869893714e9b5a490b169b0414144ab" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c" + +[[package]] name = "futures-macro" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -870,9 +930,13 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a894a0acddba51a2d49a6f4263b1e64b8c579ece8af50fa86503d52cd1eea34" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project", "pin-utils", "proc-macro-hack", @@ -1492,20 +1556,17 @@ dependencies = [ [[package]] name = "parity-tokio-ipc" -version = "0.4.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e57fea504fea33f9fbb5f49f378359030e7e026a6ab849bb9e8f0787376f1bf" +checksum = "c1d417ba1ab454723ff2271bf999fd700027dc48759a13d43e488cc8ca38b87f" dependencies = [ - "bytes 0.4.12", - "futures", + "futures 0.3.6", "libc", "log", "mio-named-pipes", "miow 0.3.5", "rand", - "tokio 0.1.22", - "tokio-named-pipes", - "tokio-uds", + "tokio 0.2.22", "winapi 0.3.9", ] @@ -1973,11 +2034,11 @@ version = "0.20.0" dependencies = [ "anyhow", "buffered-reader", - "capnp-rpc", + "capnp-rpc 0.13.1", "clap", "ctor", "fs2", - "futures", + "futures 0.3.6", "lalrpop", "lalrpop-util", "lazy_static", @@ -1991,9 +2052,8 @@ dependencies = [ "socket2", "tempfile", "thiserror", - "tokio 0.1.22", - "tokio-core", - "tokio-io", + "tokio 0.2.22", + "tokio-util", "winapi 0.3.9", ] @@ -2113,10 +2173,10 @@ name = "sequoia-store" version = "0.20.0" dependencies = [ "anyhow", - "capnp", - "capnp-rpc", + "capnp 0.10.3", + "capnp-rpc 0.10.0", "capnpc", - "futures", + "futures 0.1.30", "rand", "rusqlite", "sequoia-core", @@ -2416,7 +2476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "mio", "num_cpus", "tokio-codec", @@ -2464,7 +2524,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "tokio-io", ] @@ -2475,7 +2535,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "iovec", "log", "mio", @@ -2493,7 +2553,7 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e" dependencies = [ - "futures", + "futures 0.1.30", "tokio-executor", ] @@ -2504,7 +2564,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" dependencies = [ "crossbeam-utils", - "futures", + "futures 0.1.30", ] [[package]] @@ -2513,7 +2573,7 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" dependencies = [ - "futures", + "futures 0.1.30", "tokio-io", "tokio-threadpool", ] @@ -2525,7 +2585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "log", ] @@ -2541,26 +2601,13 @@ dependencies = [ ] [[package]] -name = "tokio-named-pipes" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d282d483052288b2308ba5ee795f5673b159c9bdf63c385a05609da782a5eae" -dependencies = [ - "bytes 0.4.12", - "futures", - "mio", - "mio-named-pipes", - "tokio 0.1.22", -] - -[[package]] name = "tokio-reactor" version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" dependencies = [ "crossbeam-utils", - "futures", + "futures 0.1.30", "lazy_static", "log", "mio", @@ -2579,7 +2626,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" dependencies = [ "fnv", - "futures", + "futures 0.1.30", ] [[package]] @@ -2589,7 +2636,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "iovec", "mio", "tokio-io", @@ -2605,7 +2652,7 @@ dependencies = [ "crossbeam-deque", "crossbeam-queue", "crossbeam-utils", - "futures", + "futures 0.1.30", "lazy_static", "log", "num_cpus", @@ -2620,7 +2667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" dependencies = [ "crossbeam-utils", - "futures", + "futures 0.1.30", "slab", "tokio-executor", ] @@ -2642,7 +2689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "log", "mio", "tokio-codec", @@ -2657,7 +2704,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" dependencies = [ "bytes 0.4.12", - "futures", + "futures 0.1.30", "iovec", "libc", "log", @@ -2676,6 +2723,7 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes 0.5.6", "futures-core", + "futures-io", "futures-sink", "log", "pin-project-lite", diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index 49b2d63e..dc680257 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -25,9 +25,9 @@ sequoia-core = { path = "../core", version = "0.20" } anyhow = "1" buffered-reader = { path = "../buffered-reader", version = "0.20", default-features = false } -capnp-rpc = "0.10" +capnp-rpc = "0.13" fs2 = "0.4.2" -futures = "0.1" +futures = "0.3" lalrpop-util = "0.19" lazy_static = "1.3" libc = "0.2.33" @@ -35,10 +35,9 @@ memsec = { version = "0.6", default-features = false } rand = { version = "0.7", default-features = false } tempfile = "3.0" thiserror = "1" -tokio = "0.1" -tokio-core = "0.1" -tokio-io = "0.1.4" -parity-tokio-ipc = "0.4" +tokio = { version = "0.2", features = ["rt-core", "rt-util", "tcp"] } +tokio-util = { version = "0.3", features = ["compat"] } +parity-tokio-ipc = "0.7" socket2 = "0.3.11" [target.'cfg(windows)'.dependencies] diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs index 6d790363..16c6ed41 100644 --- a/ipc/examples/assuan-client.rs +++ b/ipc/examples/assuan-client.rs @@ -1,6 +1,4 @@ -use futures; -use futures::future::Future; -use futures::stream::Stream; +use futures::StreamExt; use clap; use sequoia_ipc as ipc; use crate::ipc::assuan::Client; @@ -18,14 +16,15 @@ fn main() { .help("Commands to send to the server")) .get_matches(); - let mut c = Client::connect(matches.value_of("server").unwrap()) - .wait().unwrap(); - for command in matches.values_of("commands").unwrap() { - eprintln!("> {}", command); - c.send(command).unwrap(); - c.by_ref().for_each(|response| { - eprintln!("< {:?}", response); - Ok(()) - }).wait().unwrap(); - } + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let mut c = Client::connect(matches.value_of("server").unwrap()).await.unwrap(); + for command in matches.values_of("commands").unwrap() { + eprintln!("> {}", command); + c.send(command).unwrap(); + while let Some(response) = c.next().await { + eprintln!("< {:?}", response); + } + } + }); } diff --git a/ipc/examples/gpg-agent-client.rs b/ipc/examples/gpg-agent-client.rs index 61a29e84..64b60234 100644 --- a/ipc/examples/gpg-agent-client.rs +++ b/ipc/examples/gpg-agent-client.rs @@ -1,8 +1,6 @@ /// Connects to and sends commands to gpg-agent. -use futures; -use futures::future::Future; -use futures::stream::Stream; +use futures::StreamExt; use clap; use sequoia_ipc as ipc; use crate::ipc::gnupg::{Context, Agent}; @@ -25,14 +23,17 @@ fn main() { } else { Context::new().unwrap() }; - let mut agent = Agent::connect(&ctx).wait().unwrap(); - for command in matches.values_of("commands").unwrap() { - eprintln!("> {}", command); - agent.send(command).unwrap(); - agent.by_ref().for_each(|response| { - eprintln!("< {:?}", response); - Ok(()) - }).wait().unwrap(); - } + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let mut agent = Agent::connect(&ctx).await.unwrap(); + + for command in matches.values_of("commands").unwrap() { + eprintln!("> {}", command); + agent.send(command).unwrap(); + while let Some(response) = agent.next().await { + eprintln!("< {:?}", response); + } + } + }); } diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs index 73fb1734..510893ff 100644 --- a/ipc/src/assuan/mod.rs +++ b/ipc/src/assuan/mod.rs @@ -3,16 +3,18 @@ #![warn(missing_docs)] use std::cmp; -use std::io::{Write, BufReader}; +use std::io::Write; use std::mem; use std::path::Path; +use std::pin::Pin; +use std::task::{Poll, Context}; use lalrpop_util::ParseError; -use futures::{future, Async, Future, Stream}; -use parity_tokio_ipc::IpcConnection; -use tokio_io::io; -use tokio_io::AsyncRead; +use futures::{Future, Stream, StreamExt}; +use parity_tokio_ipc::Connection; +use tokio::io::{BufReader, ReadHalf, WriteHalf}; +use tokio::io::{AsyncRead, AsyncWriteExt}; use crate::openpgp; @@ -58,30 +60,26 @@ lalrpop_util::lalrpop_mod!( /// [`Connection::data()`]: #method.data /// [`Connection::cancel()`]: #method.cancel pub struct Client { - r: BufReader<io::ReadHalf<IpcConnection>>, // xxx: abstract over + r: BufReader<ReadHalf<Connection>>, // xxx: abstract over buffer: Vec<u8>, done: bool, w: WriteState, } enum WriteState { - Ready(io::WriteHalf<IpcConnection>), - Sending(future::FromErr<io::WriteAll<io::WriteHalf<IpcConnection>, Vec<u8>>, anyhow::Error>), + Ready(WriteHalf<Connection>), + Sending(Pin<Box<dyn Future<Output = Result<WriteHalf<Connection>, anyhow::Error>>>>), Transitioning, Dead, } impl Client { /// Connects to the server. - pub fn connect<P>(path: P) - -> impl Future<Item = Client, Error = anyhow::Error> - where P: AsRef<Path> - { + pub async fn connect<P>(path: P) -> Result<Client> where P: AsRef<Path> { // XXX: Implement Windows support using TCP + nonce approach used upstream // https://gnupg.org/documentation/manuals/assuan.pdf#Socket%20wrappers - future::result(IpcConnection::connect(path, &Default::default())) - .map_err(Into::into) - .and_then(ConnectionFuture::new) + let connection = parity_tokio_ipc::Endpoint::connect(path).await?; + Ok(ConnectionFuture::new(connection).await?) } /// Lazily sends a command to the server. @@ -118,13 +116,16 @@ impl Client { self.w = match mem::replace(&mut self.w, WriteState::Transitioning) { - WriteState::Ready(sink) => { + WriteState::Ready(mut sink) => { let command = command.as_ref(); let mut c = command.to_vec(); if ! c.ends_with(b"\n") { c.push(0x0a); } - WriteState::Sending(io::write_all(sink, c).from_err()) + WriteState::Sending(Box::pin(async move { + sink.write_all(&c).await?; + Ok(sink) + })) }, _ => unreachable!(), }; @@ -192,8 +193,8 @@ impl Client { struct ConnectionFuture(Option<Client>); impl ConnectionFuture { - fn new(c: IpcConnection) -> Self { - let (r, w) = c.split(); + fn new(c: Connection) -> Self { + let (r, w) = tokio::io::split(c); let buffer = Vec::with_capacity(MAX_LINE_LENGTH); Self(Some(Client { r: BufReader::new(r), buffer, done: false, @@ -203,19 +204,19 @@ impl ConnectionFuture { } impl Future for ConnectionFuture { - type Item = Client; - type Error = anyhow::Error; + type Output = Result<Client>; - fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Consume the initial message from the server. - match self.0.as_mut().expect("future polled after completion") - .by_ref().collect().poll()? - { - Async::Ready(response) => { - match response.iter().last() { - Some(Response::Ok { .. }) => - Ok(Async::Ready(self.0.take().unwrap())), - Some(Response::Error { code, message }) => + let client: &mut Client = self.0.as_mut().expect("future polled after completion"); + let mut responses = client.by_ref().collect::<Vec<_>>(); + + match Pin::new(&mut responses).poll(cx) { + Poll::Ready(response) => { + Poll::Ready(match response.iter().last() { + Some(Ok(Response::Ok { .. })) => + Ok(self.0.take().unwrap()), + Some(Ok(Response::Error { code, message })) => Err(Error::HandshakeFailed( format!("Error {}: {:?}", code, message)).into()), l @ Some(_) => @@ -225,25 +226,22 @@ impl Future for ConnectionFuture { None => // XXX does that happen? Err(Error::HandshakeFailed( "No data received from server".into()).into()), - } + }) }, - Async::NotReady => Ok(Async::NotReady), + Poll::Pending => Poll::Pending, } } } impl Stream for Client { - type Item = Response; - type Error = anyhow::Error; + type Item = Result<Response>; /// Attempt to pull out the next value of this stream, returning /// None if the stream is finished. /// /// Note: It _is_ safe to call this again after the stream /// finished, i.e. returned `Ready(None)`. - fn poll(&mut self) - -> std::result::Result<Async<Option<Self::Item>>, Self::Error> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // First, handle sending of the command. match self.w { WriteState::Ready(_) => @@ -252,12 +250,12 @@ impl Stream for Client { self.w = if let WriteState::Sending(mut f) = mem::replace(&mut self.w, WriteState::Transitioning) { - match f.poll() { - Ok(Async::Ready((sink, _))) => WriteState::Ready(sink), - Ok(Async::NotReady) => WriteState::Sending(f), - Err(e) => { + match f.as_mut().poll(cx) { + Poll::Ready(Ok(sink)) => WriteState::Ready(sink), + Poll::Pending => WriteState::Sending(f), + Poll::Ready(Err(e)) => { self.w = WriteState::Dead; - return Err(e); + return Poll::Ready(Some(Err(e))); }, } } else { @@ -272,7 +270,7 @@ impl Stream for Client { // Recheck if we are still sending the command. if let WriteState::Sending(_) = self.w { - return Ok(Async::NotReady); + return Poll::Pending; } // Check if the previous response was one of ok, error, or @@ -280,51 +278,55 @@ impl Stream for Client { if self.done { // If so, we signal end of stream here. self.done = false; - return Ok(Async::Ready(None)); + return Poll::Ready(None); } + // The compiler is not smart enough to figure out disjoint borrows + // through Pin via DerefMut (which wholly borrows `self`), so unwrap it + let Self { buffer, done, r, .. } = Pin::into_inner(self); + let mut reader = Pin::new(r); loop { // Try to yield a line from the buffer. For that, try to // find linebreaks. - if let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) { - let line: Vec<u8> = self.buffer.drain(..p+1).collect(); + if let Some(p) = buffer.iter().position(|&b| b == 0x0a) { + let line: Vec<u8> = buffer.drain(..p+1).collect(); // xxx: rtrim linebreak even more? crlf maybe? let r = Response::parse(&line[..line.len()-1])?; // If this response is one of ok, error, or inquire, // we want to surrender control to the client next // time she asks for an item. - self.done = r.is_done(); - return Ok(Async::Ready(Some(r))); + *done = r.is_done(); + return Poll::Ready(Some(Ok(r))); } // No more linebreaks in the buffer. We need to get more. // First, grow the buffer. - let buffer_len = self.buffer.len(); - self.buffer.resize(buffer_len + MAX_LINE_LENGTH, 0); + let buffer_len = buffer.len(); + buffer.resize(buffer_len + MAX_LINE_LENGTH, 0); - match self.r.poll_read(&mut self.buffer[buffer_len..])? { - Async::Ready(n_read) if n_read == 0 => { + match reader.as_mut().poll_read(cx, &mut buffer[buffer_len..])? { + Poll::Ready(n_read) if n_read == 0 => { // EOF. - self.buffer.resize(buffer_len, 0); - if ! self.buffer.is_empty() { + buffer.resize(buffer_len, 0); + if ! buffer.is_empty() { // Incomplete server response. - return Err(Error::ConnectionClosed( - self.buffer.clone()).into()); + return Poll::Ready(Some(Err(Error::ConnectionClosed( + buffer.clone()).into()))); } // End of stream. - return Ok(Async::Ready(None)); + return Poll::Ready(None); }, - Async::Ready(n_read) => { - self.buffer.resize(buffer_len + n_read, 0); + Poll::Ready(n_read) => { + buffer.resize(buffer_len + n_read, 0); continue; }, - Async::NotReady => { - self.buffer.resize(buffer_len, 0); - return Ok(Async::NotReady); + Poll::Pending => { + buffer.resize(buffer_len, 0); + return Poll::Pending; }, } } diff --git a/ipc/src/gnupg.rs b/ipc/src/gnupg.rs index 3d581477..fc81a07b 100644 --- a/ipc/src/gnupg.rs +++ b/ipc/src/gnupg.rs @@ -8,7 +8,10 @@ use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::process::Command; -use futures::{Async, Future, Stream}; +use futures::{Future, Stream}; + +use std::task::{Poll, self}; +use std::pin::Pin; use sequoia_openpgp as openpgp; use openpgp::types::HashAlgorithm; @@ -241,18 +244,15 @@ impl DerefMut for Agent { } impl Stream for Agent { - type Item = assuan::Response; - type Error = anyhow::Error; + type Item = Result<assuan::Response>; /// Attempt to pull out the next value of this stream, returning /// None if the stream is finished. /// /// Note: It _is_ safe to call this again after the stream /// finished, i.e. returned `Ready(None)`. - fn poll(&mut self) - -> std::result::Result<Async<Option<Self::Item>>, Self::Error> - { - self.c.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + Pin::new(&mut self.c).poll_next(cx) } } @@ -262,11 +262,9 @@ impl Agent { /// Note: This function does not try to start the server. If no /// server is running for the given context, this operation will /// fail. - pub fn connect<'c>(ctx: &'c Context) - -> impl Future<Item = Self, Error = anyhow::Error> + 'c - { - futures::lazy(move || ctx.socket("agent")) - .and_then(Self::connect_to) + pub async fn connect<'c>(ctx: &'c Context) -> Result<Self> { + let path = ctx.socket("agent")?; + Self::connect_to(path).await } /// Connects to the agent at the given path. @@ -274,36 +272,32 @@ impl Agent { /// Note: This function does not try to start the server. If no /// server is running for the given context, this operation will /// fail. - pub fn connect_to<P>(path: P) - -> impl Future<Item = Self, Error = anyhow::Error> + pub async fn connect_to<P>(path: P) -> Result<Self> where P: AsRef<Path> { - assuan::Client::connect(path) - .and_then(|c| Ok(Agent { c })) + Ok(Agent { c: assuan::Client::connect(path).await? }) } /// Creates a signature over the `digest` produced by `algo` using /// `key` with the secret bits managed by the agent. - pub fn sign<'a, R>(&'a mut self, + pub async fn sign<'a, R>(&'a mut self, |