summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Feinberg <vladimir.feinberg@gmail.com>2021-04-25 23:07:32 +0000
committerVladimir Feinberg <vladimir.feinberg@gmail.com>2021-04-25 23:07:35 +0000
commite65f3980a8ecb6995ad5edb1329078044e2b19b2 (patch)
tree3edb2d47003c97140f522119daace489c4f9af30
parentb27709fdcadb6330231e1e9a2b91f4f5ed3ebbd1 (diff)
Add new ex: distinct feature vals
-rw-r--r--README.md49
-rw-r--r--examples/svm-countdistinct.awk17
-rw-r--r--slb/src/sharder.rs2
3 files changed, 56 insertions, 12 deletions
diff --git a/README.md b/README.md
index afb6968..1862a46 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,6 @@
# slb: sharded load balancer
-[![travis build](https://travis-ci.org/vlad17/slb.svg?branch=master)](https://travis-ci.org/vlad17/slb)
-
-Like `parallel --pipe --roundrobin` but load balancing is performed based on input line hashing. When performing keyed aggregations in child processes this is crucial since then only one shard contains a given key. Here's a word count example:
+Like `parallel --pipe --roundrobin` but load balancing is performed based on input line hashing. When performing keyed aggregations in child processes this is crucial since then only one shard contains a given key. Here's a word count example on a 16-physical-cpu machine:
```
cargo build --release
@@ -24,9 +22,11 @@ diff <(sort wikawk.txt) <(cat wikslb.* | sort) ; echo $?
# 0
```
+This demonstrates a "flatmap-fold" paradigm over the typical "map-reduce" one, which is simpler and faster where possible.
+
## Feature Frequency Calculation
-Here's an example of counting the frequency of features in sparse [SVMlight](https://www.cs.cornell.edu/people/tj/svm_light/) format of a large dataset, benchmarked on the large KDD12 dataset on a 32-core machine (assumes [ripgrep](https://github.com/BurntSushi/ripgrep), [GNU Parallel](https://www.gnu.org/software/parallel/) are installed).
+Here's an example of counting the frequency of features in sparse [SVMlight](https://www.cs.cornell.edu/people/tj/svm_light/) format of a large dataset, benchmarked on the large KDD12 dataset on a 16-physical-cpu machine (assumes [ripgrep](https://github.com/BurntSushi/ripgrep), [GNU Parallel](https://www.gnu.org/software/parallel/) are installed).
```
echo 'will cite' | parallel --citation 1>/dev/null 2>/dev/null
@@ -40,31 +40,58 @@ parallel --pipepart -a kdd12.tr wc -w | awk '{a+=$0}END{print a}'
# num nnz: 1436460384 - 119705032 = 1316755352
/usr/bin/time -f "%e sec %M KB" awk -f examples/svm-featurecount.awk kdd12.tr > results-awk.txt
-# 1011.70 sec 13720908 KB
+# 1032.18 sec 13721032 KB
/usr/bin/time -f "%e sec %M KB" target/release/slb \
--mapper 'sed -E "s/^[^ ]+ //" | sed -E "s/:[^ ]+//g" | tr " " "\n" | rg -v "^$"' \
--folder "awk '{a[\$0]++}END{for(k in a)print k,a[k]}'" \
--infile kdd12.tr \
--outprefix results-slb.
-# 136.29 sec 881360 KB
+# 122.50 sec 881436 KB
# note above doesn't count child memory
# eyeballing htop, max memory use is ~12.3GB
# check we're correct
-cat results-slb.* > results-slb-cat && rm results-slb.*
-sort --parallel=$(($(nproc) / 2)) -k2nr -k 1 -o results-slb-cat results-slb-cat & \
-sort --parallel=$(($(nproc) / 2)) -k2nr -k 1 -o results-awk.txt results-awk.txt & \
+cat results-slb.* > results-slb && rm results-slb.*
+sort --parallel=$(($(nproc) / 2)) -k2nr -k1n -o results-slb results-slb & \
+sort --parallel=$(($(nproc) / 2)) -k2nr -k1n -o results-awk.txt results-awk.txt & \
wait
-diff results-slb-cat results-awk.txt >/dev/null ; echo $?
+diff results-slb results-awk.txt >/dev/null ; echo $?
# 0
```
Note the above demonstrates the convenience of the tool:
* For large datasets, parallelism is essential.
-* Compared to an equivalent map-reduce, we use less memory and less code.
+* Compared to an equivalent map-reduce, we use less memory, less time, and less code.
The last point holds because `slb` ensures each parallel invocation recieves a _unique partition_ of the key space. In turn, we use less memory because each `wc.awk` process is only tracking counts for its own key space and less code because we do not need to write a combiner that merges two feature count maps.
+## Count Distinct Feature Values
+
+As another, similar example we could count the number of distinct values for each feature. In particular, for each feature we're looking to get the minimum of its total number of distinct values with 100 (as we might be inclined to consider anything with more than 99 values to be continuous).
+
+```
+curl -o kdda.bz2 "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.bz2"
+bunzip2 kdda.bz2
+du -hs kdda
+# 2.5G kdda
+
+/usr/bin/time -f "%e sec %M KB" awk -f examples/svm-countdistinct.awk kdda > cdawk.txt
+# 388.72 sec 23895104 KB
+
+/usr/bin/time -f "%e sec %M KB" target/release/slb \
+ --mapper 'sed -E "s/^[^ ]+ //" | tr " " "\n" | tr ":" " " | rg -v "^$"' \
+ --folder "awk '{if(!(\$1 in a)||length(a[\$1])<100)a[\$1][\$2]=1}END{for(k in a)print k,length(a[k])}'" \
+ --infile kdda \
+ --outprefix cdslb.
+# 26.79 sec 1499992 KB
+
+diff \
+ <(sort --parallel=$(($(nproc) / 2)) -k2nr -k1n cdawk.txt) \
+ <(cat cdslb.* | sort --parallel=$(($(nproc) / 2)) -k2nr -k1n) \
+ > /dev/null ; echo $?
+# 0
+```
+
diff --git a/examples/svm-countdistinct.awk b/examples/svm-countdistinct.awk
new file mode 100644
index 0000000..7362f39
--- /dev/null
+++ b/examples/svm-countdistinct.awk
@@ -0,0 +1,17 @@
+BEGIN {
+ MAXUNIQUE = 100
+}
+1 {
+ for (i=2; i<=NF; i++) {
+ s = index($i, ":")
+ feature = substr($i, 1, s - 1)
+ if (feature in values && length(values[feature]) == MAXUNIQUE)
+ continue
+ value = substr($i, s + 1)
+ values[feature][value] = 1
+ }
+}
+END {
+ for (k in values)
+ print k, length(values[k])
+}
diff --git a/slb/src/sharder.rs b/slb/src/sharder.rs
index 18ff5a2..6630f80 100644
--- a/slb/src/sharder.rs
+++ b/slb/src/sharder.rs
@@ -39,7 +39,7 @@ where
fn hash_key(bytes: &[u8], npartitions: u64) -> usize {
let end = memchr(b' ', bytes).unwrap_or(bytes.len());
- // consider faster hasher?
+ // TODO: consider faster hasher?
let mut hasher = DefaultHasher::default();
bytes[..end].hash(&mut hasher);
(hasher.finish() % npartitions) as usize