diff options
author | Michael Aaron Murphy <mmstickman@gmail.com> | 2016-08-26 17:51:47 -0400 |
---|---|---|
committer | Michael Aaron Murphy <mmstickman@gmail.com> | 2016-08-26 17:51:47 -0400 |
commit | ac3d3c40b450077172b099a019c1f3cdfa147e47 (patch) | |
tree | 985dca4af043661a394f26ff1277965a50d18fb5 | |
parent | 98712c9d604c94f5752437034014ebef8d9479dd (diff) |
Group Stdout/Stderr Buffers By Default0.0.1
-rw-r--r-- | README.md | 1 | ||||
-rw-r--r-- | src/command.rs | 25 | ||||
-rw-r--r-- | src/main.rs | 127 | ||||
-rw-r--r-- | src/parser.rs | 99 |
4 files changed, 175 insertions, 77 deletions
@@ -37,6 +37,7 @@ parallel ::: "echo 1" "echo 2" "echo 3" // If no command is supplied, the input In addition to the command syntax, there are also some options that you can use to configure the load balancer: - **-j**: Defines the number of jobs/threads to run in parallel. +- **--ungroup**: By default, stdout/stderr buffers are grouped in the order that they are received. Available syntax options for the placeholders values are: - **{}**: Each occurrence will be replaced with the name of the input. diff --git a/src/command.rs b/src/command.rs index 19e799e..a6af78f 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,21 +1,23 @@ use std::io::{StderrLock, Write}; -use std::process::Command; +use std::process::{Command, Output}; use tokenizer::Token; pub enum CommandErr { - Failed(String, Vec<String>) + Failed(String, Vec<String>, String) } impl CommandErr { pub fn handle(self, stderr: &mut StderrLock) { let _ = stderr.write(b"parallel: command error: "); match self { - CommandErr::Failed(command, arguments) => { + CommandErr::Failed(command, arguments, error) => { let _ = stderr.write(command.as_bytes()); for arg in &arguments { let _ = stderr.write(b" "); let _ = stderr.write(arg.as_bytes()); } + let _ = stderr.write(b": "); + let _ = stderr.write(error.as_bytes()); let _ = stderr.write(b"\n"); } } @@ -24,7 +26,7 @@ impl CommandErr { /// Builds the command and executes it pub fn exec(input: &str, command: &str, arg_tokens: &[Token], slot_id: &str, job_id: &str, - job_total :&str) -> Result<(), CommandErr> + job_total :&str, grouping: bool) -> Result<Option<Output>, CommandErr> { // First the arguments will be generated based on the tokens and input. let mut arguments = Vec::new(); @@ -42,12 +44,17 @@ pub fn exec(input: &str, command: &str, arg_tokens: &[Token], slot_id: &str, job arguments.push(String::from(input)); } - // Attempt to execute the command with the generated arguments. - if let Err(_) = Command::new(&command).args(&arguments).status() { - // If an error status is returned, return it to be printed. - return Err(CommandErr::Failed(String::from(command), arguments)); + if grouping { + Command::new(&command).args(&arguments).output() + .map(|x| Some(x)).map_err(|why| { + CommandErr::Failed(String::from(command), arguments, why.to_string()) + }) + } else { + match Command::new(&command).args(&arguments).status() { + Ok(_) => Ok(None), + Err(why) => Err(CommandErr::Failed(String::from(command), arguments, why.to_string())) + } } - Ok(()) } /// Builds arguments using the `tokens` template with the current `input` value. diff --git a/src/main.rs b/src/main.rs index 8b5d29f..7f9cab8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,10 +5,13 @@ mod parser; // Collects the input arguments given to the program. use std::io::{self, Write, BufRead}; use std::process::{Command, exit}; + use std::thread::{self, JoinHandle}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use parser::{ParseErr, parse_arguments}; +use std::sync::mpsc::channel; + +use parser::{Args, ParseErr}; /* TODO: Functionality can be increased to accept the following syntaxes from GNU Parallel: - Stdin support is currently missing. @@ -17,22 +20,30 @@ use parser::{ParseErr, parse_arguments}; - paralllel command ::: a b c :::+ 1 2 3 ::: d e f :::+ 4 5 6 */ +struct JobOutput { + id: usize, + stdout: Vec<u8>, + stderr: Vec<u8>, +} + fn main() { // Obtain a handle to standard error's buffer so we can write directly to it. let stderr = io::stderr(); - // The `num_cpus` crate allows conveniently obtaining the number of CPU cores in the system. - // This number will be used to determine how many threads to run in parallel. - let mut ncores = num_cpus::get(); - - // Initialize mutable vectors to store data that will be collected from input arguments. - let mut command = String::new(); - let mut argument_tokens = Vec::new(); - let mut inputs = Vec::new(); + let mut args = Args { + // The `num_cpus` crate allows conveniently obtaining the number of CPU cores in the system. + // This number will be used to determine how many threads to run in parallel. + ncores: num_cpus::get(), + // Defines whether stdout/stderr buffers should be printed in order. + grouped: true, + command: String::new(), + arguments: Vec::new(), + inputs: Vec::new() + }; // Let's collect all parameters that we need from the program's arguments. // If an error is returned, this will handle that error as efficiently as possible. - if let Err(why) = parse_arguments(&mut ncores, &mut command, &mut argument_tokens, &mut inputs) { + if let Err(why) = args.parse() { // Always lock an output buffer before using it. let mut stderr = stderr.lock(); let _ = stderr.write(b"parallel: parsing error: "); @@ -48,43 +59,51 @@ fn main() { } // If no inputs are provided, read from stdin instead. - if inputs.is_empty() { + if args.inputs.is_empty() { let stdin = io::stdin(); for line in stdin.lock().lines() { if let Ok(line) = line { - inputs.push(line) + args.inputs.push(line) } } } // If no command was given, then the inputs are actually commands themselves. - let input_is_command = command.is_empty(); + let input_is_command = args.command.is_empty(); // It will be useful to know the number of inputs, to know when to quit. - let num_inputs = inputs.len(); + let num_inputs = args.inputs.len(); // Keeps track of the current step in the input queue. // All threads will share this counter without stepping on each other's toes. let shared_counter = Arc::new(AtomicUsize::new(0)); // We will share the same list of inputs with each thread. - let shared_input = Arc::new(inputs); + let shared_input = Arc::new(args.inputs); + + // If grouping is enabled, stdout and stderr will be buffered. + let (output_tx, input_rx) = channel::<JobOutput>(); // First we will create as many threads as `ncores` specifies. // The `threads` vector will contain the thread handles needed to // know when to quit the program. - let mut threads: Vec<JoinHandle<()>> = Vec::with_capacity(ncores); - for slot in 1..ncores+1 { + let mut threads: Vec<JoinHandle<()>> = Vec::with_capacity(args.ncores); + for slot in 1..args.ncores+1 { // The command that each input variable will be sent to. - let command = command.clone(); + let command = args.command.clone(); // The base command template that each thread will use. - let argument_tokens = argument_tokens.clone(); + let argument_tokens = args.arguments.clone(); // Allow the thread to gain access to the list of inputs. let input = shared_input.clone(); // Allow the thread to access the input counter let counter = shared_counter.clone(); // Allow the thread to know when it's time to stop. let num_inputs = num_inputs; + // If grouped is set to true, stdout/stderr buffers will be collected. + let grouped = args.grouped; + // Each thread will receive it's own sender for sending stderr/stdout buffers. + let output_tx = output_tx.clone(); + // The actual thread where the work will happen on incoming data. let handle: JoinHandle<()> = thread::spawn(move || { @@ -108,7 +127,7 @@ fn main() { break } else { // Obtain the Nth input as well as the job ID - (&input[counter], (counter + 1).to_string()) + (&input[counter], (counter + 1)) } }; @@ -127,11 +146,21 @@ fn main() { } else { // Build a command by merging the command template with the input, // and then execute that command. - if let Err(cmd_err) = command::exec(input_var, &command, &argument_tokens, - &slot, &job_id, &job_total) + match command::exec(&input_var, &command, &argument_tokens, &slot, + &job_id.to_string(), &job_total, grouped) { - let mut stderr = stderr.lock(); - cmd_err.handle(&mut stderr); + Ok(Some(ref output)) if grouped => { + output_tx.send(JobOutput{ + id: job_id, + stdout: output.stdout.clone(), + stderr: output.stderr.clone(), + }).unwrap(); + }, + Ok(_) => (), + Err(cmd_err) => { + let mut stderr = stderr.lock(); + cmd_err.handle(&mut stderr); + } } } } @@ -142,6 +171,56 @@ fn main() { threads.push(handle); } + if args.grouped { + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + let mut stderr = stderr.lock(); + let mut counter = 1; + let mut buffer = Vec::new(); + while counter != num_inputs + 1 { + // Block and wait until a new buffer is received. + let output = input_rx.recv().unwrap(); + + // If the buffer ID is the next in line, print it, else add it to the buffer. + if output.id == counter { + let _ = stdout.write(&output.stdout); + let _ = stderr.write(&output.stderr); + counter += 1; + } else { + buffer.push(output); + } + + // Check to see if there are any stored buffers that can now be printed. + // Items in the buffer will be removed after they are used. + 'outer: loop { + let mut changed = false; + let mut drop = Vec::new(); + + // Loop through the list of buffers and print buffers with the next ID in line. + for (id, output) in buffer.iter().enumerate() { + if output.id == counter { + let _ = stdout.write(&output.stdout); + let _ = stderr.write(&output.stderr); + counter += 1; + changed = true; + drop.push(id); + } + } + + // Drop the buffers that were used. + if !drop.is_empty() { + drop.sort(); + for id in drop.iter().rev() { + let _ = buffer.remove(*id); + } + } + + // If no change is made during a loop, it's time to give up searching. + if !changed { break 'outer } + } + } + } + // Wait for each thread to complete before quitting the program. for thread in threads.into_iter() { thread.join().unwrap(); } } diff --git a/src/parser.rs b/src/parser.rs index 6221f49..2048ae0 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,60 +1,71 @@ use std::env; use tokenizer::{Token, tokenize}; +pub struct Args { + pub ncores: usize, + pub grouped: bool, + pub command: String, + pub arguments: Vec<Token>, + pub inputs: Vec<String> +} + pub enum ParseErr { JobsNaN(String), JobsNoValue, } -// Parses input arguments and stores their values into their associated variabless. -pub fn parse_arguments(ncores: &mut usize, command: &mut String, arg_tokens: &mut Vec<Token>, - input_variables: &mut Vec<String>) -> Result<(), ParseErr> -{ - let mut parsing_arguments = true; - let mut command_is_set = false; - let mut raw_args = env::args().skip(1).peekable(); - let mut comm = String::new(); - while let Some(argument) = raw_args.next() { - if parsing_arguments { - match argument.as_str() { - // Defines the number of jobs to run in parallel. - "-j" => { - match raw_args.peek() { - Some(val) => match val.parse::<usize>() { - Ok(val) => *ncores = val, - Err(_) => return Err(ParseErr::JobsNaN(val.clone())) - }, - None => return Err(ParseErr::JobsNoValue) - } - let _ = raw_args.next(); - }, - // Arguments after `:::` are input values. - ":::" => parsing_arguments = false, - _ => { - if command_is_set { - comm.push(' '); - comm.push_str(&argument); - } else { - comm.push_str(&argument); - command_is_set = true; + +impl Args { + pub fn parse(&mut self) -> Result<(), ParseErr> { + let mut parsing_arguments = true; + let mut command_is_set = false; + let mut raw_args = env::args().skip(1).peekable(); + let mut comm = String::new(); + while let Some(argument) = raw_args.next() { + if parsing_arguments { + match argument.as_str() { + // Defines the number of jobs to run in parallel. + "-j" if !command_is_set => { + match raw_args.peek() { + Some(val) => match val.parse::<usize>() { + Ok(val) => self.ncores = val, + Err(_) => return Err(ParseErr::JobsNaN(val.clone())) + }, + None => return Err(ParseErr::JobsNoValue) + } + let _ = raw_args.next(); + }, + "--ungroup" if !command_is_set => { + self.grouped = false; } + // Arguments after `:::` are input values. + ":::" => parsing_arguments = false, + _ => { + if command_is_set { + comm.push(' '); + comm.push_str(&argument); + } else { + comm.push_str(&argument); + command_is_set = true; + } + } } + } else { + self.inputs.push(argument); } - } else { - input_variables.push(argument); } - } - // This will fill in command and argument information needed by the threads. - // If there is a space in the argument, then the command has arguments - match comm.chars().position(|x| x == ' ') { - Some(pos) => { - *command = String::from(&comm[0..pos]); - *arg_tokens = tokenize(&comm[pos+1..]); - }, - None => *command = comm - } + // This will fill in command and argument information needed by the threads. + // If there is a space in the argument, then the command has arguments + match comm.chars().position(|x| x == ' ') { + Some(pos) => { + self.command = String::from(&comm[0..pos]); + self.arguments = tokenize(&comm[pos+1..]); + }, + None => self.command = comm + } - Ok(()) + Ok(()) + } } |