summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Feinberg <vladimir.feinberg@gmail.com>2021-07-17 22:13:50 +0000
committerVladimir Feinberg <vladimir.feinberg@gmail.com>2021-07-17 22:54:12 +0000
commit42bc43754ebef25226e2da8f1f2b5687b9b889b7 (patch)
treed7d6f66a4fd474242c01c18cc23a62e2ba39cafe
parentb1e52b120318bb156eb4a9907ac165a7d5b0c2ab (diff)
v0.2.5: avoid needless pipe construction
-rw-r--r--slb/Cargo.toml2
-rw-r--r--slb/src/fileblocks.rs13
-rw-r--r--slb/src/main.rs40
-rw-r--r--slb/src/sharder.rs6
4 files changed, 38 insertions, 23 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml
index 5aaa090..b933a1c 100644
--- a/slb/Cargo.toml
+++ b/slb/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "slb"
-version = "0.2.4"
+version = "0.2.5"
authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"]
edition = "2018"
description = "Sharded load balancing text-streaming Unix tool"
diff --git a/slb/src/fileblocks.rs b/slb/src/fileblocks.rs
index 136f201..39f2108 100644
--- a/slb/src/fileblocks.rs
+++ b/slb/src/fileblocks.rs
@@ -24,6 +24,19 @@ pub struct FileChunk {
}
impl FileChunk {
+ /// Prepare a pre-seeked file for this chunk.
+ pub fn file(&self) -> File {
+ let mut file = File::open(&self.path).expect("file available");
+ file.seek(SeekFrom::Start(self.start.try_into().unwrap()))
+ .expect("seek");
+ file
+ }
+
+ /// Return the number of bytes to read for this chunk.
+ pub fn nbytes(&self) -> usize {
+ self.stop - self.start
+ }
+
/// Iterates over just those lines the file chunk refers to.
pub fn dump<W: Write>(&self, mut w: W) {
let mut file = File::open(&self.path).expect("file available");
diff --git a/slb/src/main.rs b/slb/src/main.rs
index 3f7ebdc..7ad1816 100644
--- a/slb/src/main.rs
+++ b/slb/src/main.rs
@@ -90,9 +90,12 @@ struct Opt {
#[structopt(long)]
outprefix: PathBuf,
- /// Buffer size in KB for reading chunks of input.
+ /// Buffer size in KB for buffering output before it's sent to a
+ /// corresponding folder from a mapper.
///
- /// Note memory usage is O(bufsize * nthreads)
+ /// Note memory usage is O(bufsize * nthreads^2) since such a buffer
+ /// is maintained for each mapper/folder pair, but there's a TODO to
+ /// fix this.
#[structopt(long)]
bufsize: Option<usize>,
@@ -147,35 +150,31 @@ fn main() {
// Allow enough chunks for parallelism but not so few the chunksize
// is small.
- let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, bufsize);
+ let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, 16 * 1024);
let nthreads = chunks.len(); // smaller b/c of min bufsize
assert!(nthreads >= 1);
- let mut mapper_processes: Vec<_> = (0..nthreads)
- .map(|i| {
+ let mut mapper_processes: Vec<_> = chunks
+ .iter()
+ .enumerate()
+ .map(|(i, chunk)| {
Command::new("/bin/bash")
.arg("-c")
- .arg(mapper_cmd)
- .stdin(Stdio::piped())
+ .arg(format!(
+ "/bin/bash -c 'head -c {} | {}'",
+ chunk.nbytes(),
+ mapper_cmd
+ ))
+ .stdin(chunk.file())
.stdout(Stdio::piped())
.spawn()
.unwrap_or_else(|err| panic!("error spawn map child {}: {}", i, err))
})
.collect();
- let (mapper_inputs, mapper_outputs): (Vec<_>, Vec<_>) = mapper_processes
+ let mapper_outputs: Vec<_> = mapper_processes
.iter_mut()
- .map(|child| {
- let stdin = child.stdin.take().unwrap();
- let stdout = child.stdout.take().unwrap();
- (stdin, stdout)
- })
- .unzip();
-
- let mapper_input_threads: Vec<_> = mapper_inputs
- .into_iter()
- .zip(chunks.into_iter())
- .map(|(input, chunk)| thread::spawn(move || chunk.dump(input)))
+ .map(|child| child.stdout.take().unwrap())
.collect();
let (txs, rxs): (Vec<_>, Vec<_>) = (0..nthreads).map(|_| sync_channel(queuesize)).unzip();
@@ -250,9 +249,6 @@ fn main() {
})
.collect();
- mapper_input_threads
- .into_iter()
- .for_each(|handle| handle.join().expect("map input join"));
mapper_processes
.into_iter()
.for_each(|mut child| assert!(child.wait().expect("wait").success()));
diff --git a/slb/src/sharder.rs b/slb/src/sharder.rs
index f6331fc..f167224 100644
--- a/slb/src/sharder.rs
+++ b/slb/src/sharder.rs
@@ -22,6 +22,12 @@ where
R: BufRead,
F: FnMut(usize, Vec<u8>),
{
+ // TODO: currently memory usage here is O(bufsize * npartitions^2)
+ // we should still buffer as much as we can but keep global usage
+ // below some new `bufsize` given from command line.
+ //
+ // Can experiment with flushing only half the used bytes, prioritizing
+ // minimizing sends (so sending the largest bufs first).
let mut bufs = vec![Vec::with_capacity(bufsize * 2); npartitions];
let npartitions: u64 = npartitions.try_into().unwrap();
r.for_byte_line_with_terminator(|line| {