diff options
author | Vladimir Feinberg <vladimir.feinberg@gmail.com> | 2021-07-17 22:13:50 +0000 |
---|---|---|
committer | Vladimir Feinberg <vladimir.feinberg@gmail.com> | 2021-07-17 22:54:12 +0000 |
commit | 42bc43754ebef25226e2da8f1f2b5687b9b889b7 (patch) | |
tree | d7d6f66a4fd474242c01c18cc23a62e2ba39cafe | |
parent | b1e52b120318bb156eb4a9907ac165a7d5b0c2ab (diff) |
v0.2.5: avoid needless pipe construction
-rw-r--r-- | slb/Cargo.toml | 2 | ||||
-rw-r--r-- | slb/src/fileblocks.rs | 13 | ||||
-rw-r--r-- | slb/src/main.rs | 40 | ||||
-rw-r--r-- | slb/src/sharder.rs | 6 |
4 files changed, 38 insertions, 23 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml index 5aaa090..b933a1c 100644 --- a/slb/Cargo.toml +++ b/slb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "slb" -version = "0.2.4" +version = "0.2.5" authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"] edition = "2018" description = "Sharded load balancing text-streaming Unix tool" diff --git a/slb/src/fileblocks.rs b/slb/src/fileblocks.rs index 136f201..39f2108 100644 --- a/slb/src/fileblocks.rs +++ b/slb/src/fileblocks.rs @@ -24,6 +24,19 @@ pub struct FileChunk { } impl FileChunk { + /// Prepare a pre-seeked file for this chunk. + pub fn file(&self) -> File { + let mut file = File::open(&self.path).expect("file available"); + file.seek(SeekFrom::Start(self.start.try_into().unwrap())) + .expect("seek"); + file + } + + /// Return the number of bytes to read for this chunk. + pub fn nbytes(&self) -> usize { + self.stop - self.start + } + /// Iterates over just those lines the file chunk refers to. pub fn dump<W: Write>(&self, mut w: W) { let mut file = File::open(&self.path).expect("file available"); diff --git a/slb/src/main.rs b/slb/src/main.rs index 3f7ebdc..7ad1816 100644 --- a/slb/src/main.rs +++ b/slb/src/main.rs @@ -90,9 +90,12 @@ struct Opt { #[structopt(long)] outprefix: PathBuf, - /// Buffer size in KB for reading chunks of input. + /// Buffer size in KB for buffering output before it's sent to a + /// corresponding folder from a mapper. /// - /// Note memory usage is O(bufsize * nthreads) + /// Note memory usage is O(bufsize * nthreads^2) since such a buffer + /// is maintained for each mapper/folder pair, but there's a TODO to + /// fix this. #[structopt(long)] bufsize: Option<usize>, @@ -147,35 +150,31 @@ fn main() { // Allow enough chunks for parallelism but not so few the chunksize // is small. - let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, bufsize); + let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, 16 * 1024); let nthreads = chunks.len(); // smaller b/c of min bufsize assert!(nthreads >= 1); - let mut mapper_processes: Vec<_> = (0..nthreads) - .map(|i| { + let mut mapper_processes: Vec<_> = chunks + .iter() + .enumerate() + .map(|(i, chunk)| { Command::new("/bin/bash") .arg("-c") - .arg(mapper_cmd) - .stdin(Stdio::piped()) + .arg(format!( + "/bin/bash -c 'head -c {} | {}'", + chunk.nbytes(), + mapper_cmd + )) + .stdin(chunk.file()) .stdout(Stdio::piped()) .spawn() .unwrap_or_else(|err| panic!("error spawn map child {}: {}", i, err)) }) .collect(); - let (mapper_inputs, mapper_outputs): (Vec<_>, Vec<_>) = mapper_processes + let mapper_outputs: Vec<_> = mapper_processes .iter_mut() - .map(|child| { - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - (stdin, stdout) - }) - .unzip(); - - let mapper_input_threads: Vec<_> = mapper_inputs - .into_iter() - .zip(chunks.into_iter()) - .map(|(input, chunk)| thread::spawn(move || chunk.dump(input))) + .map(|child| child.stdout.take().unwrap()) .collect(); let (txs, rxs): (Vec<_>, Vec<_>) = (0..nthreads).map(|_| sync_channel(queuesize)).unzip(); @@ -250,9 +249,6 @@ fn main() { }) .collect(); - mapper_input_threads - .into_iter() - .for_each(|handle| handle.join().expect("map input join")); mapper_processes .into_iter() .for_each(|mut child| assert!(child.wait().expect("wait").success())); diff --git a/slb/src/sharder.rs b/slb/src/sharder.rs index f6331fc..f167224 100644 --- a/slb/src/sharder.rs +++ b/slb/src/sharder.rs @@ -22,6 +22,12 @@ where R: BufRead, F: FnMut(usize, Vec<u8>), { + // TODO: currently memory usage here is O(bufsize * npartitions^2) + // we should still buffer as much as we can but keep global usage + // below some new `bufsize` given from command line. + // + // Can experiment with flushing only half the used bytes, prioritizing + // minimizing sends (so sending the largest bufs first). let mut bufs = vec![Vec::with_capacity(bufsize * 2); npartitions]; let npartitions: u64 = npartitions.try_into().unwrap(); r.for_byte_line_with_terminator(|line| { |