summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Aaron Murphy <mmstickman@gmail.com>2017-01-11 22:55:28 -0500
committerMichael Aaron Murphy <mmstickman@gmail.com>2017-01-11 22:55:28 -0500
commit8d9cd3991f20242d69cbfd23095e1efa96d86924 (patch)
treeb8f8ba5625cf3fd1a5f550f7aa5957f38d28f190
parentb2a2997d08df4407bbd0ef778c4be805c1cf9845 (diff)
0.10.0: New Release0.10.0
- Reading from standard input now prints a notification - Fixed an issue with reading inputs from standard input that are commands - Some refactoring and a version bump
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml4
-rw-r--r--README.md3
-rw-r--r--src/arguments/man.rs5
-rw-r--r--src/arguments/mod.rs22
-rw-r--r--src/execute/child.rs29
-rw-r--r--src/execute/exec_commands.rs28
-rw-r--r--src/execute/exec_inputs.rs26
-rw-r--r--src/execute/mod.rs1
-rw-r--r--src/main.rs18
10 files changed, 68 insertions, 70 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 171be53..b3fd117 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1,6 +1,6 @@
[root]
name = "parallel"
-version = "0.9.0"
+version = "0.10.0"
dependencies = [
"arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/Cargo.toml b/Cargo.toml
index c51ce6b..b4afc7f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "parallel"
-version = "0.9.0"
+version = "0.10.0"
authors = ["Michael Aaron Murphy <mmstickman@gmail.com>"]
license = "MIT"
description = "Command-line CPU load balancer for executing jobs in parallel"
@@ -20,7 +20,7 @@ wait-timeout = "0.1"
[profile.release]
opt-level = 3
-debug = true
+lto = true
[package.metadata.deb]
maintainer = "Michael Aaron Murphy <mmstickman@gmail.com>"
diff --git a/README.md b/README.md
index 0de2784..ecec8ca 100644
--- a/README.md
+++ b/README.md
@@ -251,6 +251,7 @@ operates:
- **--eta**: Prints the estimated time to complete based on average runtime of running processes.
- **-h**, **--help**: Prints the manual for the application (recommended to pipe it to `less`).
- **-j**, **--jobs**: Defines the number of jobs/threads to run in parallel.
+- **--joblog**: Logs job statistics to a designated file as they are completed.
- **--memfree**: Defines the minimum amount of memory available before starting the next job.
- **-n**, **--max-args**: Groups up to a certain number of arguments together in the same command line.
- **--num-cpu-cores**: Prints the number of CPU cores in the system and exits.
@@ -258,7 +259,7 @@ operates:
instead supply the arguments directly to the standard input of each child process.
- **-s**, **--silent**, **--quiet**: Disables printing the standard output of running processes.
- **--shebang**: Grants ability to utilize the parallel command as an interpreter via calling it within a shebang line.
-- **--shellquote**: Expands upon quote mode by escaping a wide variety of special characters.
+- **--shellquote**: Prints commands that will be executed, with the commands quoted.
- **--timeout**: If a command runs for longer than a specified number of seconds, it will be killed with a SIGKILL.
- **-v**, **--verbose**: Prints information about running processes.
- **--version**: Prints the current version of the application and it's dependencies.
diff --git a/src/arguments/man.rs b/src/arguments/man.rs
index 2d78b2f..5148d63 100644
--- a/src/arguments/man.rs
+++ b/src/arguments/man.rs
@@ -91,6 +91,9 @@ OPTIONS
Values may be written as a number (12) or as a percent (150%).
The default value is the number of CPU cores in the system.
+ --joblog:
+ Logs job statistics to a designated file as they are completed.
+
--memfree:
Defines the minimum amount of memory available before starting the next job.
@@ -116,7 +119,7 @@ OPTIONS
calling it within a shebang line.
--shellquote:
- Expands upon quote mode by escaping a wide variety of special characters.
+ Prints commands that will be executed, with the commands quoted.
--timeout:
If a command runs for longer than a specified number of seconds, it will be
diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs
index ed66a83..0557d44 100644
--- a/src/arguments/mod.rs
+++ b/src/arguments/mod.rs
@@ -85,9 +85,9 @@ impl Args {
if env::args().len() > 1 {
// The first argument defines which `mode` to shift into and which argument `index` to start from.
let (mut mode, mut index) = match arguments[1].as_str() {
- ":::" | ":::+" => { self.flags |= INPUTS_ARE_COMMANDS; (Mode::Inputs, 2) },
- "::::" | "::::+" => { self.flags |= INPUTS_ARE_COMMANDS; (Mode::Files, 2) },
- _ => (Mode::Arguments, 1)
+ ":::" | ":::+" => (Mode::Inputs, 2),
+ "::::" | "::::+" => (Mode::Files, 2),
+ _ => (Mode::Arguments, 1)
};
// If the `--shebang` parameter was passed, this will be set to `true`.
@@ -175,7 +175,7 @@ impl Args {
}
"verbose" => self.flags |= VERBOSE_MODE,
"version" => {
- println!("MIT/Rust Parallel 0.9.0\n");
+ println!("MIT/Rust Parallel 0.10.0\n");
exit(0);
}
_ if &argument[2..9] == "shebang" => {
@@ -188,14 +188,8 @@ impl Args {
}
} else {
match argument.as_str() {
- ":::" => {
- mode = Mode::Inputs;
- self.flags |= INPUTS_ARE_COMMANDS;
- },
- "::::" => {
- mode = Mode::Files;
- self.flags |= INPUTS_ARE_COMMANDS;
- }
+ ":::" => mode = Mode::Inputs,
+ "::::" => mode = Mode::Files,
_ => {
// The command has been supplied, and argument parsing is over.
comm.push_str(argument);
@@ -238,7 +232,6 @@ impl Args {
number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?;
} else if let Some(path) = redirection::input_was_redirected() {
- self.flags |= INPUTS_ARE_COMMANDS;
file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?;
number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?;
}
@@ -252,6 +245,8 @@ impl Args {
// Flush the contents of the buffer to the disk before tokenizing the command argument.
let _ = disk_buffer.flush();
+ if comm.is_empty() { self.flags |= INPUTS_ARE_COMMANDS; }
+
Ok(number_of_arguments)
}
}
@@ -260,6 +255,7 @@ impl Args {
fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unprocessed_path: &Path)
-> Result<usize, ParseErr>
{
+ println!("parallel: reading inputs from standard input");
let mut number_of_arguments = 0;
let stdin = io::stdin();
diff --git a/src/execute/child.rs b/src/execute/child.rs
new file mode 100644
index 0000000..50a21a9
--- /dev/null
+++ b/src/execute/child.rs
@@ -0,0 +1,29 @@
+use arguments::QUIET_MODE;
+use std::process::Child;
+use std::sync::mpsc::Sender;
+use std::time::Duration;
+use wait_timeout::ChildExt;
+use time::{get_time, Timespec};
+use super::signals;
+use super::pipe::disk::output as pipe_output;
+use super::pipe::disk::State;
+
+pub fn handle_child(mut child: Child, output: &Sender<State>, flags: u16, job_id: usize, input: String,
+ has_timeout: bool, timeout: Duration) -> (Timespec, Timespec, i32, i32)
+{
+ let start_time = get_time();
+ if has_timeout && child.wait_timeout(timeout).unwrap().is_none() {
+ let _ = child.kill();
+ pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0);
+ (start_time, get_time(), -1, 15)
+ } else {
+ pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0);
+ match child.wait() {
+ Ok(status) => match status.code() {
+ Some(exit) => (start_time, get_time(), exit, 0),
+ None => (start_time, get_time(), -1, signals::get(status))
+ },
+ Err(_) => (start_time, get_time(), -1, 0),
+ }
+ }
+}
diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs
index 59cec66..dc5feb2 100644
--- a/src/execute/exec_commands.rs
+++ b/src/execute/exec_commands.rs
@@ -1,15 +1,13 @@
-use arguments::{QUIET_MODE, VERBOSE_MODE, JOBLOG};
+use arguments::{VERBOSE_MODE, JOBLOG};
use execute::command::{self, CommandErr};
use input_iterator::InputsLock;
use misc::NumToA;
use time::{self, Timespec};
use tokenizer::Token;
-use wait_timeout::ChildExt;
use verbose;
-use super::pipe::disk::output as pipe_output;
use super::pipe::disk::State;
use super::job_log::JobLog;
-use super::signals;
+use super::child::handle_child;
use std::io::{self, Write};
use std::sync::mpsc::Sender;
@@ -57,25 +55,11 @@ impl<'a> ExecCommands<'a> {
command_template: self.arguments,
flags: self.flags
};
-
+
command_buffer.clear();
let (start_time, end_time, exit_value, signal) = match command.exec(command_buffer) {
- Ok(mut child) => {
- let start_time = time::get_time();
- if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() {
- let _ = child.kill();
- pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0);
- (start_time, time::get_time(), -1, 15)
- } else {
- pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0);
- match child.wait() {
- Ok(status) => match status.code() {
- Some(exit) => (start_time, time::get_time(), exit, 0),
- None => (start_time, time::get_time(), -1, signals::get(status))
- },
- Err(_) => (start_time, time::get_time(), -1, 0),
- }
- }
+ Ok(child) => {
+ handle_child(child, &self.output_tx, self.flags, job_id, input.clone(), has_timeout, self.timeout)
},
Err(cmd_err) => {
let mut stderr = stderr.lock();
@@ -92,7 +76,7 @@ impl<'a> ExecCommands<'a> {
};
if self.flags & JOBLOG != 0 {
- let runtime = end_time - start_time;
+ let runtime: time::Duration = end_time - start_time;
let _ = self.output_tx.send(State::JobLog(JobLog {
job_id: job_id,
start_time: start_time,
diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs
index 31ee5a6..440a2f7 100644
--- a/src/execute/exec_inputs.rs
+++ b/src/execute/exec_inputs.rs
@@ -1,14 +1,12 @@
-use arguments::{self, JOBLOG, QUIET_MODE};
+use arguments::{self, JOBLOG};
use execute::command;
use input_iterator::InputsLock;
use shell;
-use time::{self, Timespec};
+use time::Timespec;
use verbose;
-use wait_timeout::ChildExt;
use super::job_log::JobLog;
-use super::pipe::disk::output as pipe_output;
use super::pipe::disk::State;
-use super::signals;
+use super::child::handle_child;
use std::u16;
use std::time::Duration;
@@ -45,22 +43,8 @@ impl ExecInputs {
}
let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) {
- Ok(mut child) => {
- let start_time = time::get_time();
- if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() {
- let _ = child.kill();
- pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0);
- (start_time, time::get_time(), -1, 15)
- } else {
- pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0);
- match child.wait() {
- Ok(status) => match status.code() {
- Some(exit) => (start_time, time::get_time(), exit, 0),
- None => (start_time, time::get_time(), -1, signals::get(status))
- },
- Err(_) => (start_time, time::get_time(), -1, 0),
- }
- }
+ Ok(child) => {
+ handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout)
},
Err(why) => {
let mut stderr = stderr.lock();
diff --git a/src/execute/mod.rs b/src/execute/mod.rs
index b5b8a38..09c928f 100644
--- a/src/execute/mod.rs
+++ b/src/execute/mod.rs
@@ -1,3 +1,4 @@
+mod child;
mod dry;
mod exec_commands;
mod exec_inputs;
diff --git a/src/main.rs b/src/main.rs
index ca9f1d2..0bf937d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,4 @@
-// #![deny(dead_code)]
-// #![deny(unused_imports)]
+#![deny(dead_code)]
#![allow(unknown_lints)]
#![feature(loop_break_value)]
#![feature(alloc_system)]
@@ -39,13 +38,14 @@ use execute::pipe::disk::State;
use input_iterator::{InputIterator, InputsLock};
use tokenizer::{Token, TokenErr, tokenize};
-/// Coercing the `command` `String` into a `&'static str` is required to share it among all threads.
/// The command string needs to be available in memory for the entirety of the application, so this
-/// is achievable by leaking the memory so it attains a `'static` lifetime.
-unsafe fn leak_command(comm: String) -> &'static str {
- let static_comm = mem::transmute(&comm as &str);
+/// is achievable by transmuting the lifetime of the reference into a static lifetime. To guarantee
+/// that this is perfectly safe, and that the reference will live outside the scope, the value will
+/// also be leaked so that it is forced to remain in memory for the remainder of the application.
+unsafe fn leak_string(comm: String) -> &'static str {
+ let new_comm = mem::transmute(&comm as &str);
mem::forget(comm);
- static_comm
+ new_comm
}
unsafe fn static_arg(args: &[Token]) -> &'static [Token] {
@@ -73,10 +73,10 @@ fn main() {
// Coerce the `comm` `String` into a `&'static str` so that it may be shared by all threads.
// This is safe because the original `comm` may no longer be modified due to shadowing rules.
// It is also safe because `comm` lives to the end of the program.
- let comm = unsafe { leak_command(comm) };
+ let static_comm = unsafe { leak_string(comm) };
// Attempt to tokenize the command argument into simple primitive placeholders.
- if let Err(error) = tokenize(&mut args.arguments, comm, &unprocessed_path, args.ninputs) {
+ if let Err(error) = tokenize(&mut args.arguments, static_comm, &unprocessed_path, args.ninputs) {
let mut stderr = stderr.lock();
match error {
TokenErr::File(why) => {