summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Aaron Murphy <mmstickman@gmail.com>2017-01-06 01:54:45 -0500
committerMichael Aaron Murphy <mmstickman@gmail.com>2017-01-06 01:54:45 -0500
commite72344268ba46cc26610d1a548ef14863df38ef3 (patch)
tree0ed3bad972f63be9c09d4820d8432cb594a46e44
parentefe870e1ccee20f84273319c7dffb7408b50ac09 (diff)
Optimizations, Inputs as Commands Fix, & Input Redirection Detection
- Performed a number of integer to string optimizations - Fixed a missing codepath that caused inputs as commands to not work - On UNIX systems, input redirection will be detected and accounted for - This means that `parallel < inputs.list` is equivalent to `parallel :::: inputs.list`
-rw-r--r--Cargo.lock7
-rw-r--r--Cargo.toml2
-rw-r--r--README.md8
-rw-r--r--src/arguments/errors.rs4
-rw-r--r--src/arguments/mod.rs17
-rw-r--r--src/arguments/redirection.rs13
-rw-r--r--src/execute/exec_commands.rs4
-rw-r--r--src/execute/exec_inputs.rs4
-rw-r--r--src/execute/receive.rs7
-rw-r--r--src/filepaths.rs23
-rw-r--r--src/itoa_array.rs33
-rw-r--r--src/main.rs2
-rw-r--r--src/verbose.rs13
13 files changed, 110 insertions, 27 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d924f7e..e5ad79a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3,6 +3,7 @@ name = "parallel"
version = "0.9.0"
dependencies = [
"arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)",
+ "itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"permutate 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sys-info 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -25,6 +26,11 @@ version = "0.3.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "itoa"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -103,6 +109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096"
"checksum gcc 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "3689e1982a563af74960ae3a4758aa632bb8fd984cfc3cc3b60ee6109477ab6e"
+"checksum itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3088ea4baeceb0284ee9eea42f591226e6beaecf65373e41b38d95a1b8e7a1"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "a51822fc847e7a8101514d1d44e354ba2ffa7d4c194dcab48870740e327cac70"
"checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5"
diff --git a/Cargo.toml b/Cargo.toml
index 7871103..57c8436 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ keywords = ["cli", "parallel", "cpu", "load", "balancer"]
readme = "README.md"
[dependencies]
+itoa = "0.1"
num_cpus = "1.2"
permutate = "0.2"
arrayvec = "0.3"
@@ -17,7 +18,6 @@ sys-info = "0.4"
wait-timeout = "0.1"
[profile.release]
-lto = true
opt-level = 3
[package.metadata.deb]
diff --git a/README.md b/README.md
index 9a48dc7..2c72304 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,10 @@
-# Parallel: A Command-line CPU Load Balancer Written in Rust [![](https://tokei.rs/b1/github/mmstick/parallel)](https://github.com/mmstick/parallel)
+# MIT/Rust Parallel: A Command-line CPU Load Balancer Written in Rust
+[![Crates.io](https://img.shields.io/crates/v/parallel.svg)](https://github.com/mmstick/parallel)
+[![Tokei SLoC Count](https://tokei.rs/b1/github/mmstick/parallel)](https://github.com/mmstic
+ k/parallel)
+[![OpenHub Statistics](https://www.openhub.net/p/rust-parallel/widgets/project_thin_badge?format=gif&ref=Thin+badge)](https://www.openhub.net/p/rust-parallel/)
+[![AUR](https://img.shields.io/aur/version/parallel-rust.svg)](https://github.com/mmstick/parallel)
+
This is an attempt at recreating the functionality of [GNU Parallel](https://www.gnu.org/software/parallel/), a work-stealer for the command-line, in Rust under a MIT license. The end goal will be to support much of the functionality of `GNU Parallel` and then to extend the functionality further for the next generation of command-line utilities written in Rust. While functionality is important, with the application being developed in Rust, the goal is to also be as fast and efficient as possible.
## Note
diff --git a/src/arguments/errors.rs b/src/arguments/errors.rs
index 6923095..ed6e367 100644
--- a/src/arguments/errors.rs
+++ b/src/arguments/errors.rs
@@ -42,6 +42,7 @@ pub enum ParseErr {
MemNoValue,
/// No arguments were given, so no action can be taken.
NoArguments,
+ RedirFile(PathBuf),
TimeoutNaN(usize),
TimeoutNoValue,
}
@@ -115,6 +116,9 @@ impl ParseErr {
ParseErr::NoArguments => {
let _ = write!(stderr, "no input arguments were given.\n");
},
+ ParseErr::RedirFile(path) => {
+ let _ = write!(stderr, "an error occurred while redirecting file: {:?}\n", path);
+ },
ParseErr::TimeoutNaN(index) => {
let _ = write!(stderr, "invalid timeout value: {}\n", arguments[index]);
},
diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs
index beb0582..ef9e774 100644
--- a/src/arguments/mod.rs
+++ b/src/arguments/mod.rs
@@ -3,6 +3,7 @@ pub mod errors;
mod jobs;
mod man;
mod quote;
+mod redirection;
use std::env;
use std::fs;
@@ -206,7 +207,9 @@ impl Args {
}
}
- if let Mode::Command = mode {
+ if let Some(path) = redirection::input_was_redirected() {
+ file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?;
+ } else if let Mode::Command = mode {
while let Some(argument) = arguments.get(index) {
index += 1;
match argument.as_str() {
@@ -223,15 +226,21 @@ impl Args {
}
break
}
- }
- if shebang {
- file_parse(&mut current_inputs, &arguments.last().unwrap())?;
+ if shebang {
+ file_parse(&mut current_inputs, &arguments.last().unwrap())?;
+ } else {
+ parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?;
+ }
} else {
parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?;
}
number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?;
+ } else if let Some(path) = redirection::input_was_redirected() {
+ self.flags |= INPUTS_ARE_COMMANDS;
+ file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?;
+ number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?;
}
if disk_buffer.is_empty() {
diff --git a/src/arguments/redirection.rs b/src/arguments/redirection.rs
new file mode 100644
index 0000000..bc5e4a4
--- /dev/null
+++ b/src/arguments/redirection.rs
@@ -0,0 +1,13 @@
+use std::fs;
+use std::path::PathBuf;
+
+#[cfg(not(unix))]
+pub fn input_was_redirected() -> Option<PathBuf> { None }
+
+#[cfg(unix)]
+pub fn input_was_redirected() -> Option<PathBuf> {
+ if let Ok(link) = fs::read_link("/proc/self/fd/0") {
+ if !link.to_string_lossy().starts_with("/dev/pts") { return Some(link) }
+ }
+ None
+}
diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs
index b5ae0ad..b0d91aa 100644
--- a/src/execute/exec_commands.rs
+++ b/src/execute/exec_commands.rs
@@ -34,7 +34,7 @@ impl<'a> ExecCommands<'a> {
while let Some((job_id, _)) = self.inputs.try_next(&mut input) {
if self.flags & arguments::VERBOSE_MODE != 0 {
- verbose::processing_task(&stdout, &job_id.to_string(), job_total, &input);
+ verbose::processing_task(&stdout, job_id+1, job_total, &input);
}
let command = command::ParallelCommand {
@@ -70,7 +70,7 @@ impl<'a> ExecCommands<'a> {
}
if self.flags & arguments::VERBOSE_MODE != 0 {
- verbose::task_complete(&stdout, &job_id.to_string(), job_total, &input);
+ verbose::task_complete(&stdout, job_id, job_total, &input);
}
}
}
diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs
index 1f79fff..92e9144 100644
--- a/src/execute/exec_inputs.rs
+++ b/src/execute/exec_inputs.rs
@@ -29,7 +29,7 @@ impl ExecInputs {
while let Some((job_id, _)) = self.inputs.try_next(&mut input) {
if flags & arguments::VERBOSE_MODE != 0 {
- verbose::processing_task(&stdout, &job_id.to_string(), job_total, &input);
+ verbose::processing_task(&stdout, job_id+1, job_total, &input);
}
if shell::required(shell::Kind::Input(&input)) {
@@ -59,7 +59,7 @@ impl ExecInputs {
}
if flags & arguments::VERBOSE_MODE != 0 {
- verbose::task_complete(&stdout, &job_id.to_string(), job_total, &input);
+ verbose::task_complete(&stdout, job_id, job_total, &input);
}
}
}
diff --git a/src/execute/receive.rs b/src/execute/receive.rs
index c448d95..e7171f5 100644
--- a/src/execute/receive.rs
+++ b/src/execute/receive.rs
@@ -78,6 +78,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &
let mut error_file = DiskBuffer::new(errors_path).write().unwrap();
// A buffer for buffering the outputs of temporary files on disk.
let mut read_buffer = [0u8; 8192];
+ let mut id_buffer = [0u8; 64];
let (truncate_size, mut stdout_path, mut stderr_path) = filepaths::new_job(counter);
@@ -90,7 +91,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &
if id == counter {
let mut stdout = stdout.lock();
let mut stderr = stderr.lock();
- filepaths::next_job_path(counter, truncate_size, &mut stdout_path, &mut stderr_path);
+ filepaths::next_job_path(counter, truncate_size, &mut id_buffer, &mut stdout_path, &mut stderr_path);
let (mut stdout_file, mut stderr_file) = open_job_files!(stdout_path, stderr_path);
append_to_processed!(processed_file, name, stderr);
read_outputs!(stdout_file, stderr_file, read_buffer, stdout, stderr);
@@ -115,7 +116,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &
}
if tail_next {
- filepaths::next_job_path(counter, truncate_size, &mut stdout_path, &mut stderr_path);
+ filepaths::next_job_path(counter, truncate_size, &mut id_buffer, &mut stdout_path, &mut stderr_path);
let (mut stdout_file, mut stderr_file) = open_job_files!(stdout_path, stderr_path);
loop {
@@ -166,7 +167,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &
State::Completed(id, ref name) if id == counter => {
let mut stdout = stdout.lock();
let mut stderr = stderr.lock();
- filepaths::next_job_path(counter, truncate_size, &mut stdout_path, &mut stderr_path);
+ filepaths::next_job_path(counter, truncate_size, &mut id_buffer, &mut stdout_path, &mut stderr_path);
let (mut stdout_file, mut stderr_file) = open_job_files!(stdout_path, stderr_path);
append_to_processed!(processed_file, name, stderr);
read_outputs!(stdout_file, stderr_file, read_buffer, stdout, stderr);
diff --git a/src/filepaths.rs b/src/filepaths.rs
index 9ac10c6..98cddd7 100644
--- a/src/filepaths.rs
+++ b/src/filepaths.rs
@@ -1,5 +1,6 @@
use std::env::home_dir;
use std::path::PathBuf;
+use itoa_array::itoa;
#[cfg(not(windows))]
pub fn base() -> Option<PathBuf> {
@@ -82,24 +83,30 @@ pub fn new_job(id: usize) -> (usize, String, String) {
}
#[cfg(windows)]
-pub fn new_job(id: usize) -> (usize, String, String) {
+pub fn new_job(id: usize, buffer: &mut [u8; 64]) -> (usize, String, String) {
home_dir().map(|home| {
let mut stdout = home.to_str().unwrap().to_owned();
let mut stderr = stdout.clone();
stdout.push_str("AppData/Local/Temp/parallel/stdout_");
stderr.push_str("AppData/Local/Temp/parallel/stderr_");
let truncate_value = stdout.len();
- let id = id.to_string();
- stdout.push_str(&id);
- stderr.push_str(&id);
+
+ let length = itoa(buffer, id, 10);
+ for byte in &buffer[0..length] {
+ stdout.push(*byte as char);
+ stderr.push(*byte as char);
+ }
+
(truncate_value, stdout, stderr)
}).expect("parallel: unable to open home folder")
}
-pub fn next_job_path(id: usize, truncate: usize, stdout: &mut String, stderr: &mut String) {
- let id = id.to_string();
+pub fn next_job_path(id: usize, truncate: usize, buffer: &mut [u8; 64], stdout: &mut String, stderr: &mut String) {
stdout.truncate(truncate);
- stdout.push_str(&id);
stderr.truncate(truncate);
- stderr.push_str(&id);
+ let length = itoa(buffer, id, 10);
+ for byte in &buffer[0..length] {
+ stdout.push(*byte as char);
+ stderr.push(*byte as char);
+ }
}
diff --git a/src/itoa_array.rs b/src/itoa_array.rs
new file mode 100644
index 0000000..4f6ceac
--- /dev/null
+++ b/src/itoa_array.rs
@@ -0,0 +1,33 @@
+use std::ptr::swap;
+
+pub fn reverse(string: &mut [u8], length: usize) {
+ let mut start = 0isize;
+ let mut end = length as isize - 1;
+ while start < end {
+ unsafe {
+ let x = string.as_mut_ptr().offset(start);
+ let y = string.as_mut_ptr().offset(end);
+ swap(x, y);
+ }
+ start += 1;
+ end -= 1;
+ }
+}
+
+pub fn itoa(string: &mut [u8], mut number: usize, base: usize) -> usize {
+ if number == 0 {
+ string[0] = b'0';
+ return 1;
+ }
+
+ let mut index = 0;
+ while number != 0 {
+ let rem = (number % base) as u8;
+ string[index] = if rem > 9 { (rem - 10) + b'a' } else { (rem + b'0') };
+ index += 1;
+ number /= base;
+ }
+
+ reverse(string, index);
+ index
+}
diff --git a/src/main.rs b/src/main.rs
index 90df7c4..bdfd946 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,6 +5,7 @@
#![feature(alloc_system)]
extern crate alloc_system;
extern crate arrayvec;
+extern crate itoa;
extern crate num_cpus;
extern crate permutate;
extern crate sys_info;
@@ -17,6 +18,7 @@ mod execute;
mod filepaths;
mod init;
mod input_iterator;
+mod itoa_array;
mod tokenizer;
mod shell;
mod verbose;
diff --git a/src/verbose.rs b/src/verbose.rs
index 24e305c..9447fc0 100644
--- a/src/verbose.rs
+++ b/src/verbose.rs
@@ -1,18 +1,19 @@
use std::io::{Stdout, Write};
+use itoa;
pub fn total_inputs(stdout: &Stdout, threads: usize, inputs: usize) {
let mut stdout = stdout.lock();
let _ = stdout.write(b"parallel: processing ");
- let _ = stdout.write(inputs.to_string().as_bytes());
+ let _ = itoa::write(&mut stdout, inputs);
let _ = stdout.write(b" inputs on ");
- let _ = stdout.write(threads.to_string().as_bytes());
+ let _ = itoa::write(&mut stdout, threads);
let _ = stdout.write(b" threads\n");
}
-pub fn processing_task(stdout: &Stdout, job: &str, total: &str, input: &str) {
+pub fn processing_task(stdout: &Stdout, job: usize, total: &str, input: &str) {
let mut stdout = stdout.lock();
let _ = stdout.write(b"parallel: processing task #");
- let _ = stdout.write(job.as_bytes());
+ let _ = itoa::write(&mut stdout, job);
let _ = stdout.write(b" of ");
let _ = stdout.write(total.as_bytes());
let _ = stdout.write(b": '");
@@ -20,10 +21,10 @@ pub fn processing_task(stdout: &Stdout, job: &str, total: &str, input: &str) {
let _ = stdout.write(b"'\n");
}
-pub fn task_complete(stdout: &Stdout, job: &str, total: &str, input: &str) {
+pub fn task_complete(stdout: &Stdout, job: usize, total: &str, input: &str) {
let mut stdout = stdout.lock();
let _ = stdout.write(b"parallel: completed task #");
- let _ = stdout.write(job.as_bytes());
+ let _ = itoa::write(&mut stdout, job);
let _ = stdout.write(b" of ");
let _ = stdout.write(total.as_bytes());
let _ = stdout.write(b": '");