diff options
author | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-13 02:27:59 -0500 |
---|---|---|
committer | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-13 15:57:04 -0500 |
commit | 7dc92f645fa9b106a9ea351ad6717b83aa51d376 (patch) | |
tree | 67be50da7a6c9837e70001784958366a5c7ec480 | |
parent | 7fd49aa8f6c5df1aca7e942f268ca5ee2a953b3f (diff) |
0.10.1: Implement Tempdir Parameter0.10.1
- Implements the tempdir parameter which specifies where temp files
will be placed (inputs, errors, jobs).
- Much filepath code has been removed as it is no longer required
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | README.md | 1 | ||||
-rw-r--r-- | TODO.md | 4 | ||||
-rw-r--r-- | src/arguments/errors.rs | 24 | ||||
-rw-r--r-- | src/arguments/man.rs | 3 | ||||
-rw-r--r-- | src/arguments/mod.rs | 106 | ||||
-rw-r--r-- | src/execute/child.rs | 6 | ||||
-rw-r--r-- | src/execute/exec_commands.rs | 12 | ||||
-rw-r--r-- | src/execute/exec_inputs.rs | 4 | ||||
-rw-r--r-- | src/execute/pipe.rs | 6 | ||||
-rw-r--r-- | src/execute/receive.rs | 6 | ||||
-rw-r--r-- | src/filepaths.rs | 87 | ||||
-rw-r--r-- | src/init.rs | 73 | ||||
-rw-r--r-- | src/main.rs | 62 |
15 files changed, 137 insertions, 261 deletions
@@ -1,6 +1,6 @@ [root] name = "parallel" -version = "0.10.0" +version = "0.10.1" dependencies = [ "arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1,6 +1,6 @@ [package] name = "parallel" -version = "0.10.0" +version = "0.10.1" authors = ["Michael Aaron Murphy <mmstickman@gmail.com>"] license = "MIT" description = "Command-line CPU load balancer for executing jobs in parallel" @@ -260,6 +260,7 @@ operates: - **-s**, **--silent**, **--quiet**: Disables printing the standard output of running processes. - **--shebang**: Grants ability to utilize the parallel command as an interpreter via calling it within a shebang line. - **--shellquote**: Prints commands that will be executed, with the commands quoted. +- **--tmpdir**: Defines the directory to use for temporary files - **--timeout**: If a command runs for longer than a specified number of seconds, it will be killed with a SIGKILL. - **-v**, **--verbose**: Prints information about running processes. - **--version**: Prints the current version of the application and it's dependencies. @@ -5,9 +5,7 @@ The list is actively updated with each successful pull request. - Re-integrate the original memory-buffered implementation - Implement `retries`, `resume`, `resume-failed`, and `retry-failed` - Will require comparing the processed and unprocessed files - - Generate a new unprocessed file and remove the originals\ -- Implement `workdir` and `tempdir` - - Allow the ability to change the default location of the temp and work directories + - Generate a new unprocessed file and remove the originals - Fix quoting issues - `shell-quote` should infer `dry-run` - input tokens should be quoted diff --git a/src/arguments/errors.rs b/src/arguments/errors.rs index f3480d7..29500f3 100644 --- a/src/arguments/errors.rs +++ b/src/arguments/errors.rs @@ -5,13 +5,8 @@ use std::process::exit; /// A list of all the possible errors that may happen when working with files. #[derive(Debug)] pub enum FileErr { - DirectoryCreate(PathBuf, io::Error), - DirectoryRead(PathBuf, io::Error), - Create(PathBuf, io::Error), Open(PathBuf, io::Error), Read(PathBuf, io::Error), - Remove(PathBuf, io::Error), - Path, Write(PathBuf, io::Error), } @@ -46,6 +41,7 @@ pub enum ParseErr { RedirFile(PathBuf), TimeoutNaN(usize), TimeoutNoValue, + WorkDirNoValue, } impl From<FileErr> for ParseErr { @@ -62,27 +58,12 @@ impl ParseErr { let _ = stderr.write(b"parallel: parsing error: "); match self { ParseErr::File(file_err) => match file_err { - FileErr::Create(path, why) => { - let _ = write!(stderr, "unable to create file: {:?}: {}\n", path, why); - }, - FileErr::DirectoryCreate(path, why) => { - let _ = write!(stderr, "unable to create directory: {:?}: {}\n", path, why); - }, - FileErr::DirectoryRead(path, why) => { - let _ = write!(stderr, "unable to create directory: {:?}: {}\n", path, why); - }, FileErr::Open(file, why) => { let _ = write!(stderr, "unable to open file: {:?}: {}\n", file, why); }, FileErr::Read(file, why) => { let _ = write!(stderr, "unable to read file: {:?}: {}\n", file, why); }, - FileErr::Remove(file, why) => { - let _ = write!(stderr, "unable to remove file: {:?}: {}\n", file, why); - }, - FileErr::Path => { - let _ = write!(stderr, "unable to obtain input paths\n"); - }, FileErr::Write(file, why) => { let _ = write!(stderr, "unable to write to file: {:?}: {}\n", file, why); }, @@ -128,6 +109,9 @@ impl ParseErr { }, ParseErr::TimeoutNoValue => { let _ = stderr.write(b"no timeout parameter was defined.\n"); + }, + ParseErr::WorkDirNoValue => { + let _ = stderr.write(b"no workdir parameter was defined.\n"); } }; let _ = stdout.write(b"For help on command-line usage, execute `parallel -h`\n"); diff --git a/src/arguments/man.rs b/src/arguments/man.rs index 5148d63..a701022 100644 --- a/src/arguments/man.rs +++ b/src/arguments/man.rs @@ -121,6 +121,9 @@ OPTIONS --shellquote: Prints commands that will be executed, with the commands quoted. + --tmpdir: + Defines the directory to use for temporary files. + --timeout: If a command runs for longer than a specified number of seconds, it will be killed with a SIGKILL. diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs index 0557d44..205d87e 100644 --- a/src/arguments/mod.rs +++ b/src/arguments/mod.rs @@ -5,7 +5,7 @@ mod man; mod redirection; use std::env; -use std::fs::{self, File}; +use std::fs; use std::io::{self, BufRead, BufReader, BufWriter, Write}; use std::num::ParseIntError; use std::path::{Path, PathBuf}; @@ -46,6 +46,7 @@ pub struct Args { pub timeout: Duration, pub arguments: ArrayVec<[Token; 128]>, pub joblog: Option<String>, + pub tempdir: Option<PathBuf>, } impl Args { @@ -59,18 +60,13 @@ impl Args { delay: Duration::from_millis(0), timeout: Duration::from_millis(0), joblog: None, + tempdir: None, } } /// Performs all the work related to parsing program arguments - pub fn parse(&mut self, comm: &mut String, arguments: &[String], unprocessed_path: &Path) - -> Result<usize, ParseErr> - { - // Create a write buffer that automatically writes data to the disk when the buffer is full. - let disk_buffer = fs::OpenOptions::new().write(true).append(true).open(unprocessed_path) - .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?; - let mut disk_buffer = BufWriter::new(disk_buffer); - + pub fn parse(&mut self, comm: &mut String, arguments: &[String], base_path: &mut PathBuf) + -> Result<usize, ParseErr> { // Each list will consist of a series of input arguments let mut lists: Vec<Vec<String>> = Vec::new(); // The `current_inputs` variable will contain all the inputs that have been collected for the first list. @@ -158,12 +154,12 @@ impl Args { let val = arguments.get(index).ok_or(ParseErr::MaxArgsNoValue)?; max_args = val.parse::<usize>().map_err(|_| ParseErr::MaxArgsNaN(index))?; index += 1; - } + }, "mem-free" => { let val = arguments.get(index).ok_or(ParseErr::MemNoValue)?; self.memory = parse_memory(val).map_err(|_| ParseErr::MemInvalid(index))?; index += 1; - } + }, "pipe" => self.flags |= PIPE_IS_ENABLED, "quiet" | "silent" => self.flags |= QUIET_MODE, "shellquote" => self.flags |= DRY_RUN + SHELL_QUOTE, @@ -172,11 +168,15 @@ impl Args { let seconds = val.parse::<f64>().map_err(|_| ParseErr::TimeoutNaN(index))?; self.timeout = Duration::from_millis((seconds * 1000f64) as u64); index += 1; - } + }, "verbose" => self.flags |= VERBOSE_MODE, "version" => { - println!("MIT/Rust Parallel 0.10.0\n"); + println!("MIT/Rust Parallel 0.10.1\n"); exit(0); + }, + "tmpdir" | "tempdir" => { + *base_path = PathBuf::from(arguments.get(index).ok_or(ParseErr::WorkDirNoValue)?); + index += 1; } _ if &argument[2..9] == "shebang" => { shebang = true; @@ -188,7 +188,7 @@ impl Args { } } else { match argument.as_str() { - ":::" => mode = Mode::Inputs, + ":::" => mode = Mode::Inputs, "::::" => mode = Mode::Files, _ => { // The command has been supplied, and argument parsing is over. @@ -208,7 +208,7 @@ impl Args { index += 1; match argument.as_str() { // Arguments after `:::` are input values. - ":::" | ":::+" => mode = Mode::Inputs, + ":::" | ":::+" => mode = Mode::Inputs, // Arguments after `::::` are files with inputs. "::::" | "::::+" => mode = Mode::Files, // All other arguments are command arguments. @@ -230,21 +230,18 @@ impl Args { parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?; } - number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?; + number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, base_path.clone())?; } else if let Some(path) = redirection::input_was_redirected() { file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?; - number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?; + number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, base_path.clone())?; } if number_of_arguments == 0 { - number_of_arguments = write_stdin_to_disk(&mut disk_buffer, max_args, unprocessed_path)?; + number_of_arguments = write_stdin_to_disk(max_args, base_path.clone())?; } if number_of_arguments == 0 { return Err(ParseErr::NoArguments); } - // Flush the contents of the buffer to the disk before tokenizing the command argument. - let _ = disk_buffer.flush(); - if comm.is_empty() { self.flags |= INPUTS_ARE_COMMANDS; } Ok(number_of_arguments) @@ -252,10 +249,12 @@ impl Args { } /// Write all arguments from standard input to the disk, recording the number of arguments that were read. -fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unprocessed_path: &Path) - -> Result<usize, ParseErr> -{ +fn write_stdin_to_disk(max_args: usize, mut unprocessed_path: PathBuf) -> Result<usize, ParseErr> { println!("parallel: reading inputs from standard input"); + unprocessed_path.push("unprocessed"); + let disk_buffer = fs::OpenOptions::new().write(true).create(true).open(&unprocessed_path) + .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.clone(), why)))?; + let mut disk_buffer = BufWriter::new(disk_buffer); let mut number_of_arguments = 0; let stdin = io::stdin(); @@ -263,7 +262,7 @@ fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unpro for line in stdin.lock().lines() { if let Ok(line) = line { disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write(b"\n")) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; number_of_arguments += 1; } } @@ -275,24 +274,24 @@ fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unpro max_args_index -= 1; number_of_arguments += 1; disk_buffer.write(line.as_bytes()) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } else if max_args_index == 1 { max_args_index = max_args; disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(line.as_bytes())) .and_then(|_| disk_buffer.write(b"\n")) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } else { max_args_index -= 1; disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(line.as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } } } if max_args_index != max_args { disk_buffer.write(b"\n") - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } } @@ -301,7 +300,12 @@ fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unpro /// Write all input arguments buffered in memory to the disk, recording the number of arguments that were read. fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, max_args: usize, - disk_buffer: &mut BufWriter<File>, unprocessed_path: &Path) -> Result<usize, ParseErr> { + mut unprocessed_path: PathBuf) -> Result<usize, ParseErr> +{ + unprocessed_path.push("unprocessed"); + let disk_buffer = fs::OpenOptions::new().write(true).create(true).open(&unprocessed_path) + .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?; + let mut disk_buffer = BufWriter::new(disk_buffer); let mut number_of_arguments = 0; if lists.len() > 1 { @@ -321,27 +325,27 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma { let mut iter = permutation_buffer.iter(); disk_buffer.write(iter.next().unwrap().as_bytes()) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; for element in iter { disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } number_of_arguments += 1; } if max_args < 2 { - disk_buffer.write(b"\n").map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + disk_buffer.write(b"\n").map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; while let Ok(true) = permutator.next_with_buffer(&mut permutation_buffer) { let mut iter = permutation_buffer.iter(); disk_buffer.write(iter.next().unwrap().as_bytes()) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; for element in iter { disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } disk_buffer.write(b"\n") - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; number_of_arguments += 1; } } else { @@ -353,34 +357,34 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma number_of_arguments += 1; disk_buffer.write(iter.next().unwrap().as_bytes()) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; for element in iter { disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } } else if max_args_index == 1 { max_args_index = max_args; disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; for element in iter { disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } disk_buffer.write(b"\n") - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } else { max_args_index -= 1; disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; for element in iter { disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } } } @@ -389,7 +393,7 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma for input in current_inputs { disk_buffer.write(input.as_bytes()) .and_then(|_| disk_buffer.write(b"\n")) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; number_of_arguments += 1; } } else { @@ -401,12 +405,12 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma while index != max_index { disk_buffer.write(chunk[index].as_bytes()) .and_then(|_| disk_buffer.write(b" ")) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; index += 1; } disk_buffer.write(chunk[max_index].as_bytes()) .and_then(|_| disk_buffer.write(b"\n")) - .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; + .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?; } } Ok(number_of_arguments) @@ -444,9 +448,9 @@ fn parse_inputs(arguments: &[String], mut index: usize, current_inputs: &mut Vec index += 1; match argument.as_str() { // `:::` denotes that the next set of inputs will be added to a new list. - ":::" => switch_mode!(Mode::Inputs), + ":::" => switch_mode!(Mode::Inputs), // `:::+` denotes that the next set of inputs will be added to the current list. - ":::+" => switch_mode!(append Mode::InputsAppend), + ":::+" => switch_mode!(append Mode::InputsAppend), // `::::` denotes that the next set of inputs will be added to a new list. "::::" => switch_mode!(Mode::Files), // `:::+` denotes that the next set of inputs will be added to the current list. @@ -513,7 +517,7 @@ fn parse_memory(input: &str) -> Result<u64, ParseIntError> { 'T' => &input[..input.len()-1].parse::<u64>()? * 1_099_511_627_776, 'p' => &input[..input.len()-1].parse::<u64>()? * 1_000_000_000_000_000, 'P' => &input[..input.len()-1].parse::<u64>()? * 1_125_899_906_842_624, - _ => input.parse::<u64>()? + _ => input.parse::<u64>()? }; Ok(result) } @@ -532,8 +536,8 @@ fn parse_jobs(argument: &str, next_argument: Option<&String>, index: &mut usize) /// Attempts to open an input argument and adds each line to the `inputs` list. fn file_parse<P: AsRef<Path>>(inputs: &mut Vec<String>, path: P) -> Result<(), ParseErr> { - let path = path.as_ref(); - let file = fs::File::open(path).map_err(|err| ParseErr::File(FileErr::Open(path.to_owned(), err)))?; + let path = path.as_ref(); + let file = fs::File::open(path).map_err(|err| ParseErr::File(FileErr::Open(path.to_owned(), err)))?; let mut buffer = BufReader::new(file).lines(); if let Some(line) = buffer.next() { if let Ok(line) = line { diff --git a/src/execute/child.rs b/src/execute/child.rs index 50a21a9..b44170b 100644 --- a/src/execute/child.rs +++ b/src/execute/child.rs @@ -9,15 +9,15 @@ use super::pipe::disk::output as pipe_output; use super::pipe::disk::State; pub fn handle_child(mut child: Child, output: &Sender<State>, flags: u16, job_id: usize, input: String, - has_timeout: bool, timeout: Duration) -> (Timespec, Timespec, i32, i32) + has_timeout: bool, timeout: Duration, base: &str) -> (Timespec, Timespec, i32, i32) { let start_time = get_time(); if has_timeout && child.wait_timeout(timeout).unwrap().is_none() { let _ = child.kill(); - pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0); + pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0, base); (start_time, get_time(), -1, 15) } else { - pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0); + pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0, base); match child.wait() { Ok(status) => match status.code() { Some(exit) => (start_time, get_time(), exit, 0), diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs index dc5feb2..a7f1bf8 100644 --- a/src/execute/exec_commands.rs +++ b/src/execute/exec_commands.rs @@ -16,17 +16,18 @@ use std::time::Duration; /// Contains all the required data needed for executing commands in parallel. /// Commands will be generated based on a template of argument tokens combined /// with the current input argument. -pub struct ExecCommands<'a> { +pub struct ExecCommands { pub slot: usize, pub num_inputs: usize, pub flags: u16, pub timeout: Duration, pub inputs: InputsLock, pub output_tx: Sender<State>, - pub arguments: &'a [Token], + pub arguments: &'static [Token], + pub tempdir: String, } -impl<'a> ExecCommands<'a> { +impl ExecCommands { pub fn run(&mut self) { let stdout = io::stdout(); let stderr = io::stderr(); @@ -55,11 +56,12 @@ impl<'a> ExecCommands<'a> { command_template: self.arguments, flags: self.flags }; - + command_buffer.clear(); let (start_time, end_time, exit_value, signal) = match command.exec(command_buffer) { Ok(child) => { - handle_child(child, &self.output_tx, self.flags, job_id, input.clone(), has_timeout, self.timeout) + handle_child(child, &self.output_tx, self.flags, job_id, input.clone(), has_timeout, self.timeout, + &self.tempdir) }, Err(cmd_err) => { let mut stderr = stderr.lock(); diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs index 440a2f7..ab38a64 100644 --- a/src/execute/exec_inputs.rs +++ b/src/execute/exec_inputs.rs @@ -20,6 +20,7 @@ pub struct ExecInputs { pub timeout: Duration, pub inputs: InputsLock, pub output_tx: Sender<State>, + pub tempdir: String, } impl ExecInputs { @@ -44,7 +45,8 @@ impl ExecInputs { let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) { Ok(child) => { - handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout) + handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout, + &self.tempdir) }, Err(why) => { let mut stderr = stderr.lock(); diff --git a/src/execute/pipe.rs b/src/execute/pipe.rs index 497c09d..e76f18d 100644 --- a/src/execute/pipe.rs +++ b/src/execute/pipe.rs @@ -19,8 +19,10 @@ pub mod disk { /// Sends messages received by a `Child` process's standard output and error and sends them /// to be handled by the grouped output channel. - pub fn output(child: &mut Child, job_id: usize, name: String, output_tx: &Sender<State>, quiet: bool) { - let (_, stdout_path, stderr_path) = filepaths::new_job(job_id); + pub fn output(child: &mut Child, job_id: usize, name: String, output_tx: &Sender<State>, quiet: bool, + base: &str) + { + let (_, stdout_path, stderr_path) = filepaths::new_job(base, job_id); let mut stdout_file = File::create(stdout_path).expect("unable to create job stdout file"); let mut stderr_file = File::create(stderr_path).expect("unable to create job stderr file"); diff --git a/src/execute/receive.rs b/src/execute/receive.rs index a76ad89..b2b9174 100644 --- a/src/execute/receive.rs +++ b/src/execute/receive.rs @@ -69,7 +69,9 @@ macro_rules! append_to_processed { #[allow(cyclomatic_complexity)] /// Tail and print the standard output and error of each process in the correct order -pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &Path, errors_path: &Path) { +pub fn receive_messages(input_rx: Receiver<State>, args: Args, base: &str, processed_path: &Path, + errors_path: &Path) +{ let stdout = io::stdout(); let stderr = io::stderr(); @@ -98,7 +100,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: & // A buffer for converting job ID's into a byte array representation of a string. let mut id_buffer = [0u8; 64]; // Generates the stdout and stderr paths, along with a truncation value to truncate the job ID from the paths. - let (truncate_size, mut stdout_path, mut stderr_path) = filepaths::new_job(counter); + let (truncate_size, mut stdout_path, mut stderr_path) = filepaths::new_job(base, counter); // If the joblog parameter was passed, open the file for writing. let mut joblog = args.joblog.map(|path| { job_counter = 0; diff --git a/src/filepaths.rs b/src/filepaths.rs index 4bb135b..f85f86f 100644 --- a/src/filepaths.rs +++ b/src/filepaths.rs @@ -1,106 +1,31 @@ -use std::env::home_dir; use std::path::PathBuf; use misc::NumToA; #[cfg(not(windows))] pub fn base() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push(".local/share/parallel"); - path - }) + Some(PathBuf::from("/tmp/parallel")) } #[cfg(windows)] pub fn base() -> Option<PathBuf> { + use std::env::home_dir; home_dir().map(|mut path| { path.push("AppData/Local/Temp/parallel"); path }) } -#[cfg(not(windows))] -pub fn processed() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push(".local/share/parallel/processed"); - path - }) -} - -#[cfg(windows)] -pub fn processed() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push("AppData/Local/Temp/parallel/processed"); - path - }) -} - -#[cfg(not(windows))] -pub fn unprocessed() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push(".local/share/parallel/unprocessed"); - path - }) -} - -#[cfg(windows)] -pub fn unprocessed() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push("AppData/Local/Temp/parallel/unprocessed"); - path - }) -} - - -#[cfg(not(windows))] -pub fn errors() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push(".local/share/parallel/errors"); - path - }) -} - -#[cfg(windows)] -pub fn errors() -> Option<PathBuf> { - home_dir().map(|mut path| { - path.push("AppData/Local/Temp/parallel/errors"); - path - }) -} - -pub fn outputs_path() -> PathBuf { - PathBuf::from("/tmp/parallel/") -} - -#[cfg(not(windows))] -pub fn new_job(id: usize) -> (usize, String, String) { +pub fn new_job(base: &str, id: usize) -> (usize, String, String) { let id = id.to_string(); - let mut stdout = String::from("/tmp/parallel/stdout_"); - let mut stderr = String::from("/tmp/parallel/stderr_"); + let mut stdout = String::from(base) + "/stdout_"; + let mut stderr = String::from(base) + "/stderr_"; let truncate_value = stdout.len(); stdout.push_str(&id); stderr.push_str(&id); + println!("stdout: '{}'", stdout); (truncate_value, stdout, stderr) } -#[cfg(windows)] -pub fn new_job(id: usize, buffer: &mut [u8; 64]) -> (usize, String, String) { - home_dir().map(|home| { - let mut stdout = home.to_str().unwrap().to_owned(); - let mut stderr = stdout.clone(); - stdout.push_str("AppData/Local/Temp/parallel/stdout_"); - stderr.push_str("AppData/Local/Temp/parallel/stderr_"); - let truncate_value = stdout.len(); - - let length = itoa(buffer, id, 10); - for byte in &buffer[0..length] { - stdout.push(*byte as char); - stderr.push(*byte as char); - } - - (truncate_value, stdout, stderr) - }).expect("parallel: unable to open home folder") -} - pub fn next_job_path(id: usize, truncate: usize, buffer: &mut [u8; 64], stdout: &mut String, stderr: &mut String) { stdout.truncate(truncate); stderr.truncate(truncate); diff --git a/src/init.rs b/src/init.rs deleted file mode 100644 index 6a8e4c4..0000000 --- a/src/init.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::fs::{read_dir, create_dir_all, remove_file, remove_dir, File}; -use std::io::{StderrLock, Write}; -use std::path::{Path, PathBuf}; -use std::process::exit; - -use super::arguments::{FileErr, Args}; -use super::filepaths; - -fn remove_preexisting_files() -> Result<(PathBuf, PathBuf, PathBuf), FileErr> { - // Initialize the base directories of the unprocessed and processed files. - let path = filepaths::base().ok_or(FileErr::Path)?; - - // Create the directories that are required for storing input files. - create_dir_all(&path).map_err(|why| FileErr::DirectoryCreate(path.clone(), why))?; - if cfg!(not(windows)) { - let outputs_path = filepaths::outputs_path(); - create_dir_all(&outputs_path).map_err(|why| FileErr::DirectoryCreate(outputs_path, why))?; - } - - // Attempt to obtain a listing of all the directories and files within the base directory. - let directory = read_dir(&path).map_err(|why| FileErr::DirectoryRead(path.clone(), why))?; - for entry in directory { - let entry = entry.map_err(|why| FileErr::DirectoryRead(path.clone(), why))?; - let entry_is_file = entry.file_type().ok().map_or(true, |x| !x.is_dir()); - if entry_is_file { - remove_file(entry.path()).map_err(|why| FileErr::Remove(path.clone(), why))?; - } else { - remove_dir(entry.path()).map_err(|why| FileErr::Remove(path.clone(), why))?; - } - } - - // Create empty logs ahead of time - let errors = filepaths::errors().ok_or(FileErr::Path)?; - let unprocessed = filepaths::unprocessed().ok_or(FileErr::Path)?; - let processed = filepaths::processed().ok_or(FileErr::Path)?; - File::create(&errors).map_err(|why| FileErr::Create(errors.clone(), why))?; - File::create(&unprocessed).map_err(|why| FileErr::Create(unprocessed.clone(), why))?; - File::create(&processed).map_err(|why| FileErr::Create(processed.clone(), why))?; - - Ok((unprocessed, processed, errors)) -} - -pub fn cleanup(stderr: &mut StderrLock) -> (PathBuf, PathBuf, PathBuf) { - match remove_preexisting_files() { - Ok(values) => values, - Err(why) => { - let _ = stderr.write(b"parallel: initialization error: I/O error: "); - match why { - FileErr::Create(path, why) => { - let _ = write!(stderr, "unable to create file: {:?}: {}", path, why); |