summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2020-03-24 15:30:35 +0100
committerJustus Winter <justus@sequoia-pgp.org>2020-03-24 17:27:45 +0100
commitcf9ccd7d618d46423e666ad13034173a25a83faa (patch)
treeb2a97d21375257e86b6ad9299d01ee97568efc45
parent6567949b23139f0bfb72a49ab07defc53808a94c (diff)
wip zero copy transformergorgeous-zero-copy-transformer
-rw-r--r--openpgp/src/parse/stream.rs98
1 files changed, 91 insertions, 7 deletions
diff --git a/openpgp/src/parse/stream.rs b/openpgp/src/parse/stream.rs
index af8a54c6..933b49d0 100644
--- a/openpgp/src/parse/stream.rs
+++ b/openpgp/src/parse/stream.rs
@@ -663,7 +663,8 @@ struct Transformer<'a> {
#[derive(PartialEq, Debug)]
enum TransformationState {
- Data,
+ Buffer(bool), // Are we done reading from the source?
+ PassThru(bool, usize), // Done?, cap to this size.
Sigs,
Done,
}
@@ -720,7 +721,7 @@ impl<'a> Transformer<'a> {
// Copy the data, then proceed directly to the signatures.
buf.extend_from_slice(data_prefix);
- TransformationState::Sigs
+ TransformationState::Buffer(true)
} else {
// Produce a Literal Data Packet header with partial
// length encoding.
@@ -735,7 +736,7 @@ impl<'a> Transformer<'a> {
// Copy the prefix up to the first chunk, then keep in the
// data state.
buf.extend_from_slice(&data_prefix[..512 - HEADER_LEN]);
- TransformationState::Data
+ TransformationState::Buffer(false)
}
};
@@ -751,6 +752,52 @@ impl<'a> Transformer<'a> {
fn data_helper(&mut self, amount: usize, hard: bool, and_consume: bool)
-> io::Result<&[u8]> {
+ match self.state {
+ TransformationState::PassThru(_, _)
+ if self.buffer.len() - self.cursor == 0 =>
+ {
+ // Happy path, no buffering.
+ self.data_helper_pass_thru(amount, hard, and_consume)
+ },
+ _ =>
+ self.data_helper_buffer(amount, hard, and_consume),
+ }
+ }
+
+ fn data_helper_pass_thru(&mut self, amount: usize, hard: bool,
+ and_consume: bool)
+ -> io::Result<&[u8]> {
+ assert_eq!(self.buffer.len() - self.cursor, 0);
+ let (done, cap) =
+ if let TransformationState::PassThru(done, cap) = self.state {
+ (done, cap)
+ } else {
+ unreachable!("other states handled in data_helper_pass_thru")
+ };
+
+ if and_consume {
+ // We will consume `amount` bytes, so reduce the cap
+ // accordingly.
+ let new_cap = cap - amount;
+ self.state = if new_cap == 0 {
+ // Need to synthesize a new length byte.
+ TransformationState::Buffer(done)
+ } else {
+ TransformationState::PassThru(done, new_cap)
+ };
+ }
+
+ match (hard, and_consume) {
+ (false, false) => self.reader.data(amount),
+ (false, true) => self.reader.data_consume(amount),
+ (true, false) => self.reader.data_hard(amount),
+ (true, true) => self.reader.data_consume_hard(amount),
+ }.map(|buf| &buf[..cap])
+ }
+
+ fn data_helper_buffer(&mut self, amount: usize, hard: bool,
+ and_consume: bool)
+ -> io::Result<&[u8]> {
assert!(self.cursor <= self.buffer.len());
let amount_buffered = self.buffer.len() - self.cursor;
@@ -760,8 +807,43 @@ impl<'a> Transformer<'a> {
let mut new_buffer = Vec::with_capacity(want);
new_buffer.extend_from_slice(&self.buffer[self.cursor..]);
+ const MAX_CHUNK_SIZE: usize = 1 << 22; // 4 megabytes.
self.state = match self.state {
- TransformationState::Data => {
+ TransformationState::Buffer(done) if ! done && want == 1 =>
+ {
+ // This is an opportunity to get on the happy path
+ // of passing data through without buffering it.
+
+ // First, peek to see how much is available.
+ let pass_thru_chunk_size = MAX_CHUNK_SIZE;
+ let mut data = self.reader.data(pass_thru_chunk_size)?;
+
+ // Short read? The end is nigh.
+ let short_read = data.len() < pass_thru_chunk_size;
+ let len = if short_read {
+ BodyLength::Full(data.len() as u32)
+ } else {
+ // Find the largest power of two equal or
+ // smaller than the size of data, and cap data
+ // to that size.
+ let mut s = data.len().next_power_of_two();
+ if ! data.len().is_power_of_two() {
+ s >>= 1;
+ }
+ data = &data[..s];
+
+ BodyLength::Partial(data.len() as u32)
+ };
+
+ len.serialize(&mut new_buffer)
+ .expect("representable; write to buffer is infallible");
+
+ TransformationState::PassThru(short_read, data.len())
+ },
+
+ TransformationState::Buffer(done)
+ | TransformationState::PassThru(done, _) if ! done =>
+ {
// Find the largest power of two equal or smaller
// than the size of buf.
let mut s = want.next_power_of_two();
@@ -770,7 +852,6 @@ impl<'a> Transformer<'a> {
}
// Cap it. Drop once we avoid the copies below.
- const MAX_CHUNK_SIZE: usize = 1 << 22; // 4 megabytes.
if s > MAX_CHUNK_SIZE {
s = MAX_CHUNK_SIZE;
}
@@ -801,11 +882,14 @@ impl<'a> Transformer<'a> {
if short_read {
TransformationState::Sigs
} else {
- TransformationState::Data
+ TransformationState::Buffer(short_read)
}
},
- TransformationState::Sigs => {
+ TransformationState::Buffer(_)
+ | TransformationState::PassThru(_, _)
+ | TransformationState::Sigs =>
+ {
for sig in self.sigs.drain(..) {
Packet::Signature(sig).serialize(&mut new_buffer)
.map_err(|e| {