summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Feinberg <vladimir.feinberg@gmail.com>2021-06-29 07:19:25 -0700
committerVladimir Feinberg <vladimir.feinberg@gmail.com>2021-06-29 07:19:25 -0700
commitbd6a889bf7b910f53d8bb5d606876f4c38c2c91d (patch)
tree7bb87dbff2a14b73039d69ed31da8786ff8f670f
parentb5ee9056a82a15ea8de5d940513f3d484d31f997 (diff)
write to file directly
-rw-r--r--slb/Cargo.toml2
-rw-r--r--slb/src/main.rs33
2 files changed, 12 insertions, 23 deletions
diff --git a/slb/Cargo.toml b/slb/Cargo.toml
index eeb2de8..cb1283e 100644
--- a/slb/Cargo.toml
+++ b/slb/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "slb"
-version = "0.2.0"
+version = "0.2.1"
authors = ["Vladimir Feinberg <vladimir.feinberg@gmail.com>"]
edition = "2018"
description = "Sharded load balancing text-streaming Unix tool"
diff --git a/slb/src/main.rs b/slb/src/main.rs
index 5036a3e..954b479 100644
--- a/slb/src/main.rs
+++ b/slb/src/main.rs
@@ -1,7 +1,7 @@
//! `slb` main executable
use std::fs::File;
-use std::io::{BufReader, BufWriter, Write};
+use std::io::{BufReader, Write};
use std::ops::Deref;
use std::path::PathBuf;
use std::process::{Command, Stdio};
@@ -10,7 +10,6 @@ use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::thread;
-use bstr::io::BufReadExt;
use structopt::StructOpt;
use slb::{fileblocks, sharder};
@@ -182,11 +181,19 @@ fn main() {
let folder_processes: Vec<_> = (0..nthreads)
.map(|i| {
+ let outprefix = opt.outprefix.clone();
+ let width = format!("{}", nthreads - 1).len();
+ let suffix = format!("{:0>width$}", i, width = width);
+ let mut fname = outprefix.file_name().expect("file name").to_owned();
+ fname.push(&suffix);
+ let path = outprefix.with_file_name(fname);
+ let file = File::create(&path).expect("write file");
+
Command::new("/bin/bash")
.arg("-c")
.arg(folder_cmd)
.stdin(Stdio::piped())
- .stdout(Stdio::piped())
+ .stdout(file)
.spawn()
.unwrap_or_else(|err| panic!("error spawn fold child {}: {}", i, err))
})
@@ -197,9 +204,7 @@ fn main() {
let folder_input_output_threads: Vec<_> = folder_processes
.into_iter()
.zip(rxs.into_iter())
- .enumerate()
- .map(|(i, (mut child, rx))| {
- let outprefix = opt.outprefix.clone();
+ .map(|(mut child, rx)| {
thread::spawn(move || {
let mut child_stdin = child.stdin.take().expect("child stdin");
while let Ok(lines) = rx.recv() {
@@ -207,23 +212,7 @@ fn main() {
}
drop(child_stdin);
- let child_stdout = child.stdout.take().expect("child_stdout");
- let child_stdout = BufReader::new(child_stdout);
-
- let width = format!("{}", nthreads - 1).len();
- let suffix = format!("{:0>width$}", i, width = width);
- let mut fname = outprefix.file_name().expect("file name").to_owned();
- fname.push(&suffix);
- let path = outprefix.with_file_name(fname);
- // TODO: why am I copying this stream manually for no reason? This file should
- // just be handed to the folder output stdout() directly...
- let file = File::create(&path).expect("write file");
- let mut file = BufWriter::new(file);
- child_stdout
- .for_byte_line_with_terminator(|line: &[u8]| file.write_all(line).map(|_| true))
- .expect("write");
assert!(child.wait().expect("wait").success());
- file.flush().expect("flush");
})
})
.collect();