summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Feinberg <vladimir.feinberg@gmail.com>2021-07-17 23:08:47 +0000
committerVladimir Feinberg <vladimir.feinberg@gmail.com>2021-07-17 23:08:47 +0000
commitf08613399ee167b69eaba84edbf2304262eeb362 (patch)
tree92995cd1e5df11c93aa647dd7acbede0aac9dd1f
parent42bc43754ebef25226e2da8f1f2b5687b9b889b7 (diff)
v0.3.0: quadratic to linear memory usage in mappers and --verbose bool flag
-rw-r--r--slb/Cargo.toml2
-rw-r--r--slb/src/main.rs21
-rw-r--r--slb/src/sharder.rs38
3 files changed, 39 insertions, 22 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml
index b933a1c..99d32ff 100644
--- a/slb/Cargo.toml
+++ b/slb/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "slb"
-version = "0.2.5"
+version = "0.3.0"
authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"]
edition = "2018"
description = "Sharded load balancing text-streaming Unix tool"
diff --git a/slb/src/main.rs b/slb/src/main.rs
index 7ad1816..154f2ce 100644
--- a/slb/src/main.rs
+++ b/slb/src/main.rs
@@ -90,19 +90,17 @@ struct Opt {
#[structopt(long)]
outprefix: PathBuf,
- /// Buffer size in KB for buffering output before it's sent to a
- /// corresponding folder from a mapper.
+ /// Buffer size in KB for buffering output before it's sent to
+ /// folders from a mapper.
///
- /// Note memory usage is O(bufsize * nthreads^2) since such a buffer
- /// is maintained for each mapper/folder pair, but there's a TODO to
- /// fix this.
+ /// Note memory usage is O(bufsize * nthreads).
#[structopt(long)]
bufsize: Option<usize>,
// TODO: consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set
/// Print debug information to stderr.
#[structopt(long)]
- verbose: Option<bool>,
+ verbose: bool,
// TODO: this isn't very useful as an option, consider removing entirely
// or allowing a max_mappers and max_folders which controls maximum
@@ -118,15 +116,17 @@ struct Opt {
fn main() {
let opt = Opt::from_args();
- let verbose = opt.verbose.unwrap_or(false);
+ let verbose = opt.verbose;
let nthreads = opt.nthreads.unwrap_or(num_cpus::get_physical());
let mapper_cmd = opt.mapper.as_deref().unwrap_or("cat");
let folder_cmd = &opt.folder;
- let bufsize = opt.bufsize.unwrap_or(16) * 1024;
+ let bufsize = opt.bufsize.unwrap_or(64) * 1024;
let queuesize = 256;
assert!(!opt.infile.is_empty());
- // TODO: could play with queuesize and mapper:folder ratio tuning.
+ // TODO: Assume bufsize is fixed due to memory constraints.
+ //
+ // We could play with queuesize and mapper:folder ratio tuning.
// For map-constrained tasks, reducing folders past 1:1 ratio
// probably doesn't help since folders sitting idle don't hurt anyone.
// However, for fold-constrained tasks lower mapper ratios like 1:2, 1:4,
@@ -150,7 +150,8 @@ fn main() {
// Allow enough chunks for parallelism but not so few the chunksize
// is small.
- let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, 16 * 1024);
+ let read_chunk_size = 16 * 1024;
+ let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, read_chunk_size);
let nthreads = chunks.len(); // smaller b/c of min bufsize
assert!(nthreads >= 1);
diff --git a/slb/src/sharder.rs b/slb/src/sharder.rs
index f167224..295c92e 100644
--- a/slb/src/sharder.rs
+++ b/slb/src/sharder.rs
@@ -22,26 +22,42 @@ where
R: BufRead,
F: FnMut(usize, Vec<u8>),
{
- // TODO: currently memory usage here is O(bufsize * npartitions^2)
- // we should still buffer as much as we can but keep global usage
- // below some new `bufsize` given from command line.
- //
- // Can experiment with flushing only half the used bytes, prioritizing
- // minimizing sends (so sending the largest bufs first).
- let mut bufs = vec![Vec::with_capacity(bufsize * 2); npartitions];
+ let mut used_space = 0;
+ let mut bufs = vec![Vec::new(); npartitions];
let npartitions: u64 = npartitions.try_into().unwrap();
r.for_byte_line_with_terminator(|line| {
let key = hash_key(line, npartitions);
+ used_space += line.len();
bufs[key].extend_from_slice(line);
- if bufs[key].len() >= bufsize {
- f(key, mem::take(&mut bufs[key]));
- bufs[key].reserve(bufsize * 2);
+ if used_space >= bufsize {
+ // You might be tempted to ask, why not just send the largest
+ // few buffers to avoid communication overhead? It turns out
+ // this really does not help, at least if we can view
+ // line sizes as constant (or with standard deviation much
+ // smaller than `bufsize`).
+ //
+ // The size of the largest bucket of a hash table with n keys
+ // is lg(n) on average (up to lg(lg(n)) factors). So flushing
+ // the top-k largest buffers at most gets rid of about k*lg(n)
+ // keys. With k set to asymptotically anything less than n
+ // (up to lg(n) factors), we'd be increasing the net number
+ // of flushes (calls to f) we perform.
+ //
+ // Thus, we may as well flush every buffer.
+ for (i, buf) in bufs.iter_mut().enumerate() {
+ if buf.len() > 0 {
+ f(i, mem::take(buf));
+ }
+ }
+ used_space = 0;
}
Ok(true)
})
.expect("successful byte line read");
for (i, buf) in bufs.into_iter().enumerate() {
- f(i, buf)
+ if buf.len() > 0 {
+ f(i, buf)
+ }
}
}