summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Feinberg <vladimir.feinberg@gmail.com>2021-07-17 21:22:36 +0000
committerVladimir Feinberg <vladimir.feinberg@gmail.com>2021-07-17 21:22:36 +0000
commitb1e52b120318bb156eb4a9907ac165a7d5b0c2ab (patch)
tree6853a2c3f4afc5dd808e2569490356a8d40aa4ce
parent5c9d4b4f8b74eccaea370d8f6a79960c78ff75b4 (diff)
slb 0.2.4: updated statistics logging and TODOs
-rw-r--r--slb/Cargo.toml2
-rw-r--r--slb/src/main.rs73
2 files changed, 57 insertions, 18 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml
index f6ded9a..5aaa090 100644
--- a/slb/Cargo.toml
+++ b/slb/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "slb"
-version = "0.2.3"
+version = "0.2.4"
authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"]
edition = "2018"
description = "Sharded load balancing text-streaming Unix tool"
diff --git a/slb/src/main.rs b/slb/src/main.rs
index 954b479..3f7ebdc 100644
--- a/slb/src/main.rs
+++ b/slb/src/main.rs
@@ -6,8 +6,8 @@ use std::ops::Deref;
use std::path::PathBuf;
use std::process::{Command, Stdio};
-use std::sync::mpsc::sync_channel;
-use std::sync::Arc;
+use std::sync::mpsc::{sync_channel, TrySendError};
+use std::sync::{Arc, Mutex};
use std::thread;
use structopt::StructOpt;
@@ -90,20 +90,22 @@ struct Opt {
#[structopt(long)]
outprefix: PathBuf,
- /// Buffer size in KB for reading chunks of input, parameter
- /// shared between both mapper and folder right now.
+ /// Buffer size in KB for reading chunks of input.
///
/// Note memory usage is O(bufsize * nthreads)
#[structopt(long)]
bufsize: Option<usize>,
- // TODO consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set
+ // TODO: consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set
/// Print debug information to stderr.
#[structopt(long)]
verbose: Option<bool>,
- /// Number of parallel threads per stage to launch: the total
- /// number of live threads could be a multiple of this becuase
+ // TODO: this isn't very useful as an option, consider removing entirely
+ // or allowing a max_mappers and max_folders which controls maximum
+ // concurrency
+ /// Approximate target number of parallel threads per stage to launch:
+ /// the total number of live threads could be a multiple of this becuase
/// of concurrent mapper and folder stages.
///
/// Defaults to num CPUs.
@@ -113,7 +115,7 @@ struct Opt {
fn main() {
let opt = Opt::from_args();
- let _verbose = opt.verbose.unwrap_or(false);
+ let verbose = opt.verbose.unwrap_or(false);
let nthreads = opt.nthreads.unwrap_or(num_cpus::get_physical());
let mapper_cmd = opt.mapper.as_deref().unwrap_or("cat");
let folder_cmd = &opt.folder;
@@ -121,15 +123,30 @@ fn main() {
let queuesize = 256;
assert!(!opt.infile.is_empty());
- // TODO: could play with mapper/folder parallelism, bufsize,
- // and queuesize being tuned. Ideally could do tuning automatically.
-
- // TODO: very crude chunking below probably hurts in the presence of stragglers,
- // finer grained work-stealing would be ideal.
+ // TODO: could play with queuesize and mapper:folder ratio tuning.
+ // For map-constrained tasks, reducing folders past 1:1 ratio
+ // probably doesn't help since folders sitting idle don't hurt anyone.
+ // However, for fold-constrained tasks lower mapper ratios like 1:2, 1:4,
+ // and etc. are interesting since memory usage and block time could be
+ // reduced after dynamic tuning. Then for a given ideal mapper:folder
+ // ratio, which could be derived with Little's law, and a given
+ // variance in mapper speed (normalized by reducer speed), after
+ // assuming the hash is uniform, one can compute variance in queue
+ // lengths given a Poisson process setup. This means that computing
+ // statistics about mapper/folder speeds is enough to back out
+ // the ideal mapper:folder ratio and queue size (queue size chosen such
+ // that blocking is avoided with 99% probability at any fixed steady-state
+ // time).
+ //
+ // Above approach can work simply assuming mapper/folder speeds are constant
+ // over time quanta holding past trends. Otherwise, a control theory
+ // approach could be used.
+ //
+ // This would be fun to investigate more deeply, but I have yet to encounter
+ // a folder-constrained task IRL to test this on.
// Allow enough chunks for parallelism but not so few the chunksize
// is small.
-
let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, bufsize);
let nthreads = chunks.len(); // smaller b/c of min bufsize
assert!(nthreads >= 1);
@@ -161,20 +178,36 @@ fn main() {
.map(|(input, chunk)| thread::spawn(move || chunk.dump(input)))
.collect();
- // TODO: wrap these with atomic ints to log capacity / P(capacity=max)
let (txs, rxs): (Vec<_>, Vec<_>) = (0..nthreads).map(|_| sync_channel(queuesize)).unzip();
+ let lines_sent = vec![0usize; nthreads];
+ let lines_blocking = vec![0usize; nthreads];
+ let stats = Arc::new(Mutex::new((lines_sent, lines_blocking)));
let txs_ref = Arc::new(txs);
let mapper_output_threads: Vec<_> = mapper_outputs
.into_iter()
.map(|output| {
let txs_ref_clone = Arc::clone(&txs_ref);
+ let stats = Arc::clone(&stats);
thread::spawn(move || {
let output = BufReader::new(output);
let txs_ref_local = txs_ref_clone.deref();
+ let mut lines_sent = vec![0usize; nthreads];
+ let mut lines_blocking = vec![0usize; nthreads];
sharder::shard(output, nthreads, bufsize, |ix, buf| {
- txs_ref_local[ix].send(buf).expect("send");
- })
+ lines_sent[ix] += 1;
+ if let Err(TrySendError::Full(buf)) = txs_ref_local[ix].try_send(buf) {
+ lines_blocking[ix] += 1;
+ txs_ref_local[ix].send(buf).expect("send");
+ }
+ });
+ let mut guard = stats.lock().unwrap();
+ for i in 0..nthreads {
+ let ref mut sends = guard.0;
+ sends[i] += lines_sent[i];
+ let ref mut blocks = guard.1;
+ blocks[i] += lines_blocking[i];
+ }
})
})
.collect();
@@ -234,4 +267,10 @@ fn main() {
folder_input_output_threads
.into_iter()
.for_each(|handle| handle.join().expect("fold join"));
+
+ let stats = Arc::try_unwrap(stats).expect("final reference");
+ let (lines_sent, lines_blocking) = stats.into_inner().unwrap();
+ if verbose {
+ println!("sent {:?}\nblock {:?}", lines_sent, lines_blocking);
+ }
}