summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Aaron Murphy <mmstickman@gmail.com>2017-01-13 02:27:59 -0500
committerMichael Aaron Murphy <mmstickman@gmail.com>2017-01-13 15:57:04 -0500
commit7dc92f645fa9b106a9ea351ad6717b83aa51d376 (patch)
tree67be50da7a6c9837e70001784958366a5c7ec480
parent7fd49aa8f6c5df1aca7e942f268ca5ee2a953b3f (diff)
0.10.1: Implement Tempdir Parameter0.10.1
- Implements the tempdir parameter which specifies where temp files will be placed (inputs, errors, jobs). - Much filepath code has been removed as it is no longer required
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--README.md1
-rw-r--r--TODO.md4
-rw-r--r--src/arguments/errors.rs24
-rw-r--r--src/arguments/man.rs3
-rw-r--r--src/arguments/mod.rs106
-rw-r--r--src/execute/child.rs6
-rw-r--r--src/execute/exec_commands.rs12
-rw-r--r--src/execute/exec_inputs.rs4
-rw-r--r--src/execute/pipe.rs6
-rw-r--r--src/execute/receive.rs6
-rw-r--r--src/filepaths.rs87
-rw-r--r--src/init.rs73
-rw-r--r--src/main.rs62
15 files changed, 137 insertions, 261 deletions
diff --git a/Cargo.lock b/Cargo.lock
index b3fd117..9daffe5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1,6 +1,6 @@
[root]
name = "parallel"
-version = "0.10.0"
+version = "0.10.1"
dependencies = [
"arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/Cargo.toml b/Cargo.toml
index b4afc7f..eab8684 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "parallel"
-version = "0.10.0"
+version = "0.10.1"
authors = ["Michael Aaron Murphy <mmstickman@gmail.com>"]
license = "MIT"
description = "Command-line CPU load balancer for executing jobs in parallel"
diff --git a/README.md b/README.md
index ecec8ca..d0fc877 100644
--- a/README.md
+++ b/README.md
@@ -260,6 +260,7 @@ operates:
- **-s**, **--silent**, **--quiet**: Disables printing the standard output of running processes.
- **--shebang**: Grants ability to utilize the parallel command as an interpreter via calling it within a shebang line.
- **--shellquote**: Prints commands that will be executed, with the commands quoted.
+- **--tmpdir**: Defines the directory to use for temporary files
- **--timeout**: If a command runs for longer than a specified number of seconds, it will be killed with a SIGKILL.
- **-v**, **--verbose**: Prints information about running processes.
- **--version**: Prints the current version of the application and it's dependencies.
diff --git a/TODO.md b/TODO.md
index 9d3feb8..f352212 100644
--- a/TODO.md
+++ b/TODO.md
@@ -5,9 +5,7 @@ The list is actively updated with each successful pull request.
- Re-integrate the original memory-buffered implementation
- Implement `retries`, `resume`, `resume-failed`, and `retry-failed`
- Will require comparing the processed and unprocessed files
- - Generate a new unprocessed file and remove the originals\
-- Implement `workdir` and `tempdir`
- - Allow the ability to change the default location of the temp and work directories
+ - Generate a new unprocessed file and remove the originals
- Fix quoting issues
- `shell-quote` should infer `dry-run`
- input tokens should be quoted
diff --git a/src/arguments/errors.rs b/src/arguments/errors.rs
index f3480d7..29500f3 100644
--- a/src/arguments/errors.rs
+++ b/src/arguments/errors.rs
@@ -5,13 +5,8 @@ use std::process::exit;
/// A list of all the possible errors that may happen when working with files.
#[derive(Debug)]
pub enum FileErr {
- DirectoryCreate(PathBuf, io::Error),
- DirectoryRead(PathBuf, io::Error),
- Create(PathBuf, io::Error),
Open(PathBuf, io::Error),
Read(PathBuf, io::Error),
- Remove(PathBuf, io::Error),
- Path,
Write(PathBuf, io::Error),
}
@@ -46,6 +41,7 @@ pub enum ParseErr {
RedirFile(PathBuf),
TimeoutNaN(usize),
TimeoutNoValue,
+ WorkDirNoValue,
}
impl From<FileErr> for ParseErr {
@@ -62,27 +58,12 @@ impl ParseErr {
let _ = stderr.write(b"parallel: parsing error: ");
match self {
ParseErr::File(file_err) => match file_err {
- FileErr::Create(path, why) => {
- let _ = write!(stderr, "unable to create file: {:?}: {}\n", path, why);
- },
- FileErr::DirectoryCreate(path, why) => {
- let _ = write!(stderr, "unable to create directory: {:?}: {}\n", path, why);
- },
- FileErr::DirectoryRead(path, why) => {
- let _ = write!(stderr, "unable to create directory: {:?}: {}\n", path, why);
- },
FileErr::Open(file, why) => {
let _ = write!(stderr, "unable to open file: {:?}: {}\n", file, why);
},
FileErr::Read(file, why) => {
let _ = write!(stderr, "unable to read file: {:?}: {}\n", file, why);
},
- FileErr::Remove(file, why) => {
- let _ = write!(stderr, "unable to remove file: {:?}: {}\n", file, why);
- },
- FileErr::Path => {
- let _ = write!(stderr, "unable to obtain input paths\n");
- },
FileErr::Write(file, why) => {
let _ = write!(stderr, "unable to write to file: {:?}: {}\n", file, why);
},
@@ -128,6 +109,9 @@ impl ParseErr {
},
ParseErr::TimeoutNoValue => {
let _ = stderr.write(b"no timeout parameter was defined.\n");
+ },
+ ParseErr::WorkDirNoValue => {
+ let _ = stderr.write(b"no workdir parameter was defined.\n");
}
};
let _ = stdout.write(b"For help on command-line usage, execute `parallel -h`\n");
diff --git a/src/arguments/man.rs b/src/arguments/man.rs
index 5148d63..a701022 100644
--- a/src/arguments/man.rs
+++ b/src/arguments/man.rs
@@ -121,6 +121,9 @@ OPTIONS
--shellquote:
Prints commands that will be executed, with the commands quoted.
+ --tmpdir:
+ Defines the directory to use for temporary files.
+
--timeout:
If a command runs for longer than a specified number of seconds, it will be
killed with a SIGKILL.
diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs
index 0557d44..205d87e 100644
--- a/src/arguments/mod.rs
+++ b/src/arguments/mod.rs
@@ -5,7 +5,7 @@ mod man;
mod redirection;
use std::env;
-use std::fs::{self, File};
+use std::fs;
use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::num::ParseIntError;
use std::path::{Path, PathBuf};
@@ -46,6 +46,7 @@ pub struct Args {
pub timeout: Duration,
pub arguments: ArrayVec<[Token; 128]>,
pub joblog: Option<String>,
+ pub tempdir: Option<PathBuf>,
}
impl Args {
@@ -59,18 +60,13 @@ impl Args {
delay: Duration::from_millis(0),
timeout: Duration::from_millis(0),
joblog: None,
+ tempdir: None,
}
}
/// Performs all the work related to parsing program arguments
- pub fn parse(&mut self, comm: &mut String, arguments: &[String], unprocessed_path: &Path)
- -> Result<usize, ParseErr>
- {
- // Create a write buffer that automatically writes data to the disk when the buffer is full.
- let disk_buffer = fs::OpenOptions::new().write(true).append(true).open(unprocessed_path)
- .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?;
- let mut disk_buffer = BufWriter::new(disk_buffer);
-
+ pub fn parse(&mut self, comm: &mut String, arguments: &[String], base_path: &mut PathBuf)
+ -> Result<usize, ParseErr> {
// Each list will consist of a series of input arguments
let mut lists: Vec<Vec<String>> = Vec::new();
// The `current_inputs` variable will contain all the inputs that have been collected for the first list.
@@ -158,12 +154,12 @@ impl Args {
let val = arguments.get(index).ok_or(ParseErr::MaxArgsNoValue)?;
max_args = val.parse::<usize>().map_err(|_| ParseErr::MaxArgsNaN(index))?;
index += 1;
- }
+ },
"mem-free" => {
let val = arguments.get(index).ok_or(ParseErr::MemNoValue)?;
self.memory = parse_memory(val).map_err(|_| ParseErr::MemInvalid(index))?;
index += 1;
- }
+ },
"pipe" => self.flags |= PIPE_IS_ENABLED,
"quiet" | "silent" => self.flags |= QUIET_MODE,
"shellquote" => self.flags |= DRY_RUN + SHELL_QUOTE,
@@ -172,11 +168,15 @@ impl Args {
let seconds = val.parse::<f64>().map_err(|_| ParseErr::TimeoutNaN(index))?;
self.timeout = Duration::from_millis((seconds * 1000f64) as u64);
index += 1;
- }
+ },
"verbose" => self.flags |= VERBOSE_MODE,
"version" => {
- println!("MIT/Rust Parallel 0.10.0\n");
+ println!("MIT/Rust Parallel 0.10.1\n");
exit(0);
+ },
+ "tmpdir" | "tempdir" => {
+ *base_path = PathBuf::from(arguments.get(index).ok_or(ParseErr::WorkDirNoValue)?);
+ index += 1;
}
_ if &argument[2..9] == "shebang" => {
shebang = true;
@@ -188,7 +188,7 @@ impl Args {
}
} else {
match argument.as_str() {
- ":::" => mode = Mode::Inputs,
+ ":::" => mode = Mode::Inputs,
"::::" => mode = Mode::Files,
_ => {
// The command has been supplied, and argument parsing is over.
@@ -208,7 +208,7 @@ impl Args {
index += 1;
match argument.as_str() {
// Arguments after `:::` are input values.
- ":::" | ":::+" => mode = Mode::Inputs,
+ ":::" | ":::+" => mode = Mode::Inputs,
// Arguments after `::::` are files with inputs.
"::::" | "::::+" => mode = Mode::Files,
// All other arguments are command arguments.
@@ -230,21 +230,18 @@ impl Args {
parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?;
}
- number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?;
+ number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, base_path.clone())?;
} else if let Some(path) = redirection::input_was_redirected() {
file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?;
- number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?;
+ number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, base_path.clone())?;
}
if number_of_arguments == 0 {
- number_of_arguments = write_stdin_to_disk(&mut disk_buffer, max_args, unprocessed_path)?;
+ number_of_arguments = write_stdin_to_disk(max_args, base_path.clone())?;
}
if number_of_arguments == 0 { return Err(ParseErr::NoArguments); }
- // Flush the contents of the buffer to the disk before tokenizing the command argument.
- let _ = disk_buffer.flush();
-
if comm.is_empty() { self.flags |= INPUTS_ARE_COMMANDS; }
Ok(number_of_arguments)
@@ -252,10 +249,12 @@ impl Args {
}
/// Write all arguments from standard input to the disk, recording the number of arguments that were read.
-fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unprocessed_path: &Path)
- -> Result<usize, ParseErr>
-{
+fn write_stdin_to_disk(max_args: usize, mut unprocessed_path: PathBuf) -> Result<usize, ParseErr> {
println!("parallel: reading inputs from standard input");
+ unprocessed_path.push("unprocessed");
+ let disk_buffer = fs::OpenOptions::new().write(true).create(true).open(&unprocessed_path)
+ .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.clone(), why)))?;
+ let mut disk_buffer = BufWriter::new(disk_buffer);
let mut number_of_arguments = 0;
let stdin = io::stdin();
@@ -263,7 +262,7 @@ fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unpro
for line in stdin.lock().lines() {
if let Ok(line) = line {
disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write(b"\n"))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
number_of_arguments += 1;
}
}
@@ -275,24 +274,24 @@ fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unpro
max_args_index -= 1;
number_of_arguments += 1;
disk_buffer.write(line.as_bytes())
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
} else if max_args_index == 1 {
max_args_index = max_args;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(line.as_bytes()))
.and_then(|_| disk_buffer.write(b"\n"))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
} else {
max_args_index -= 1;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(line.as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
}
if max_args_index != max_args {
disk_buffer.write(b"\n")
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
@@ -301,7 +300,12 @@ fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unpro
/// Write all input arguments buffered in memory to the disk, recording the number of arguments that were read.
fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, max_args: usize,
- disk_buffer: &mut BufWriter<File>, unprocessed_path: &Path) -> Result<usize, ParseErr> {
+ mut unprocessed_path: PathBuf) -> Result<usize, ParseErr>
+{
+ unprocessed_path.push("unprocessed");
+ let disk_buffer = fs::OpenOptions::new().write(true).create(true).open(&unprocessed_path)
+ .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?;
+ let mut disk_buffer = BufWriter::new(disk_buffer);
let mut number_of_arguments = 0;
if lists.len() > 1 {
@@ -321,27 +325,27 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
{
let mut iter = permutation_buffer.iter();
disk_buffer.write(iter.next().unwrap().as_bytes())
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
number_of_arguments += 1;
}
if max_args < 2 {
- disk_buffer.write(b"\n").map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ disk_buffer.write(b"\n").map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
while let Ok(true) = permutator.next_with_buffer(&mut permutation_buffer) {
let mut iter = permutation_buffer.iter();
disk_buffer.write(iter.next().unwrap().as_bytes())
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
disk_buffer.write(b"\n")
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
number_of_arguments += 1;
}
} else {
@@ -353,34 +357,34 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
number_of_arguments += 1;
disk_buffer.write(iter.next().unwrap().as_bytes())
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
} else if max_args_index == 1 {
max_args_index = max_args;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
disk_buffer.write(b"\n")
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
} else {
max_args_index -= 1;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
}
@@ -389,7 +393,7 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
for input in current_inputs {
disk_buffer.write(input.as_bytes())
.and_then(|_| disk_buffer.write(b"\n"))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
number_of_arguments += 1;
}
} else {
@@ -401,12 +405,12 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
while index != max_index {
disk_buffer.write(chunk[index].as_bytes())
.and_then(|_| disk_buffer.write(b" "))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
index += 1;
}
disk_buffer.write(chunk[max_index].as_bytes())
.and_then(|_| disk_buffer.write(b"\n"))
- .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
+ .map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
Ok(number_of_arguments)
@@ -444,9 +448,9 @@ fn parse_inputs(arguments: &[String], mut index: usize, current_inputs: &mut Vec
index += 1;
match argument.as_str() {
// `:::` denotes that the next set of inputs will be added to a new list.
- ":::" => switch_mode!(Mode::Inputs),
+ ":::" => switch_mode!(Mode::Inputs),
// `:::+` denotes that the next set of inputs will be added to the current list.
- ":::+" => switch_mode!(append Mode::InputsAppend),
+ ":::+" => switch_mode!(append Mode::InputsAppend),
// `::::` denotes that the next set of inputs will be added to a new list.
"::::" => switch_mode!(Mode::Files),
// `:::+` denotes that the next set of inputs will be added to the current list.
@@ -513,7 +517,7 @@ fn parse_memory(input: &str) -> Result<u64, ParseIntError> {
'T' => &input[..input.len()-1].parse::<u64>()? * 1_099_511_627_776,
'p' => &input[..input.len()-1].parse::<u64>()? * 1_000_000_000_000_000,
'P' => &input[..input.len()-1].parse::<u64>()? * 1_125_899_906_842_624,
- _ => input.parse::<u64>()?
+ _ => input.parse::<u64>()?
};
Ok(result)
}
@@ -532,8 +536,8 @@ fn parse_jobs(argument: &str, next_argument: Option<&String>, index: &mut usize)
/// Attempts to open an input argument and adds each line to the `inputs` list.
fn file_parse<P: AsRef<Path>>(inputs: &mut Vec<String>, path: P) -> Result<(), ParseErr> {
- let path = path.as_ref();
- let file = fs::File::open(path).map_err(|err| ParseErr::File(FileErr::Open(path.to_owned(), err)))?;
+ let path = path.as_ref();
+ let file = fs::File::open(path).map_err(|err| ParseErr::File(FileErr::Open(path.to_owned(), err)))?;
let mut buffer = BufReader::new(file).lines();
if let Some(line) = buffer.next() {
if let Ok(line) = line {
diff --git a/src/execute/child.rs b/src/execute/child.rs
index 50a21a9..b44170b 100644
--- a/src/execute/child.rs
+++ b/src/execute/child.rs
@@ -9,15 +9,15 @@ use super::pipe::disk::output as pipe_output;
use super::pipe::disk::State;
pub fn handle_child(mut child: Child, output: &Sender<State>, flags: u16, job_id: usize, input: String,
- has_timeout: bool, timeout: Duration) -> (Timespec, Timespec, i32, i32)
+ has_timeout: bool, timeout: Duration, base: &str) -> (Timespec, Timespec, i32, i32)
{
let start_time = get_time();
if has_timeout && child.wait_timeout(timeout).unwrap().is_none() {
let _ = child.kill();
- pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0);
+ pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0, base);
(start_time, get_time(), -1, 15)
} else {
- pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0);
+ pipe_output(&mut child, job_id, input, output, flags & QUIET_MODE != 0, base);
match child.wait() {
Ok(status) => match status.code() {
Some(exit) => (start_time, get_time(), exit, 0),
diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs
index dc5feb2..a7f1bf8 100644
--- a/src/execute/exec_commands.rs
+++ b/src/execute/exec_commands.rs
@@ -16,17 +16,18 @@ use std::time::Duration;
/// Contains all the required data needed for executing commands in parallel.
/// Commands will be generated based on a template of argument tokens combined
/// with the current input argument.
-pub struct ExecCommands<'a> {
+pub struct ExecCommands {
pub slot: usize,
pub num_inputs: usize,
pub flags: u16,
pub timeout: Duration,
pub inputs: InputsLock,
pub output_tx: Sender<State>,
- pub arguments: &'a [Token],
+ pub arguments: &'static [Token],
+ pub tempdir: String,
}
-impl<'a> ExecCommands<'a> {
+impl ExecCommands {
pub fn run(&mut self) {
let stdout = io::stdout();
let stderr = io::stderr();
@@ -55,11 +56,12 @@ impl<'a> ExecCommands<'a> {
command_template: self.arguments,
flags: self.flags
};
-
+
command_buffer.clear();
let (start_time, end_time, exit_value, signal) = match command.exec(command_buffer) {
Ok(child) => {
- handle_child(child, &self.output_tx, self.flags, job_id, input.clone(), has_timeout, self.timeout)
+ handle_child(child, &self.output_tx, self.flags, job_id, input.clone(), has_timeout, self.timeout,
+ &self.tempdir)
},
Err(cmd_err) => {
let mut stderr = stderr.lock();
diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs
index 440a2f7..ab38a64 100644
--- a/src/execute/exec_inputs.rs
+++ b/src/execute/exec_inputs.rs
@@ -20,6 +20,7 @@ pub struct ExecInputs {
pub timeout: Duration,
pub inputs: InputsLock,
pub output_tx: Sender<State>,
+ pub tempdir: String,
}
impl ExecInputs {
@@ -44,7 +45,8 @@ impl ExecInputs {
let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) {
Ok(child) => {
- handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout)
+ handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout,
+ &self.tempdir)
},
Err(why) => {
let mut stderr = stderr.lock();
diff --git a/src/execute/pipe.rs b/src/execute/pipe.rs
index 497c09d..e76f18d 100644
--- a/src/execute/pipe.rs
+++ b/src/execute/pipe.rs
@@ -19,8 +19,10 @@ pub mod disk {
/// Sends messages received by a `Child` process's standard output and error and sends them
/// to be handled by the grouped output channel.
- pub fn output(child: &mut Child, job_id: usize, name: String, output_tx: &Sender<State>, quiet: bool) {
- let (_, stdout_path, stderr_path) = filepaths::new_job(job_id);
+ pub fn output(child: &mut Child, job_id: usize, name: String, output_tx: &Sender<State>, quiet: bool,
+ base: &str)
+ {
+ let (_, stdout_path, stderr_path) = filepaths::new_job(base, job_id);
let mut stdout_file = File::create(stdout_path).expect("unable to create job stdout file");
let mut stderr_file = File::create(stderr_path).expect("unable to create job stderr file");
diff --git a/src/execute/receive.rs b/src/execute/receive.rs
index a76ad89..b2b9174 100644
--- a/src/execute/receive.rs
+++ b/src/execute/receive.rs
@@ -69,7 +69,9 @@ macro_rules! append_to_processed {
#[allow(cyclomatic_complexity)]
/// Tail and print the standard output and error of each process in the correct order
-pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &Path, errors_path: &Path) {
+pub fn receive_messages(input_rx: Receiver<State>, args: Args, base: &str, processed_path: &Path,
+ errors_path: &Path)
+{
let stdout = io::stdout();
let stderr = io::stderr();
@@ -98,7 +100,7 @@ pub fn receive_messages(input_rx: Receiver<State>, args: Args, processed_path: &
// A buffer for converting job ID's into a byte array representation of a string.
let mut id_buffer = [0u8; 64];
// Generates the stdout and stderr paths, along with a truncation value to truncate the job ID from the paths.
- let (truncate_size, mut stdout_path, mut stderr_path) = filepaths::new_job(counter);
+ let (truncate_size, mut stdout_path, mut stderr_path) = filepaths::new_job(base, counter);
// If the joblog parameter was passed, open the file for writing.
let mut joblog = args.joblog.map(|path| {
job_counter = 0;
diff --git a/src/filepaths.rs b/src/filepaths.rs
index 4bb135b..f85f86f 100644
--- a/src/filepaths.rs
+++ b/src/filepaths.rs
@@ -1,106 +1,31 @@
-use std::env::home_dir;
use std::path::PathBuf;
use misc::NumToA;
#[cfg(not(windows))]
pub fn base() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push(".local/share/parallel");
- path
- })
+ Some(PathBuf::from("/tmp/parallel"))
}
#[cfg(windows)]
pub fn base() -> Option<PathBuf> {
+ use std::env::home_dir;
home_dir().map(|mut path| {
path.push("AppData/Local/Temp/parallel");
path
})
}
-#[cfg(not(windows))]
-pub fn processed() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push(".local/share/parallel/processed");
- path
- })
-}
-
-#[cfg(windows)]
-pub fn processed() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push("AppData/Local/Temp/parallel/processed");
- path
- })
-}
-
-#[cfg(not(windows))]
-pub fn unprocessed() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push(".local/share/parallel/unprocessed");
- path
- })
-}
-
-#[cfg(windows)]
-pub fn unprocessed() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push("AppData/Local/Temp/parallel/unprocessed");
- path
- })
-}
-
-
-#[cfg(not(windows))]
-pub fn errors() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push(".local/share/parallel/errors");
- path
- })
-}
-
-#[cfg(windows)]
-pub fn errors() -> Option<PathBuf> {
- home_dir().map(|mut path| {
- path.push("AppData/Local/Temp/parallel/errors");
- path
- })
-}
-
-pub fn outputs_path() -> PathBuf {
- PathBuf::from("/tmp/parallel/")
-}
-
-#[cfg(not(windows))]
-pub fn new_job(id: usize) -> (usize, String, String) {
+pub fn new_job(base: &str, id: usize) -> (usize, String, String) {
let id = id.to_string();
- let mut stdout = String::from("/tmp/parallel/stdout_");
- let mut stderr = String::from("/tmp/parallel/stderr_");
+ let mut stdout = String::from(base) + "/stdout_";
+ let mut stderr = String::from(base) + "/stderr_";
let truncate_value = stdout.len();
stdout.push_str(&id);
stderr.push_str(&id);
+ println!("stdout: '{}'", stdout);
(truncate_value, stdout, stderr)
}
-#[cfg(windows)]
-pub fn new_job(id: usize, buffer: &mut [u8; 64]) -> (usize, String, String) {
- home_dir().map(|home| {
- let mut stdout = home.to_str().unwrap().to_owned();
- let mut stderr = stdout.clone();
- stdout.push_str("AppData/Local/Temp/parallel/stdout_");
- stderr.push_str("AppData/Local/Temp/parallel/stderr_");
- let truncate_value = stdout.len();
-
- let length = itoa(buffer, id, 10);
- for byte in &buffer[0..length] {
- stdout.push(*byte as char);
- stderr.push(*byte as char);
- }
-
- (truncate_value, stdout, stderr)
- }).expect("parallel: unable to open home folder")
-}
-
pub fn next_job_path(id: usize, truncate: usize, buffer: &mut [u8; 64], stdout: &mut String, stderr: &mut String) {
stdout.truncate(truncate);
stderr.truncate(truncate);
diff --git a/src/init.rs b/src/init.rs
deleted file mode 100644
index 6a8e4c4..0000000
--- a/src/init.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-use std::fs::{read_dir, create_dir_all, remove_file, remove_dir, File};
-use std::io::{StderrLock, Write};
-use std::path::{Path, PathBuf};
-use std::process::exit;
-
-use super::arguments::{FileErr, Args};
-use super::filepaths;
-
-fn remove_preexisting_files() -> Result<(PathBuf, PathBuf, PathBuf), FileErr> {
- // Initialize the base directories of the unprocessed and processed files.
- let path = filepaths::base().ok_or(FileErr::Path)?;
-
- // Create the directories that are required for storing input files.
- create_dir_all(&path).map_err(|why| FileErr::DirectoryCreate(path.clone(), why))?;
- if cfg!(not(windows)) {
- let outputs_path = filepaths::outputs_path();
- create_dir_all(&outputs_path).map_err(|why| FileErr::DirectoryCreate(outputs_path, why))?;
- }
-
- // Attempt to obtain a listing of all the directories and files within the base directory.
- let directory = read_dir(&path).map_err(|why| FileErr::DirectoryRead(path.clone(), why))?;
- for entry in directory {
- let entry = entry.map_err(|why| FileErr::DirectoryRead(path.clone(), why))?;
- let entry_is_file = entry.file_type().ok().map_or(true, |x| !x.is_dir());
- if entry_is_file {
- remove_file(entry.path()).map_err(|why| FileErr::Remove(path.clone(), why))?;
- } else {
- remove_dir(entry.path()).map_err(|why| FileErr::Remove(path.clone(), why))?;
- }
- }
-
- // Create empty logs ahead of time
- let errors = filepaths::errors().ok_or(FileErr::Path)?;
- let unprocessed = filepaths::unprocessed().ok_or(FileErr::Path)?;
- let processed = filepaths::processed().ok_or(FileErr::Path)?;
- File::create(&errors).map_err(|why| FileErr::Create(errors.clone(), why))?;
- File::create(&unprocessed).map_err(|why| FileErr::Create(unprocessed.clone(), why))?;
- File::create(&processed).map_err(|why| FileErr::Create(processed.clone(), why))?;
-
- Ok((unprocessed, processed, errors))
-}
-
-pub fn cleanup(stderr: &mut StderrLock) -> (PathBuf, PathBuf, PathBuf) {
- match remove_preexisting_files() {
- Ok(values) => values,
- Err(why) => {
- let _ = stderr.write(b"parallel: initialization error: I/O error: ");
- match why {
- FileErr::Create(path, why) => {
- let _ = write!(stderr, "unable to create file: {:?}: {}", path, why);