diff options
author | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-11 22:55:28 -0500 |
---|---|---|
committer | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-11 22:55:28 -0500 |
commit | 8d9cd3991f20242d69cbfd23095e1efa96d86924 (patch) | |
tree | b8f8ba5625cf3fd1a5f550f7aa5957f38d28f190 | |
parent | b2a2997d08df4407bbd0ef778c4be805c1cf9845 (diff) |
0.10.0: New Release0.10.0
- Reading from standard input now prints a notification
- Fixed an issue with reading inputs from standard input that are commands
- Some refactoring and a version bump
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | README.md | 3 | ||||
-rw-r--r-- | src/arguments/man.rs | 5 | ||||
-rw-r--r-- | src/arguments/mod.rs | 22 | ||||
-rw-r--r-- | src/execute/child.rs | 29 | ||||
-rw-r--r-- | src/execute/exec_commands.rs | 28 | ||||
-rw-r--r-- | src/execute/exec_inputs.rs | 26 | ||||
-rw-r--r-- | src/execute/mod.rs | 1 | ||||
-rw-r--r-- | src/main.rs | 18 |
10 files changed, 68 insertions, 70 deletions
@@ -1,6 +1,6 @@ [root] name = "parallel" -version = "0.9.0" +version = "0.10.0" dependencies = [ "arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1,6 +1,6 @@ [package] name = "parallel" -version = "0.9.0" +version = "0.10.0" authors = ["Michael Aaron Murphy <mmstickman@gmail.com>"] license = "MIT" description = "Command-line CPU load balancer for executing jobs in parallel" @@ -20,7 +20,7 @@ wait-timeout = "0.1" [profile.release] opt-level = 3 -debug = true +lto = true [package.metadata.deb] maintainer = "Michael Aaron Murphy <mmstickman@gmail.com>" @@ -251,6 +251,7 @@ operates: - **--eta**: Prints the estimated time to complete based on average runtime of running processes. - **-h**, **--help**: Prints the manual for the application (recommended to pipe it to `less`). - **-j**, **--jobs**: Defines the number of jobs/threads to run in parallel. +- **--joblog**: Logs job statistics to a designated file as they are completed. - **--memfree**: Defines the minimum amount of memory available before starting the next job. - **-n**, **--max-args**: Groups up to a certain number of arguments together in the same command line. - **--num-cpu-cores**: Prints the number of CPU cores in the system and exits. @@ -258,7 +259,7 @@ operates: instead supply the arguments directly to the standard input of each child process. - **-s**, **--silent**, **--quiet**: Disables printing the standard output of running processes. - **--shebang**: Grants ability to utilize the parallel command as an interpreter via calling it within a shebang line. -- **--shellquote**: Expands upon quote mode by escaping a wide variety of special characters. +- **--shellquote**: Prints commands that will be executed, with the commands quoted. - **--timeout**: If a command runs for longer than a specified number of seconds, it will be killed with a SIGKILL. - **-v**, **--verbose**: Prints information about running processes. - **--version**: Prints the current version of the application and it's dependencies. diff --git a/src/arguments/man.rs b/src/arguments/man.rs index 2d78b2f..5148d63 100644 --- a/src/arguments/man.rs +++ b/src/arguments/man.rs @@ -91,6 +91,9 @@ OPTIONS Values may be written as a number (12) or as a percent (150%). The default value is the number of CPU cores in the system. + --joblog: + Logs job statistics to a designated file as they are completed. + --memfree: Defines the minimum amount of memory available before starting the next job. @@ -116,7 +119,7 @@ OPTIONS calling it within a shebang line. --shellquote: - Expands upon quote mode by escaping a wide variety of special characters. + Prints commands that will be executed, with the commands quoted. --timeout: If a command runs for longer than a specified number of seconds, it will be diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs index ed66a83..0557d44 100644 --- a/src/arguments/mod.rs +++ b/src/arguments/mod.rs @@ -85,9 +85,9 @@ impl Args { if env::args().len() > 1 { // The first argument defines which `mode` to shift into and which argument `index` to start from. let (mut mode, mut index) = match arguments[1].as_str() { - ":::" | ":::+" => { self.flags |= INPUTS_ARE_COMMANDS; (Mode::Inputs, 2) }, - "::::" | "::::+" => { self.flags |= INPUTS_ARE_COMMANDS; (Mode::Files, 2) }, - _ => (Mode::Arguments, 1) + ":::" | ":::+" => (Mode::Inputs, 2), + "::::" | "::::+" => (Mode::Files, 2), + _ => (Mode::Arguments, 1) }; // If the `--shebang` parameter was passed, this will be set to `true`. @@ -175,7 +175,7 @@ impl Args { } "verbose" => self.flags |= VERBOSE_MODE, "version" => { - println!("MIT/Rust Parallel 0.9.0\n"); + println!("MIT/Rust Parallel 0.10.0\n"); exit(0); } _ if &argument[2..9] == "shebang" => { @@ -188,14 +188,8 @@ impl Args { } } else { match argument.as_str() { - ":::" => { - mode = Mode::Inputs; - self.flags |= INPUTS_ARE_COMMANDS; - }, - "::::" => { - mode = Mode::Files; - self.flags |= INPUTS_ARE_COMMANDS; - } + ":::" => mode = Mode::Inputs, + "::::" => mode = Mode::Files, _ => { // The command has been supplied, and argument parsing is over. comm.push_str(argument); @@ -238,7 +232,6 @@ impl Args { number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?; } else if let Some(path) = redirection::input_was_redirected() { - self.flags |= INPUTS_ARE_COMMANDS; file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?; number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?; } @@ -252,6 +245,8 @@ impl Args { // Flush the contents of the buffer to the disk before tokenizing the command argument. let _ = disk_buffer.flush(); + if comm.is_empty() { self.flags |= INPUTS_ARE_COMMANDS; } + Ok(number_of_arguments) } } @@ -260,6 +255,7 @@ impl Args { fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unprocessed_path: &Path) -> Result<usize, ParseErr> { + println!("parallel: reading inputs from standard input"); let mut number_of_arguments = 0; let stdin = io::stdin(); diff --git a/src/execute/child.rs b/src/execute/child.rs new file mode 100644 index 0000000..50a21a9 --- /dev/null +++ b/src/execute/child.rs @@ -0,0 +1,29 @@ +use arguments::QUIET_MODE; +use std::process::Child; +use std::sync::mpsc::Sender; +use std::time::Duration; +use wait_timeout::ChildExt; +use time::{get_time, Timespec}; +use super::signals; +use super::pipe::disk::output as pipe_output; +use super::pipe::disk::State; + +pub fn handle_child(mut child: Child, output: &Sender<State>, flags: u16, job_id: usize, input: String, + has_timeout: bool, timeout: Duration) -> (Timespec, Timespec, i32, i32) +{ + let start_time = get_time(); + if has_timeout && child.wait_timeout(timeout).unwrap().is_none() { + let _ = child.kill(); + pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0); + (start_time, get_time(), -1, 15) + } else { + pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0); + match child.wait() { + Ok(status) => match status.code() { + Some(exit) => (start_time, get_time(), exit, 0), + None => (start_time, get_time(), -1, signals::get(status)) + }, + Err(_) => (start_time, get_time(), -1, 0), + } + } +} diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs index 59cec66..dc5feb2 100644 --- a/src/execute/exec_commands.rs +++ b/src/execute/exec_commands.rs @@ -1,15 +1,13 @@ -use arguments::{QUIET_MODE, VERBOSE_MODE, JOBLOG}; +use arguments::{VERBOSE_MODE, JOBLOG}; use execute::command::{self, CommandErr}; use input_iterator::InputsLock; use misc::NumToA; use time::{self, Timespec}; use tokenizer::Token; -use wait_timeout::ChildExt; use verbose; -use super::pipe::disk::output as pipe_output; use super::pipe::disk::State; use super::job_log::JobLog; -use super::signals; +use super::child::handle_child; use std::io::{self, Write}; use std::sync::mpsc::Sender; @@ -57,25 +55,11 @@ impl<'a> ExecCommands<'a> { command_template: self.arguments, flags: self.flags }; - + command_buffer.clear(); let (start_time, end_time, exit_value, signal) = match command.exec(command_buffer) { - Ok(mut child) => { - let start_time = time::get_time(); - if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() { - let _ = child.kill(); - pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0); - (start_time, time::get_time(), -1, 15) - } else { - pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0); - match child.wait() { - Ok(status) => match status.code() { - Some(exit) => (start_time, time::get_time(), exit, 0), - None => (start_time, time::get_time(), -1, signals::get(status)) - }, - Err(_) => (start_time, time::get_time(), -1, 0), - } - } + Ok(child) => { + handle_child(child, &self.output_tx, self.flags, job_id, input.clone(), has_timeout, self.timeout) }, Err(cmd_err) => { let mut stderr = stderr.lock(); @@ -92,7 +76,7 @@ impl<'a> ExecCommands<'a> { }; if self.flags & JOBLOG != 0 { - let runtime = end_time - start_time; + let runtime: time::Duration = end_time - start_time; let _ = self.output_tx.send(State::JobLog(JobLog { job_id: job_id, start_time: start_time, diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs index 31ee5a6..440a2f7 100644 --- a/src/execute/exec_inputs.rs +++ b/src/execute/exec_inputs.rs @@ -1,14 +1,12 @@ -use arguments::{self, JOBLOG, QUIET_MODE}; +use arguments::{self, JOBLOG}; use execute::command; use input_iterator::InputsLock; use shell; -use time::{self, Timespec}; +use time::Timespec; use verbose; -use wait_timeout::ChildExt; use super::job_log::JobLog; -use super::pipe::disk::output as pipe_output; use super::pipe::disk::State; -use super::signals; +use super::child::handle_child; use std::u16; use std::time::Duration; @@ -45,22 +43,8 @@ impl ExecInputs { } let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) { - Ok(mut child) => { - let start_time = time::get_time(); - if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() { - let _ = child.kill(); - pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0); - (start_time, time::get_time(), -1, 15) - } else { - pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0); - match child.wait() { - Ok(status) => match status.code() { - Some(exit) => (start_time, time::get_time(), exit, 0), - None => (start_time, time::get_time(), -1, signals::get(status)) - }, - Err(_) => (start_time, time::get_time(), -1, 0), - } - } + Ok(child) => { + handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout) }, Err(why) => { let mut stderr = stderr.lock(); diff --git a/src/execute/mod.rs b/src/execute/mod.rs index b5b8a38..09c928f 100644 --- a/src/execute/mod.rs +++ b/src/execute/mod.rs @@ -1,3 +1,4 @@ +mod child; mod dry; mod exec_commands; mod exec_inputs; diff --git a/src/main.rs b/src/main.rs index ca9f1d2..0bf937d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ -// #![deny(dead_code)] -// #![deny(unused_imports)] +#![deny(dead_code)] #![allow(unknown_lints)] #![feature(loop_break_value)] #![feature(alloc_system)] @@ -39,13 +38,14 @@ use execute::pipe::disk::State; use input_iterator::{InputIterator, InputsLock}; use tokenizer::{Token, TokenErr, tokenize}; -/// Coercing the `command` `String` into a `&'static str` is required to share it among all threads. /// The command string needs to be available in memory for the entirety of the application, so this -/// is achievable by leaking the memory so it attains a `'static` lifetime. -unsafe fn leak_command(comm: String) -> &'static str { - let static_comm = mem::transmute(&comm as &str); +/// is achievable by transmuting the lifetime of the reference into a static lifetime. To guarantee +/// that this is perfectly safe, and that the reference will live outside the scope, the value will +/// also be leaked so that it is forced to remain in memory for the remainder of the application. +unsafe fn leak_string(comm: String) -> &'static str { + let new_comm = mem::transmute(&comm as &str); mem::forget(comm); - static_comm + new_comm } unsafe fn static_arg(args: &[Token]) -> &'static [Token] { @@ -73,10 +73,10 @@ fn main() { // Coerce the `comm` `String` into a `&'static str` so that it may be shared by all threads. // This is safe because the original `comm` may no longer be modified due to shadowing rules. // It is also safe because `comm` lives to the end of the program. - let comm = unsafe { leak_command(comm) }; + let static_comm = unsafe { leak_string(comm) }; // Attempt to tokenize the command argument into simple primitive placeholders. - if let Err(error) = tokenize(&mut args.arguments, comm, &unprocessed_path, args.ninputs) { + if let Err(error) = tokenize(&mut args.arguments, static_comm, &unprocessed_path, args.ninputs) { let mut stderr = stderr.lock(); match error { TokenErr::File(why) => { |