summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Matuszewski <igor@sequoia-pgp.org>2020-10-19 17:59:19 +0200
committerIgor Matuszewski <igor@sequoia-pgp.org>2020-10-23 12:05:54 +0200
commitd30e05a43655a3884c6119e282e0ac58a9c723f7 (patch)
treeeeff82e5a4d40b933a9bd7849396812013eacf39
parenteb324f60bbd4184057797f72cc3db34e6160497d (diff)
ipc: Migrate to std::futures
-rw-r--r--Cargo.lock146
-rw-r--r--ipc/Cargo.toml11
-rw-r--r--ipc/examples/assuan-client.rs25
-rw-r--r--ipc/examples/gpg-agent-client.rs25
-rw-r--r--ipc/src/assuan/mod.rs126
-rw-r--r--ipc/src/gnupg.rs268
-rw-r--r--ipc/src/lib.rs114
-rw-r--r--ipc/tests/gpg-agent.rs20
8 files changed, 400 insertions, 335 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 41f9c0ff..b7272c19 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -292,17 +292,33 @@ version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b867c15d8ff93c4d81b69c89280840f877331ef2a1fccbaf947afecc68b51a9e"
dependencies = [
- "futures",
+ "futures 0.1.30",
]
[[package]]
+name = "capnp"
+version = "0.13.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "394f9ad87764d43d41c8d3ea270fd03def2f455011f3ada86c9f01d88592105d"
+
+[[package]]
name = "capnp-futures"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa07b8de7e06c61c287fb5a03a644e2439fec4fe17e206d4658ac09aeec4b161"
dependencies = [
- "capnp",
- "futures",
+ "capnp 0.10.3",
+ "futures 0.1.30",
+]
+
+[[package]]
+name = "capnp-futures"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9f9ff1dae086de0d7ecbc147fee21aed8b3ad64468f0f991c98da06fb8c8459"
+dependencies = [
+ "capnp 0.13.5",
+ "futures 0.3.6",
]
[[package]]
@@ -311,10 +327,21 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "568eecd404ea80e98d506b922be2de5e1013ac8f9b170242a53068affc79ddc8"
dependencies = [
- "capnp",
- "capnp-futures",
+ "capnp 0.10.3",
+ "capnp-futures 0.10.1",
"capnpc",
- "futures",
+ "futures 0.1.30",
+]
+
+[[package]]
+name = "capnp-rpc"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "37998522d42bbe4a1d266f418b1a053b679a338e904e55afd5ff22333df0e09e"
+dependencies = [
+ "capnp 0.13.5",
+ "capnp-futures 0.13.1",
+ "futures 0.3.6",
]
[[package]]
@@ -323,7 +350,7 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2afedfc194b01c6804ad0a10c7139024b99ee3df6a39bb09bdf759067ababff"
dependencies = [
- "capnp",
+ "capnp 0.10.3",
]
[[package]]
@@ -823,12 +850,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7e4c2612746b0df8fed4ce0c69156021b704c9aefa360311c04e6e9e002eed"
[[package]]
+name = "futures"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5d8e3078b7b2a8a671cb7a3d17b4760e4181ea243227776ba83fd043b4ca034e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a4d35f7401e948629c9c3d6638fb9bf94e0b2121e96c3b428cc4e631f3eb74"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
@@ -838,6 +881,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d674eaa0056896d5ada519900dbf97ead2e46a7b6621e8160d79e2f2e1e2784b"
[[package]]
+name = "futures-executor"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc709ca1da6f66143b8c9bec8e6260181869893714e9b5a490b169b0414144ab"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c"
+
+[[package]]
name = "futures-macro"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -870,9 +930,13 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a894a0acddba51a2d49a6f4263b1e64b8c579ece8af50fa86503d52cd1eea34"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
"futures-macro",
+ "futures-sink",
"futures-task",
+ "memchr",
"pin-project",
"pin-utils",
"proc-macro-hack",
@@ -1492,20 +1556,17 @@ dependencies = [
[[package]]
name = "parity-tokio-ipc"
-version = "0.4.0"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e57fea504fea33f9fbb5f49f378359030e7e026a6ab849bb9e8f0787376f1bf"
+checksum = "c1d417ba1ab454723ff2271bf999fd700027dc48759a13d43e488cc8ca38b87f"
dependencies = [
- "bytes 0.4.12",
- "futures",
+ "futures 0.3.6",
"libc",
"log",
"mio-named-pipes",
"miow 0.3.5",
"rand",
- "tokio 0.1.22",
- "tokio-named-pipes",
- "tokio-uds",
+ "tokio 0.2.22",
"winapi 0.3.9",
]
@@ -1973,11 +2034,11 @@ version = "0.20.0"
dependencies = [
"anyhow",
"buffered-reader",
- "capnp-rpc",
+ "capnp-rpc 0.13.1",
"clap",
"ctor",
"fs2",
- "futures",
+ "futures 0.3.6",
"lalrpop",
"lalrpop-util",
"lazy_static",
@@ -1991,9 +2052,8 @@ dependencies = [
"socket2",
"tempfile",
"thiserror",
- "tokio 0.1.22",
- "tokio-core",
- "tokio-io",
+ "tokio 0.2.22",
+ "tokio-util",
"winapi 0.3.9",
]
@@ -2113,10 +2173,10 @@ name = "sequoia-store"
version = "0.20.0"
dependencies = [
"anyhow",
- "capnp",
- "capnp-rpc",
+ "capnp 0.10.3",
+ "capnp-rpc 0.10.0",
"capnpc",
- "futures",
+ "futures 0.1.30",
"rand",
"rusqlite",
"sequoia-core",
@@ -2416,7 +2476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"mio",
"num_cpus",
"tokio-codec",
@@ -2464,7 +2524,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"tokio-io",
]
@@ -2475,7 +2535,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"iovec",
"log",
"mio",
@@ -2493,7 +2553,7 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e"
dependencies = [
- "futures",
+ "futures 0.1.30",
"tokio-executor",
]
@@ -2504,7 +2564,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671"
dependencies = [
"crossbeam-utils",
- "futures",
+ "futures 0.1.30",
]
[[package]]
@@ -2513,7 +2573,7 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4"
dependencies = [
- "futures",
+ "futures 0.1.30",
"tokio-io",
"tokio-threadpool",
]
@@ -2525,7 +2585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"log",
]
@@ -2541,26 +2601,13 @@ dependencies = [
]
[[package]]
-name = "tokio-named-pipes"
-version = "0.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d282d483052288b2308ba5ee795f5673b159c9bdf63c385a05609da782a5eae"
-dependencies = [
- "bytes 0.4.12",
- "futures",
- "mio",
- "mio-named-pipes",
- "tokio 0.1.22",
-]
-
-[[package]]
name = "tokio-reactor"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351"
dependencies = [
"crossbeam-utils",
- "futures",
+ "futures 0.1.30",
"lazy_static",
"log",
"mio",
@@ -2579,7 +2626,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee"
dependencies = [
"fnv",
- "futures",
+ "futures 0.1.30",
]
[[package]]
@@ -2589,7 +2636,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"iovec",
"mio",
"tokio-io",
@@ -2605,7 +2652,7 @@ dependencies = [
"crossbeam-deque",
"crossbeam-queue",
"crossbeam-utils",
- "futures",
+ "futures 0.1.30",
"lazy_static",
"log",
"num_cpus",
@@ -2620,7 +2667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296"
dependencies = [
"crossbeam-utils",
- "futures",
+ "futures 0.1.30",
"slab",
"tokio-executor",
]
@@ -2642,7 +2689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"log",
"mio",
"tokio-codec",
@@ -2657,7 +2704,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0"
dependencies = [
"bytes 0.4.12",
- "futures",
+ "futures 0.1.30",
"iovec",
"libc",
"log",
@@ -2676,6 +2723,7 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes 0.5.6",
"futures-core",
+ "futures-io",
"futures-sink",
"log",
"pin-project-lite",
diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml
index 49b2d63e..dc680257 100644
--- a/ipc/Cargo.toml
+++ b/ipc/Cargo.toml
@@ -25,9 +25,9 @@ sequoia-core = { path = "../core", version = "0.20" }
anyhow = "1"
buffered-reader = { path = "../buffered-reader", version = "0.20", default-features = false }
-capnp-rpc = "0.10"
+capnp-rpc = "0.13"
fs2 = "0.4.2"
-futures = "0.1"
+futures = "0.3"
lalrpop-util = "0.19"
lazy_static = "1.3"
libc = "0.2.33"
@@ -35,10 +35,9 @@ memsec = { version = "0.6", default-features = false }
rand = { version = "0.7", default-features = false }
tempfile = "3.0"
thiserror = "1"
-tokio = "0.1"
-tokio-core = "0.1"
-tokio-io = "0.1.4"
-parity-tokio-ipc = "0.4"
+tokio = { version = "0.2", features = ["rt-core", "rt-util", "tcp"] }
+tokio-util = { version = "0.3", features = ["compat"] }
+parity-tokio-ipc = "0.7"
socket2 = "0.3.11"
[target.'cfg(windows)'.dependencies]
diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs
index 6d790363..16c6ed41 100644
--- a/ipc/examples/assuan-client.rs
+++ b/ipc/examples/assuan-client.rs
@@ -1,6 +1,4 @@
-use futures;
-use futures::future::Future;
-use futures::stream::Stream;
+use futures::StreamExt;
use clap;
use sequoia_ipc as ipc;
use crate::ipc::assuan::Client;
@@ -18,14 +16,15 @@ fn main() {
.help("Commands to send to the server"))
.get_matches();
- let mut c = Client::connect(matches.value_of("server").unwrap())
- .wait().unwrap();
- for command in matches.values_of("commands").unwrap() {
- eprintln!("> {}", command);
- c.send(command).unwrap();
- c.by_ref().for_each(|response| {
- eprintln!("< {:?}", response);
- Ok(())
- }).wait().unwrap();
- }
+ let mut rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async {
+ let mut c = Client::connect(matches.value_of("server").unwrap()).await.unwrap();
+ for command in matches.values_of("commands").unwrap() {
+ eprintln!("> {}", command);
+ c.send(command).unwrap();
+ while let Some(response) = c.next().await {
+ eprintln!("< {:?}", response);
+ }
+ }
+ });
}
diff --git a/ipc/examples/gpg-agent-client.rs b/ipc/examples/gpg-agent-client.rs
index 61a29e84..64b60234 100644
--- a/ipc/examples/gpg-agent-client.rs
+++ b/ipc/examples/gpg-agent-client.rs
@@ -1,8 +1,6 @@
/// Connects to and sends commands to gpg-agent.
-use futures;
-use futures::future::Future;
-use futures::stream::Stream;
+use futures::StreamExt;
use clap;
use sequoia_ipc as ipc;
use crate::ipc::gnupg::{Context, Agent};
@@ -25,14 +23,17 @@ fn main() {
} else {
Context::new().unwrap()
};
- let mut agent = Agent::connect(&ctx).wait().unwrap();
- for command in matches.values_of("commands").unwrap() {
- eprintln!("> {}", command);
- agent.send(command).unwrap();
- agent.by_ref().for_each(|response| {
- eprintln!("< {:?}", response);
- Ok(())
- }).wait().unwrap();
- }
+ let mut rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async {
+ let mut agent = Agent::connect(&ctx).await.unwrap();
+
+ for command in matches.values_of("commands").unwrap() {
+ eprintln!("> {}", command);
+ agent.send(command).unwrap();
+ while let Some(response) = agent.next().await {
+ eprintln!("< {:?}", response);
+ }
+ }
+ });
}
diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs
index 73fb1734..510893ff 100644
--- a/ipc/src/assuan/mod.rs
+++ b/ipc/src/assuan/mod.rs
@@ -3,16 +3,18 @@
#![warn(missing_docs)]
use std::cmp;
-use std::io::{Write, BufReader};
+use std::io::Write;
use std::mem;
use std::path::Path;
+use std::pin::Pin;
+use std::task::{Poll, Context};
use lalrpop_util::ParseError;
-use futures::{future, Async, Future, Stream};
-use parity_tokio_ipc::IpcConnection;
-use tokio_io::io;
-use tokio_io::AsyncRead;
+use futures::{Future, Stream, StreamExt};
+use parity_tokio_ipc::Connection;
+use tokio::io::{BufReader, ReadHalf, WriteHalf};
+use tokio::io::{AsyncRead, AsyncWriteExt};
use crate::openpgp;
@@ -58,30 +60,26 @@ lalrpop_util::lalrpop_mod!(
/// [`Connection::data()`]: #method.data
/// [`Connection::cancel()`]: #method.cancel
pub struct Client {
- r: BufReader<io::ReadHalf<IpcConnection>>, // xxx: abstract over
+ r: BufReader<ReadHalf<Connection>>, // xxx: abstract over
buffer: Vec<u8>,
done: bool,
w: WriteState,
}
enum WriteState {
- Ready(io::WriteHalf<IpcConnection>),
- Sending(future::FromErr<io::WriteAll<io::WriteHalf<IpcConnection>, Vec<u8>>, anyhow::Error>),
+ Ready(WriteHalf<Connection>),
+ Sending(Pin<Box<dyn Future<Output = Result<WriteHalf<Connection>, anyhow::Error>>>>),
Transitioning,
Dead,
}
impl Client {
/// Connects to the server.
- pub fn connect<P>(path: P)
- -> impl Future<Item = Client, Error = anyhow::Error>
- where P: AsRef<Path>
- {
+ pub async fn connect<P>(path: P) -> Result<Client> where P: AsRef<Path> {
// XXX: Implement Windows support using TCP + nonce approach used upstream
// https://gnupg.org/documentation/manuals/assuan.pdf#Socket%20wrappers
- future::result(IpcConnection::connect(path, &Default::default()))
- .map_err(Into::into)
- .and_then(ConnectionFuture::new)
+ let connection = parity_tokio_ipc::Endpoint::connect(path).await?;
+ Ok(ConnectionFuture::new(connection).await?)
}
/// Lazily sends a command to the server.
@@ -118,13 +116,16 @@ impl Client {
self.w =
match mem::replace(&mut self.w, WriteState::Transitioning)
{
- WriteState::Ready(sink) => {
+ WriteState::Ready(mut sink) => {
let command = command.as_ref();
let mut c = command.to_vec();
if ! c.ends_with(b"\n") {
c.push(0x0a);
}
- WriteState::Sending(io::write_all(sink, c).from_err())
+ WriteState::Sending(Box::pin(async move {
+ sink.write_all(&c).await?;
+ Ok(sink)
+ }))
},
_ => unreachable!(),
};
@@ -192,8 +193,8 @@ impl Client {
struct ConnectionFuture(Option<Client>);
impl ConnectionFuture {
- fn new(c: IpcConnection) -> Self {
- let (r, w) = c.split();
+ fn new(c: Connection) -> Self {
+ let (r, w) = tokio::io::split(c);
let buffer = Vec::with_capacity(MAX_LINE_LENGTH);
Self(Some(Client {
r: BufReader::new(r), buffer, done: false,
@@ -203,19 +204,19 @@ impl ConnectionFuture {
}
impl Future for ConnectionFuture {
- type Item = Client;
- type Error = anyhow::Error;
+ type Output = Result<Client>;
- fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Consume the initial message from the server.
- match self.0.as_mut().expect("future polled after completion")
- .by_ref().collect().poll()?
- {
- Async::Ready(response) => {
- match response.iter().last() {
- Some(Response::Ok { .. }) =>
- Ok(Async::Ready(self.0.take().unwrap())),
- Some(Response::Error { code, message }) =>
+ let client: &mut Client = self.0.as_mut().expect("future polled after completion");
+ let mut responses = client.by_ref().collect::<Vec<_>>();
+
+ match Pin::new(&mut responses).poll(cx) {
+ Poll::Ready(response) => {
+ Poll::Ready(match response.iter().last() {
+ Some(Ok(Response::Ok { .. })) =>
+ Ok(self.0.take().unwrap()),
+ Some(Ok(Response::Error { code, message })) =>
Err(Error::HandshakeFailed(
format!("Error {}: {:?}", code, message)).into()),
l @ Some(_) =>
@@ -225,25 +226,22 @@ impl Future for ConnectionFuture {
None => // XXX does that happen?
Err(Error::HandshakeFailed(
"No data received from server".into()).into()),
- }
+ })
},
- Async::NotReady => Ok(Async::NotReady),
+ Poll::Pending => Poll::Pending,
}
}
}
impl Stream for Client {
- type Item = Response;
- type Error = anyhow::Error;
+ type Item = Result<Response>;
/// Attempt to pull out the next value of this stream, returning
/// None if the stream is finished.
///
/// Note: It _is_ safe to call this again after the stream
/// finished, i.e. returned `Ready(None)`.
- fn poll(&mut self)
- -> std::result::Result<Async<Option<Self::Item>>, Self::Error>
- {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First, handle sending of the command.
match self.w {
WriteState::Ready(_) =>
@@ -252,12 +250,12 @@ impl Stream for Client {
self.w = if let WriteState::Sending(mut f) =
mem::replace(&mut self.w, WriteState::Transitioning)
{
- match f.poll() {
- Ok(Async::Ready((sink, _))) => WriteState::Ready(sink),
- Ok(Async::NotReady) => WriteState::Sending(f),
- Err(e) => {
+ match f.as_mut().poll(cx) {
+ Poll::Ready(Ok(sink)) => WriteState::Ready(sink),
+ Poll::Pending => WriteState::Sending(f),
+ Poll::Ready(Err(e)) => {
self.w = WriteState::Dead;
- return Err(e);
+ return Poll::Ready(Some(Err(e)));
},
}
} else {
@@ -272,7 +270,7 @@ impl Stream for Client {
// Recheck if we are still sending the command.
if let WriteState::Sending(_) = self.w {
- return Ok(Async::NotReady);
+ return Poll::Pending;
}
// Check if the previous response was one of ok, error, or
@@ -280,51 +278,55 @@ impl Stream for Client {
if self.done {
// If so, we signal end of stream here.
self.done = false;
- return Ok(Async::Ready(None));
+ return Poll::Ready(None);
}
+ // The compiler is not smart enough to figure out disjoint borrows
+ // through Pin via DerefMut (which wholly borrows `self`), so unwrap it
+ let Self { buffer, done, r, .. } = Pin::into_inner(self);
+ let mut reader = Pin::new(r);
loop {
// Try to yield a line from the buffer. For that, try to
// find linebreaks.
- if let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) {
- let line: Vec<u8> = self.buffer.drain(..p+1).collect();
+ if let Some(p) = buffer.iter().position(|&b| b == 0x0a) {
+ let line: Vec<u8> = buffer.drain(..p+1).collect();
// xxx: rtrim linebreak even more? crlf maybe?
let r = Response::parse(&line[..line.len()-1])?;
// If this response is one of ok, error, or inquire,
// we want to surrender control to the client next
// time she asks for an item.
- self.done = r.is_done();
- return Ok(Async::Ready(Some(r)));
+ *done = r.is_done();
+ return Poll::Ready(Some(Ok(r)));
}
// No more linebreaks in the buffer. We need to get more.
// First, grow the buffer.
- let buffer_len = self.buffer.len();
- self.buffer.resize(buffer_len + MAX_LINE_LENGTH, 0);
+ let buffer_len = buffer.len();
+ buffer.resize(buffer_len + MAX_LINE_LENGTH, 0);
- match self.r.poll_read(&mut self.buffer[buffer_len..])? {
- Async::Ready(n_read) if n_read == 0 => {
+ match reader.as_mut().poll_read(cx, &mut buffer[buffer_len..])? {
+ Poll::Ready(n_read) if n_read == 0 => {
// EOF.
- self.buffer.resize(buffer_len, 0);
- if ! self.buffer.is_empty() {
+ buffer.resize(buffer_len, 0);
+ if ! buffer.is_empty() {
// Incomplete server response.
- return Err(Error::ConnectionClosed(
- self.buffer.clone()).into());
+ return Poll::Ready(Some(Err(Error::ConnectionClosed(
+ buffer.clone()).into())));
}
// End of stream.
- return Ok(Async::Ready(None));
+ return Poll::Ready(None);
},
- Async::Ready(n_read) => {
- self.buffer.resize(buffer_len + n_read, 0);
+ Poll::Ready(n_read) => {
+ buffer.resize(buffer_len + n_read, 0);
continue;
},
- Async::NotReady => {
- self.buffer.resize(buffer_len, 0);
- return Ok(Async::NotReady);
+ Poll::Pending => {
+ buffer.resize(buffer_len, 0);
+ return Poll::Pending;
},
}
}
diff --git a/ipc/src/gnupg.rs b/ipc/src/gnupg.rs
index 3d581477..fc81a07b 100644
--- a/ipc/src/gnupg.rs
+++ b/ipc/src/gnupg.rs
@@ -8,7 +8,10 @@ use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::process::Command;
-use futures::{Async, Future, Stream};
+use futures::{Future, Stream};
+
+use std::task::{Poll, self};
+use std::pin::Pin;
use sequoia_openpgp as openpgp;
use openpgp::types::HashAlgorithm;
@@ -241,18 +244,15 @@ impl DerefMut for Agent {
}
impl Stream for Agent {
- type Item = assuan::Response;
- type Error = anyhow::Error;
+ type Item = Result<assuan::Response>;
/// Attempt to pull out the next value of this stream, returning
/// None if the stream is finished.
///
/// Note: It _is_ safe to call this again after the stream
/// finished, i.e. returned `Ready(None)`.
- fn poll(&mut self)
- -> std::result::Result<Async<Option<Self::Item>>, Self::Error>
- {
- self.c.poll()
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+ Pin::new(&mut self.c).poll_next(cx)
}
}
@@ -262,11 +262,9 @@ impl Agent {
/// Note: This function does not try to start the server. If no
/// server is running for the given context, this operation will
/// fail.
- pub fn connect<'c>(ctx: &'c Context)
- -> impl Future<Item = Self, Error = anyhow::Error> + 'c
- {
- futures::lazy(move || ctx.socket("agent"))
- .and_then(Self::connect_to)
+ pub async fn connect<'c>(ctx: &'c Context) -> Result<Self> {
+ let path = ctx.socket("agent")?;
+ Self::connect_to(path).await
}
/// Connects to the agent at the given path.
@@ -274,36 +272,32 @@ impl Agent {
/// Note: This function does not try to start the server. If no
/// server is running for the given context, this operation will
/// fail.
- pub fn connect_to<P>(path: P)
- -> impl Future<Item = Self, Error = anyhow::Error>
+ pub async fn connect_to<P>(path: P) -> Result<Self>
where P: AsRef<Path>
{
- assuan::Client::connect(path)
- .and_then(|c| Ok(Agent { c }))
+ Ok(Agent { c: assuan::Client::connect(path).await? })
}
/// Creates a signature over the `digest` produced by `algo` using
/// `key` with the secret bits managed by the agent.
- pub fn sign<'a, R>(&'a mut self,
+ pub async fn sign<'a, R>(&'a mut self,