summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Aaron Murphy <mmstickman@gmail.com>2016-08-26 17:51:47 -0400
committerMichael Aaron Murphy <mmstickman@gmail.com>2016-08-26 17:51:47 -0400
commitac3d3c40b450077172b099a019c1f3cdfa147e47 (patch)
tree985dca4af043661a394f26ff1277965a50d18fb5
parent98712c9d604c94f5752437034014ebef8d9479dd (diff)
Group Stdout/Stderr Buffers By Default0.0.1
-rw-r--r--README.md1
-rw-r--r--src/command.rs25
-rw-r--r--src/main.rs127
-rw-r--r--src/parser.rs99
4 files changed, 175 insertions, 77 deletions
diff --git a/README.md b/README.md
index 1d4802d..5872779 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,7 @@ parallel ::: "echo 1" "echo 2" "echo 3" // If no command is supplied, the input
In addition to the command syntax, there are also some options that you can use to configure the load balancer:
- **-j**: Defines the number of jobs/threads to run in parallel.
+- **--ungroup**: By default, stdout/stderr buffers are grouped in the order that they are received.
Available syntax options for the placeholders values are:
- **{}**: Each occurrence will be replaced with the name of the input.
diff --git a/src/command.rs b/src/command.rs
index 19e799e..a6af78f 100644
--- a/src/command.rs
+++ b/src/command.rs
@@ -1,21 +1,23 @@
use std::io::{StderrLock, Write};
-use std::process::Command;
+use std::process::{Command, Output};
use tokenizer::Token;
pub enum CommandErr {
- Failed(String, Vec<String>)
+ Failed(String, Vec<String>, String)
}
impl CommandErr {
pub fn handle(self, stderr: &mut StderrLock) {
let _ = stderr.write(b"parallel: command error: ");
match self {
- CommandErr::Failed(command, arguments) => {
+ CommandErr::Failed(command, arguments, error) => {
let _ = stderr.write(command.as_bytes());
for arg in &arguments {
let _ = stderr.write(b" ");
let _ = stderr.write(arg.as_bytes());
}
+ let _ = stderr.write(b": ");
+ let _ = stderr.write(error.as_bytes());
let _ = stderr.write(b"\n");
}
}
@@ -24,7 +26,7 @@ impl CommandErr {
/// Builds the command and executes it
pub fn exec(input: &str, command: &str, arg_tokens: &[Token], slot_id: &str, job_id: &str,
- job_total :&str) -> Result<(), CommandErr>
+ job_total :&str, grouping: bool) -> Result<Option<Output>, CommandErr>
{
// First the arguments will be generated based on the tokens and input.
let mut arguments = Vec::new();
@@ -42,12 +44,17 @@ pub fn exec(input: &str, command: &str, arg_tokens: &[Token], slot_id: &str, job
arguments.push(String::from(input));
}
- // Attempt to execute the command with the generated arguments.
- if let Err(_) = Command::new(&command).args(&arguments).status() {
- // If an error status is returned, return it to be printed.
- return Err(CommandErr::Failed(String::from(command), arguments));
+ if grouping {
+ Command::new(&command).args(&arguments).output()
+ .map(|x| Some(x)).map_err(|why| {
+ CommandErr::Failed(String::from(command), arguments, why.to_string())
+ })
+ } else {
+ match Command::new(&command).args(&arguments).status() {
+ Ok(_) => Ok(None),
+ Err(why) => Err(CommandErr::Failed(String::from(command), arguments, why.to_string()))
+ }
}
- Ok(())
}
/// Builds arguments using the `tokens` template with the current `input` value.
diff --git a/src/main.rs b/src/main.rs
index 8b5d29f..7f9cab8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,10 +5,13 @@ mod parser; // Collects the input arguments given to the program.
use std::io::{self, Write, BufRead};
use std::process::{Command, exit};
+
use std::thread::{self, JoinHandle};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
-use parser::{ParseErr, parse_arguments};
+use std::sync::mpsc::channel;
+
+use parser::{Args, ParseErr};
/* TODO: Functionality can be increased to accept the following syntaxes from GNU Parallel:
- Stdin support is currently missing.
@@ -17,22 +20,30 @@ use parser::{ParseErr, parse_arguments};
- paralllel command ::: a b c :::+ 1 2 3 ::: d e f :::+ 4 5 6
*/
+struct JobOutput {
+ id: usize,
+ stdout: Vec<u8>,
+ stderr: Vec<u8>,
+}
+
fn main() {
// Obtain a handle to standard error's buffer so we can write directly to it.
let stderr = io::stderr();
- // The `num_cpus` crate allows conveniently obtaining the number of CPU cores in the system.
- // This number will be used to determine how many threads to run in parallel.
- let mut ncores = num_cpus::get();
-
- // Initialize mutable vectors to store data that will be collected from input arguments.
- let mut command = String::new();
- let mut argument_tokens = Vec::new();
- let mut inputs = Vec::new();
+ let mut args = Args {
+ // The `num_cpus` crate allows conveniently obtaining the number of CPU cores in the system.
+ // This number will be used to determine how many threads to run in parallel.
+ ncores: num_cpus::get(),
+ // Defines whether stdout/stderr buffers should be printed in order.
+ grouped: true,
+ command: String::new(),
+ arguments: Vec::new(),
+ inputs: Vec::new()
+ };
// Let's collect all parameters that we need from the program's arguments.
// If an error is returned, this will handle that error as efficiently as possible.
- if let Err(why) = parse_arguments(&mut ncores, &mut command, &mut argument_tokens, &mut inputs) {
+ if let Err(why) = args.parse() {
// Always lock an output buffer before using it.
let mut stderr = stderr.lock();
let _ = stderr.write(b"parallel: parsing error: ");
@@ -48,43 +59,51 @@ fn main() {
}
// If no inputs are provided, read from stdin instead.
- if inputs.is_empty() {
+ if args.inputs.is_empty() {
let stdin = io::stdin();
for line in stdin.lock().lines() {
if let Ok(line) = line {
- inputs.push(line)
+ args.inputs.push(line)
}
}
}
// If no command was given, then the inputs are actually commands themselves.
- let input_is_command = command.is_empty();
+ let input_is_command = args.command.is_empty();
// It will be useful to know the number of inputs, to know when to quit.
- let num_inputs = inputs.len();
+ let num_inputs = args.inputs.len();
// Keeps track of the current step in the input queue.
// All threads will share this counter without stepping on each other's toes.
let shared_counter = Arc::new(AtomicUsize::new(0));
// We will share the same list of inputs with each thread.
- let shared_input = Arc::new(inputs);
+ let shared_input = Arc::new(args.inputs);
+
+ // If grouping is enabled, stdout and stderr will be buffered.
+ let (output_tx, input_rx) = channel::<JobOutput>();
// First we will create as many threads as `ncores` specifies.
// The `threads` vector will contain the thread handles needed to
// know when to quit the program.
- let mut threads: Vec<JoinHandle<()>> = Vec::with_capacity(ncores);
- for slot in 1..ncores+1 {
+ let mut threads: Vec<JoinHandle<()>> = Vec::with_capacity(args.ncores);
+ for slot in 1..args.ncores+1 {
// The command that each input variable will be sent to.
- let command = command.clone();
+ let command = args.command.clone();
// The base command template that each thread will use.
- let argument_tokens = argument_tokens.clone();
+ let argument_tokens = args.arguments.clone();
// Allow the thread to gain access to the list of inputs.
let input = shared_input.clone();
// Allow the thread to access the input counter
let counter = shared_counter.clone();
// Allow the thread to know when it's time to stop.
let num_inputs = num_inputs;
+ // If grouped is set to true, stdout/stderr buffers will be collected.
+ let grouped = args.grouped;
+ // Each thread will receive it's own sender for sending stderr/stdout buffers.
+ let output_tx = output_tx.clone();
+
// The actual thread where the work will happen on incoming data.
let handle: JoinHandle<()> = thread::spawn(move || {
@@ -108,7 +127,7 @@ fn main() {
break
} else {
// Obtain the Nth input as well as the job ID
- (&input[counter], (counter + 1).to_string())
+ (&input[counter], (counter + 1))
}
};
@@ -127,11 +146,21 @@ fn main() {
} else {
// Build a command by merging the command template with the input,
// and then execute that command.
- if let Err(cmd_err) = command::exec(input_var, &command, &argument_tokens,
- &slot, &job_id, &job_total)
+ match command::exec(&input_var, &command, &argument_tokens, &slot,
+ &job_id.to_string(), &job_total, grouped)
{
- let mut stderr = stderr.lock();
- cmd_err.handle(&mut stderr);
+ Ok(Some(ref output)) if grouped => {
+ output_tx.send(JobOutput{
+ id: job_id,
+ stdout: output.stdout.clone(),
+ stderr: output.stderr.clone(),
+ }).unwrap();
+ },
+ Ok(_) => (),
+ Err(cmd_err) => {
+ let mut stderr = stderr.lock();
+ cmd_err.handle(&mut stderr);
+ }
}
}
}
@@ -142,6 +171,56 @@ fn main() {
threads.push(handle);
}
+ if args.grouped {
+ let stdout = io::stdout();
+ let mut stdout = stdout.lock();
+ let mut stderr = stderr.lock();
+ let mut counter = 1;
+ let mut buffer = Vec::new();
+ while counter != num_inputs + 1 {
+ // Block and wait until a new buffer is received.
+ let output = input_rx.recv().unwrap();
+
+ // If the buffer ID is the next in line, print it, else add it to the buffer.
+ if output.id == counter {
+ let _ = stdout.write(&output.stdout);
+ let _ = stderr.write(&output.stderr);
+ counter += 1;
+ } else {
+ buffer.push(output);
+ }
+
+ // Check to see if there are any stored buffers that can now be printed.
+ // Items in the buffer will be removed after they are used.
+ 'outer: loop {
+ let mut changed = false;
+ let mut drop = Vec::new();
+
+ // Loop through the list of buffers and print buffers with the next ID in line.
+ for (id, output) in buffer.iter().enumerate() {
+ if output.id == counter {
+ let _ = stdout.write(&output.stdout);
+ let _ = stderr.write(&output.stderr);
+ counter += 1;
+ changed = true;
+ drop.push(id);
+ }
+ }
+
+ // Drop the buffers that were used.
+ if !drop.is_empty() {
+ drop.sort();
+ for id in drop.iter().rev() {
+ let _ = buffer.remove(*id);
+ }
+ }
+
+ // If no change is made during a loop, it's time to give up searching.
+ if !changed { break 'outer }
+ }
+ }
+ }
+
// Wait for each thread to complete before quitting the program.
for thread in threads.into_iter() { thread.join().unwrap(); }
}
diff --git a/src/parser.rs b/src/parser.rs
index 6221f49..2048ae0 100644
--- a/src/parser.rs
+++ b/src/parser.rs
@@ -1,60 +1,71 @@
use std::env;
use tokenizer::{Token, tokenize};
+pub struct Args {
+ pub ncores: usize,
+ pub grouped: bool,
+ pub command: String,
+ pub arguments: Vec<Token>,
+ pub inputs: Vec<String>
+}
+
pub enum ParseErr {
JobsNaN(String),
JobsNoValue,
}
-// Parses input arguments and stores their values into their associated variabless.
-pub fn parse_arguments(ncores: &mut usize, command: &mut String, arg_tokens: &mut Vec<Token>,
- input_variables: &mut Vec<String>) -> Result<(), ParseErr>
-{
- let mut parsing_arguments = true;
- let mut command_is_set = false;
- let mut raw_args = env::args().skip(1).peekable();
- let mut comm = String::new();
- while let Some(argument) = raw_args.next() {
- if parsing_arguments {
- match argument.as_str() {
- // Defines the number of jobs to run in parallel.
- "-j" => {
- match raw_args.peek() {
- Some(val) => match val.parse::<usize>() {
- Ok(val) => *ncores = val,
- Err(_) => return Err(ParseErr::JobsNaN(val.clone()))
- },
- None => return Err(ParseErr::JobsNoValue)
- }
- let _ = raw_args.next();
- },
- // Arguments after `:::` are input values.
- ":::" => parsing_arguments = false,
- _ => {
- if command_is_set {
- comm.push(' ');
- comm.push_str(&argument);
- } else {
- comm.push_str(&argument);
- command_is_set = true;
+
+impl Args {
+ pub fn parse(&mut self) -> Result<(), ParseErr> {
+ let mut parsing_arguments = true;
+ let mut command_is_set = false;
+ let mut raw_args = env::args().skip(1).peekable();
+ let mut comm = String::new();
+ while let Some(argument) = raw_args.next() {
+ if parsing_arguments {
+ match argument.as_str() {
+ // Defines the number of jobs to run in parallel.
+ "-j" if !command_is_set => {
+ match raw_args.peek() {
+ Some(val) => match val.parse::<usize>() {
+ Ok(val) => self.ncores = val,
+ Err(_) => return Err(ParseErr::JobsNaN(val.clone()))
+ },
+ None => return Err(ParseErr::JobsNoValue)
+ }
+ let _ = raw_args.next();
+ },
+ "--ungroup" if !command_is_set => {
+ self.grouped = false;
}
+ // Arguments after `:::` are input values.
+ ":::" => parsing_arguments = false,
+ _ => {
+ if command_is_set {
+ comm.push(' ');
+ comm.push_str(&argument);
+ } else {
+ comm.push_str(&argument);
+ command_is_set = true;
+ }
+ }
}
+ } else {
+ self.inputs.push(argument);
}
- } else {
- input_variables.push(argument);
}
- }
- // This will fill in command and argument information needed by the threads.
- // If there is a space in the argument, then the command has arguments
- match comm.chars().position(|x| x == ' ') {
- Some(pos) => {
- *command = String::from(&comm[0..pos]);
- *arg_tokens = tokenize(&comm[pos+1..]);
- },
- None => *command = comm
- }
+ // This will fill in command and argument information needed by the threads.
+ // If there is a space in the argument, then the command has arguments
+ match comm.chars().position(|x| x == ' ') {
+ Some(pos) => {
+ self.command = String::from(&comm[0..pos]);
+ self.arguments = tokenize(&comm[pos+1..]);
+ },
+ None => self.command = comm
+ }
- Ok(())
+ Ok(())
+ }
}