summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2019-06-20 14:21:45 +0200
committerJustus Winter <justus@sequoia-pgp.org>2019-06-24 19:00:37 +0200
commit0ec5511b235b6844eca0094505e32e6d911cd142 (patch)
treef9eaf899fb72e83b326d46cee0cd1a37edb54b79
parent7b2f7e842c359475f9044c969f16527311c4009b (diff)
ipc: Lazily execute commands.
-rw-r--r--ipc/examples/assuan-client.rs7
-rw-r--r--ipc/src/assuan/mod.rs151
-rw-r--r--ipc/src/lib.rs1
3 files changed, 127 insertions, 32 deletions
diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs
index 67286e43..d92e0593 100644
--- a/ipc/examples/assuan-client.rs
+++ b/ipc/examples/assuan-client.rs
@@ -22,9 +22,10 @@ fn main() {
.wait().unwrap();
for command in matches.values_of("commands").unwrap() {
eprintln!("> {}", command);
- c.send(command).wait().unwrap();
- for response in c.by_ref().wait() {
+ c.send(command).unwrap();
+ c.by_ref().for_each(|response| {
eprintln!("< {:?}", response);
- }
+ Ok(())
+ }).wait().unwrap();
}
}
diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs
index 8b704846..e250b542 100644
--- a/ipc/src/assuan/mod.rs
+++ b/ipc/src/assuan/mod.rs
@@ -4,15 +4,18 @@
use std::cmp;
use std::io::{Write, BufReader};
+use std::mem;
use std::path::Path;
use lalrpop_util::ParseError;
-use futures::{Async, Future, Stream};
+use futures::{future, Async, Future, Stream};
use tokio::net::UnixStream;
use tokio_io::io;
use tokio_io::AsyncRead;
+use openpgp;
+
use Error;
use Result;
@@ -27,20 +30,26 @@ lalrpop_util::lalrpop_mod!(
/// A connection to an Assuan server.
///
-/// Commands may be issued using [`Connection::send`].
+/// Commands may be issued using [`Connection::send`]. Note that the
+/// command is sent lazily, i.e. it is only send if you poll for the
+/// responses.
///
/// [`Connection::send`]: #method.send
///
/// `Client` implements [`Stream`] to return all server responses
-/// until the first `Response::Ok`, `Response::Error`, or
-/// `Response::Inquire`.
+/// until the first [`Response::Ok`], [`Response::Error`], or
+/// [`Response::Inquire`].
///
/// [`Stream`]: #impl-Stream
+/// [`Response::Ok`]: enum.Response.html#variant.Ok
+/// [`Response::Error`]: enum.Response.html#variant.Error
+/// [`Response::Inquire`]: enum.Response.html#variant.Inquire
///
-/// `Ok` and `Error` indicate success and failure. `Inquire` means
-/// that the server requires more information to complete the request.
-/// This information may be provided using [`Connection::data()`], or
-/// the operation may be canceled using [`Connection::cancel()`].
+/// [`Response::Ok`] and [`Response::Error`] indicate success and
+/// failure. [`Response::Inquire`] means that the server requires
+/// more information to complete the request. This information may be
+/// provided using [`Connection::data()`], or the operation may be
+/// canceled using [`Connection::cancel()`].
///
/// [`Connection::data()`]: #method.data
/// [`Connection::cancel()`]: #method.cancel
@@ -48,7 +57,14 @@ pub struct Client {
r: BufReader<io::ReadHalf<UnixStream>>, // xxx: abstract over
buffer: Vec<u8>,
done: bool,
- w: io::WriteHalf<UnixStream>,
+ w: WriteState,
+}
+
+enum WriteState {
+ Ready(io::WriteHalf<UnixStream>),
+ Sending(future::FromErr<io::WriteAll<io::WriteHalf<tokio::net::UnixStream>, Vec<u8>>, failure::Error>),
+ Transitioning,
+ Dead,
}
impl Client {
@@ -61,37 +77,80 @@ impl Client {
.and_then(ConnectionFuture::new)
}
- /// Sends a command to the server.
+ /// Lazily sends a command to the server.
+ ///
+ /// For the command to be actually executed, stream the responses
+ /// using this objects [`Stream`] implementation.
+ ///
+ /// [`Stream`]: #impl-Stream
+ ///
+ /// The response stream ends in either a [`Response::Ok`],
+ /// [`Response::Error`], or [`Response::Inquire`]. `Ok` and
+ /// `Error` indicate success and failure of the current operation.
+ /// `Inquire` means that the server requires more information to
+ /// complete the request. This information may be provided using
+ /// [`Connection::data()`], or the operation may be canceled using
+ /// [`Connection::cancel()`].
+ ///
+ /// [`Response::Ok`]: enum.Response.html#variant.Ok
+ /// [`Response::Error`]: enum.Response.html#variant.Error
+ /// [`Response::Inquire`]: enum.Response.html#variant.Inquire
+ /// [`Connection::data()`]: #method.data
+ /// [`Connection::cancel()`]: #method.cancel
///
/// Note: `command` is passed as-is. Control characters, like
/// `%`, must be %-escaped.
- pub fn send<'a, C: 'a>(&'a mut self, command: C)
- -> impl Future<Item = (), Error = failure::Error> + 'a
+ pub fn send<'a, C: 'a>(&'a mut self, command: C) -> Result<()>
where C: AsRef<[u8]>
{
- let command = command.as_ref();
- let mut c = command.to_vec();
- if ! c.ends_with(b"\n") {
- c.push(0x0a);
+ if let WriteState::Sending(_) = self.w {
+ return Err(openpgp::Error::InvalidOperation(
+ "Busy, poll responses first".into()).into());
}
- io::write_all(&mut self.w, c).map(|_| ()).from_err()
+
+ self.w =
+ match mem::replace(&mut self.w, WriteState::Transitioning)
+ {
+ WriteState::Ready(sink) => {
+ let command = command.as_ref();
+ let mut c = command.to_vec();
+ if ! c.ends_with(b"\n") {
+ c.push(0x0a);
+ }
+ WriteState::Sending(io::write_all(sink, c).from_err())
+ },
+ _ => unreachable!(),
+ };
+
+ Ok(())
}
- /// Cancels a pending operation.
- pub fn cancel<'a>(&'a mut self)
- -> impl Future<Item = (), Error = failure::Error> + 'a
- {
+ /// Lazily cancels a pending operation.
+ ///
+ /// For the command to be actually executed, stream the responses
+ /// using this objects [`Stream`] implementation.
+ ///
+ /// [`Stream`]: #impl-Stream
+ pub fn cancel<'a>(&'a mut self) -> Result<()> {
self.send("CAN")
}
- /// Sends data in response to an inquire.
+ /// Lazily sends data in response to an inquire.
+ ///
+ /// For the command to be actually executed, stream the responses
+ /// using this objects [`Stream`] implementation.
///
- /// The response is either a `Response::Ok`, `Response::Error`, or
- /// another `Response::Inquire`. `Ok` and `Error` indicate
- /// success and failure of the original operation that lead to the
- /// current inquiry.
- pub fn data<'a, C: 'a>(&'a mut self, data: C)
- -> impl Future<Item = (), Error = failure::Error> + 'a
+ /// [`Stream`]: #impl-Stream
+ ///
+ /// The response stream ends in either a [`Response::Ok`],
+ /// [`Response::Error`], or another [`Response::Inquire`]. `Ok`
+ /// and `Error` indicate success and failure of the original
+ /// operation that lead to the current inquiry.
+ ///
+ /// [`Response::Ok`]: enum.Response.html#variant.Ok
+ /// [`Response::Error`]: enum.Response.html#variant.Error
+ /// [`Response::Inquire`]: enum.Response.html#variant.Inquire
+ pub fn data<'a, C: 'a>(&'a mut self, data: C) -> Result<()>
where C: AsRef<[u8]>
{
let mut data = data.as_ref();
@@ -129,7 +188,10 @@ impl ConnectionFuture {
fn new(c: UnixStream) -> Self {
let (r, w) = c.split();
let buffer = Vec::with_capacity(MAX_LINE_LENGTH);
- Self(Some(Client { r: BufReader::new(r), buffer, done: false, w }))
+ Self(Some(Client {
+ r: BufReader::new(r), buffer, done: false,
+ w: WriteState::Ready(w)
+ }))
}
}
@@ -175,6 +237,37 @@ impl Stream for Client {
fn poll(&mut self)
-> std::result::Result<Async<Option<Self::Item>>, Self::Error>
{
+ // First, handle sending of the command.
+ match self.w {
+ WriteState::Ready(_) =>
+ (), // Nothing to do, poll for responses below.
+ WriteState::Sending(_) => {
+ self.w = if let WriteState::Sending(mut f) =
+ mem::replace(&mut self.w, WriteState::Transitioning)
+ {
+ match f.poll() {
+ Ok(Async::Ready((sink, _))) => WriteState::Ready(sink),
+ Ok(Async::NotReady) => WriteState::Sending(f),
+ Err(e) => {
+ self.w = WriteState::Dead;
+ return Err(e);
+ },
+ }
+ } else {
+ unreachable!()
+ };
+ },
+ WriteState::Transitioning =>
+ unreachable!(),
+ WriteState::Dead =>
+ (), // Nothing left to do, poll for responses below.
+ }
+
+ // Recheck if we are still sending the command.
+ if let WriteState::Sending(_) = self.w {
+ return Ok(Async::NotReady);
+ }
+
// Check if the previous response was one of ok, error, or
// inquire.
if self.done {
diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs
index 7afbcfea..eee2ba69 100644
--- a/ipc/src/lib.rs
+++ b/ipc/src/lib.rs
@@ -74,6 +74,7 @@ use std::os::unix::io::AsRawFd;
use std::thread;
extern crate sequoia_core;
+extern crate sequoia_openpgp as openpgp;
use sequoia_core as core;