summaryrefslogtreecommitdiffstats
path: root/slb/src/main.rs
blob: 954b479d8a641ceccbf5fc7b7aef9f536cf2293c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
//! `slb` main executable

use std::fs::File;
use std::io::{BufReader, Write};
use std::ops::Deref;
use std::path::PathBuf;
use std::process::{Command, Stdio};

use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::thread;

use structopt::StructOpt;

use slb::{fileblocks, sharder};

/// Performs sharded load balancing on stdin, handing off input
/// to child processes based on a hash of the first word on each line.
///
/// E.g., suppose we have a file with contents like
///
/// ```
/// a b c d
/// e f g h
/// a b
/// ```
///
/// Every line is handed to stdin of the the [`mapper`] processes,
/// which are really "flat maps": they can generate multiple output
/// lines per input line, but should be pure functions.
///
/// The above might generate input that looks like
///
/// ```
/// key1 a b c d
/// key2 e f g h
/// key1 a b
/// ```
///
/// The key is all bytes leading up to the first space, or all bytes
/// on a line if there are no spaces. Suppose `hash(key1) == 1` and
/// `hash(key2) == 2`. For a machine with 2 cores, `slb` will have
/// two processes, and the zeroth one will receive as stdin
///
/// ```
/// key2 e f g h
/// ```
///
/// since `2 % 2 == 0` and all `key1` lines will be fed into the
/// process at index 1.
///
/// These processes are expected to perform some kind of aggregation
/// and print at the end of their execution. For instance, suppose we invoke
/// `slb 'awk -f catter.awk'` where `catter.awk` is just defined to be
/// `{key = $1; $1 = ""; a[key] += $0}END{for (k in a) print k,a[k]}`,
/// which just concatenates the keyed values. Then the output might be
///
/// ```
/// <file outprefix0.txt>
/// key2  e f g h
///
/// <file outprefix1.txt>
/// key1  a b c d a b
/// ```
#[derive(Debug, StructOpt)]
#[structopt(name = "slb", about = "Performs streaming load balancing.")]
struct Opt {
    /// A flat-map pure function, which is a bash command line string
    /// that performs line-by-line operations on the input to emit
    /// output lines for reduction.
    ///
    /// By default, this is just the identity, equivalent to `cat`.
    #[structopt(long)]
    mapper: Option<String>,

    /// The folder function.
    ///
    /// Multiple instances of this same process are created with the same
    /// command-line string. Text lines from mapper output are fed
    /// into these processes, and stdout is shared between this parent
    /// process and its children, but collated.
    #[structopt(long)]
    folder: String,

    /// The input files to read lines from.
    #[structopt(long)]
    infile: Vec<PathBuf>,

    /// Output file prefixes.
    #[structopt(long)]
    outprefix: PathBuf,

    /// Buffer size in KB for reading chunks of input, parameter
    /// shared between both mapper and folder right now.
    ///
    /// Note memory usage is O(bufsize * nthreads)
    #[structopt(long)]
    bufsize: Option<usize>,

    // TODO consider sort-like KEYDEF -k --key which wouldn't hash if n (numeric) flag set
    /// Print debug information to stderr.
    #[structopt(long)]
    verbose: Option<bool>,

    /// Number of parallel threads per stage to launch: the total
    /// number of live threads could be a multiple of this becuase
    /// of concurrent mapper and folder stages.
    ///
    /// Defaults to num CPUs.
    #[structopt(long)]
    nthreads: Option<usize>,
}

