summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2019-06-19 16:10:09 +0200
committerJustus Winter <justus@sequoia-pgp.org>2019-06-24 19:00:37 +0200
commit7b2f7e842c359475f9044c969f16527311c4009b (patch)
treed863f49019474e34dfaea3514a1b4c4b58d8ca14
parentd1c9bd08e23aa8901028dceb1e63cb8fd7ae5ddc (diff)
ipc: Simplify the Assuan RPC interface.
- Turn the client into a stream of responses.
-rw-r--r--ipc/examples/assuan-client.rs5
-rw-r--r--ipc/src/assuan/mod.rs195
2 files changed, 101 insertions, 99 deletions
diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs
index 5b0b3e14..67286e43 100644
--- a/ipc/examples/assuan-client.rs
+++ b/ipc/examples/assuan-client.rs
@@ -1,5 +1,6 @@
extern crate futures;
use futures::future::Future;
+use futures::stream::Stream;
extern crate clap;
extern crate sequoia_ipc as ipc;
use ipc::assuan::Client;
@@ -21,8 +22,8 @@ fn main() {
.wait().unwrap();
for command in matches.values_of("commands").unwrap() {
eprintln!("> {}", command);
- let responses = c.send(command).wait().unwrap();
- for response in responses {
+ c.send(command).wait().unwrap();
+ for response in c.by_ref().wait() {
eprintln!("< {:?}", response);
}
}
diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs
index a24c48e8..8b704846 100644
--- a/ipc/src/assuan/mod.rs
+++ b/ipc/src/assuan/mod.rs
@@ -4,7 +4,6 @@
use std::cmp;
use std::io::{Write, BufReader};
-use std::mem;
use std::path::Path;
use lalrpop_util::ParseError;
@@ -27,8 +26,28 @@ lalrpop_util::lalrpop_mod!(
#[allow(missing_docs)] grammar, "/assuan/grammar.rs");
/// A connection to an Assuan server.
+///
+/// Commands may be issued using [`Connection::send`].
+///
+/// [`Connection::send`]: #method.send
+///
+/// `Client` implements [`Stream`] to return all server responses
+/// until the first `Response::Ok`, `Response::Error`, or
+/// `Response::Inquire`.
+///
+/// [`Stream`]: #impl-Stream
+///
+/// `Ok` and `Error` indicate success and failure. `Inquire` means
+/// that the server requires more information to complete the request.
+/// This information may be provided using [`Connection::data()`], or
+/// the operation may be canceled using [`Connection::cancel()`].
+///
+/// [`Connection::data()`]: #method.data
+/// [`Connection::cancel()`]: #method.cancel
pub struct Client {
- r: ResponseStream,
+ r: BufReader<io::ReadHalf<UnixStream>>, // xxx: abstract over
+ buffer: Vec<u8>,
+ done: bool,
w: io::WriteHalf<UnixStream>,
}
@@ -44,22 +63,10 @@ impl Client {
/// Sends a command to the server.
///
- /// Returns all server responses until the first `Response::Ok`,
- /// `Response::Error`, or `Response::Inquire`. `Ok` and `Error`
- /// indicate success and failure.
- ///
- /// `Inquire` means that the server requires more information to
- /// complete the request. This information may be provided using
- /// [`Connection::data()`], or the operation may be canceled using
- /// [`Connection::cancel()`].
- ///
- /// [`Connection::data()`]: #method.data
- /// [`Connection::cancel()`]: #method.cancel
- ///
/// Note: `command` is passed as-is. Control characters, like
/// `%`, must be %-escaped.
pub fn send<'a, C: 'a>(&'a mut self, command: C)
- -> impl Future<Item = Vec<Response>, Error = failure::Error> + 'a
+ -> impl Future<Item = (), Error = failure::Error> + 'a
where C: AsRef<[u8]>
{
let command = command.as_ref();
@@ -67,15 +74,12 @@ impl Client {
if ! c.ends_with(b"\n") {
c.push(0x0a);
}
- let w = &mut self.w;
- let r = &mut self.r;
- io::write_all(w, c).from_err()
- .and_then(move |_| ResponseFuture(r))
+ io::write_all(&mut self.w, c).map(|_| ()).from_err()
}
/// Cancels a pending operation.
pub fn cancel<'a>(&'a mut self)
- -> impl Future<Item = Vec<Response>, Error = failure::Error> + 'a
+ -> impl Future<Item = (), Error = failure::Error> + 'a
{
self.send("CAN")
}
@@ -87,7 +91,7 @@ impl Client {
/// success and failure of the original operation that lead to the
/// current inquiry.
pub fn data<'a, C: 'a>(&'a mut self, data: C)
- -> impl Future<Item = Vec<Response>, Error = failure::Error> + 'a
+ -> impl Future<Item = (), Error = failure::Error> + 'a
where C: AsRef<[u8]>
{
let mut data = data.as_ref();
@@ -124,7 +128,8 @@ struct ConnectionFuture(Option<Client>);
impl ConnectionFuture {
fn new(c: UnixStream) -> Self {
let (r, w) = c.split();
- Self(Some(Client { r: ResponseStream::new(BufReader::new(r)), w }))
+ let buffer = Vec::with_capacity(MAX_LINE_LENGTH);
+ Self(Some(Client { r: BufReader::new(r), buffer, done: false, w }))
}
}
@@ -134,12 +139,11 @@ impl Future for ConnectionFuture {
fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> {
// Consume the initial message from the server.
- match {
- let c = self.0.as_mut().expect("future polled after completion");
- ResponseFuture(&mut c.r).poll()?
- } {
- Async::Ready(r) => {
- match r.last() {
+ match self.0.as_mut().expect("future polled after completion")
+ .by_ref().collect().poll()?
+ {
+ Async::Ready(response) => {
+ match response.iter().last() {
Some(Response::Ok { .. }) =>
Ok(Async::Ready(self.0.take().unwrap())),
Some(Response::Error { code, message }) =>
@@ -159,68 +163,38 @@ impl Future for ConnectionFuture {
}
}
-/// A future that will resolve to a `Vec<Response>`.
-struct ResponseFuture<'a>(&'a mut ResponseStream);
-
-impl<'a> Future for ResponseFuture<'a> {
- type Item = Vec<Response>;
+impl Stream for Client {
+ type Item = Response;
type Error = failure::Error;
- fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> {
- match self.0.poll()? {
- Async::Ready(Some(r)) => Ok(Async::Ready(r)),
- Async::Ready(None) =>
- Err(Error::ConnectionClosed(Vec::new()).into()),
- Async::NotReady => Ok(Async::NotReady),
+ /// Attempt to pull out the next value of this stream, returning
+ /// None if the stream is finished.
+ ///
+ /// Note: It _is_ safe to call this again after the stream
+ /// finished, i.e. returned `Ready(None)`.
+ fn poll(&mut self)
+ -> std::result::Result<Async<Option<Self::Item>>, Self::Error>
+ {
+ // Check if the previous response was one of ok, error, or
+ // inquire.
+ if self.done {
+ // If so, we signal end of stream here.
+ self.done = false;
+ return Ok(Async::Ready(None));
}
- }
-}
-
-struct ResponseStream {
- r: BufReader<io::ReadHalf<UnixStream>>, // xxx: abstract over
- buffer: Vec<u8>,
- responses: Vec<Response>,
-}
-
-impl ResponseStream {
- fn new(r: BufReader<io::ReadHalf<UnixStream>>) -> Self {
- Self { r, buffer: Vec::new(), responses: Vec::new() }
- }
-}
-impl Stream for ResponseStream {
- type Item = Vec<Response>;
- type Error = failure::Error;
-
- fn poll(&mut self) -> std::result::Result<Async<Option<Self::Item>>,
- Self::Error> {
loop {
// Try to yield a line from the buffer. For that, try to
// find linebreaks.
- while let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) {
+ if let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) {
let line: Vec<u8> = self.buffer.drain(..p+1).collect();
// xxx: rtrim linebreak even more? crlf maybe?
let r = Response::parse(&line[..line.len()-1])?;
-
- let done = match r {
- // All server responses end in either OK or ERR.
- Response::Ok { .. } | Response::Error { .. } => true,
- // However, the server may inquire more
- // information. We also surrender control to the
- // caller by yielding the responses we have seen
- // so far, and allow her to respond to the
- // inquiry.
- Response::Inquire { .. } => true,
- _ => false,
- };
- self.responses.push(r);
- if done {
- return Ok(Async::Ready(Some(
- mem::replace(&mut self.responses, Vec::new()))));
- }
-
- // We found a line, but it was not the last line in
- // this server response. Try to find another line.
+ // If this response is one of ok, error, or inquire,
+ // we want to surrender control to the client next
+ // time she asks for an item.
+ self.done = r.is_done();
+ return Ok(Async::Ready(Some(r)));
}
// No more linebreaks in the buffer. We need to get more.
@@ -232,26 +206,17 @@ impl Stream for ResponseStream {
Async::Ready(n_read) if n_read == 0 => {
// EOF.
self.buffer.resize(buffer_len, 0);
+ if ! self.buffer.is_empty() {
+ // Incomplete server response.
+ return Err(Error::ConnectionClosed(
+ self.buffer.clone()).into());
- if self.responses.is_empty() {
- if ! self.buffer.is_empty() {
- // Incomplete server response.
- return Err(Error::ConnectionClosed(
- mem::replace(&mut self.buffer, Vec::new())
- ).into());
- }
-
- // End of stream.
- return Ok(Async::Ready(None));
- } else {
- // There is an incomplete server response,
- // yield that and let the caller figure it
- // out.
- return Ok(Async::Ready(Some(
- mem::replace(&mut self.responses, Vec::new())
- )));
}
+
+ // End of stream.
+ return Ok(Async::Ready(None));
},
+
Async::Ready(n_read) => {
self.buffer.resize(buffer_len + n_read, 0);
continue;
@@ -347,6 +312,42 @@ impl Response {
},
}
}
+
+ /// Returns true if this message indicates success.
+ pub fn is_ok(&self) -> bool {
+ match self {
+ Response::Ok { .. } => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if this message indicates an error.
+ pub fn is_err(&self) -> bool {
+ match self {
+ Response::Error { .. } => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if this message is an inquiry.
+ pub fn is_inquire(&self) -> bool {
+ match self {
+ Response::Inquire { .. } => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if this response concludes the server's response.
+ pub fn is_done(&self) -> bool {
+ // All server responses end in either OK or ERR.
+ self.is_ok() || self.is_err()
+ // However, the server may inquire more
+ // information. We also surrender control to the
+ // caller by yielding the responses we have seen
+ // so far, and allow her to respond to the
+ // inquiry.
+ || self.is_inquire()
+ }
}
#[cfg(test)]