diff options
author | Igor Matuszewski <igor@sequoia-pgp.org> | 2020-10-19 17:59:19 +0200 |
---|---|---|
committer | Igor Matuszewski <igor@sequoia-pgp.org> | 2020-10-23 12:05:54 +0200 |
commit | d30e05a43655a3884c6119e282e0ac58a9c723f7 (patch) | |
tree | eeff82e5a4d40b933a9bd7849396812013eacf39 /ipc | |
parent | eb324f60bbd4184057797f72cc3db34e6160497d (diff) |
ipc: Migrate to std::futures
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/Cargo.toml | 11 | ||||
-rw-r--r-- | ipc/examples/assuan-client.rs | 25 | ||||
-rw-r--r-- | ipc/examples/gpg-agent-client.rs | 25 | ||||
-rw-r--r-- | ipc/src/assuan/mod.rs | 126 | ||||
-rw-r--r-- | ipc/src/gnupg.rs | 268 | ||||
-rw-r--r-- | ipc/src/lib.rs | 114 | ||||
-rw-r--r-- | ipc/tests/gpg-agent.rs | 20 |
7 files changed, 303 insertions, 286 deletions
diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index 49b2d63e..dc680257 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -25,9 +25,9 @@ sequoia-core = { path = "../core", version = "0.20" } anyhow = "1" buffered-reader = { path = "../buffered-reader", version = "0.20", default-features = false } -capnp-rpc = "0.10" +capnp-rpc = "0.13" fs2 = "0.4.2" -futures = "0.1" +futures = "0.3" lalrpop-util = "0.19" lazy_static = "1.3" libc = "0.2.33" @@ -35,10 +35,9 @@ memsec = { version = "0.6", default-features = false } rand = { version = "0.7", default-features = false } tempfile = "3.0" thiserror = "1" -tokio = "0.1" -tokio-core = "0.1" -tokio-io = "0.1.4" -parity-tokio-ipc = "0.4" +tokio = { version = "0.2", features = ["rt-core", "rt-util", "tcp"] } +tokio-util = { version = "0.3", features = ["compat"] } +parity-tokio-ipc = "0.7" socket2 = "0.3.11" [target.'cfg(windows)'.dependencies] diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs index 6d790363..16c6ed41 100644 --- a/ipc/examples/assuan-client.rs +++ b/ipc/examples/assuan-client.rs @@ -1,6 +1,4 @@ -use futures; -use futures::future::Future; -use futures::stream::Stream; +use futures::StreamExt; use clap; use sequoia_ipc as ipc; use crate::ipc::assuan::Client; @@ -18,14 +16,15 @@ fn main() { .help("Commands to send to the server")) .get_matches(); - let mut c = Client::connect(matches.value_of("server").unwrap()) - .wait().unwrap(); - for command in matches.values_of("commands").unwrap() { - eprintln!("> {}", command); - c.send(command).unwrap(); - c.by_ref().for_each(|response| { - eprintln!("< {:?}", response); - Ok(()) - }).wait().unwrap(); - } + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let mut c = Client::connect(matches.value_of("server").unwrap()).await.unwrap(); + for command in matches.values_of("commands").unwrap() { + eprintln!("> {}", command); + c.send(command).unwrap(); + while let Some(response) = c.next().await { + eprintln!("< {:?}", response); + } + } + }); } diff --git a/ipc/examples/gpg-agent-client.rs b/ipc/examples/gpg-agent-client.rs index 61a29e84..64b60234 100644 --- a/ipc/examples/gpg-agent-client.rs +++ b/ipc/examples/gpg-agent-client.rs @@ -1,8 +1,6 @@ /// Connects to and sends commands to gpg-agent. -use futures; -use futures::future::Future; -use futures::stream::Stream; +use futures::StreamExt; use clap; use sequoia_ipc as ipc; use crate::ipc::gnupg::{Context, Agent}; @@ -25,14 +23,17 @@ fn main() { } else { Context::new().unwrap() }; - let mut agent = Agent::connect(&ctx).wait().unwrap(); - for command in matches.values_of("commands").unwrap() { - eprintln!("> {}", command); - agent.send(command).unwrap(); - agent.by_ref().for_each(|response| { - eprintln!("< {:?}", response); - Ok(()) - }).wait().unwrap(); - } + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let mut agent = Agent::connect(&ctx).await.unwrap(); + + for command in matches.values_of("commands").unwrap() { + eprintln!("> {}", command); + agent.send(command).unwrap(); + while let Some(response) = agent.next().await { + eprintln!("< {:?}", response); + } + } + }); } diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs index 73fb1734..510893ff 100644 --- a/ipc/src/assuan/mod.rs +++ b/ipc/src/assuan/mod.rs @@ -3,16 +3,18 @@ #![warn(missing_docs)] use std::cmp; -use std::io::{Write, BufReader}; +use std::io::Write; use std::mem; use std::path::Path; +use std::pin::Pin; +use std::task::{Poll, Context}; use lalrpop_util::ParseError; -use futures::{future, Async, Future, Stream}; -use parity_tokio_ipc::IpcConnection; -use tokio_io::io; -use tokio_io::AsyncRead; +use futures::{Future, Stream, StreamExt}; +use parity_tokio_ipc::Connection; +use tokio::io::{BufReader, ReadHalf, WriteHalf}; +use tokio::io::{AsyncRead, AsyncWriteExt}; use crate::openpgp; @@ -58,30 +60,26 @@ lalrpop_util::lalrpop_mod!( /// [`Connection::data()`]: #method.data /// [`Connection::cancel()`]: #method.cancel pub struct Client { - r: BufReader<io::ReadHalf<IpcConnection>>, // xxx: abstract over + r: BufReader<ReadHalf<Connection>>, // xxx: abstract over buffer: Vec<u8>, done: bool, w: WriteState, } enum WriteState { - Ready(io::WriteHalf<IpcConnection>), - Sending(future::FromErr<io::WriteAll<io::WriteHalf<IpcConnection>, Vec<u8>>, anyhow::Error>), + Ready(WriteHalf<Connection>), + Sending(Pin<Box<dyn Future<Output = Result<WriteHalf<Connection>, anyhow::Error>>>>), Transitioning, Dead, } impl Client { /// Connects to the server. - pub fn connect<P>(path: P) - -> impl Future<Item = Client, Error = anyhow::Error> - where P: AsRef<Path> - { + pub async fn connect<P>(path: P) -> Result<Client> where P: AsRef<Path> { // XXX: Implement Windows support using TCP + nonce approach used upstream // https://gnupg.org/documentation/manuals/assuan.pdf#Socket%20wrappers - future::result(IpcConnection::connect(path, &Default::default())) - .map_err(Into::into) - .and_then(ConnectionFuture::new) + let connection = parity_tokio_ipc::Endpoint::connect(path).await?; + Ok(ConnectionFuture::new(connection).await?) } /// Lazily sends a command to the server. @@ -118,13 +116,16 @@ impl Client { self.w = match mem::replace(&mut self.w, WriteState::Transitioning) { - WriteState::Ready(sink) => { + WriteState::Ready(mut sink) => { let command = command.as_ref(); let mut c = command.to_vec(); if ! c.ends_with(b"\n") { c.push(0x0a); } - WriteState::Sending(io::write_all(sink, c).from_err()) + WriteState::Sending(Box::pin(async move { + sink.write_all(&c).await?; + Ok(sink) + })) }, _ => unreachable!(), }; @@ -192,8 +193,8 @@ impl Client { struct ConnectionFuture(Option<Client>); impl ConnectionFuture { - fn new(c: IpcConnection) -> Self { - let (r, w) = c.split(); + fn new(c: Connection) -> Self { + let (r, w) = tokio::io::split(c); let buffer = Vec::with_capacity(MAX_LINE_LENGTH); Self(Some(Client { r: BufReader::new(r), buffer, done: false, @@ -203,19 +204,19 @@ impl ConnectionFuture { } impl Future for ConnectionFuture { - type Item = Client; - type Error = anyhow::Error; + type Output = Result<Client>; - fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Consume the initial message from the server. - match self.0.as_mut().expect("future polled after completion") - .by_ref().collect().poll()? - { - Async::Ready(response) => { - match response.iter().last() { - Some(Response::Ok { .. }) => - Ok(Async::Ready(self.0.take().unwrap())), - Some(Response::Error { code, message }) => + let client: &mut Client = self.0.as_mut().expect("future polled after completion"); + let mut responses = client.by_ref().collect::<Vec<_>>(); + + match Pin::new(&mut responses).poll(cx) { + Poll::Ready(response) => { + Poll::Ready(match response.iter().last() { + Some(Ok(Response::Ok { .. })) => + Ok(self.0.take().unwrap()), + Some(Ok(Response::Error { code, message })) => Err(Error::HandshakeFailed( format!("Error {}: {:?}", code, message)).into()), l @ Some(_) => @@ -225,25 +226,22 @@ impl Future for ConnectionFuture { None => // XXX does that happen? Err(Error::HandshakeFailed( "No data received from server".into()).into()), - } + }) }, - Async::NotReady => Ok(Async::NotReady), + Poll::Pending => Poll::Pending, } } } impl Stream for Client { - type Item = Response; - type Error = anyhow::Error; + type Item = Result<Response>; /// Attempt to pull out the next value of this stream, returning /// None if the stream is finished. /// /// Note: It _is_ safe to call this again after the stream /// finished, i.e. returned `Ready(None)`. - fn poll(&mut self) - -> std::result::Result<Async<Option<Self::Item>>, Self::Error> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // First, handle sending of the command. match self.w { WriteState::Ready(_) => @@ -252,12 +250,12 @@ impl Stream for Client { self.w = if let WriteState::Sending(mut f) = mem::replace(&mut self.w, WriteState::Transitioning) { - match f.poll() { - Ok(Async::Ready((sink, _))) => WriteState::Ready(sink), - Ok(Async::NotReady) => WriteState::Sending(f), - Err(e) => { + match f.as_mut().poll(cx) { + Poll::Ready(Ok(sink)) => WriteState::Ready(sink), + Poll::Pending => WriteState::Sending(f), + Poll::Ready(Err(e)) => { self.w = WriteState::Dead; - return Err(e); + return Poll::Ready(Some(Err(e))); }, } } else { @@ -272,7 +270,7 @@ impl Stream for Client { // Recheck if we are still sending the command. if let WriteState::Sending(_) = self.w { - return Ok(Async::NotReady); + return Poll::Pending; } // Check if the previous response was one of ok, error, or @@ -280,51 +278,55 @@ impl Stream for Client { if self.done { // If so, we signal end of stream here. self.done = false; - return Ok(Async::Ready(None)); + return Poll::Ready(None); } + // The compiler is not smart enough to figure out disjoint borrows + // through Pin via DerefMut (which wholly borrows `self`), so unwrap it + let Self { buffer, done, r, .. } = Pin::into_inner(self); + let mut reader = Pin::new(r); loop { // Try to yield a line from the buffer. For that, try to // find linebreaks. - if let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) { - let line: Vec<u8> = self.buffer.drain(..p+1).collect(); + if let Some(p) = buffer.iter().position(|&b| b == 0x0a) { + let line: Vec<u8> = buffer.drain(..p+1).collect(); // xxx: rtrim linebreak even more? crlf maybe? let r = Response::parse(&line[..line.len()-1])?; // If this response is one of ok, error, or inquire, // we want to surrender control to the client next // time she asks for an item. - self.done = r.is_done(); - return Ok(Async::Ready(Some(r))); + *done = r.is_done(); + return Poll::Ready(Some(Ok(r))); } // No more linebreaks in the buffer. We need to get more. // First, grow the buffer. - let buffer_len = self.buffer.len(); - self.buffer.resize(buffer_len + MAX_LINE_LENGTH, 0); + let buffer_len = buffer.len(); + buffer.resize(buffer_len + MAX_LINE_LENGTH, 0); - match self.r.poll_read(&mut self.buffer[buffer_len..])? { - Async::Ready(n_read) if n_read == 0 => { + match reader.as_mut().poll_read(cx, &mut buffer[buffer_len..])? { + Poll::Ready(n_read) if n_read == 0 => { // EOF. - self.buffer.resize(buffer_len, 0); - if ! self.buffer.is_empty() { + buffer.resize(buffer_len, 0); + if ! buffer.is_empty() { // Incomplete server response. - return Err(Error::ConnectionClosed( - self.buffer.clone()).into()); + return Poll::Ready(Some(Err(Error::ConnectionClosed( + buffer.clone()).into()))); } // End of stream. - return Ok(Async::Ready(None)); + return Poll::Ready(None); }, - Async::Ready(n_read) => { - self.buffer.resize(buffer_len + n_read, 0); + Poll::Ready(n_read) => { + buffer.resize(buffer_len + n_read, 0); continue; }, - Async::NotReady => { - self.buffer.resize(buffer_len, 0); - return Ok(Async::NotReady); + Poll::Pending => { + buffer.resize(buffer_len, 0); + return Poll::Pending; }, } } diff --git a/ipc/src/gnupg.rs b/ipc/src/gnupg.rs index 3d581477..fc81a07b 100644 --- a/ipc/src/gnupg.rs +++ b/ipc/src/gnupg.rs @@ -8,7 +8,10 @@ use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::process::Command; -use futures::{Async, Future, Stream}; +use futures::{Future, Stream}; + +use std::task::{Poll, self}; +use std::pin::Pin; use sequoia_openpgp as openpgp; use openpgp::types::HashAlgorithm; @@ -241,18 +244,15 @@ impl DerefMut for Agent { } impl Stream for Agent { - type Item = assuan::Response; - type Error = anyhow::Error; + type Item = Result<assuan::Response>; /// Attempt to pull out the next value of this stream, returning /// None if the stream is finished. /// /// Note: It _is_ safe to call this again after the stream /// finished, i.e. returned `Ready(None)`. - fn poll(&mut self) - -> std::result::Result<Async<Option<Self::Item>>, Self::Error> - { - self.c.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + Pin::new(&mut self.c).poll_next(cx) } } @@ -262,11 +262,9 @@ impl Agent { /// Note: This function does not try to start the server. If no /// server is running for the given context, this operation will /// fail. - pub fn connect<'c>(ctx: &'c Context) - -> impl Future<Item = Self, Error = anyhow::Error> + 'c - { - futures::lazy(move || ctx.socket("agent")) - .and_then(Self::connect_to) + pub async fn connect<'c>(ctx: &'c Context) -> Result<Self> { + let path = ctx.socket("agent")?; + Self::connect_to(path).await } /// Connects to the agent at the given path. @@ -274,36 +272,32 @@ impl Agent { /// Note: This function does not try to start the server. If no /// server is running for the given context, this operation will /// fail. - pub fn connect_to<P>(path: P) - -> impl Future<Item = Self, Error = anyhow::Error> + pub async fn connect_to<P>(path: P) -> Result<Self> where P: AsRef<Path> { - assuan::Client::connect(path) - .and_then(|c| Ok(Agent { c })) + Ok(Agent { c: assuan::Client::connect(path).await? }) } /// Creates a signature over the `digest` produced by `algo` using /// `key` with the secret bits managed by the agent. - pub fn sign<'a, R>(&'a mut self, + pub async fn sign<'a, R>(&'a mut self, key: &'a Key<key::PublicParts, R>, algo: HashAlgorithm, digest: &'a [u8]) - -> impl Future<Item = crypto::mpi::Signature, - Error = anyhow::Error> + 'a + -> Result<crypto::mpi::Signature> where R: key::KeyRole { - SigningRequest::new(&mut self.c, key, algo, digest) + SigningRequest::new(&mut self.c, key, algo, digest).await } /// Decrypts `ciphertext` using `key` with the secret bits managed /// by the agent. - pub fn decrypt<'a, R>(&'a mut self, + pub async fn decrypt<'a, R>(&'a mut self, key: &'a Key<key::PublicParts, R>, ciphertext: &'a crypto::mpi::Ciphertext) - -> impl Future<Item = crypto::SessionKey, - Error = anyhow::Error> + 'a + -> Result<crypto::SessionKey> where R: key::KeyRole { - DecryptionRequest::new(&mut self.c, key, ciphertext) + DecryptionRequest::new(&mut self.c, key, ciphertext).await } /// Computes options that we want to communicate. @@ -407,109 +401,110 @@ fn protocol_error<T>(response: &assuan::Response) -> Result<T> { impl<'a, 'b, 'c, R> Future for SigningRequest<'a, 'b, 'c, R> where R: key::KeyRole { - type Item = crypto::mpi::Signature; - type Error = anyhow::Error; + type Output = Result<crypto::mpi::Signature>; - fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { use self::SigningRequestState::*; + // The compiler is not smart enough to figure out disjoint borrows + // through Pin via DerefMut (which wholly borrows `self`), so unwrap it + let Self { c, state, key, options, algo, digest } = Pin::into_inner(self); + let mut client = Pin::new(c); + loop { - match self.state { + match state { Start => { - if self.options.is_empty() { - self.c.send(format!("SIGKEY {}", - Keygrip::of(self.key.mpis())?))?; - self.state = SigKey; + if options.is_empty() { + let grip = Keygrip::of(key.mpis())?; + client.send(format!("SIGKEY {}", grip))?; + *state = SigKey; } else { - self.c.send(self.options.pop().unwrap())?; - self.state = Options; + let opts = options.pop().unwrap(); + client.send(opts)?; + *state = Options; } }, - Options => match self.c.poll()? { - Async::Ready(Some(r)) => match r { + Options => match client.as_mut().poll_next(cx)? { + Poll::Ready(Some(r)) => match r { assuan::Response::Ok { .. } | assuan::Response::Comment { .. } | assuan::Response::Status { .. } => (), // Ignore. assuan::Response::Error { ref message, .. } => - return operation_failed(message), + return Poll::Ready(operation_failed(message)), _ => - return protocol_error(&r), + return Poll::Ready(protocol_error(&r)), }, - Async::Ready(None) => { - if let Some(option) = self.options.pop() { - self.c.send(option)?; + Poll::Ready(None) => { + if let Some(option) = options.pop() { + client.send(option)?; } else { - self.c.send(format!("SIGKEY {}", - Keygrip::of(self.key.mpis())?))?; - self.state = SigKey; + let grip = Keygrip::of(key.mpis())?; + client.send(format!("SIGKEY {}", grip))?; + *state = SigKey; } }, - Async::NotReady => - return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, }, - SigKey => match self.c.poll()? { - Async::Ready(Some(r)) => match r { + SigKey => match client.as_mut().poll_next(cx)? { + Poll::Ready(Some(r)) => match r { assuan::Response::Ok { .. } | assuan::Response::Comment { .. } | assuan::Response::Status { .. } => (), // Ignore. assuan::Response::Error { ref message, .. } => - return operation_failed(message), + return Poll::Ready(operation_failed(message)), _ => - return protocol_error(&r), + return Poll::Ready(protocol_error(&r)), }, - Async::Ready(None) => { - self.c.send(format!("SETHASH {} {}", - u8::from(self.algo), - hex::encode(&self.digest)))?; - self.state = SetHash; + Poll::Ready(None) => { + let algo = u8::from(*algo); + let digest = hex::encode(&digest); + client.send(format!("SETHASH {} {}", algo, digest))?; + *state = SetHash; }, - Async::NotReady => - return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, }, - SetHash => match self.c.poll()? { - Async::Ready(Some(r)) => match r { + SetHash => match client.as_mut().poll_next(cx)? { + Poll::Ready(Some(r)) => match r { assuan::Response::Ok { .. } | assuan::Response::Comment { .. } | assuan::Response::Status { .. } => (), // Ignore. assuan::Response::Error { ref message, .. } => - return operation_failed(message), + return Poll::Ready(operation_failed(message)), _ => - return protocol_error(&r), + return Poll::Ready(protocol_error(&r)), }, - Async::Ready(None) => { - self.c.send("PKSIGN")?; - self.state = PkSign(Vec::new()); + Poll::Ready(None) => { + client.send("PKSIGN")?; + *state = PkSign(Vec::new()); }, - Async::NotReady => - return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, }, - PkSign(ref mut data) => match self.c.poll()? { - Async::Ready(Some(r)) => match r { + PkSign(ref mut data) => match client.as_mut().poll_next(cx)? { + Poll::Ready(Some(r)) => match r { assuan::Response::Ok { .. } | assuan::Response::Comment { .. } | assuan::Response::Status { .. } => (), // Ignore. assuan::Response::Error { ref message, .. } => - return operation_failed(message), + return Poll::Ready(operation_failed(message)), assuan::Response::Data { ref partial } => data.extend_from_slice(partial), _ => - return protocol_error(&r), + return Poll::Ready(protocol_error(&r)), }, - Async::Ready(None) => { - return Ok(Async::Ready( - Sexp::from_bytes(&data)?.to_signature()?)); + Poll::Ready(None) => { + return Poll::Ready( + Sexp::from_bytes(&data)?.to_signature()); }, - Async::NotReady => - return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, }, } } @@ -555,70 +550,73 @@ enum DecryptionRequestState { impl<'a, 'b, 'c, R> Future for DecryptionRequest<'a, 'b, 'c, R> where R: key::KeyRole { - type Item = crypto::SessionKey; - type Error = anyhow::Error; + type Output = Result<crypto::SessionKey>; - fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { use self::DecryptionRequestState::*; + // The compiler is not smart enough to figure out disjoint borrows + // through Pin via DerefMut (which wholly borrows `self`), so unwrap it + let Self { c, state, key, ciphertext, options } = self.deref_mut(); + let mut client = Pin::new(c); + loop { - match self.state { + match state { Start => { - if self.options.is_empty() { - self.c.send(format!("SETKEY {}", - Keygrip::of(self.key.mpis())?))?; - self.state = SetKey; + |