diff options
author | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-10 20:46:40 -0500 |
---|---|---|
committer | Michael Aaron Murphy <mmstickman@gmail.com> | 2017-01-11 17:03:03 -0500 |
commit | b2a2997d08df4407bbd0ef778c4be805c1cf9845 (patch) | |
tree | aaaefa94ae8cccf3d4e1bcb9d89ffe200af38082 | |
parent | ee95fbd68e85c2ee81352a0fd6f3fc0723f4cd18 (diff) |
Implement JobLog Parameter & Refactoring Work
- The **--joblog** parameter is now successfully implemented
- Replaced all instances of DiskBufferWriter with BufWriter
- Moved the `itoa` related functions into a new `misc` module
- Converted the `itoa` functions into a `NumToA` trait
- Added a `Digits` function to count the number of digits in a number
-rw-r--r-- | Cargo.lock | 10 | ||||
-rw-r--r-- | src/arguments/errors.rs | 4 | ||||
-rw-r--r-- | src/arguments/mod.rs | 118 | ||||
-rw-r--r-- | src/disk_buffer/mod.rs | 73 | ||||
-rw-r--r-- | src/execute/dry.rs | 6 | ||||
-rw-r--r-- | src/execute/exec_commands.rs | 38 | ||||
-rw-r--r-- | src/execute/exec_inputs.rs | 36 | ||||
-rw-r--r-- | src/execute/job_log.rs | 99 | ||||
-rw-r--r-- | src/execute/mod.rs | 2 | ||||
-rw-r--r-- | src/execute/pipe.rs | 3 | ||||
-rw-r--r-- | src/execute/receive.rs | 152 | ||||
-rw-r--r-- | src/execute/signals.rs | 12 | ||||
-rw-r--r-- | src/filepaths.rs | 4 | ||||
-rw-r--r-- | src/init.rs | 8 | ||||
-rw-r--r-- | src/input_iterator/iterator.rs | 7 | ||||
-rw-r--r-- | src/input_iterator/lock.rs | 8 | ||||
-rw-r--r-- | src/itoa_array.rs | 33 | ||||
-rw-r--r-- | src/main.rs | 22 | ||||
-rw-r--r-- | src/misc/digits.rs | 17 | ||||
-rw-r--r-- | src/misc/mod.rs | 10 | ||||
-rw-r--r-- | src/misc/numtoa.rs | 94 | ||||
-rw-r--r-- | src/shell.rs | 1 |
22 files changed, 501 insertions, 256 deletions
@@ -42,7 +42,7 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -58,7 +58,7 @@ name = "num_cpus" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -90,7 +90,7 @@ version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -99,7 +99,7 @@ name = "wait-timeout" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -117,7 +117,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum gcc 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "3689e1982a563af74960ae3a4758aa632bb8fd984cfc3cc3b60ee6109477ab6e" "checksum itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3088ea4baeceb0284ee9eea42f591226e6beaecf65373e41b38d95a1b8e7a1" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -"checksum libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "a51822fc847e7a8101514d1d44e354ba2ffa7d4c194dcab48870740e327cac70" +"checksum libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)" = "9e030dc72013ed68994d1b2cbf36a94dd0e58418ba949c4b0db7eeb70a7a6352" "checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5" "checksum num_cpus 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a225d1e2717567599c24f88e49f00856c6e825a12125181ee42c4257e3688d39" "checksum odds 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)" = "c3df9b730298cea3a1c3faa90b7e2f9df3a9c400d0936d6015e6165734eefcba" diff --git a/src/arguments/errors.rs b/src/arguments/errors.rs index ed6e367..f3480d7 100644 --- a/src/arguments/errors.rs +++ b/src/arguments/errors.rs @@ -28,6 +28,7 @@ pub enum ParseErr { DelayNoValue, /// An error occurred with accessing the unprocessed file. File(FileErr), + JoblogNoValue, /// The value of jobs was not set to a number. JobsNaN(String), /// No value was provided for the jobs flag. @@ -92,6 +93,9 @@ impl ParseErr { ParseErr::DelayNoValue => { let _ = stderr.write(b"no delay parameter was defined.\n"); }, + ParseErr::JoblogNoValue => { + let _ = stderr.write(b"no joblog parameter was defined.\n"); + }, ParseErr::JobsNaN(value) => { let _ = write!(stderr, "jobs parameter, '{}', is not a number.\n", value); }, diff --git a/src/arguments/mod.rs b/src/arguments/mod.rs index d93e39c..ed66a83 100644 --- a/src/arguments/mod.rs +++ b/src/arguments/mod.rs @@ -5,16 +5,14 @@ mod man; mod redirection; use std::env; -use std::fs; -use std::io::{self, BufRead, BufReader}; +use std::fs::{self, File}; +use std::io::{self, BufRead, BufReader, BufWriter, Write}; use std::num::ParseIntError; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::process::exit; use std::time::Duration; use arrayvec::ArrayVec; -use disk_buffer::{self, DiskBufferTrait, DiskBufferWriter}; -use input_iterator::InputIterator; use permutate::Permutator; use tokenizer::Token; use num_cpus; @@ -35,6 +33,7 @@ pub const DASH_EXISTS: u16 = 32; pub const DRY_RUN: u16 = 64; pub const SHELL_QUOTE: u16 = 128; pub const ETA: u16 = 256; +pub const JOBLOG: u16 = 512; /// `Args` is a collection of critical options and arguments that were collected at /// startup of the application. @@ -46,6 +45,7 @@ pub struct Args { pub delay: Duration, pub timeout: Duration, pub arguments: ArrayVec<[Token; 128]>, + pub joblog: Option<String>, } impl Args { @@ -58,16 +58,18 @@ impl Args { memory: 0, delay: Duration::from_millis(0), timeout: Duration::from_millis(0), + joblog: None, } } /// Performs all the work related to parsing program arguments pub fn parse(&mut self, comm: &mut String, arguments: &[String], unprocessed_path: &Path) - -> Result<InputIterator, ParseErr> + -> Result<usize, ParseErr> { // Create a write buffer that automatically writes data to the disk when the buffer is full. - let mut disk_buffer = disk_buffer::DiskBuffer::new(unprocessed_path).write() + let disk_buffer = fs::OpenOptions::new().write(true).append(true).open(unprocessed_path) .map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?; + let mut disk_buffer = BufWriter::new(disk_buffer); // Each list will consist of a series of input arguments let mut lists: Vec<Vec<String>> = Vec::new(); @@ -137,6 +139,12 @@ impl Args { println!("{}", man::MAN_PAGE); exit(0); }, + "joblog" => { + let file = arguments.get(index).ok_or(ParseErr::JoblogNoValue)?; + self.joblog = Some(file.to_owned()); + index += 1; + self.flags |= JOBLOG; + }, "jobs" => { let val = arguments.get(index).ok_or(ParseErr::JobsNoValue)?; self.ncores = jobs::parse(val)?; @@ -228,38 +236,38 @@ impl Args { parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode)?; } - number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?; + number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?; } else if let Some(path) = redirection::input_was_redirected() { self.flags |= INPUTS_ARE_COMMANDS; file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?)?; - number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer)?; + number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, &mut disk_buffer, unprocessed_path)?; } - if disk_buffer.is_empty() { - number_of_arguments = write_stdin_to_disk(&mut disk_buffer, max_args)?; + if number_of_arguments == 0 { + number_of_arguments = write_stdin_to_disk(&mut disk_buffer, max_args, unprocessed_path)?; } if number_of_arguments == 0 { return Err(ParseErr::NoArguments); } // Flush the contents of the buffer to the disk before tokenizing the command argument. - disk_buffer.flush().map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + let _ = disk_buffer.flush(); - // Return an `InputIterator` of the arguments contained within the unprocessed file. - let inputs = InputIterator::new(unprocessed_path, number_of_arguments).map_err(ParseErr::File)?; - Ok(inputs) + Ok(number_of_arguments) } } /// Write all arguments from standard input to the disk, recording the number of arguments that were read. -fn write_stdin_to_disk(disk_buffer: &mut DiskBufferWriter, max_args: usize) -> Result<usize, ParseErr> { +fn write_stdin_to_disk(disk_buffer: &mut BufWriter<File>, max_args: usize, unprocessed_path: &Path) + -> Result<usize, ParseErr> +{ let mut number_of_arguments = 0; let stdin = io::stdin(); if max_args < 2 { for line in stdin.lock().lines() { if let Ok(line) = line { - disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write_byte(b'\n')) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write(b"\n")) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; number_of_arguments += 1; } } @@ -271,24 +279,24 @@ fn write_stdin_to_disk(disk_buffer: &mut DiskBufferWriter, max_args: usize) -> R max_args_index -= 1; number_of_arguments += 1; disk_buffer.write(line.as_bytes()) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } else if max_args_index == 1 { max_args_index = max_args; - disk_buffer.write_byte(b' ') + disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(line.as_bytes())) - .and_then(|_| disk_buffer.write_byte(b'\n')) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .and_then(|_| disk_buffer.write(b"\n")) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } else { max_args_index -= 1; - disk_buffer.write_byte(b' ') + disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(line.as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } } } if max_args_index != max_args { - disk_buffer.write_byte(b'\n') - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b"\n") + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } } @@ -297,7 +305,7 @@ fn write_stdin_to_disk(disk_buffer: &mut DiskBufferWriter, max_args: usize) -> R /// Write all input arguments buffered in memory to the disk, recording the number of arguments that were read. fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, max_args: usize, - disk_buffer: &mut DiskBufferWriter) -> Result<usize, ParseErr> { + disk_buffer: &mut BufWriter<File>, unprocessed_path: &Path) -> Result<usize, ParseErr> { let mut number_of_arguments = 0; if lists.len() > 1 { @@ -317,27 +325,27 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma { let mut iter = permutation_buffer.iter(); disk_buffer.write(iter.next().unwrap().as_bytes()) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; for element in iter { - disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } number_of_arguments += 1; } if max_args < 2 { - disk_buffer.write_byte(b'\n').map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b"\n").map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; while let Ok(true) = permutator.next_with_buffer(&mut permutation_buffer) { let mut iter = permutation_buffer.iter(); disk_buffer.write(iter.next().unwrap().as_bytes()) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; for element in iter { - disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } - disk_buffer.write_byte(b'\n') - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b"\n") + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; number_of_arguments += 1; } } else { @@ -349,34 +357,34 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma number_of_arguments += 1; disk_buffer.write(iter.next().unwrap().as_bytes()) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; for element in iter { - disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } } else if max_args_index == 1 { max_args_index = max_args; - disk_buffer.write_byte(b' ') + disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; for element in iter { - disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } - disk_buffer.write_byte(b'\n') - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b"\n") + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } else { max_args_index -= 1; - disk_buffer.write_byte(b' ') + disk_buffer.write(b" ") .and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; for element in iter { - disk_buffer.write_byte(b' ').and_then(|_| disk_buffer.write(element.as_bytes())) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes())) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } } } @@ -384,8 +392,8 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma } else if max_args < 2 { for input in current_inputs { disk_buffer.write(input.as_bytes()) - .and_then(|_| disk_buffer.write_byte(b'\n')) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .and_then(|_| disk_buffer.write(b"\n")) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; number_of_arguments += 1; } } else { @@ -396,13 +404,13 @@ fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, ma while index != max_index { disk_buffer.write(chunk[index].as_bytes()) - .and_then(|_| disk_buffer.write_byte(b' ')) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .and_then(|_| disk_buffer.write(b" ")) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; index += 1; } disk_buffer.write(chunk[max_index].as_bytes()) - .and_then(|_| disk_buffer.write_byte(b'\n')) - .map_err(|why| FileErr::Write(disk_buffer.path.clone(), why))?; + .and_then(|_| disk_buffer.write(b"\n")) + .map_err(|why| FileErr::Write(PathBuf::from(unprocessed_path), why))?; } } Ok(number_of_arguments) diff --git a/src/disk_buffer/mod.rs b/src/disk_buffer/mod.rs index 7976230..901d594 100644 --- a/src/disk_buffer/mod.rs +++ b/src/disk_buffer/mod.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; use std::fs; -use std::io::{Error, Read, Write}; +use std::io::{Error, Read}; /// Controls the size of the buffers for reading/writing to files. pub const BUFFER_SIZE: usize = 8 * 1024; // 8K seems to be the best buffer size. @@ -25,16 +25,8 @@ pub struct DiskBuffer { impl DiskBuffer { // Create a new `DiskBuffer` from a `Path`. - pub fn new(path: &Path) -> DiskBuffer { DiskBuffer { path: path.to_owned() } } - - /// Transform the `DiskBuffer` into a `DiskBufferWriter`. - pub fn write(self) -> Result<DiskBufferWriter, Error> { - fs::OpenOptions::new().create(true).write(true).open(&self.path)?; - Ok(DiskBufferWriter { - data: [b'\0'; BUFFER_SIZE], - capacity: 0, - path: self.path, - }) + pub fn new<P: AsRef<Path>>(path: P) -> DiskBuffer { + DiskBuffer { path: path.as_ref().to_owned() } } /// Transform the `DiskBuffer` into a `DiskBufferReader` @@ -81,65 +73,6 @@ impl DiskBufferReader { } } -/// A `DiskBufferWriter` only contains methods for buffering writes to a file. -pub struct DiskBufferWriter { - pub data: [u8; BUFFER_SIZE], - pub capacity: usize, - pub path: PathBuf, -} - -impl DiskBufferTrait for DiskBufferWriter { - fn clear(&mut self) { self.capacity = 0; } - fn get_ref(&self) -> &[u8] { &self.data[0..self.capacity] } - fn is_empty(&self) -> bool { self.capacity == 0 } -} - -impl DiskBufferWriter { - /// Write a byte slice to the buffer and mark the new size. - pub fn write(&mut self, data: &[u8]) -> Result<(), Error> { - let cap = data.len(); - - // If the input `data`'s capacity would overrun the internal buffer, append the internal buffer's data - // to the file and clear it before adding the input's data into the buffer. - if cap + self.capacity > BUFFER_SIZE { - fs::OpenOptions::new().write(true).append(true).open(&self.path) - .and_then(|mut file| file.write(self.get_ref()))?; - self.clear(); - } - - // Copy the input's data into the internal buffer and mark the new size of the internal buffer. - self.data[self.capacity..self.capacity + cap].clone_from_slice(data); - self.capacity += cap; - Ok(()) - } - - /// Append an individual byte to the buffer, typically a space or newline. - pub fn write_byte(&mut self, data: u8) -> Result<(), Error> { - // If the input `data`'s capacity would overrun the internal buffer, append the internal buffer's data - // to the file and clear it before adding the input's data into the buffer. - if self.capacity + 1 > BUFFER_SIZE { - fs::OpenOptions::new().write(true).append(true).open(&self.path) - .and_then(|mut file| file.write(self.get_ref()))?; - self.clear(); - } - - // Copy the input's data into the internal buffer and mark the new size of the internal buffer. - self.data[self.capacity] = data; - self.capacity += 1; - Ok(()) - } - - /// If data has not yet been written to the disk, flush it's contents. - pub fn flush(&mut self) -> Result<usize, Error> { - if !self.is_empty() { - return fs::OpenOptions::new().write(true).append(true).open(&self.path) - .and_then(|mut file| file.write(self.get_ref())); - } - self.capacity = 0; - Ok(0) - } -} - #[test] fn test_disk_buffer_reader_simple() { let file = include_bytes!("../../tests/buffer.dat"); diff --git a/src/execute/dry.rs b/src/execute/dry.rs index ddd72cd..d7baddb 100644 --- a/src/execute/dry.rs +++ b/src/execute/dry.rs @@ -1,8 +1,8 @@ use input_iterator::InputIterator; -use itoa_array::itoa; use tokenizer::Token; use arguments::{self, InputIteratorErr}; use execute::command; +use misc::NumToA; use std::io::{self, StdoutLock, Write}; @@ -18,7 +18,7 @@ pub fn dry_run(flags: u16, inputs: InputIterator, arguments: &[Token]) { let pipe = flags & arguments::PIPE_IS_ENABLED != 0; let mut id_buffer = [0u8; 64]; let mut total_buffer = [0u8; 64]; - let truncate = itoa(&mut total_buffer, inputs.total_arguments, 10); + let truncate = inputs.total_arguments.numtoa(10, &mut total_buffer); let job_total = &total_buffer[0..truncate]; // If `SHELL_QUOTE` is enabled then the quoted command will be printed, otherwise the command will be @@ -40,7 +40,7 @@ pub fn dry_run(flags: u16, inputs: InputIterator, arguments: &[Token]) { for (job_id, input) in inputs.enumerate() { match input { Ok(input) => { - let truncate = itoa(&mut id_buffer, job_id, 10); + let truncate = job_id.numtoa(10, &mut id_buffer); let command = command::ParallelCommand { slot_no: slot, job_no: &id_buffer[0..truncate], diff --git a/src/execute/exec_commands.rs b/src/execute/exec_commands.rs index e395433..59cec66 100644 --- a/src/execute/exec_commands.rs +++ b/src/execute/exec_commands.rs @@ -1,12 +1,15 @@ -use arguments::{QUIET_MODE, VERBOSE_MODE}; +use arguments::{QUIET_MODE, VERBOSE_MODE, JOBLOG}; use execute::command::{self, CommandErr}; use input_iterator::InputsLock; -use itoa_array::itoa; +use misc::NumToA; +use time::{self, Timespec}; use tokenizer::Token; use wait_timeout::ChildExt; use verbose; use super::pipe::disk::output as pipe_output; use super::pipe::disk::State; +use super::job_log::JobLog; +use super::signals; use std::io::{self, Write}; use std::sync::mpsc::Sender; @@ -37,15 +40,15 @@ impl<'a> ExecCommands<'a> { let mut id_buffer = [0u8; 64]; let mut total_buffer = [0u8; 64]; - let truncate = itoa(&mut total_buffer, self.num_inputs, 10); + let truncate = self.num_inputs.numtoa(10, &mut total_buffer); let job_total = &total_buffer[0..truncate]; - while let Some((job_id, _)) = self.inputs.try_next(&mut input) { + while let Some(job_id) = self.inputs.try_next(&mut input) { if self.flags & VERBOSE_MODE != 0 { verbose::processing_task(&stdout, job_id+1, self.num_inputs, &input); } - let truncate = itoa(&mut id_buffer, job_id+1, 10); + let truncate = (job_id+1).numtoa(10, &mut id_buffer); let command = command::ParallelCommand { slot_no: slot, job_no: &id_buffer[0..truncate], @@ -56,14 +59,22 @@ impl<'a> ExecCommands<'a> { }; command_buffer.clear(); - match command.exec(command_buffer) { + let (start_time, end_time, exit_value, signal) = match command.exec(command_buffer) { Ok(mut child) => { + let start_time = time::get_time(); if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() { let _ = child.kill(); pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0); + (start_time, time::get_time(), -1, 15) } else { pipe_output(&mut child, job_id, input.clone(), &self.output_tx, self.flags & QUIET_MODE != 0); - let _ = child.wait(); + match child.wait() { + Ok(status) => match status.code() { + Some(exit) => (start_time, time::get_time(), exit, 0), + None => (start_time, time::get_time(), -1, signals::get(status)) + }, + Err(_) => (start_time, time::get_time(), -1, 0), + } } }, Err(cmd_err) => { @@ -76,7 +87,20 @@ impl<'a> ExecCommands<'a> { let _ = stderr.write(message.as_bytes()); let message = format!("{}: {}: {}", job_id+1, command.input, message); let _ = self.output_tx.send(State::Error(job_id, message)); + (Timespec::new(0, 0), Timespec::new(0, 0), -1, 0) } + }; + + if self.flags & JOBLOG != 0 { + let runtime = end_time - start_time; + let _ = self.output_tx.send(State::JobLog(JobLog { + job_id: job_id, + start_time: start_time, + runtime: runtime.num_nanoseconds().unwrap_or(0) as u64, + exit_value: exit_value, + signal: signal, + command: command_buffer.clone(), + })); } if self.flags & VERBOSE_MODE != 0 { diff --git a/src/execute/exec_inputs.rs b/src/execute/exec_inputs.rs index 204967b..31ee5a6 100644 --- a/src/execute/exec_inputs.rs +++ b/src/execute/exec_inputs.rs @@ -1,11 +1,14 @@ -use arguments; +use arguments::{self, JOBLOG, QUIET_MODE}; use execute::command; use input_iterator::InputsLock; use shell; +use time::{self, Timespec}; use verbose; use wait_timeout::ChildExt; +use super::job_log::JobLog; use super::pipe::disk::output as pipe_output; use super::pipe::disk::State; +use super::signals; use std::u16; use std::time::Duration; @@ -29,7 +32,7 @@ impl ExecInputs { let has_timeout = self.timeout != Duration::from_millis(0); let mut input = String::with_capacity(64); - while let Some((job_id, _)) = self.inputs.try_next(&mut input) { + while let Some(job_id) = self.inputs.try_next(&mut input) { if flags & arguments::VERBOSE_MODE != 0 { verbose::processing_task(&stdout, job_id+1, self.num_inputs, &input); } @@ -41,14 +44,22 @@ impl ExecInputs { flags &= u16::MAX ^ arguments::SHELL_ENABLED; } - match command::get_command_output(&input, flags) { + let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) { Ok(mut child) => { + let start_time = time::get_time(); if has_timeout && child.wait_timeout(self.timeout).unwrap().is_none() { let _ = child.kill(); - pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & arguments::QUIET_MODE != 0); + pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0); + (start_time, time::get_time(), -1, 15) } else { - pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & arguments::QUIET_MODE != 0); - let _ = child.wait(); + pipe_output(&mut child, job_id, input.clone(), &self.output_tx, flags & QUIET_MODE != 0); + match child.wait() { + Ok(status) => match status.code() { + Some(exit) => (start_time, time::get_time(), exit, 0), + None => (start_time, time::get_time(), -1, signals::get(status)) + }, + Err(_) => (start_time, time::get_time(), -1, 0), + } } }, Err(why) => { @@ -56,7 +67,20 @@ impl ExecInputs { let _ = write!(&mut stderr, "parallel: command error: {}: {}\n", input, why); let message = format!("{}: {}: {}\n", job_id, input, why); let _ = self.output_tx.send(State::Error(job_id, message)); + (Timespec::new(0, 0), Timespec::new(0, 0), -1, 0) } + }; + + if flags & JOBLOG != 0 { + let runtime = end_time - start_time; + let _ = self.output_tx.send(State::JobLog(JobLog { + job_id: job_id, + start_time: start_time, + runtime: runtime.num_nanoseconds().unwrap_or(0) as u64, + exit_value: exit_value, + signal: signal, + command: input.clone(), + })); } if flags & arguments::VERBOSE_MODE != 0 { diff --git a/src/execute/job_log.rs b/src/execute/job_log.rs new file mode 100644 index 0000000..a07617a --- /dev/null +++ b/src/execute/job_log.rs @@ -0,0 +1,99 @@ +use misc::NumToA; +use std::fs::File; +use std::io::{Write, BufWriter}; +use time::Timespec; + +pub struct JobLog { + pub job_id: usize, + pub start_time: Timespec, + pub runtime: u64, + pub exit_value: i32, + pub signal: i32, + pub command: String +} + +impl JobLog { + pub fn write_entry(&self, joblog: &mut File, id_buffer: &mut [u8], pad: usize) { + // 1: JobID + let mut joblog = BufWriter::new(joblog); + let bytes_written = (self.job_id + 1).numtoa(10, id_buffer); + let _ = joblog.write(&id_buffer[0..bytes_written]); + for _ in 0..pad-bytes_written { + let _ = joblog.write(b" "); + } + + // 2: StartTime in seconds + let bytes_written = (self.start_time.sec as u64).numtoa(10, id_buffer); + let _ = joblog.write(&id_buffer[0..bytes_written]); + let _ = joblog.write(b"."); + let decimal = (self.start_time.nsec as u64 % 1_000_000_000) / 1_000_000; + if decimal == 0 { + let _ = joblog.write(b"000"); + } else { + let bytes_written = decimal.numtoa(10, id_buffer); + match bytes_written { + 1 => { let _ = joblog.write(b"00"); }, + 2 => { let _ = joblog.write(b"0"); }, + _ => (), + }; + let _ = joblog.write(&id_buffer[0..bytes_written]); + } + for _ in 0..16-(bytes_written+4) { + let _ = joblog.write(b" "); + } + + // 3: Runtime in seconds + let bytes_written = (self.runtime / 1_000_000_000).numtoa(10, id_buffer); + for _ in 0..10-(bytes_written + 4) { + let _ = joblog.write(b" "); + } + let _ = joblog.write(&id_buffer[0..bytes_written]); + let _ = joblog.write(b"."); + let decimal = (self.runtime % 1_000_000_000) / 1_000_000; + if decimal == 0 { + let _ = joblog.write(b"000"); + } else { + let bytes_written = decimal.numtoa(10, id_buffer); + match bytes_written { + 1 => { let _ = joblog.write(b"00"); }, + 2 => { let _ = joblog.write(b"0"); }, + _ => (), + }; |