diff options
author | Justus Winter <justus@sequoia-pgp.org> | 2019-06-19 16:10:09 +0200 |
---|---|---|
committer | Justus Winter <justus@sequoia-pgp.org> | 2019-06-24 19:00:37 +0200 |
commit | 7b2f7e842c359475f9044c969f16527311c4009b (patch) | |
tree | d863f49019474e34dfaea3514a1b4c4b58d8ca14 /ipc | |
parent | d1c9bd08e23aa8901028dceb1e63cb8fd7ae5ddc (diff) |
ipc: Simplify the Assuan RPC interface.
- Turn the client into a stream of responses.
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/examples/assuan-client.rs | 5 | ||||
-rw-r--r-- | ipc/src/assuan/mod.rs | 195 |
2 files changed, 101 insertions, 99 deletions
diff --git a/ipc/examples/assuan-client.rs b/ipc/examples/assuan-client.rs index 5b0b3e14..67286e43 100644 --- a/ipc/examples/assuan-client.rs +++ b/ipc/examples/assuan-client.rs @@ -1,5 +1,6 @@ extern crate futures; use futures::future::Future; +use futures::stream::Stream; extern crate clap; extern crate sequoia_ipc as ipc; use ipc::assuan::Client; @@ -21,8 +22,8 @@ fn main() { .wait().unwrap(); for command in matches.values_of("commands").unwrap() { eprintln!("> {}", command); - let responses = c.send(command).wait().unwrap(); - for response in responses { + c.send(command).wait().unwrap(); + for response in c.by_ref().wait() { eprintln!("< {:?}", response); } } diff --git a/ipc/src/assuan/mod.rs b/ipc/src/assuan/mod.rs index a24c48e8..8b704846 100644 --- a/ipc/src/assuan/mod.rs +++ b/ipc/src/assuan/mod.rs @@ -4,7 +4,6 @@ use std::cmp; use std::io::{Write, BufReader}; -use std::mem; use std::path::Path; use lalrpop_util::ParseError; @@ -27,8 +26,28 @@ lalrpop_util::lalrpop_mod!( #[allow(missing_docs)] grammar, "/assuan/grammar.rs"); /// A connection to an Assuan server. +/// +/// Commands may be issued using [`Connection::send`]. +/// +/// [`Connection::send`]: #method.send +/// +/// `Client` implements [`Stream`] to return all server responses +/// until the first `Response::Ok`, `Response::Error`, or +/// `Response::Inquire`. +/// +/// [`Stream`]: #impl-Stream +/// +/// `Ok` and `Error` indicate success and failure. `Inquire` means +/// that the server requires more information to complete the request. +/// This information may be provided using [`Connection::data()`], or +/// the operation may be canceled using [`Connection::cancel()`]. +/// +/// [`Connection::data()`]: #method.data +/// [`Connection::cancel()`]: #method.cancel pub struct Client { - r: ResponseStream, + r: BufReader<io::ReadHalf<UnixStream>>, // xxx: abstract over + buffer: Vec<u8>, + done: bool, w: io::WriteHalf<UnixStream>, } @@ -44,22 +63,10 @@ impl Client { /// Sends a command to the server. /// - /// Returns all server responses until the first `Response::Ok`, - /// `Response::Error`, or `Response::Inquire`. `Ok` and `Error` - /// indicate success and failure. - /// - /// `Inquire` means that the server requires more information to - /// complete the request. This information may be provided using - /// [`Connection::data()`], or the operation may be canceled using - /// [`Connection::cancel()`]. - /// - /// [`Connection::data()`]: #method.data - /// [`Connection::cancel()`]: #method.cancel - /// /// Note: `command` is passed as-is. Control characters, like /// `%`, must be %-escaped. pub fn send<'a, C: 'a>(&'a mut self, command: C) - -> impl Future<Item = Vec<Response>, Error = failure::Error> + 'a + -> impl Future<Item = (), Error = failure::Error> + 'a where C: AsRef<[u8]> { let command = command.as_ref(); @@ -67,15 +74,12 @@ impl Client { if ! c.ends_with(b"\n") { c.push(0x0a); } - let w = &mut self.w; - let r = &mut self.r; - io::write_all(w, c).from_err() - .and_then(move |_| ResponseFuture(r)) + io::write_all(&mut self.w, c).map(|_| ()).from_err() } /// Cancels a pending operation. pub fn cancel<'a>(&'a mut self) - -> impl Future<Item = Vec<Response>, Error = failure::Error> + 'a + -> impl Future<Item = (), Error = failure::Error> + 'a { self.send("CAN") } @@ -87,7 +91,7 @@ impl Client { /// success and failure of the original operation that lead to the /// current inquiry. pub fn data<'a, C: 'a>(&'a mut self, data: C) - -> impl Future<Item = Vec<Response>, Error = failure::Error> + 'a + -> impl Future<Item = (), Error = failure::Error> + 'a where C: AsRef<[u8]> { let mut data = data.as_ref(); @@ -124,7 +128,8 @@ struct ConnectionFuture(Option<Client>); impl ConnectionFuture { fn new(c: UnixStream) -> Self { let (r, w) = c.split(); - Self(Some(Client { r: ResponseStream::new(BufReader::new(r)), w })) + let buffer = Vec::with_capacity(MAX_LINE_LENGTH); + Self(Some(Client { r: BufReader::new(r), buffer, done: false, w })) } } @@ -134,12 +139,11 @@ impl Future for ConnectionFuture { fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> { // Consume the initial message from the server. - match { - let c = self.0.as_mut().expect("future polled after completion"); - ResponseFuture(&mut c.r).poll()? - } { - Async::Ready(r) => { - match r.last() { + match self.0.as_mut().expect("future polled after completion") + .by_ref().collect().poll()? + { + Async::Ready(response) => { + match response.iter().last() { Some(Response::Ok { .. }) => Ok(Async::Ready(self.0.take().unwrap())), Some(Response::Error { code, message }) => @@ -159,68 +163,38 @@ impl Future for ConnectionFuture { } } -/// A future that will resolve to a `Vec<Response>`. -struct ResponseFuture<'a>(&'a mut ResponseStream); - -impl<'a> Future for ResponseFuture<'a> { - type Item = Vec<Response>; +impl Stream for Client { + type Item = Response; type Error = failure::Error; - fn poll(&mut self) -> std::result::Result<Async<Self::Item>, Self::Error> { - match self.0.poll()? { - Async::Ready(Some(r)) => Ok(Async::Ready(r)), - Async::Ready(None) => - Err(Error::ConnectionClosed(Vec::new()).into()), - Async::NotReady => Ok(Async::NotReady), + /// Attempt to pull out the next value of this stream, returning + /// None if the stream is finished. + /// + /// Note: It _is_ safe to call this again after the stream + /// finished, i.e. returned `Ready(None)`. + fn poll(&mut self) + -> std::result::Result<Async<Option<Self::Item>>, Self::Error> + { + // Check if the previous response was one of ok, error, or + // inquire. + if self.done { + // If so, we signal end of stream here. + self.done = false; + return Ok(Async::Ready(None)); } - } -} - -struct ResponseStream { - r: BufReader<io::ReadHalf<UnixStream>>, // xxx: abstract over - buffer: Vec<u8>, - responses: Vec<Response>, -} - -impl ResponseStream { - fn new(r: BufReader<io::ReadHalf<UnixStream>>) -> Self { - Self { r, buffer: Vec::new(), responses: Vec::new() } - } -} -impl Stream for ResponseStream { - type Item = Vec<Response>; - type Error = failure::Error; - - fn poll(&mut self) -> std::result::Result<Async<Option<Self::Item>>, - Self::Error> { loop { // Try to yield a line from the buffer. For that, try to // find linebreaks. - while let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) { + if let Some(p) = self.buffer.iter().position(|&b| b == 0x0a) { let line: Vec<u8> = self.buffer.drain(..p+1).collect(); // xxx: rtrim linebreak even more? crlf maybe? let r = Response::parse(&line[..line.len()-1])?; - - let done = match r { - // All server responses end in either OK or ERR. - Response::Ok { .. } | Response::Error { .. } => true, - // However, the server may inquire more - // information. We also surrender control to the - // caller by yielding the responses we have seen - // so far, and allow her to respond to the - // inquiry. - Response::Inquire { .. } => true, - _ => false, - }; - self.responses.push(r); - if done { - return Ok(Async::Ready(Some( - mem::replace(&mut self.responses, Vec::new())))); - } - - // We found a line, but it was not the last line in - // this server response. Try to find another line. + // If this response is one of ok, error, or inquire, + // we want to surrender control to the client next + // time she asks for an item. + self.done = r.is_done(); + return Ok(Async::Ready(Some(r))); } // No more linebreaks in the buffer. We need to get more. @@ -232,26 +206,17 @@ impl Stream for ResponseStream { Async::Ready(n_read) if n_read == 0 => { // EOF. self.buffer.resize(buffer_len, 0); + if ! self.buffer.is_empty() { + // Incomplete server response. + return Err(Error::ConnectionClosed( + self.buffer.clone()).into()); - if self.responses.is_empty() { - if ! self.buffer.is_empty() { - // Incomplete server response. - return Err(Error::ConnectionClosed( - mem::replace(&mut self.buffer, Vec::new()) - ).into()); - } - - // End of stream. - return Ok(Async::Ready(None)); - } else { - // There is an incomplete server response, - // yield that and let the caller figure it - // out. - return Ok(Async::Ready(Some( - mem::replace(&mut self.responses, Vec::new()) - ))); } + + // End of stream. + return Ok(Async::Ready(None)); }, + Async::Ready(n_read) => { self.buffer.resize(buffer_len + n_read, 0); continue; @@ -347,6 +312,42 @@ impl Response { }, } } + + /// Returns true if this message indicates success. + pub fn is_ok(&self) -> bool { + match self { + Response::Ok { .. } => true, + _ => false, + } + } + + /// Returns true if this message indicates an error. + pub fn is_err(&self) -> bool { + match self { + Response::Error { .. } => true, + _ => false, + } + } + + /// Returns true if this message is an inquiry. + pub fn is_inquire(&self) -> bool { + match self { + Response::Inquire { .. } => true, + _ => false, + } + } + + /// Returns true if this response concludes the server's response. + pub fn is_done(&self) -> bool { + // All server responses end in either OK or ERR. + self.is_ok() || self.is_err() + // However, the server may inquire more + // information. We also surrender control to the + // caller by yielding the responses we have seen + // so far, and allow her to respond to the + // inquiry. + || self.is_inquire() + } } #[cfg(test)] |