summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Aaron Murphy <mmstickman@gmail.com>2017-01-10 20:46:40 -0500
committerMichael Aaron Murphy <mmstickman@gmail.com>2017-01-11 17:03:03 -0500
commitb2a2997d08df4407bbd0ef778c4be805c1cf9845 (patch)
treeaaaefa94ae8cccf3d4e1bcb9d89ffe200af38082
parentee95fbd68e85c2ee81352a0fd6f3fc0723f4cd18 (diff)
Implement JobLog Parameter & Refactoring Work
- The **--joblog** parameter is now successfully implemented - Replaced all instances of DiskBufferWriter with BufWriter - Moved the `itoa` related functions into a new `misc` module - Converted the `itoa` functions into a `NumToA` trait - Added a `Digits` function to count the number of digits in a number
-rw-r--r--Cargo.lock10
-rw-r--r--src/arguments/errors.rs4
-rw-r--r--src/arguments/mod.rs118
-rw-r--r--src/disk_buffer/mod.rs73
-rw-r--r--src/execute/dry.rs6
-rw-r--r--src/execute/exec_commands.rs38
-rw-r--r--src/execute/exec_inputs.rs36
-rw-r--r--src/execute/job_log.rs99
-rw-r--r--src/execute/mod.rs2
-rw-r--r--src/execute/pipe.rs3
-rw-r--r--src/execute/receive.rs152
-rw-r--r--src/execute/signals.rs12
-rw-r--r--src/filepaths.rs4
-rw-r--r--src/init.rs8
-rw-r--r--src/input_iterator/iterator.rs7
-rw-r--r--src/input_iterator/lock.rs8
-rw-r--r--src/itoa_array.rs33
-rw-r--r--src/main.rs22
-rw-r--r--src/misc/digits.rs17
-rw-r--r--src/misc/mod.rs10
-rw-r--r--src/misc/numtoa.rs94
-rw-r--r--src/shell.rs1
22 files changed, 501 insertions, 256 deletions
diff --git a/Cargo.lock b/Cargo.lock
index dcdbe4f..171be53 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -42,7 +42,7 @@ dependencies = [
[[package]]
name = "libc"
-version = "0.2.18"
+version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@@ -58,7 +58,7 @@ name = "num_cpus"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
- "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -90,7 +90,7 @@ version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
- "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -99,7 +99,7 @@ name = "wait-timeout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
- "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -117,7 +117,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum gcc 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "3689e1982a563af74960ae3a4758aa632bb8fd984cfc3cc3b60ee6109477ab6e"
"checksum itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3088ea4baeceb0284ee9eea42f591226e6beaecf65373e41b38d95a1b8e7a1"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
-"checksum libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "a51822fc847e7a8101514d1d44e354ba2ffa7d4c194dcab48870740e327cac70"
+"checksum libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)" = "9e030dc72013ed68994d1b2cbf36a94dd0e58418ba949c4b0db7eeb70a7a6352"
"checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5"
"checksum num_cpus 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a225d1e2717567599c24f88e49f00856c6e825a12125181ee42c4257e3688d39"
"checksum odds 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)" = "c3df9b730298cea3a1c3faa90b7e2f9df3a9c400d0936d6015e6165734eefcba"
diff --git a/src/arguments/errors.rs b/src/arguments/errors.rs
index ed6e367..f3480d7 100644
--- a/src/arguments/errors.rs
+++ b/src/arguments/errors.rs
@@ -28,6 +28,7 @@ pub enum ParseErr {
DelayNoValue,
/// An error occurred with accessing the unprocessed file.
File(FileErr),
+ JoblogNoValue,
/// The value of jobs was not set to a number.
JobsNaN(String),
/// No value was provided for the jobs flag.
@@ -92,6 +93,9 @@ impl ParseErr {
ParseErr::DelayNoValue => {
let _ = stderr.write(b"no delay parameter was defined.\n");
},
+ ParseErr::JoblogNoValue => {
+ let _ = stderr.write(b"no joblog parameter was defined.\n");
+ },
ParseErr::JobsNaN(value) => {
let _ = write!(stderr, "jobs parameter, '{}', is not a number.\n", value);
},
diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs
index d93e39c..ed66a83 100644
--- a/src/arguments/mod.rs
+++ b/src/arguments/mod.rs
@@ -5,16 +5,14 @@ mod man;
mod redirection;
use std::env;
-use std::fs;
-use std::io::{self, BufRead, BufReader};
+use std::fs::{self, File};
+use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::num::ParseIntError;
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::process::exit;
use std::time::Duration;
use arrayvec::ArrayVec;
-use disk_buffer::{self, DiskBufferTrait, DiskBufferWriter};
-use input_iterator::InputIterator;
use permutate::Permutator;
use tokenizer::Token;
use num_cpus;
@@ -35,6 +33,7 @@ pub const DASH_EXISTS: u16 = 32;
pub const DRY_RUN: u16 = 64;
pub const SHELL_QUOTE: u16 = 128;
pub const ETA: u16 = 256;
+pub const JOBLOG: u16 = 512;
/// `Args` is a collection of critical options and arguments that were collected at
/// startup of the application.
@@ -46,6 +45,7 @@ pub struct Args {
pub delay: Duration,
pub timeout: Duration,
pub arguments: ArrayVec<[Token; 128]>,
+ pub joblog: Option<String>,
}
impl Args {
@@ -58,16 +58,18 @@ impl Args {
memory: 0,
delay: Duration::from_millis(0),
timeout: Duration::from_millis(0),
+ joblog: None,
}
}
/// Performs all the work related to parsing program arguments
pub fn parse(&mut self, comm: &mut String, arguments: &[String], unprocessed_path: &Path)
- -> Result<InputIterator, ParseErr>
+ -> Result<usize, ParseErr>
{
// Create a write buffer that automatically writes data to the disk when the buffer is full.
- let mut disk_buffer = disk_buffer::DiskBuffer::new(unprocessed_path).write()
+ let disk_buffer = fs::OpenOptions::new().write(true).append(true).open(unprocessed_path)
.map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?;
+ let mut disk_buffer = BufWriter::new(disk_buffer);
// Each list will consist of a series of input arguments
let mut lists: Vec<Vec<String>> = Vec::new();
@@ -137,6 +139,12 @@ impl Args {
println!("{}", man::MAN_PAGE);
exit(0);
},
+ "joblog" => {
+ let file = arguments.get(index).ok_or(ParseErr::JoblogNoValue)?;
+ self.joblog = Some(file.to_owned());
+ index += 1;
+ self.flags |= JOBLOG;
+ },
"jobs" => {
let val = arguments.get(index).ok_or(ParseErr::JobsNoValue)?;
self.ncores = jobs::parse(val)?;
@@ -228,38 +236,38 @@ impl Args {
parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?;
}
- number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?;
+ number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?;
} else if let Some(path) = redirection::input_was_redirected() {
self.flags |= INPUTS_ARE_COMMANDS;
file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?;
- number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?;
+ number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?;
}
- if disk_buffer.is_empty() {
- number_of_arguments = write_stdin_to_disk(&mut disk_buffer, max_args)?;
+ if number_of_arguments == 0 {
+ number_of_arguments = write_stdin_to_disk(&mut disk_buffer, max_args, unprocessed_path)?;
}
if number_of_arguments == 0 { return Err(ParseErr::NoArguments); }
// Flush the contents of the buffer to the disk before tokenizing the command argument.
- disk_buffer.flush().map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ let _ = disk_buffer.flush();
- // Return an `InputIterator` of the arguments contained within the unprocessed file.
- let inputs = InputIterator::new(unprocessed_path, number_of_arguments).map_err(ParseErr::File)?;
- Ok(inputs)
+ Ok(number_of_arguments)
}
}
/// Write all arguments from standard input to the disk, recording the number of arguments that were read.
-fn write_stdin_to_disk(disk_buffer: &mut DiskBufferWriter, max_args: usize) -> Result<usize, ParseErr> {
+fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unprocessed_path: &Path)
+ -> Result<usize, ParseErr>
+{
let mut number_of_arguments = 0;
let stdin = io::stdin();
if max_args < 2 {
for line in stdin.lock().lines() {
if let Ok(line) = line {
- disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write_byte(b'\n'))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write(b"\n"))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
number_of_arguments += 1;
}
}
@@ -271,24 +279,24 @@ fn write_stdin_to_disk(disk_buffer: &mut DiskBufferWriter, max_args: usize) -> R
max_args_index -= 1;
number_of_arguments += 1;
disk_buffer.write(line.as_bytes())
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
} else if max_args_index == 1 {
max_args_index = max_args;
- disk_buffer.write_byte(b' ')
+ disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(line.as_bytes()))
- .and_then(|_| disk_buffer.write_byte(b'\n'))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .and_then(|_| disk_buffer.write(b"\n"))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
} else {
max_args_index -= 1;
- disk_buffer.write_byte(b' ')
+ disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(line.as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
}
}
if max_args_index != max_args {
- disk_buffer.write_byte(b'\n')
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b"\n")
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
}
@@ -297,7 +305,7 @@ fn write_stdin_to_disk(disk_buffer: &mut DiskBufferWriter, max_args: usize) -> R
/// Write all input arguments buffered in memory to the disk, recording the number of arguments that were read.
fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, max_args: usize,
- disk_buffer: &mut DiskBufferWriter) -> Result<usize, ParseErr> {
+ disk_buffer: &mut BufWriter<File>, unprocessed_path: &Path) -> Result<usize, ParseErr> {
let mut number_of_arguments = 0;
if lists.len() > 1 {
@@ -317,27 +325,27 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
{
let mut iter = permutation_buffer.iter();
disk_buffer.write(iter.next().unwrap().as_bytes())
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
for element in iter {
- disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
number_of_arguments += 1;
}
if max_args < 2 {
- disk_buffer.write_byte(b'\n').map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b"\n").map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
while let Ok(true) = permutator.next_with_buffer(&mut permutation_buffer) {
let mut iter = permutation_buffer.iter();
disk_buffer.write(iter.next().unwrap().as_bytes())
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
for element in iter {
- disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
- disk_buffer.write_byte(b'\n')
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b"\n")
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
number_of_arguments += 1;
}
} else {
@@ -349,34 +357,34 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
number_of_arguments += 1;
disk_buffer.write(iter.next().unwrap().as_bytes())
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
for element in iter {
- disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
} else if max_args_index == 1 {
max_args_index = max_args;
- disk_buffer.write_byte(b' ')
+ disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
for element in iter {
- disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
- disk_buffer.write_byte(b'\n')
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b"\n")
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
} else {
max_args_index -= 1;
- disk_buffer.write_byte(b' ')
+ disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
for element in iter {
- disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes()))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
}
}
@@ -384,8 +392,8 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
} else if max_args < 2 {
for input in current_inputs {
disk_buffer.write(input.as_bytes())
- .and_then(|_| disk_buffer.write_byte(b'\n'))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .and_then(|_| disk_buffer.write(b"\n"))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
number_of_arguments += 1;
}
} else {
@@ -396,13 +404,13 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma
while index != max_index {
disk_buffer.write(chunk[index].as_bytes())
- .and_then(|_| disk_buffer.write_byte(b' '))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .and_then(|_| disk_buffer.write(b" "))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
index += 1;
}
disk_buffer.write(chunk[max_index].as_bytes())
- .and_then(|_| disk_buffer.write_byte(b'\n'))
- .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?;
+ .and_then(|_| disk_buffer.write(b"\n"))
+ .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?;
}
}
Ok(number_of_arguments)
diff --git a/src/disk_buffer/mod.rs b/src/disk_buffer/mod.rs
index 7976230..901d594 100644
--- a/src/disk_buffer/mod.rs
+++ b/src/disk_buffer/mod.rs
@@ -1,6 +1,6 @@
use std::path::{Path, PathBuf};
use std::fs;
-use std::io::{Error, Read, Write};
+use std::io::{Error, Read};
/// Controls the size of the buffers for reading/writing to files.
pub const BUFFER_SIZE: usize = 8 * 1024; // 8K seems to be the best buffer size.
@@ -25,16 +25,8 @@ pub struct DiskBuffer {
impl DiskBuffer {
// Create a new `DiskBuffer` from a `Path`.
- pub fn new(path: &Path) -> DiskBuffer { DiskBuffer { path: path.to_owned() } }
-
- /// Transform the `DiskBuffer` into a `DiskBufferWriter`.
- pub fn write(self) -> Result<DiskBufferWriter, Error> {
- fs::OpenOptions::new().create(true).write(true).open(&self.path)?;
- Ok(DiskBufferWriter {
- data: [b'\0'; BUFFER_SIZE],
- capacity: 0,
- path: self.path,
- })
+ pub fn new<P: AsRef<Path>>(path: P) -> DiskBuffer {
+ DiskBuffer { path: path.as_ref().to_owned() }
}
/// Transform the `DiskBuffer` into a `DiskBufferReader`
@@ -81,65 +73,6 @@ impl DiskBufferReader {
}
}
-/// A `DiskBufferWriter` only contains methods for buffering writes to a file.
-pub struct DiskBufferWriter {
- pub data: [u8; BUFFER_SIZE],
- pub capacity: usize,
- pub path: PathBuf,
-}
-
-impl DiskBufferTrait for DiskBufferWriter {
- fn clear(&mut self) { self.capacity = 0; }
- fn get_ref(&self) -> &[u8] { &self.data[0..self.capacity] }
- fn is_empty(&self) -> bool { self.capacity == 0 }
-}
-
-impl DiskBufferWriter {
- /// Write a byte slice to the buffer and mark the new size.
- pub fn write(&mut self, data: &[u8]) -> Result<(), Error> {
- let cap = data.len();
-
- // If the input `data`'s capacity would overrun the internal buffer, append the internal buffer's data
- // to the file and clear it before adding the input's data into the buffer.
- if cap + self.capacity > BUFFER_SIZE {
- fs::OpenOptions::new().write(true).append(true).open(&self.path)
- .and_then(|mut file| file.write(self.get_ref()))?;
- self.clear();
- }
-
- // Copy the input's data into the internal buffer and mark the new size of the internal buffer.
- self.data[self.capacity..self.capacity + cap].clone_from_slice(data);
- self.capacity += cap;
- Ok(())
- }
-
- /// Append an individual byte to the buffer, typically a space or newline.
- pub fn write_byte(&mut self, data: u8) -> Result<(), Error> {
- // If the input `data`'s capacity would overrun the internal buffer, append the internal buffer's data
- // to the file and clear it before adding the input's data into the buffer.
- if self.capacity + 1 > BUFFER_SIZE {
- fs::OpenOptions::new().write(true).append(true).open(&self.path)
- .and_then(|mut file| file.write(self.get_ref()))?;
- self.clear();
- }
-
- // Copy the input's data into the internal buffer and mark the new size of the internal buffer.
- self.data[self.capacity] = data;
- self.capacity += 1;
- Ok(())
- }
-
- /// If data has not yet been written to the disk, flush it's contents.
- pub fn flush(&mut self) -> Result<usize, Error> {
- if !self.is_empty() {
- return fs::OpenOptions::new().write(true).append(true).open(&self.path)
- .and_then(|mut file| file.write(self.get_ref()));
- }
- self.capacity = 0;
- Ok(0)
- }
-}
-
#[test]
fn test_disk_buffer_reader_simple() {
let file = include_bytes!("../../tests/buffer.dat");
diff --git a/src/execute/dry.rs b/src/execute/dry.rs
index ddd72cd..d7baddb 100644
--- a/src/execute/dry.rs
+++ b/src/execute/dry.rs
@@ -1,8 +1,8 @@
use input_iterator::InputIterator;
-use itoa_array::itoa;
use tokenizer::Token;
use arguments::{self, InputIteratorErr};
use execute::command;
+use misc::NumToA;
use std::io::{self, StdoutLock, Write};
@@ -18,7 +18,7 @@ pub fn dry_run(flags: u16, inputs: InputIterator, arguments: &[Token]) {
let pipe = flags & arguments::PIPE_IS_ENABLED != 0;
let mut id_buffer = [0u8; 64];
let mut total_buffer = [0u8; 64];
- let truncate = itoa(&mut total_buffer, inputs.total_arguments, 10);
+ let truncate = inputs.total_arguments.numtoa(10, &mut total_buffer);
let job_total = &total_buffer[0..truncate];
// If `SHELL_QUOTE` is enabled then the quoted command will be printed, otherwise the command will be
@@ -40,7 +40,7 @@ pub fn dry_run(flags: u16, inputs: InputIterator, arguments: &[Token]) {
for (job_id, input) in inputs.enumerate() {
match input {
Ok(input) => {
- let truncate = itoa(&mut id_buffer, job_id, 10);
+ let truncate = job_id.numtoa(10, &mut id_buffer);
let command = command::ParallelCommand {
slot_no: slot,
job_no: &id_buffer[0..truncate],
diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs
index e395433..59cec66 100644
--- a/src/execute/exec_commands.rs
+++ b/src/execute/exec_commands.rs
@@ -1,12 +1,15 @@
-use arguments::{QUIET_MODE, VERBOSE_MODE};
+use arguments::{QUIET_MODE, VERBOSE_MODE, JOBLOG};
use execute::command::{self, CommandErr};
use input_iterator::InputsLock;
-use itoa_array::itoa;
+use misc::NumToA;
+use time::{self, Timespec};
use tokenizer::Token;
use wait_timeout::ChildExt;
use verbose;
use super::pipe::disk::output as pipe_output;
use super::pipe::disk::State;
+use super::job_log::JobLog;
+use super::signals;
use std::io::{self, Write};
use std::sync::mpsc::Sender;
@@ -37,15 +40,15 @@ impl<'a> ExecCommands<'a> {
let mut id_buffer = [0u8; 64];
let mut total_buffer = [0u8; 64];
- let truncate = itoa(&mut total_buffer, self.num_inputs, 10);
+ let truncate = self.num_inputs.numtoa(10, &mut total_buffer);
let job_total = &total_buffer[0..truncate];
- while let Some((job_id, _)) = self.inputs.try_next(&mut input) {
+ while let Some(job_id) = self.inputs.try_next(&mut input) {
if self.flags & VERBOSE_MODE != 0 {
verbose::processing_task(&stdout, job_id+1, self.num_inputs, &input);
}
- let truncate = itoa(&mut id_buffer, job_id+1, 10);
+ let truncate = (job_id+1).numtoa(10, &mut id_buffer);
let command = command::ParallelCommand {
slot_no: slot,
job_no: &id_buffer[0..truncate],
@@ -56,14 +59,22 @@ impl<'a> ExecCommands<'a> {
};
command_buffer.clear();
- match command.exec(command_buffer) {
+ let (start_time, end_time, exit_value, signal) = match command.exec(command_buffer) {
Ok(mut child) => {
+ let start_time = time::get_time();
if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() {
let _ = child.kill();
pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0);
+ (start_time, time::get_time(), -1, 15)
} else {
pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0);
- let _ = child.wait();
+ match child.wait() {
+ Ok(status) => match status.code() {
+ Some(exit) => (start_time, time::get_time(), exit, 0),
+ None => (start_time, time::get_time(), -1, signals::get(status))
+ },
+ Err(_) => (start_time, time::get_time(), -1, 0),
+ }
}
},
Err(cmd_err) => {
@@ -76,7 +87,20 @@ impl<'a> ExecCommands<'a> {
let _ = stderr.write(message.as_bytes());
let message = format!("{}: {}: {}", job_id+1, command.input, message);
let _ = self.output_tx.send(State::Error(job_id, message));
+ (Timespec::new(0, 0), Timespec::new(0, 0), -1, 0)
}
+ };
+
+ if self.flags & JOBLOG != 0 {
+ let runtime = end_time - start_time;
+ let _ = self.output_tx.send(State::JobLog(JobLog {
+ job_id: job_id,
+ start_time: start_time,
+ runtime: runtime.num_nanoseconds().unwrap_or(0) as u64,
+ exit_value: exit_value,
+ signal: signal,
+ command: command_buffer.clone(),
+ }));
}
if self.flags & VERBOSE_MODE != 0 {
diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs
index 204967b..31ee5a6 100644
--- a/src/execute/exec_inputs.rs
+++ b/src/execute/exec_inputs.rs
@@ -1,11 +1,14 @@
-use arguments;
+use arguments::{self, JOBLOG, QUIET_MODE};
use execute::command;
use input_iterator::InputsLock;
use shell;
+use time::{self, Timespec};
use verbose;
use wait_timeout::ChildExt;
+use super::job_log::JobLog;
use super::pipe::disk::output as pipe_output;
use super::pipe::disk::State;
+use super::signals;
use std::u16;
use std::time::Duration;
@@ -29,7 +32,7 @@ impl ExecInputs {
let has_timeout = self.timeout != Duration::from_millis(0);
let mut input = String::with_capacity(64);
- while let Some((job_id, _)) = self.inputs.try_next(&mut input) {
+ while let Some(job_id) = self.inputs.try_next(&mut input) {
if flags & arguments::VERBOSE_MODE != 0 {
verbose::processing_task(&stdout, job_id+1, self.num_inputs, &input);
}
@@ -41,14 +44,22 @@ impl ExecInputs {
flags &= u16::MAX ^ arguments::SHELL_ENABLED;
}
- match command::get_command_output(&input, flags) {
+ let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) {
Ok(mut child) => {
+ let start_time = time::get_time();
if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() {
let _ = child.kill();
- pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & arguments::QUIET_MODE != 0);
+ pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0);
+ (start_time, time::get_time(), -1, 15)
} else {
- pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & arguments::QUIET_MODE != 0);
- let _ = child.wait();
+ pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0);
+ match child.wait() {
+ Ok(status) => match status.code() {
+ Some(exit) => (start_time, time::get_time(), exit, 0),
+ None => (start_time, time::get_time(), -1, signals::get(status))
+ },
+ Err(_) => (start_time, time::get_time(), -1, 0),
+ }
}
},
Err(why) => {
@@ -56,7 +67,20 @@ impl ExecInputs {
let _ = write!(&mut stderr, "parallel: command error: {}: {}\n", input, why);
let message = format!("{}: {}: {}\n", job_id, input, why);
let _ = self.output_tx.send(State::Error(job_id, message));
+ (Timespec::new(0, 0), Timespec::new(0, 0), -1, 0)
}
+ };
+
+ if flags & JOBLOG != 0 {
+ let runtime = end_time - start_time;
+ let _ = self.output_tx.send(State::JobLog(JobLog {
+ job_id: job_id,
+ start_time: start_time,
+ runtime: runtime.num_nanoseconds().unwrap_or(0) as u64,
+ exit_value: exit_value,
+ signal: signal,
+ command: input.clone(),
+ }));
}
if flags & arguments::VERBOSE_MODE != 0 {
diff --git a/src/execute/job_log.rs b/src/execute/job_log.rs
new file mode 100644
index 0000000..a07617a
--- /dev/null
+++ b/src/execute/job_log.rs
@@ -0,0 +1,99 @@
+use misc::NumToA;
+use std::fs::File;
+use std::io::{Write, BufWriter};
+use time::Timespec;
+
+pub struct JobLog {
+ pub job_id: usize,
+ pub start_time: Timespec,
+ pub runtime: u64,
+ pub exit_value: i32,
+ pub signal: i32,
+ pub command: String
+}
+
+impl JobLog {
+ pub fn write_entry(&self, joblog: &mut File, id_buffer: &mut [u8], pad: usize) {
+ // 1: JobID
+ let mut joblog = BufWriter::new(joblog);
+ let bytes_written = (self.job_id + 1).numtoa(10, id_buffer);
+ let _ = joblog.write(&id_buffer[0..bytes_written]);
+ for _ in 0..pad-bytes_written {
+ let _ = joblog.write(b" ");
+ }
+
+ // 2: StartTime in seconds
+ let bytes_written = (self.start_time.sec as u64).numtoa(10, id_buffer);
+ let _ = joblog.write(&id_buffer[0..bytes_written]);
+ let _ = joblog.write(b".");
+ let decimal = (self.start_time.nsec as u64 % 1_000_000_000) / 1_000_000;
+ if decimal == 0 {
+ let _ = joblog.write(b"000");
+ } else {
+ let bytes_written = decimal.numtoa(10, id_buffer);
+ match bytes_written {
+ 1 => { let _ = joblog.write(b"00"); },
+ 2 => { let _ = joblog.write(b"0"); },
+ _ => (),
+ };
+ let _ = joblog.write(&id_buffer[0..bytes_written]);
+ }
+ for _ in 0..16-(bytes_written+4) {
+ let _ = joblog.write(b" ");
+ }
+
+ // 3: Runtime in seconds
+ let bytes_written = (self.runtime / 1_000_000_000).numtoa(10, id_buffer);
+ for _ in 0..10-(bytes_written + 4) {
+ let _ = joblog.write(b" ");
+ }
+ let _ = joblog.write(&id_buffer[0..bytes_written]);
+ let _ = joblog.write(b".");
+ let decimal = (self.runtime % 1_000_000_000) / 1_000_000;
+ if decimal == 0 {
+ let _ = joblog.write(b"000");
+ } else {
+ let bytes_written = decimal.numtoa(10, id_buffer);
+ match bytes_written {
+ 1 => { let _ = joblog.write(b"00"); },
+ 2 => { let _ = joblog.write(b"0"); },
+ _ => (),
+ };