summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJustus Winter <justus@sequoia-pgp.org>2021-11-04 10:15:11 +0100
committerJustus Winter <justus@sequoia-pgp.org>2021-11-04 10:29:39 +0100
commit62be3957d51efb2191b56c97c732583d02caf1ae (patch)
treea7866bc027d03bd757f6668d1c66287bcfbb45e0
parent11e408c7b7b644f5b94af06dd54598d1f8ff58fc (diff)
openpgp: Improve the streaming Decryptor's buffer strategy.
- Previously, for a read of X bytes, we'd request X + buffer_size from the underlying buffered reader, then satisfy the read request, after which we'd request the next X + buffer_size bytes for the next read. This requires the underlying reader to copy buffer_size bytes for each read. In a typical scenario, we'd copy 25 megabytes (the default buffer size) for every 8 kilobytes read (std::io::copy's default buffer size). This incurs a linear cost with a very high factor. - Improve this by requesting 2 * buffer_size, then satisfying the reads from the first half of that buffer, only consuming the first half once we have exhausted the first half. Then, we'd request the next 2 * buffer_size, at which point the underlying buffered reader has to copy the data to a new buffer. - See #771.
-rw-r--r--openpgp/src/parse/stream.rs51
1 files changed, 40 insertions, 11 deletions
diff --git a/openpgp/src/parse/stream.rs b/openpgp/src/parse/stream.rs
index abd9e061..da21b320 100644
--- a/openpgp/src/parse/stream.rs
+++ b/openpgp/src/parse/stream.rs
@@ -102,7 +102,7 @@
//! # Ok(()) }
//! ```
use std::cmp;
-use std::io::{self, Read};
+use std::io;
use std::path::Path;
use std::time;
@@ -143,6 +143,9 @@ use crate::parse::{
/// Whether to trace execution by default (on stderr).
const TRACE : bool = false;
+/// Indentation level for tracing in this module.
+const TRACE_INDENT: isize = 5;
+
/// How much data to buffer before giving it to the caller.
///
/// Signature verification and detection of ciphertext tampering
@@ -2278,7 +2281,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
-> Result<Decryptor<'a, H>>
where T: Into<Option<time::SystemTime>>
{
- tracer!(TRACE, "Decryptor::from_buffered_reader", 0);
+ tracer!(TRACE, "Decryptor::from_buffered_reader", TRACE_INDENT);
let time = time.into();
let tolerance = time
@@ -2529,13 +2532,17 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
//
// Note: once this call succeeds, you may not call it again.
fn finish_maybe(&mut self) -> Result<()> {
+ tracer!(TRACE, "Decryptor::finish_maybe", TRACE_INDENT);
if let Some(PacketParserResult::Some(mut pp)) = self.oppr.take() {
// Check if we hit EOF.
let data_len = pp.data(self.buffer_size + 1)?.len();
- if data_len <= self.buffer_size {
+ if data_len - self.cursor <= self.buffer_size {
// Stash the reserve.
- self.reserve = Some(pp.steal_eof()?);
+ t!("Hit eof with {} bytes of the current buffer consumed.",
+ self.cursor);
+ pp.consume(self.cursor);
self.cursor = 0;
+ self.reserve = Some(pp.steal_eof()?);
// Process the rest of the packets.
let mut ppr = PacketParserResult::Some(pp);
@@ -2602,6 +2609,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
self.verify_signatures()
} else {
+ t!("Didn't hit EOF.");
self.oppr = Some(PacketParserResult::Some(pp));
Ok(())
}
@@ -2613,7 +2621,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
/// Verifies the signatures.
#[allow(clippy::blocks_in_if_conditions)]
fn verify_signatures(&mut self) -> Result<()> {
- tracer!(TRACE, "Decryptor::verify_signatures", 0);
+ tracer!(TRACE, "Decryptor::verify_signatures", TRACE_INDENT);
t!("called");
self.certs = self.helper.get_certs(&self.issuers)?;
@@ -2819,6 +2827,9 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
/// Like `io::Read::read()`, but returns our `Result`.
fn read_helper(&mut self, buf: &mut [u8]) -> Result<usize> {
+ tracer!(TRACE, "Decryptor::read_helper", TRACE_INDENT);
+ t!("read(buf of {} bytes)", buf.len());
+
if buf.is_empty() {
return Ok(0);
}
@@ -2826,6 +2837,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
if let Some(ref mut reserve) = self.reserve {
// The message has been verified. We can now drain the
// reserve.
+ t!("Message verified, draining reserve.");
assert!(self.oppr.is_none());
assert!(self.cursor <= reserve.len());
let n = cmp::min(buf.len(), reserve.len() - self.cursor);
@@ -2838,17 +2850,33 @@ impl<'a, H: VerificationHelper + DecryptionHelper> Decryptor<'a, H> {
// Read the data from the Literal data packet.
if let Some(PacketParserResult::Some(mut pp)) = self.oppr.take() {
// Be careful to not read from the reserve.
- let data_len = pp.data(self.buffer_size + buf.len())?.len();
- if data_len <= self.buffer_size {
+ if self.cursor >= self.buffer_size {
+ // Consume the active part of the buffer.
+ t!("Consuming first part of the buffer.");
+ pp.consume(self.buffer_size);
+ self.cursor -= self.buffer_size;
+ }
+
+ // We request two times what our buffer size is, the first
+ // part is the one we give out, the second part is the one
+ // we hold back.
+ let data_len = pp.data(2 * self.buffer_size)?.len();
+ t!("Read {} bytes.", data_len);
+ if data_len - self.cursor <= self.buffer_size {
self.oppr = Some(PacketParserResult::Some(pp));
self.finish_maybe()?;
self.read_helper(buf)
} else {
- let n = cmp::min(buf.len(), data_len - self.buffer_size);
- let buf = &mut buf[..n];
- let result = pp.read(buf);
+ let data = pp.data(2 * self.buffer_size - self.cursor)?;
+ assert_eq!(data.len(), data_len);
+
+ let n =
+ buf.len().min(data_len - self.buffer_size - self.cursor);
+ buf[..n].copy_from_slice(&data[self.cursor..self.cursor + n]);
+ self.cursor += n;
self.oppr = Some(PacketParserResult::Some(pp));
- Ok(result?)
+ t!("Copied {} bytes from buffer, cursor is {}.", n, self.cursor);
+ Ok(n)
}
} else {
panic!("No ppr.");
@@ -2873,6 +2901,7 @@ impl<'a, H: VerificationHelper + DecryptionHelper> io::Read for Decryptor<'a, H>
#[cfg(test)]
mod test {
+ use std::io::Read;
use super::*;
use std::convert::TryFrom;
use crate::parse::Parse;