diff options
author | Justus Winter <justus@sequoia-pgp.org> | 2019-06-20 14:21:45 +0200 |
---|---|---|
committer | Justus Winter <justus@sequoia-pgp.org> | 2019-06-24 19:00:37 +0200 |
commit | 0ec5511b235b6844eca0094505e32e6d911cd142 (patch) | |
tree | f9eaf899fb72e83b326d46cee0cd1a37edb54b79 | |
parent | 7b2f7e842c359475f9044c969f16527311c4009b (diff) |
ipc: Lazily execute commands.
-rw-r--r-- | ipc/examples/assuan-client.rs | 7 | ||||
-rw-r--r-- | ipc/src/assuan/mod.rs | 151 | ||||
-rw-r--r-- | ipc/src/lib.rs | 1 |
3 files changed, 127 insertions, 32 deletions
diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs index 67286e43..d92e0593 100644 --- a/ipc/examples/assuan-client.rs +++ b/ipc/examples/assuan-client.rs @@ -22,9 +22,10 @@ fn main() { .wait().unwrap(); for command in matches.values_of("commands").unwrap() { eprintln!("> {}", command); - c.send(command).wait().unwrap(); - for response in c.by_ref().wait() { + c.send(command).unwrap(); + c.by_ref().for_each(|response| { eprintln!("< {:?}", response); - } + Ok(()) + }).wait().unwrap(); } } diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs index 8b704846..e250b542 100644 --- a/ipc/src/assuan/mod.rs +++ b/ipc/src/assuan/mod.rs @@ -4,15 +4,18 @@ use std::cmp; use std::io::{Write, BufReader}; +use std::mem; use std::path::Path; use lalrpop_util::ParseError; -use futures::{Async, Future, Stream}; +use futures::{future, Async, Future, Stream}; use tokio::net::UnixStream; use tokio_io::io; use tokio_io::AsyncRead; +use openpgp; + use Error; use Result; @@ -27,20 +30,26 @@ lalrpop_util::lalrpop_mod!( /// A connection to an Assuan server. /// -/// Commands may be issued using [`Connection::send`]. +/// Commands may be issued using [`Connection::send`]. Note that the +/// command is sent lazily, i.e. it is only send if you poll for the +/// responses. /// /// [`Connection::send`]: #method.send /// /// `Client` implements [`Stream`] to return all server responses -/// until the first `Response::Ok`, `Response::Error`, or -/// `Response::Inquire`. +/// until the first [`Response::Ok`], [`Response::Error`], or +/// [`Response::Inquire`]. /// /// [`Stream`]: #impl-Stream +/// [`Response::Ok`]: enum.Response.html#variant.Ok +/// [`Response::Error`]: enum.Response.html#variant.Error +/// [`Response::Inquire`]: enum.Response.html#variant.Inquire /// -/// `Ok` and `Error` indicate success and failure. `Inquire` means -/// that the server requires more information to complete the request. -/// This information may be provided using [`Connection::data()`], or -/// the operation may be canceled using [`Connection::cancel()`]. +/// [`Response::Ok`] and [`Response::Error`] indicate success and +/// failure. [`Response::Inquire`] means that the server requires +/// more information to complete the request. This information may be +/// provided using [`Connection::data()`], or the operation may be +/// canceled using [`Connection::cancel()`]. /// /// [`Connection::data()`]: #method.data /// [`Connection::cancel()`]: #method.cancel @@ -48,7 +57,14 @@ pub struct Client { r: BufReader<io::ReadHalf<UnixStream>>, // xxx: abstract over buffer: Vec<u8>, done: bool, - w: io::WriteHalf<UnixStream>, + w: WriteState, +} + +enum WriteState { + Ready(io::WriteHalf<UnixStream>), + Sending(future::FromErr<io::WriteAll<io::WriteHalf<tokio::net::UnixStream>, Vec<u8>>, failure::Error>), + Transitioning, + Dead, } impl Client { @@ -61,37 +77,80 @@ impl Client { .and_then(ConnectionFuture::new) } - /// Sends a command to the server. + /// Lazily sends a command to the server. + /// + /// For the command to be actually executed, stream the responses + /// using this objects [`Stream`] implementation. + /// + /// [`Stream`]: #impl-Stream + /// + /// The response stream ends in either a [`Response::Ok`], + /// [`Response::Error`], or [`Response::Inquire`]. `Ok` and + /// `Error` indicate success and failure of the current operation. + /// `Inquire` means that the server requires more information to + /// complete the request. This information may be provided using + /// [`Connection::data()`], or the operation may be canceled using + /// [`Connection::cancel()`]. + /// + /// [`Response::Ok`]: enum.Response.html#variant.Ok + /// [`Response::Error`]: enum.Response.html#variant.Error + /// [`Response::Inquire`]: enum.Response.html#variant.Inquire + /// [`Connection::data()`]: #method.data + /// [`Connection::cancel()`]: #method.cancel /// /// Note: `command` is passed as-is. Control characters, like /// `%`, must be %-escaped. - pub fn send<'a, C: 'a>(&'a mut self, command: C) - -> impl Future<Item = (), Error = failure::Error> + 'a + pub fn send<'a, C: 'a>(&'a mut self, command: C) -> Result<()> where C: AsRef<[u8]> { - let command = command.as_ref(); - let mut c = command.to_vec(); - if ! c.ends_with(b"\n") { - c.push(0x0a); + if let WriteState::Sending(_) = self.w { + return Err(openpgp::Error::InvalidOperation( + "Busy, poll responses first".into()).into()); } - io::write_all(&mut self.w, c).map(|_| ()).from_err() + + self.w = + match mem::replace(&mut self.w, WriteState::Transitioning) + { + WriteState::Ready(sink) => { + let command = command.as_ref(); + let mut c = command.to_vec(); + if ! c.ends_with(b"\n") { + c.push(0x0a); + } + WriteState::Sending(io::write_all(sink, c).from_err()) + }, + _ => unreachable!(), + }; + + Ok(()) } - /// Cancels a pending operation. - pub fn cancel<'a>(&'a mut self) - -> impl Future<Item = (), Error = failure::Error> + 'a - { + /// Lazily cancels a pending operation. + /// + /// For the command to be actually executed, stream the responses + /// using this objects [`Stream`] implementation. + /// + /// [`Stream`]: #impl-Stream + pub fn cancel<'a>(&'a mut self) -> Result<()> { self.send("CAN") } - /// Sends data in response to an inquire. + /// Lazily sends data in response to an inquire. + /// + /// For the command to be actually executed, stream the responses + /// using this objects [`Stream`] implementation. /// - /// The response is either a `Response::Ok`, `Response::Error`, or - /// another `Response::Inquire`. `Ok` and `Error` indicate - /// success and failure of the original operation that lead to the - /// current inquiry. - pub fn data<'a, C: 'a>(&'a mut self, data: C) - -> impl Future<Item = (), Error = failure::Error> + 'a + /// [`Stream`]: #impl-Stream + /// + /// The response stream ends in either a [`Response::Ok`], + /// [`Response::Error`], or another [`Response::Inquire`]. `Ok` + /// and `Error` indicate success and failure of the original + /// operation that lead to the current inquiry. + /// + /// [`Response::Ok`]: enum.Response.html#variant.Ok + /// [`Response::Error`]: enum.Response.html#variant.Error + /// [`Response::Inquire`]: enum.Response.html#variant.Inquire + pub fn data<'a, C: 'a>(&'a mut self, data: C) -> Result<()> where C: AsRef<[u8]> { let mut data = data.as_ref(); @@ -129,7 +188,10 @@ impl ConnectionFuture { fn new(c: UnixStream) -> Self { let (r, w) = c.split(); let buffer = Vec::with_capacity(MAX_LINE_LENGTH); - Self(Some(Client { r: BufReader::new(r), buffer, done: false, w })) + Self(Some(Client { + r: BufReader::new(r), buffer, done: false, + w: WriteState::Ready(w) + })) } } @@ -175,6 +237,37 @@ impl Stream for Client { fn poll(&mut self) -> std::result::Result<Async<Option<Self::Item>>, Self::Error> { + // First, handle sending of the command. + match self.w { + WriteState::Ready(_) => + (), // Nothing to do, poll for responses below. + WriteState::Sending(_) => { + self.w = if let WriteState::Sending(mut f) = + mem::replace(&mut self.w, WriteState::Transitioning) + { + match f.poll() { + Ok(Async::Ready((sink, _))) => WriteState::Ready(sink), + Ok(Async::NotReady) => WriteState::Sending(f), + Err(e) => { + self.w = WriteState::Dead; + return Err(e); + }, + } + } else { + unreachable!() + }; + }, + WriteState::Transitioning => + unreachable!(), + WriteState::Dead => + (), // Nothing left to do, poll for responses below. + } + + // Recheck if we are still sending the command. + if let WriteState::Sending(_) = self.w { + return Ok(Async::NotReady); + } + // Check if the previous response was one of ok, error, or // inquire. if self.done { diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index 7afbcfea..eee2ba69 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -74,6 +74,7 @@ use std::os::unix::io::AsRawFd; use std::thread; extern crate sequoia_core; +extern crate sequoia_openpgp as openpgp; use sequoia_core as core; |