diff options
author | Justus Winter <justus@sequoia-pgp.org> | 2021-11-04 10:15:11 +0100 |
---|---|---|
committer | Justus Winter <justus@sequoia-pgp.org> | 2021-11-04 10:29:39 +0100 |
commit | 62be3957d51efb2191b56c97c732583d02caf1ae (patch) | |
tree | a7866bc027d03bd757f6668d1c66287bcfbb45e0 | |
parent | 11e408c7b7b644f5b94af06dd54598d1f8ff58fc (diff) |
openpgp: Improve the streaming Decryptor's buffer strategy.
- Previously, for a read of X bytes, we'd request X + buffer_size
from the underlying buffered reader, then satisfy the read
request, after which we'd request the next X + buffer_size bytes
for the next read. This requires the underlying reader to copy
buffer_size bytes for each read. In a typical scenario, we'd copy
25 megabytes (the default buffer size) for every 8 kilobytes
read (std::io::copy's default buffer size). This incurs a linear
cost with a very high factor.
- Improve this by requesting 2 * buffer_size, then satisfying the
reads from the first half of that buffer, only consuming the first
half once we have exhausted the first half. Then, we'd request
the next 2 * buffer_size, at which point the underlying buffered
reader has to copy the data to a new buffer.
- See #771.
-rw-r--r-- | openpgp/src/parse/stream.rs | 51 |
1 files changed, 40 insertions, 11 deletions
diff --git a/openpgp/src/parse/stream.rs b/openpgp/src/parse/stream.rs index abd9e061..da21b320 100644 --- a/openpgp/src/parse/stream.rs +++ b/openpgp/src/parse/stream.rs @@ -102,7 +102,7 @@ //! # Ok(()) } //! ``` use std::cmp; -use std::io::{self, Read}; +use std::io; use std::path::Path; use std::time; @@ -143,6 +143,9 @@ use crate::parse::{ /// Whether to trace execution by default (on stderr). const TRACE : bool = false; +/// Indentation level for tracing in this module. +const TRACE_INDENT: isize = 5; + /// How much data to buffer before giving it to the caller. /// /// Signature verification and detection of ciphertext tampering @@ -2278,7 +2281,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { -> Result<Decryptor<'a, H>> where T: Into<Option<time::SystemTime>> { - tracer!(TRACE, "Decryptor::from_buffered_reader", 0); + tracer!(TRACE, "Decryptor::from_buffered_reader", TRACE_INDENT); let time = time.into(); let tolerance = time @@ -2529,13 +2532,17 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { // // Note: once this call succeeds, you may not call it again. fn finish_maybe(&mut self) -> Result<()> { + tracer!(TRACE, "Decryptor::finish_maybe", TRACE_INDENT); if let Some(PacketParserResult::Some(mut pp)) = self.oppr.take() { // Check if we hit EOF. let data_len = pp.data(self.buffer_size + 1)?.len(); - if data_len <= self.buffer_size { + if data_len - self.cursor <= self.buffer_size { // Stash the reserve. - self.reserve = Some(pp.steal_eof()?); + t!("Hit eof with {} bytes of the current buffer consumed.", + self.cursor); + pp.consume(self.cursor); self.cursor = 0; + self.reserve = Some(pp.steal_eof()?); // Process the rest of the packets. let mut ppr = PacketParserResult::Some(pp); @@ -2602,6 +2609,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { self.verify_signatures() } else { + t!("Didn't hit EOF."); self.oppr = Some(PacketParserResult::Some(pp)); Ok(()) } @@ -2613,7 +2621,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { /// Verifies the signatures. #[allow(clippy::blocks_in_if_conditions)] fn verify_signatures(&mut self) -> Result<()> { - tracer!(TRACE, "Decryptor::verify_signatures", 0); + tracer!(TRACE, "Decryptor::verify_signatures", TRACE_INDENT); t!("called"); self.certs = self.helper.get_certs(&self.issuers)?; @@ -2819,6 +2827,9 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { /// Like `io::Read::read()`, but returns our `Result`. fn read_helper(&mut self, buf: &mut [u8]) -> Result<usize> { + tracer!(TRACE, "Decryptor::read_helper", TRACE_INDENT); + t!("read(buf of {} bytes)", buf.len()); + if buf.is_empty() { return Ok(0); } @@ -2826,6 +2837,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { if let Some(ref mut reserve) = self.reserve { // The message has been verified. We can now drain the // reserve. + t!("Message verified, draining reserve."); assert!(self.oppr.is_none()); assert!(self.cursor <= reserve.len()); let n = cmp::min(buf.len(), reserve.len() - self.cursor); @@ -2838,17 +2850,33 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> { // Read the data from the Literal data packet. if let Some(PacketParserResult::Some(mut pp)) = self.oppr.take() { // Be careful to not read from the reserve. - let data_len = pp.data(self.buffer_size + buf.len())?.len(); - if data_len <= self.buffer_size { + if self.cursor >= self.buffer_size { + // Consume the active part of the buffer. + t!("Consuming first part of the buffer."); + pp.consume(self.buffer_size); + self.cursor -= self.buffer_size; + } + + // We request two times what our buffer size is, the first + // part is the one we give out, the second part is the one + // we hold back. + let data_len = pp.data(2 * self.buffer_size)?.len(); + t!("Read {} bytes.", data_len); + if data_len - self.cursor <= self.buffer_size { self.oppr = Some(PacketParserResult::Some(pp)); self.finish_maybe()?; self.read_helper(buf) } else { - let n = cmp::min(buf.len(), data_len - self.buffer_size); - let buf = &mut buf[..n]; - let result = pp.read(buf); + let data = pp.data(2 * self.buffer_size - self.cursor)?; + assert_eq!(data.len(), data_len); + + let n = + buf.len().min(data_len - self.buffer_size - self.cursor); + buf[..n].copy_from_slice(&data[self.cursor..self.cursor + n]); + self.cursor += n; self.oppr = Some(PacketParserResult::Some(pp)); - Ok(result?) + t!("Copied {} bytes from buffer, cursor is {}.", n, self.cursor); + Ok(n) } } else { panic!("No ppr."); @@ -2873,6 +2901,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> io::Read for Decryptor<'a, H> #[cfg(test)] mod test { + use std::io::Read; use super::*; use std::convert::TryFrom; use crate::parse::Parse; |