diff options
author | Vladimir Feinberg <vladimir.feinberg@gmail.com> | 2021-07-17 23:08:47 +0000 |
---|---|---|
committer | Vladimir Feinberg <vladimir.feinberg@gmail.com> | 2021-07-17 23:08:47 +0000 |
commit | f08613399ee167b69eaba84edbf2304262eeb362 (patch) | |
tree | 92995cd1e5df11c93aa647dd7acbede0aac9dd1f | |
parent | 42bc43754ebef25226e2da8f1f2b5687b9b889b7 (diff) |
v0.3.0: quadratic to linear memory usage in mappers and --verbose bool flag
-rw-r--r-- | slb/Cargo.toml | 2 | ||||
-rw-r--r-- | slb/src/main.rs | 21 | ||||
-rw-r--r-- | slb/src/sharder.rs | 38 |
3 files changed, 39 insertions, 22 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml index b933a1c..99d32ff 100644 --- a/slb/Cargo.toml +++ b/slb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "slb" -version = "0.2.5" +version = "0.3.0" authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"] edition = "2018" description = "Sharded load balancing text-streaming Unix tool" diff --git a/slb/src/main.rs b/slb/src/main.rs index 7ad1816..154f2ce 100644 --- a/slb/src/main.rs +++ b/slb/src/main.rs @@ -90,19 +90,17 @@ struct Opt { #[structopt(long)] outprefix: PathBuf, - /// Buffer size in KB for buffering output before it's sent to a - /// corresponding folder from a mapper. + /// Buffer size in KB for buffering output before it's sent to + /// folders from a mapper. /// - /// Note memory usage is O(bufsize * nthreads^2) since such a buffer - /// is maintained for each mapper/folder pair, but there's a TODO to - /// fix this. + /// Note memory usage is O(bufsize * nthreads). #[structopt(long)] bufsize: Option<usize>, // TODO: consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set /// Print debug information to stderr. #[structopt(long)] - verbose: Option<bool>, + verbose: bool, // TODO: this isn't very useful as an option, consider removing entirely // or allowing a max_mappers and max_folders which controls maximum @@ -118,15 +116,17 @@ struct Opt { fn main() { let opt = Opt::from_args(); - let verbose = opt.verbose.unwrap_or(false); + let verbose = opt.verbose; let nthreads = opt.nthreads.unwrap_or(num_cpus::get_physical()); let mapper_cmd = opt.mapper.as_deref().unwrap_or("cat"); let folder_cmd = &opt.folder; - let bufsize = opt.bufsize.unwrap_or(16) * 1024; + let bufsize = opt.bufsize.unwrap_or(64) * 1024; let queuesize = 256; assert!(!opt.infile.is_empty()); - // TODO: could play with queuesize and mapper:folder ratio tuning. + // TODO: Assume bufsize is fixed due to memory constraints. + // + // We could play with queuesize and mapper:folder ratio tuning. // For map-constrained tasks, reducing folders past 1:1 ratio // probably doesn't help since folders sitting idle don't hurt anyone. // However, for fold-constrained tasks lower mapper ratios like 1:2, 1:4, @@ -150,7 +150,8 @@ fn main() { // Allow enough chunks for parallelism but not so few the chunksize // is small. - let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, 16 * 1024); + let read_chunk_size = 16 * 1024; + let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, read_chunk_size); let nthreads = chunks.len(); // smaller b/c of min bufsize assert!(nthreads >= 1); diff --git a/slb/src/sharder.rs b/slb/src/sharder.rs index f167224..295c92e 100644 --- a/slb/src/sharder.rs +++ b/slb/src/sharder.rs @@ -22,26 +22,42 @@ where R: BufRead, F: FnMut(usize, Vec<u8>), { - // TODO: currently memory usage here is O(bufsize * npartitions^2) - // we should still buffer as much as we can but keep global usage - // below some new `bufsize` given from command line. - // - // Can experiment with flushing only half the used bytes, prioritizing - // minimizing sends (so sending the largest bufs first). - let mut bufs = vec![Vec::with_capacity(bufsize * 2); npartitions]; + let mut used_space = 0; + let mut bufs = vec![Vec::new(); npartitions]; let npartitions: u64 = npartitions.try_into().unwrap(); r.for_byte_line_with_terminator(|line| { let key = hash_key(line, npartitions); + used_space += line.len(); bufs[key].extend_from_slice(line); - if bufs[key].len() >= bufsize { - f(key, mem::take(&mut bufs[key])); - bufs[key].reserve(bufsize * 2); + if used_space >= bufsize { + // You might be tempted to ask, why not just send the largest + // few buffers to avoid communication overhead? It turns out + // this really does not help, at least if we can view + // line sizes as constant (or with standard deviation much + // smaller than `bufsize`). + // + // The size of the largest bucket of a hash table with n keys + // is lg(n) on average (up to lg(lg(n)) factors). So flushing + // the top-k largest buffers at most gets rid of about k*lg(n) + // keys. With k set to asymptotically anything less than n + // (up to lg(n) factors), we'd be increasing the net number + // of flushes (calls to f) we perform. + // + // Thus, we may as well flush every buffer. + for (i, buf) in bufs.iter_mut().enumerate() { + if buf.len() > 0 { + f(i, mem::take(buf)); + } + } + used_space = 0; } Ok(true) }) .expect("successful byte line read"); for (i, buf) in bufs.into_iter().enumerate() { - f(i, buf) + if buf.len() > 0 { + f(i, buf) + } } } |