diff options
author | Vladimir Feinberg <vladimir.feinberg@gmail.com> | 2021-07-17 21:22:36 +0000 |
---|---|---|
committer | Vladimir Feinberg <vladimir.feinberg@gmail.com> | 2021-07-17 21:22:36 +0000 |
commit | b1e52b120318bb156eb4a9907ac165a7d5b0c2ab (patch) | |
tree | 6853a2c3f4afc5dd808e2569490356a8d40aa4ce | |
parent | 5c9d4b4f8b74eccaea370d8f6a79960c78ff75b4 (diff) |
slb 0.2.4: updated statistics logging and TODOs
-rw-r--r-- | slb/Cargo.toml | 2 | ||||
-rw-r--r-- | slb/src/main.rs | 73 |
2 files changed, 57 insertions, 18 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml index f6ded9a..5aaa090 100644 --- a/slb/Cargo.toml +++ b/slb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "slb" -version = "0.2.3" +version = "0.2.4" authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"] edition = "2018" description = "Sharded load balancing text-streaming Unix tool" diff --git a/slb/src/main.rs b/slb/src/main.rs index 954b479..3f7ebdc 100644 --- a/slb/src/main.rs +++ b/slb/src/main.rs @@ -6,8 +6,8 @@ use std::ops::Deref; use std::path::PathBuf; use std::process::{Command, Stdio}; -use std::sync::mpsc::sync_channel; -use std::sync::Arc; +use std::sync::mpsc::{sync_channel, TrySendError}; +use std::sync::{Arc, Mutex}; use std::thread; use structopt::StructOpt; @@ -90,20 +90,22 @@ struct Opt { #[structopt(long)] outprefix: PathBuf, - /// Buffer size in KB for reading chunks of input, parameter - /// shared between both mapper and folder right now. + /// Buffer size in KB for reading chunks of input. /// /// Note memory usage is O(bufsize * nthreads) #[structopt(long)] bufsize: Option<usize>, - // TODO consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set + // TODO: consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set /// Print debug information to stderr. #[structopt(long)] verbose: Option<bool>, - /// Number of parallel threads per stage to launch: the total - /// number of live threads could be a multiple of this becuase + // TODO: this isn't very useful as an option, consider removing entirely + // or allowing a max_mappers and max_folders which controls maximum + // concurrency + /// Approximate target number of parallel threads per stage to launch: + /// the total number of live threads could be a multiple of this becuase /// of concurrent mapper and folder stages. /// /// Defaults to num CPUs. @@ -113,7 +115,7 @@ struct Opt { fn main() { let opt = Opt::from_args(); - let _verbose = opt.verbose.unwrap_or(false); + let verbose = opt.verbose.unwrap_or(false); let nthreads = opt.nthreads.unwrap_or(num_cpus::get_physical()); let mapper_cmd = opt.mapper.as_deref().unwrap_or("cat"); let folder_cmd = &opt.folder; @@ -121,15 +123,30 @@ fn main() { let queuesize = 256; assert!(!opt.infile.is_empty()); - // TODO: could play with mapper/folder parallelism, bufsize, - // and queuesize being tuned. Ideally could do tuning automatically. - - // TODO: very crude chunking below probably hurts in the presence of stragglers, - // finer grained work-stealing would be ideal. + // TODO: could play with queuesize and mapper:folder ratio tuning. + // For map-constrained tasks, reducing folders past 1:1 ratio + // probably doesn't help since folders sitting idle don't hurt anyone. + // However, for fold-constrained tasks lower mapper ratios like 1:2, 1:4, + // and etc. are interesting since memory usage and block time could be + // reduced after dynamic tuning. Then for a given ideal mapper:folder + // ratio, which could be derived with Little's law, and a given + // variance in mapper speed (normalized by reducer speed), after + // assuming the hash is uniform, one can compute variance in queue + // lengths given a Poisson process setup. This means that computing + // statistics about mapper/folder speeds is enough to back out + // the ideal mapper:folder ratio and queue size (queue size chosen such + // that blocking is avoided with 99% probability at any fixed steady-state + // time). + // + // Above approach can work simply assuming mapper/folder speeds are constant + // over time quanta holding past trends. Otherwise, a control theory + // approach could be used. + // + // This would be fun to investigate more deeply, but I have yet to encounter + // a folder-constrained task IRL to test this on. // Allow enough chunks for parallelism but not so few the chunksize // is small. - let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, bufsize); let nthreads = chunks.len(); // smaller b/c of min bufsize assert!(nthreads >= 1); @@ -161,20 +178,36 @@ fn main() { .map(|(input, chunk)| thread::spawn(move || chunk.dump(input))) .collect(); - // TODO: wrap these with atomic ints to log capacity / P(capacity=max) let (txs, rxs): (Vec<_>, Vec<_>) = (0..nthreads).map(|_| sync_channel(queuesize)).unzip(); + let lines_sent = vec![0usize; nthreads]; + let lines_blocking = vec![0usize; nthreads]; + let stats = Arc::new(Mutex::new((lines_sent, lines_blocking))); let txs_ref = Arc::new(txs); let mapper_output_threads: Vec<_> = mapper_outputs .into_iter() .map(|output| { let txs_ref_clone = Arc::clone(&txs_ref); + let stats = Arc::clone(&stats); thread::spawn(move || { let output = BufReader::new(output); let txs_ref_local = txs_ref_clone.deref(); + let mut lines_sent = vec![0usize; nthreads]; + let mut lines_blocking = vec![0usize; nthreads]; sharder::shard(output, nthreads, bufsize, |ix, buf| { - txs_ref_local[ix].send(buf).expect("send"); - }) + lines_sent[ix] += 1; + if let Err(TrySendError::Full(buf)) = txs_ref_local[ix].try_send(buf) { + lines_blocking[ix] += 1; + txs_ref_local[ix].send(buf).expect("send"); + } + }); + let mut guard = stats.lock().unwrap(); + for i in 0..nthreads { + let ref mut sends = guard.0; + sends[i] += lines_sent[i]; + let ref mut blocks = guard.1; + blocks[i] += lines_blocking[i]; + } }) }) .collect(); @@ -234,4 +267,10 @@ fn main() { folder_input_output_threads .into_iter() .for_each(|handle| handle.join().expect("fold join")); + + let stats = Arc::try_unwrap(stats).expect("final reference"); + let (lines_sent, lines_blocking) = stats.into_inner().unwrap(); + if verbose { + println!("sent {:?}\nblock {:?}", lines_sent, lines_blocking); + } } |