diff options
author | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-06 01:54:45 -0500 |
---|---|---|
committer | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-06 01:54:45 -0500 |
commit | e72344268ba46cc26610d1a548ef14863df38ef3 (patch) | |
tree | 0ed3bad972f63be9c09d4820d8432cb594a46e44 | |
parent | efe870e1ccee20f84273319c7dffb7408b50ac09 (diff) |
Optimizations, Inputs as Commands Fix, & Input Redirection Detection
- Performed a number of integer to string optimizations
- Fixed a missing codepath that caused inputs as commands to not work
- On UNIX systems, input redirection will be detected and accounted for
- This means that `parallel < inputs.list` is equivalent to `parallel :::: inputs.list`
-rw-r--r-- | Cargo.lock | 7 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | README.md | 8 | ||||
-rw-r--r-- | src/arguments/errors.rs | 4 | ||||
-rw-r--r-- | src/arguments/mod.rs | 17 | ||||
-rw-r--r-- | src/arguments/redirection.rs | 13 | ||||
-rw-r--r-- | src/execute/exec_commands.rs | 4 | ||||
-rw-r--r-- | src/execute/exec_inputs.rs | 4 | ||||
-rw-r--r-- | src/execute/receive.rs | 7 | ||||
-rw-r--r-- | src/filepaths.rs | 23 | ||||
-rw-r--r-- | src/itoa_array.rs | 33 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/verbose.rs | 13 |
13 files changed, 110 insertions, 27 deletions
@@ -3,6 +3,7 @@ name = "parallel" version = "0.9.0" dependencies = [ "arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "permutate 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "sys-info 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -25,6 +26,11 @@ version = "0.3.41" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "itoa" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "kernel32-sys" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -103,6 +109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096" "checksum gcc 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "3689e1982a563af74960ae3a4758aa632bb8fd984cfc3cc3b60ee6109477ab6e" +"checksum itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3088ea4baeceb0284ee9eea42f591226e6beaecf65373e41b38d95a1b8e7a1" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "a51822fc847e7a8101514d1d44e354ba2ffa7d4c194dcab48870740e327cac70" "checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5" @@ -9,6 +9,7 @@ keywords = ["cli", "parallel", "cpu", "load", "balancer"] readme = "README.md" [dependencies] +itoa = "0.1" num_cpus = "1.2" permutate = "0.2" arrayvec = "0.3" @@ -17,7 +18,6 @@ sys-info = "0.4" wait-timeout = "0.1" [profile.release] -lto = true opt-level = 3 [package.metadata.deb] @@ -1,4 +1,10 @@ -# Parallel: A Command-line CPU Load Balancer Written in Rust [![](https://tokei.rs/b1/github/mmstick/parallel)](https://github.com/mmstick/parallel) +# MIT/Rust Parallel: A Command-line CPU Load Balancer Written in Rust +[![Crates.io](https://img.shields.io/crates/v/parallel.svg)](https://github.com/mmstick/parallel) +[![Tokei SLoC Count](https://tokei.rs/b1/github/mmstick/parallel)](https://github.com/mmstic + k/parallel) +[![OpenHub Statistics](https://www.openhub.net/p/rust-parallel/widgets/project_thin_badge?format=gif&ref=Thin+badge)](https://www.openhub.net/p/rust-parallel/) +[![AUR](https://img.shields.io/aur/version/parallel-rust.svg)](https://github.com/mmstick/parallel) + This is an attempt at recreating the functionality of [GNU Parallel](https://www.gnu.org/software/parallel/), a work-stealer for the command-line, in Rust under a MIT license. The end goal will be to support much of the functionality of `GNU Parallel` and then to extend the functionality further for the next generation of command-line utilities written in Rust. While functionality is important, with the application being developed in Rust, the goal is to also be as fast and efficient as possible. ## Note diff --git a/src/arguments/errors.rs b/src/arguments/errors.rs index 6923095..ed6e367 100644 --- a/src/arguments/errors.rs +++ b/src/arguments/errors.rs @@ -42,6 +42,7 @@ pub enum ParseErr { MemNoValue, /// No arguments were given, so no action can be taken. NoArguments, + RedirFile(PathBuf), TimeoutNaN(usize), TimeoutNoValue, } @@ -115,6 +116,9 @@ impl ParseErr { ParseErr::NoArguments => { let _ = write!(stderr, "no input arguments were given.\n"); }, + ParseErr::RedirFile(path) => { + let _ = write!(stderr, "an error occurred while redirecting file: {:?}\n", path); + }, ParseErr::TimeoutNaN(index) => { let _ = write!(stderr, "invalid timeout value: {}\n", arguments[index]); }, diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs index beb0582..ef9e774 100644 --- a/src/arguments/mod.rs +++ b/src/arguments/mod.rs @@ -3,6 +3,7 @@ pub mod errors; mod jobs; mod man; mod quote; +mod redirection; use std::env; use std::fs; @@ -206,7 +207,9 @@ impl Args { } } - if let Mode::Command = mode { + if let Some(path) = redirection::input_was_redirected() { + file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?; + } else if let Mode::Command = mode { while let Some(argument) = arguments.get(index) { index += 1; match argument.as_str() { @@ -223,15 +226,21 @@ impl Args { } break } - } - if shebang { - file_parse(&mut current_inputs, &arguments.last().unwrap())?; + if shebang { + file_parse(&mut current_inputs, &arguments.last().unwrap())?; + } else { + parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?; + } } else { parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?; } number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?; + } else if let Some(path) = redirection::input_was_redirected() { + self.flags |= INPUTS_ARE_COMMANDS; + file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?; + number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?; } if disk_buffer.is_empty() { diff --git a/src/arguments/redirection.rs b/src/arguments/redirection.rs new file mode 100644 index 0000000..bc5e4a4 --- /dev/null +++ b/src/arguments/redirection.rs @@ -0,0 +1,13 @@ +use std::fs; +use std::path::PathBuf; + +#[cfg(not(unix))] +pub fn input_was_redirected() -> Option<PathBuf> { None } + +#[cfg(unix)] +pub fn input_was_redirected() -> Option<PathBuf> { + if let Ok(link) = fs::read_link("/proc/self/fd/0") { + if !link.to_string_lossy().starts_with("/dev/pts") { return Some(link) } + } + None +} diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs index b5ae0ad..b0d91aa 100644 --- a/src/execute/exec_commands.rs +++ b/src/execute/exec_commands.rs @@ -34,7 +34,7 @@ impl<'a> ExecCommands<'a> { while let Some((job_id, _)) = self.inputs.try_next(&mut input) { if self.flags & arguments::VERBOSE_MODE != 0 { - verbose::processing_task(&stdout, &job_id.to_string(), job_total, &input); + verbose::processing_task(&stdout, job_id+1, job_total, &input); } let command = command::ParallelCommand { @@ -70,7 +70,7 @@ impl<'a> ExecCommands<'a> { } if self.flags & arguments::VERBOSE_MODE != 0 { - verbose::task_complete(&stdout, &job_id.to_string(), job_total, &input); + verbose::task_complete(&stdout, job_id, job_total, &input); } } } diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs index 1f79fff..92e9144 100644 --- a/src/execute/exec_inputs.rs +++ b/src/execute/exec_inputs.rs @@ -29,7 +29,7 @@ impl ExecInputs { while let Some((job_id, _)) = self.inputs.try_next(&mut input) { if flags & arguments::VERBOSE_MODE != 0 { - verbose::processing_task(&stdout, &job_id.to_string(), job_total, &input); + verbose::processing_task(&stdout, job_id+1, job_total, &input); } if shell::required(shell::Kind::Input(&input)) { @@ -59,7 +59,7 @@ impl ExecInputs { } if flags & arguments::VERBOSE_MODE != 0 { - verbose::task_complete(&stdout, &job_id.to_string(), job_total, &input); + verbose::task_complete(&stdout, job_id, job_total, &input); } } } diff --git a/src/execute/receive.rs b/src/execute/receive.rs index c448d95..e7171f5 100644 --- a/src/execute/receive.rs +++ b/src/execute/receive.rs @@ -78,6 +78,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: & let mut error_file = DiskBuffer::new(errors_path).write().unwrap(); // A buffer for buffering the outputs of temporary files on disk. let mut read_buffer = [0u8; 8192]; + let mut id_buffer = [0u8; 64]; let (truncate_size, mut stdout_path, mut stderr_path) = filepaths::new_job(counter); @@ -90,7 +91,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: & if id == counter { let mut stdout = stdout.lock(); let mut stderr = stderr.lock(); - filepaths::next_job_path(counter, truncate_size, &mut stdout_path, &mut stderr_path); + filepaths::next_job_path(counter, truncate_size, &mut id_buffer, &mut stdout_path, &mut stderr_path); let (mut stdout_file, mut stderr_file) = open_job_files!(stdout_path, stderr_path); append_to_processed!(processed_file, name, stderr); read_outputs!(stdout_file, stderr_file, read_buffer, stdout, stderr); @@ -115,7 +116,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: & } if tail_next { - filepaths::next_job_path(counter, truncate_size, &mut stdout_path, &mut stderr_path); + filepaths::next_job_path(counter, truncate_size, &mut id_buffer, &mut stdout_path, &mut stderr_path); let (mut stdout_file, mut stderr_file) = open_job_files!(stdout_path, stderr_path); loop { @@ -166,7 +167,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: & State::Completed(id, ref name) if id == counter => { let mut stdout = stdout.lock(); let mut stderr = stderr.lock(); - filepaths::next_job_path(counter, truncate_size, &mut stdout_path, &mut stderr_path); + filepaths::next_job_path(counter, truncate_size, &mut id_buffer, &mut stdout_path, &mut stderr_path); let (mut stdout_file, mut stderr_file) = open_job_files!(stdout_path, stderr_path); append_to_processed!(processed_file, name, stderr); read_outputs!(stdout_file, stderr_file, read_buffer, stdout, stderr); diff --git a/src/filepaths.rs b/src/filepaths.rs index 9ac10c6..98cddd7 100644 --- a/src/filepaths.rs +++ b/src/filepaths.rs @@ -1,5 +1,6 @@ use std::env::home_dir; use std::path::PathBuf; +use itoa_array::itoa; #[cfg(not(windows))] pub fn base() -> Option<PathBuf> { @@ -82,24 +83,30 @@ pub fn new_job(id: usize) -> (usize, String, String) { } #[cfg(windows)] -pub fn new_job(id: usize) -> (usize, String, String) { +pub fn new_job(id: usize, buffer: &mut [u8; 64]) -> (usize, String, String) { home_dir().map(|home| { let mut stdout = home.to_str().unwrap().to_owned(); let mut stderr = stdout.clone(); stdout.push_str("AppData/Local/Temp/parallel/stdout_"); stderr.push_str("AppData/Local/Temp/parallel/stderr_"); let truncate_value = stdout.len(); - let id = id.to_string(); - stdout.push_str(&id); - stderr.push_str(&id); + + let length = itoa(buffer, id, 10); + for byte in &buffer[0..length] { + stdout.push(*byte as char); + stderr.push(*byte as char); + } + (truncate_value, stdout, stderr) }).expect("parallel: unable to open home folder") } -pub fn next_job_path(id: usize, truncate: usize, stdout: &mut String, stderr: &mut String) { - let id = id.to_string(); +pub fn next_job_path(id: usize, truncate: usize, buffer: &mut [u8; 64], stdout: &mut String, stderr: &mut String) { stdout.truncate(truncate); - stdout.push_str(&id); stderr.truncate(truncate); - stderr.push_str(&id); + let length = itoa(buffer, id, 10); + for byte in &buffer[0..length] { + stdout.push(*byte as char); + stderr.push(*byte as char); + } } diff --git a/src/itoa_array.rs b/src/itoa_array.rs new file mode 100644 index 0000000..4f6ceac --- /dev/null +++ b/src/itoa_array.rs @@ -0,0 +1,33 @@ +use std::ptr::swap; + +pub fn reverse(string: &mut [u8], length: usize) { + let mut start = 0isize; + let mut end = length as isize - 1; + while start < end { + unsafe { + let x = string.as_mut_ptr().offset(start); + let y = string.as_mut_ptr().offset(end); + swap(x, y); + } + start += 1; + end -= 1; + } +} + +pub fn itoa(string: &mut [u8], mut number: usize, base: usize) -> usize { + if number == 0 { + string[0] = b'0'; + return 1; + } + + let mut index = 0; + while number != 0 { + let rem = (number % base) as u8; + string[index] = if rem > 9 { (rem - 10) + b'a' } else { (rem + b'0') }; + index += 1; + number /= base; + } + + reverse(string, index); + index +} diff --git a/src/main.rs b/src/main.rs index 90df7c4..bdfd946 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ #![feature(alloc_system)] extern crate alloc_system; extern crate arrayvec; +extern crate itoa; extern crate num_cpus; extern crate permutate; extern crate sys_info; @@ -17,6 +18,7 @@ mod execute; mod filepaths; mod init; mod input_iterator; +mod itoa_array; mod tokenizer; mod shell; mod verbose; diff --git a/src/verbose.rs b/src/verbose.rs index 24e305c..9447fc0 100644 --- a/src/verbose.rs +++ b/src/verbose.rs @@ -1,18 +1,19 @@ use std::io::{Stdout, Write}; +use itoa; pub fn total_inputs(stdout: &Stdout, threads: usize, inputs: usize) { let mut stdout = stdout.lock(); let _ = stdout.write(b"parallel: processing "); - let _ = stdout.write(inputs.to_string().as_bytes()); + let _ = itoa::write(&mut stdout, inputs); let _ = stdout.write(b" inputs on "); - let _ = stdout.write(threads.to_string().as_bytes()); + let _ = itoa::write(&mut stdout, threads); let _ = stdout.write(b" threads\n"); } -pub fn processing_task(stdout: &Stdout, job: &str, total: &str, input: &str) { +pub fn processing_task(stdout: &Stdout, job: usize, total: &str, input: &str) { let mut stdout = stdout.lock(); let _ = stdout.write(b"parallel: processing task #"); - let _ = stdout.write(job.as_bytes()); + let _ = itoa::write(&mut stdout, job); let _ = stdout.write(b" of "); let _ = stdout.write(total.as_bytes()); let _ = stdout.write(b": '"); @@ -20,10 +21,10 @@ pub fn processing_task(stdout: &Stdout, job: &str, total: &str, input: &str) { let _ = stdout.write(b"'\n"); } -pub fn task_complete(stdout: &Stdout, job: &str, total: &str, input: &str) { +pub fn task_complete(stdout: &Stdout, job: usize, total: &str, input: &str) { let mut stdout = stdout.lock(); let _ = stdout.write(b"parallel: completed task #"); - let _ = stdout.write(job.as_bytes()); + let _ = itoa::write(&mut stdout, job); let _ = stdout.write(b" of "); let _ = stdout.write(total.as_bytes()); let _ = stdout.write(b": '"); |