1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
#![deny(dead_code)]
#![allow(unknown_lints)]
#![feature(loop_break_value)]
#![feature(alloc_system)]
extern crate alloc_system;
extern crate arrayvec;
extern crate itoa;
extern crate numtoa;
extern crate num_cpus;
extern crate permutate;
extern crate smallvec;
extern crate sys_info;
extern crate time;
extern crate wait_timeout;
mod arguments;
mod disk_buffer;
mod execute;
mod filepaths;
mod input_iterator;
mod misc;
mod tokenizer;
mod shell;
mod verbose;
use std::env;
use std::fs::{self, create_dir_all, File};
use std::io::{self, BufRead, BufReader, Write};
use std::mem;
use std::process::exit;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use arguments::Args;
use execute::pipe::disk::State;
use input_iterator::{InputIterator, InputsLock};
use tokenizer::{Token, tokenize};
/// The command string needs to be available in memory for the entirety of the application, so this
/// is achievable by transmuting the lifetime of the reference into a static lifetime. To guarantee
/// that this is perfectly safe, and that the reference will live outside the scope, the value will
/// also be leaked so that it is forced to remain in memory for the remainder of the application.
unsafe fn leak_string(comm: String) -> &'static str {
let new_comm = mem::transmute(&comm as &str);
mem::forget(comm);
new_comm
}
/// The tokens will live throughout the entirety of the application, so it's OK to mark it with
/// a static lifetime. Prevents needing to copy the token vector to each thread.
unsafe fn static_arg(args: &[Token]) -> &'static [Token] { mem::transmute(args) }
fn main() {
// Obtain a handle to standard output/error's buffers so we can write directly to them.
let stdout = io::stdout();
let stderr = io::stderr();
// Parse arguments and collect flags and statistics.
let mut args = Args::new();
let mut comm = String::with_capacity(128);
let raw_arguments = env::args().collect::<Vec<String>>();
// Attempt to obtain the default tempdir base path.
let mut base = match filepaths::base() {
Some(base) => base,
None => {
let mut stderr = stderr.lock();
let _ = stderr.write(b"parallel: unable to open home directory");
exit(1);
}
};
// Create the base directory if it does not exist
if let Err(why) = create_dir_all(&base) {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: unable to create tempdir {:?}: {}", base, why);
exit(1);
}
// Collect the command, arguments, and tempdir base path.
args.ninputs = match args.parse(&mut comm, &raw_arguments, &mut base) {
Ok(inputs) => inputs,
Err(why) => why.handle(&raw_arguments)
};
// Attempt to convert the base path into a string slice.
let base_path = match base.to_str() {
Some(base) => String::from(base),
None => {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: tempdir path, {:?}, is invalid", base);
exit(1);
}
};
// Construct the paths of each of the required files using the base tempdir path.
// These paths will be shared all through the application to avoid needing to copy.
let mut unprocessed_path = base.clone();
let mut processed_path = base.clone();
let mut errors_path = base;
unprocessed_path.push("unprocessed");
processed_path.push("processed");
errors_path.push("errors");
// This file is required by the upcoming `InputIterator`. It will remain open for the
// remainder of the application.
let file = match fs::OpenOptions::new().read(true).open(&unprocessed_path) {
Ok(file) => file,
Err(why) => {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: unable to open unprocessed file: {:?}: {}", &unprocessed_path, why);
exit(1);
}
};
// Initialize the `InputIterator` structure, which efficiently generates inputs from the
// above `unprocessed` file until all arguments have been processed, denoted by `args.ninputs`.
let inputs = InputIterator::new(&unprocessed_path, file, args.ninputs)
.expect("unable to initialize the InputIterator structure");
// Coerce the `comm` `String` into a `&'static str` so that it may be shared by all threads.
// This is safe because the original `comm` may no longer be modified due to shadowing rules.
// It is also safe because `comm` lives to the end of the program.
let static_comm = unsafe { leak_string(comm) };
// Attempt to tokenize the command argument into simple primitive placeholders.
if let Err(error) = tokenize(&mut args.arguments, static_comm, &unprocessed_path, args.ninputs) {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "{}", error);
exit(1)
}
// Coerce the arguments into a `&'static [Token]` to eliminate the need to make a copy in
// each thread.
let arguments = unsafe { static_arg(&args.arguments) };
// If the `--dry-run` parameter was passsed, the program will simply print all commands to
// execute and will subsequently quit. Otherwise, real work will be performed.
if args.flags & arguments::DRY_RUN != 0 {
execute::dry_run(args.flags, inputs, arguments);
} else {
// The `InputIterator` will be wrapped within a `Mutex` so that it can safely be shared
// across all of the upcoming threads. A `Mutex` is required because each time a thread
// pulls the next input from the queue, it needs to update various data fields which
// rely on threads waiting for their turn to report back.
let shared_input = Arc::new(Mutex::new(inputs));
// This channel is used exclusively for signaling back to the main thread when a task
// has completed or has errored.
let (output_tx, input_rx) = channel::<State>();
// Will contain handles to the upcoming threads to know when the threads are finished.
let mut threads = Vec::with_capacity(args.ncores);
if args.flags & arguments::VERBOSE_MODE != 0 {
verbose::total_inputs(&stdout, args.ncores, args.ninputs);
}
// The `slot` variable is required by the {%} token.
if args.flags & arguments::INPUTS_ARE_COMMANDS != 0 {
// Dash kills all other shells in performance, so if it exists, we need to know.
if shell::dash_exists() { args.flags |= arguments::DASH_EXISTS; }
for _ in 0..args.ncores {
let flags = args.flags;
let mut exec = execute::ExecInputs {
num_inputs: args.ninputs,
timeout: args.timeout,
output_tx: output_tx.clone(),
tempdir: base_path.clone(),
inputs: InputsLock {
inputs: shared_input.clone(),
memory: args.memory,
delay: args.delay,
has_delay: args.delay != Duration::from_millis(0),
completed: false,
flags: flags,
}
};
let handle: JoinHandle<()> = thread::spawn(move || exec.run(flags));
// Add the thread handle to the `threads` vector to know when to quit the program.
threads.push(handle);
}
} else {
shell::set_flags(&mut args.flags, arguments);
for slot in 1..args.ncores+1 {
let timeout = args.timeout;
let num_inputs = args.ninputs;
let output_tx = output_tx.clone();
let flags = args.flags;
let base_path = base_path.clone();
let inputs = InputsLock {
inputs: shared_input.clone(),
memory: args.memory,
delay: args.delay,
has_delay: args.delay != Duration::from_millis(0),
completed: false,
flags: flags,
};
// The command will be built from the arguments, and inputs will be transferred to the command.
let handle: JoinHandle<()> = thread::spawn(move || {
let mut exec = execute::ExecCommands {
slot: slot,
num_inputs: num_inputs,
flags: flags,
timeout: timeout,
inputs: inputs,
output_tx: output_tx,
arguments: arguments,
tempdir: base_path,
};
exec.run();
});
// Add the thread handle to the `threads` vector to know when to quit the program.
threads.push(handle);
}
}
/// Prints messages from executed commands in the correct order.
execute::receive_messages(input_rx, args, &base_path, &processed_path, &errors_path);
/// Wait for all threads to exit before proceeding.
for thread in threads { thread.join().unwrap(); }
// If errors have occurred, re-print these errors at the end.
if let Ok(file) = File::open(errors_path) {
if file.metadata().ok().map_or(0, |metadata| metadata.len()) > 0 {
let stderr = &mut stderr.lock();
let _ = stderr.write(b"parallel: encountered errors during processing:\n");
for line in BufReader::new(file).lines() {
if let Ok(line) = line {
let _ = stderr.write(line.as_bytes());
let _ = stderr.write(b"\n
|