summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Feinberg <vladimir.feinberg@gmail.com>2021-04-20 07:32:31 -0700
committerVladimir Feinberg <vladimir.feinberg@gmail.com>2021-04-20 07:32:31 -0700
commitbb6f9f5e157e36c2f42d1ffd10b5801dee2d42f1 (patch)
tree969c2eee1f877f8c715cc28ee97cee91488c0876
parent84fcf0d340fcbe229fddeeb8218cf5150a39657a (diff)
separate reading and parsing threads
-rw-r--r--slb/src/main.rs62
1 files changed, 41 insertions, 21 deletions
diff --git a/slb/src/main.rs b/slb/src/main.rs
index 7955b1a..25b6148 100644
--- a/slb/src/main.rs
+++ b/slb/src/main.rs
@@ -6,6 +6,7 @@ use std::hash::{Hash, Hasher};
use std::io::{self, BufRead, BufReader, Write};
use std::iter;
use std::process::{Command, Stdio};
+use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::thread;
@@ -73,10 +74,14 @@ struct Opt {
// buffer input size (buffer full stdin reads, do line parsing
// ourselves)
// queue buffer size for mpsc queues
+ /// Print debug information to stderr.
+ #[structopt(short, long)]
+ verbose: Option<bool>,
}
fn main() {
let opt = Opt::from_args();
+ let verbose = opt.verbose.unwrap_or(false);
let children: Vec<_> = (0..rayon::current_num_threads())
.map(|i| {
@@ -94,17 +99,16 @@ fn main() {
.map(|_| sync_channel(opt.queuesize.unwrap_or(16 * 1024)))
.unzip();
- rayon::spawn(move || {
- // txs captured by value here
+ let reader_queue_max_size = 16;
+ let (read_tx, read_rx) = sync_channel(reader_queue_max_size);
+
+ let reader_queue_size = Arc::new(AtomicU32::new(0));
+ let reader_rqs = Arc::clone(&reader_queue_size);
+ let reader = thread::spawn(move || {
let mut done = false;
- let txs_ref = &txs;
- // use bstr here --> collect multiple slices into hashes
- // line by line
- // but also try chunking (for_byte_line, collect buffers
- iter::from_fn(move || {
- if done {
- return None;
- }
+ let mut total_queue_len = 0;
+ let mut num_enqueues = 0;
+ while !done {
let bufsize = opt.bufsize.unwrap_or(16 * 1024);
let mut buf = Vec::with_capacity(bufsize);
let mut lines = Vec::with_capacity(bufsize / 8);
@@ -126,21 +130,34 @@ fn main() {
buf.pop();
lines.push(buf.len());
}
- Some((buf, lines))
- })
- .flat_map(|(buf, lines)| {
+ read_tx.send((buf, lines)).expect("send");
+ total_queue_len += reader_rqs.fetch_add(1, Ordering::Relaxed);
+ num_enqueues += 1;
+ }
+ // read tx dropped here, hanging up send
+ drop(read_tx);
+ if verbose {
+ eprintln!(
+ "avg queue len, rounding up {} (max {})",
+ (total_queue_len + num_enqueues - 1) / num_enqueues,
+ reader_queue_max_size
+ );
+ }
+ });
+
+ let writer_rqs = Arc::clone(&reader_queue_size);
+ let writer = thread::spawn(move || {
+ while let Ok((buf, lines)) = read_rx.recv() {
+ writer_rqs.fetch_sub(1, Ordering::Relaxed);
let mut start = 0;
- lines.into_iter().map(move |end| {
+ for end in lines {
let line = &buf[start..end];
let key = hash_key(line);
- let send_ix = key % txs_ref.len();
+ let send_ix = key % txs.len();
start = end;
- (send_ix, line.to_vec())
- })
- })
- .for_each(|(send_ix, line)| {
- txs_ref[send_ix].send(line).expect("send");
- });
+ txs[send_ix].send(line.to_vec()).expect("send");
+ }
+ }
// txs dropped here, hanging up the send channel
drop(txs)
});
@@ -173,6 +190,9 @@ fn main() {
})
})
.collect();
+
+ reader.join().expect("reader join");
+ writer.join().expect("writer join");
handles
.into_iter()
.for_each(|handle| handle.join().expect("join"));