From bb6f9f5e157e36c2f42d1ffd10b5801dee2d42f1 Mon Sep 17 00:00:00 2001 From: Vladimir Feinberg Date: Tue, 20 Apr 2021 07:32:31 -0700 Subject: separate reading and parsing threads --- slb/src/main.rs | 62 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/slb/src/main.rs b/slb/src/main.rs index 7955b1a..25b6148 100644 --- a/slb/src/main.rs +++ b/slb/src/main.rs @@ -6,6 +6,7 @@ use std::hash::{Hash, Hasher}; use std::io::{self, BufRead, BufReader, Write}; use std::iter; use std::process::{Command, Stdio}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::mpsc::sync_channel; use std::sync::{Arc, Mutex}; use std::thread; @@ -73,10 +74,14 @@ struct Opt { // buffer input size (buffer full stdin reads, do line parsing // ourselves) // queue buffer size for mpsc queues + /// Print debug information to stderr. + #[structopt(short, long)] + verbose: Option, } fn main() { let opt = Opt::from_args(); + let verbose = opt.verbose.unwrap_or(false); let children: Vec<_> = (0..rayon::current_num_threads()) .map(|i| { @@ -94,17 +99,16 @@ fn main() { .map(|_| sync_channel(opt.queuesize.unwrap_or(16 * 1024))) .unzip(); - rayon::spawn(move || { - // txs captured by value here + let reader_queue_max_size = 16; + let (read_tx, read_rx) = sync_channel(reader_queue_max_size); + + let reader_queue_size = Arc::new(AtomicU32::new(0)); + let reader_rqs = Arc::clone(&reader_queue_size); + let reader = thread::spawn(move || { let mut done = false; - let txs_ref = &txs; - // use bstr here --> collect multiple slices into hashes - // line by line - // but also try chunking (for_byte_line, collect buffers - iter::from_fn(move || { - if done { - return None; - } + let mut total_queue_len = 0; + let mut num_enqueues = 0; + while !done { let bufsize = opt.bufsize.unwrap_or(16 * 1024); let mut buf = Vec::with_capacity(bufsize); let mut lines = Vec::with_capacity(bufsize / 8); @@ -126,21 +130,34 @@ fn main() { buf.pop(); lines.push(buf.len()); } - Some((buf, lines)) - }) - .flat_map(|(buf, lines)| { + read_tx.send((buf, lines)).expect("send"); + total_queue_len += reader_rqs.fetch_add(1, Ordering::Relaxed); + num_enqueues += 1; + } + // read tx dropped here, hanging up send + drop(read_tx); + if verbose { + eprintln!( + "avg queue len, rounding up {} (max {})", + (total_queue_len + num_enqueues - 1) / num_enqueues, + reader_queue_max_size + ); + } + }); + + let writer_rqs = Arc::clone(&reader_queue_size); + let writer = thread::spawn(move || { + while let Ok((buf, lines)) = read_rx.recv() { + writer_rqs.fetch_sub(1, Ordering::Relaxed); let mut start = 0; - lines.into_iter().map(move |end| { + for end in lines { let line = &buf[start..end]; let key = hash_key(line); - let send_ix = key % txs_ref.len(); + let send_ix = key % txs.len(); start = end; - (send_ix, line.to_vec()) - }) - }) - .for_each(|(send_ix, line)| { - txs_ref[send_ix].send(line).expect("send"); - }); + txs[send_ix].send(line.to_vec()).expect("send"); + } + } // txs dropped here, hanging up the send channel drop(txs) }); @@ -173,6 +190,9 @@ fn main() { }) }) .collect(); + + reader.join().expect("reader join"); + writer.join().expect("writer join"); handles .into_iter() .for_each(|handle| handle.join().expect("join")); -- cgit v1.2.3