fn main() {
    let opt = Opt::from_args();
    let _verbose = opt.verbose.unwrap_or(false);
    let nthreads = opt.nthreads.unwrap_or(num_cpus::get_physical());
    let mapper_cmd = opt.mapper.as_deref().unwrap_or("cat");
    let folder_cmd = &opt.folder;
    let bufsize = opt.bufsize.unwrap_or(16) * 1024;
    let queuesize = 256;

    assert!(!opt.infile.is_empty());
    // TODO: could play with mapper/folder parallelism, bufsize,
    // and queuesize being tuned. Ideally could do tuning automatically.

    // TODO: very crude chunking below probably hurts in the presence of stragglers,
    // finer grained work-stealing would be ideal.

    // Allow enough chunks for parallelism but not so few the chunksize
    // is small.

    let chunks = fileblocks::chunkify_multiple(&opt.infile, nthreads, bufsize);
    let nthreads = chunks.len(); // smaller b/c of min bufsize
    assert!(nthreads >= 1);

    let mut mapper_processes: Vec<_> = (0..nthreads)
        .map(|i| {
            Command::new("/bin/bash")
                .arg("-c")
                .arg(mapper_cmd)
                .stdin(Stdio::piped())
                .stdout(Stdio::piped())
                .spawn()
                .unwrap_or_else(|err| panic!("error spawn map child {}: {}", i, err))
        })
        .collect();

    let (mapper_inputs, mapper_outputs): (Vec<_>, Vec<_>) = mapper_processes
        .iter_mut()
        .map(|child| {
            let stdin = child.stdin.take().unwrap();
            let stdout = child.stdout.take().unwrap();
            (stdin, stdout)
        })
        .unzip();

    let mapper_input_threads: Vec<_> = mapper_inputs
        .into_iter()
        .zip(chunks.into_iter())
        .map(|(input, chunk)| thread::spawn(move || chunk.dump(input)))
        .collect();

    // TODO: wrap these with atomic ints to log capacity / P(capacity=max)
    let (txs, rxs): (Vec<_>, Vec<_>) = (0..nthreads).map(|_| sync_channel(queuesize)).unzip();

    let txs_ref = Arc::new(txs);
    let mapper_output_threads: Vec<_> = mapper_outputs
        .into_iter()
        .map(|output| {
            let txs_ref_clone = Arc::clone(&txs_ref);
            thread::spawn(move || {
                let output = BufReader::new(output);
                let txs_ref_local = txs_ref_clone.deref();
                sharder::shard(output, nthreads, bufsize, |ix, buf| {
                    txs_ref_local[ix].send(buf).expect("send");
                })
            })
        })
        .collect();

    let folder_processes: Vec<_> = (0..nthreads)
        .map(|i| {
            let outprefix = opt.outprefix.clone();
            let width = format!("{}", nthreads - 1).len();
            let suffix = format!("{:0>width$}", i, width = width);
            let mut fname = outprefix.file_name().expect("file name").to_owned();
            fname.push(&suffix);
            let path = outprefix.with_file_name(fname);
            let file = File::create(&path).expect("write file");

            Command::new("/bin/bash")
                .arg("-c")
                .arg(folder_cmd)
                .stdin(Stdio::piped())
                .stdout(file)
                .spawn()
                .unwrap_or_else(|err| panic!("error spawn fold child {}: {}", i, err))
        })
        .collect();

    // must be both I/O thread to manage livelock from stdin EOF
    // expectation of folder procs
    let folder_input_output_threads: Vec<_> = folder_processes
        .into_iter()
        .zip(rxs.into_iter())
        .map(|(mut child, rx)| {
            thread::spawn(move || {
                let mut child_stdin = child.stdin.take().expect("child stdin");
                while let Ok(lines) = rx.recv() {
                    child_stdin.write_all(&lines).expect("write lines");
                }
                drop(child_stdin);

                assert!(child.wait().expect("wait").success());
            })
        })
        .collect();

    mapper_input_threads
        .into_iter()
        .for_each(|handle| handle.join().expect("map input join"));
    mapper_processes
        .into_iter()
        .for_each(|mut child| assert!(child.wait().expect("wait").success()));
    mapper_output_threads
        .into_iter()
        .for_each(|handle| handle.join().expect("map output join"));

    let txs = Arc::try_unwrap(txs_ref).expect("final reference");
    drop(txs); // ensure hangup of transmission channel

    // Closures here own the fold processes
    folder_input_output_threads
        .into_iter()
        .for_each(|handle| handle.join().expect("fold join"));
}