diff options
author | Justus Winter <justus@sequoia-pgp.org> | 2020-03-24 15:30:35 +0100 |
---|---|---|
committer | Justus Winter <justus@sequoia-pgp.org> | 2020-03-24 17:27:45 +0100 |
commit | cf9ccd7d618d46423e666ad13034173a25a83faa (patch) | |
tree | b2a97d21375257e86b6ad9299d01ee97568efc45 | |
parent | 6567949b23139f0bfb72a49ab07defc53808a94c (diff) |
wip zero copy transformergorgeous-zero-copy-transformer
-rw-r--r-- | openpgp/src/parse/stream.rs | 98 |
1 files changed, 91 insertions, 7 deletions
diff --git a/openpgp/src/parse/stream.rs b/openpgp/src/parse/stream.rs index af8a54c6..933b49d0 100644 --- a/openpgp/src/parse/stream.rs +++ b/openpgp/src/parse/stream.rs @@ -663,7 +663,8 @@ struct Transformer<'a> { #[derive(PartialEq, Debug)] enum TransformationState { - Data, + Buffer(bool), // Are we done reading from the source? + PassThru(bool, usize), // Done?, cap to this size. Sigs, Done, } @@ -720,7 +721,7 @@ impl<'a> Transformer<'a> { // Copy the data, then proceed directly to the signatures. buf.extend_from_slice(data_prefix); - TransformationState::Sigs + TransformationState::Buffer(true) } else { // Produce a Literal Data Packet header with partial // length encoding. @@ -735,7 +736,7 @@ impl<'a> Transformer<'a> { // Copy the prefix up to the first chunk, then keep in the // data state. buf.extend_from_slice(&data_prefix[..512 - HEADER_LEN]); - TransformationState::Data + TransformationState::Buffer(false) } }; @@ -751,6 +752,52 @@ impl<'a> Transformer<'a> { fn data_helper(&mut self, amount: usize, hard: bool, and_consume: bool) -> io::Result<&[u8]> { + match self.state { + TransformationState::PassThru(_, _) + if self.buffer.len() - self.cursor == 0 => + { + // Happy path, no buffering. + self.data_helper_pass_thru(amount, hard, and_consume) + }, + _ => + self.data_helper_buffer(amount, hard, and_consume), + } + } + + fn data_helper_pass_thru(&mut self, amount: usize, hard: bool, + and_consume: bool) + -> io::Result<&[u8]> { + assert_eq!(self.buffer.len() - self.cursor, 0); + let (done, cap) = + if let TransformationState::PassThru(done, cap) = self.state { + (done, cap) + } else { + unreachable!("other states handled in data_helper_pass_thru") + }; + + if and_consume { + // We will consume `amount` bytes, so reduce the cap + // accordingly. + let new_cap = cap - amount; + self.state = if new_cap == 0 { + // Need to synthesize a new length byte. + TransformationState::Buffer(done) + } else { + TransformationState::PassThru(done, new_cap) + }; + } + + match (hard, and_consume) { + (false, false) => self.reader.data(amount), + (false, true) => self.reader.data_consume(amount), + (true, false) => self.reader.data_hard(amount), + (true, true) => self.reader.data_consume_hard(amount), + }.map(|buf| &buf[..cap]) + } + + fn data_helper_buffer(&mut self, amount: usize, hard: bool, + and_consume: bool) + -> io::Result<&[u8]> { assert!(self.cursor <= self.buffer.len()); let amount_buffered = self.buffer.len() - self.cursor; @@ -760,8 +807,43 @@ impl<'a> Transformer<'a> { let mut new_buffer = Vec::with_capacity(want); new_buffer.extend_from_slice(&self.buffer[self.cursor..]); + const MAX_CHUNK_SIZE: usize = 1 << 22; // 4 megabytes. self.state = match self.state { - TransformationState::Data => { + TransformationState::Buffer(done) if ! done && want == 1 => + { + // This is an opportunity to get on the happy path + // of passing data through without buffering it. + + // First, peek to see how much is available. + let pass_thru_chunk_size = MAX_CHUNK_SIZE; + let mut data = self.reader.data(pass_thru_chunk_size)?; + + // Short read? The end is nigh. + let short_read = data.len() < pass_thru_chunk_size; + let len = if short_read { + BodyLength::Full(data.len() as u32) + } else { + // Find the largest power of two equal or + // smaller than the size of data, and cap data + // to that size. + let mut s = data.len().next_power_of_two(); + if ! data.len().is_power_of_two() { + s >>= 1; + } + data = &data[..s]; + + BodyLength::Partial(data.len() as u32) + }; + + len.serialize(&mut new_buffer) + .expect("representable; write to buffer is infallible"); + + TransformationState::PassThru(short_read, data.len()) + }, + + TransformationState::Buffer(done) + | TransformationState::PassThru(done, _) if ! done => + { // Find the largest power of two equal or smaller // than the size of buf. let mut s = want.next_power_of_two(); @@ -770,7 +852,6 @@ impl<'a> Transformer<'a> { } // Cap it. Drop once we avoid the copies below. - const MAX_CHUNK_SIZE: usize = 1 << 22; // 4 megabytes. if s > MAX_CHUNK_SIZE { s = MAX_CHUNK_SIZE; } @@ -801,11 +882,14 @@ impl<'a> Transformer<'a> { if short_read { TransformationState::Sigs } else { - TransformationState::Data + TransformationState::Buffer(short_read) } }, - TransformationState::Sigs => { + TransformationState::Buffer(_) + | TransformationState::PassThru(_, _) + | TransformationState::Sigs => + { for sig in self.sigs.drain(..) { Packet::Signature(sig).serialize(&mut new_buffer) .map_err(|e| { |