summaryrefslogtreecommitdiffstats
path: root/crates/common/batcher/src/batcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/common/batcher/src/batcher.rs')
-rw-r--r--crates/common/batcher/src/batcher.rs635
1 files changed, 635 insertions, 0 deletions
diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs
new file mode 100644
index 00000000..237e1b57
--- /dev/null
+++ b/crates/common/batcher/src/batcher.rs
@@ -0,0 +1,635 @@
+use crate::batch::{Batch, BatchAdd};
+use crate::batchable::Batchable;
+use crate::config::BatchConfig;
+use chrono::{DateTime, Utc};
+
+#[derive(Debug, Eq, PartialEq)]
+pub(crate) enum BatcherOutput<B> {
+ Batch(Vec<B>),
+ Timer(DateTime<Utc>),
+}
+
+/// Provides the core implementation of the batching algorithm.
+#[derive(Debug)]
+pub struct Batcher<B: Batchable> {
+ config: BatchConfig,
+ batches: Vec<Batch<B>>,
+}
+
+impl<B: Batchable> Batcher<B> {
+ /// Create a Batcher with the specified config.
+ pub fn new(config: BatchConfig) -> Batcher<B> {
+ Batcher {
+ config,
+ batches: vec![],
+ }
+ }
+
+ pub(crate) fn event(
+ &mut self,
+ processing_time: DateTime<Utc>,
+ event: B,
+ ) -> Vec<BatcherOutput<B>> {
+ let event_time = event.event_time();
+
+ if event_time < processing_time - self.config.delivery_jitter() {
+ // Discard event because it is too old
+ return vec![];
+ }
+
+ if event_time > processing_time + self.config.message_leap_limit() {
+ // Discard event because it is too futuristic
+ return vec![];
+ }
+
+ match self.find_target_batch(event_time) {
+ None => {
+ let new_batch = self.make_new_batch(event);
+ let new_batch_end = new_batch.batch_end();
+ self.batches.push(new_batch);
+ self.output_for_batch_end(processing_time, new_batch_end)
+ }
+ Some(target_batch) => match target_batch.add(event) {
+ BatchAdd::Added => vec![],
+ BatchAdd::Duplicate => vec![],
+ BatchAdd::Split(new_batch) => {
+ let split_batch_end = target_batch.batch_end();
+ self.batches.push(new_batch);
+ self.output_for_batch_end(processing_time, split_batch_end)
+ }
+ },
+ }
+ }
+
+ fn output_for_batch_end(
+ &mut self,
+ processing_time: DateTime<Utc>,
+ batch_end: DateTime<Utc>,
+ ) -> Vec<BatcherOutput<B>> {
+ let batch_timeout = batch_end + self.config.delivery_jitter();
+ if processing_time < batch_timeout {
+ vec![BatcherOutput::Timer(batch_timeout)]
+ } else {
+ self.time(processing_time)
+ .into_iter()
+ .map(BatcherOutput::Batch)
+ .collect()
+ }
+ }
+
+ pub(crate) fn time(&mut self, time: DateTime<Utc>) -> Vec<Vec<B>> {
+ let batches = std::mem::take(&mut self.batches);
+
+ let (open_batches, closed_batches) = batches
+ .into_iter()
+ .partition(|batch| self.is_open(batch, time));
+
+ self.batches = open_batches;
+
+ closed_batches
+ .into_iter()
+ .map(|batch| batch.into_vec())
+ .collect()
+ }
+
+ fn is_open(&self, batch: &Batch<B>, time: DateTime<Utc>) -> bool {
+ batch.batch_end() + self.config.delivery_jitter() > time
+ }
+
+ pub(crate) fn flush(self) -> Vec<Vec<B>> {
+ let mut batches = Vec::with_capacity(self.batches.len());
+
+ for batch in self.batches {
+ batches.push(batch.into_vec())
+ }
+
+ batches
+ }
+
+ fn find_target_batch(&mut self, event_time: DateTime<Utc>) -> Option<&mut Batch<B>> {
+ for batch in &mut self.batches {
+ if batch.batch_start() <= event_time && event_time <= batch.batch_end() {
+ return Some(batch);
+ }
+ }
+
+ None
+ }
+
+ fn make_new_batch(&self, event: B) -> Batch<B> {
+ let event_time = event.event_time();
+ let mut batch_start = event_time;
+ let mut batch_end = batch_start + self.config.event_jitter();
+
+ if let Some(previous_batch) = self.previous_batch(event_time) {
+ batch_start = batch_start.max(previous_batch.batch_end())
+ }
+ if let Some(next_batch) = self.next_batch(event_time) {
+ batch_end = batch_end.min(next_batch.batch_start())
+ }
+
+ Batch::new(batch_start, batch_end, event)
+ }
+
+ fn previous_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> {
+ self.batches
+ .iter()
+ .filter(|batch| batch.batch_end() < event_time)
+ .max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end()))
+ }
+
+ fn next_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> {
+ self.batches
+ .iter()
+ .filter(|batch| batch.batch_start() > event_time)
+ .min_by(|batch1, batch2| batch1.batch_start().cmp(&batch2.batch_start()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::batchable::Batchable;
+ use crate::config::BatchConfigBuilder;
+ use chrono::offset::TimeZone;
+ use chrono::{DateTime, Duration, Utc};
+ use std::collections::BTreeMap;
+
+ #[test]
+ fn single_event_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+
+ test.event(1, &event1);
+ test.expect_batch(70, vec![event1]);
+
+ test.run();
+ }
+
+ #[test]
+ fn multi_event_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "b", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.expect_batch(70, vec![event1, event2]);
+
+ test.run();
+ }
+
+ #[test]
+ // The same behavior as for `multi_event_batch` is expected
+ // Since we just change how long we wait for an event
+ fn multi_event_batch_with_long_delivery_jitter() {
+ let mut test = BatcherTest::new(50, 50, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "b", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.expect_batch(100, vec![event1, event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn multi_event_batch_with_long_delivery_jitter_and_delayed_message() {
+ let mut test = BatcherTest::new(50, 50, 0);
+
+ let event1 = test.create_event(5, "a", 2);
+ let event2 = test.create_event(10, "b", 1);
+
+ test.event(11, &event2);
+ test.event(25, &event1); // late, but not too late
+
+ test.expect_batch(60, vec![event1]);
+ test.expect_batch(110, vec![event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn split_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "a", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.expect_batch(25, vec![event1]); // why 25?
+ test.expect_batch(70, vec![event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn allocate_to_earlier_split_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "a", 2);
+ let event3 = test.create_event(2, "b", 3);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.event(12, &event3);
+ test.expect_batch(25, vec![event1, event3]);
+ test.expect_batch(70, vec![event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn allocate_to_later_split_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "a", 2);
+ let event3 = test.create_event(9, "b", 3);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.event(12, &event3);
+ test.expect_batch(25, vec![event1]);
+ test.expect_batch(70, vec![event2, event3]);
+
+ test.run();
+ }
+
+ #[test]
+ fn flush_no_batches() {
+ let mut test = BatcherTest::new(50, 20, 0);
+ test.flush(100);
+ test.run();
+ }
+
+ #[test]
+ fn flush_one_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "b", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.flush(20);
+ test.expect_batch(20, vec![event1, event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn flush_two_batches() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(3, "b", 2);
+ let event3 = test.create_event(10, "a", 3);
+
+ test.event(1, &event1);
+ test.event(4, &event2);
+ test.event(11, &event3);
+ test.flush(20);
+ test.expect_batch(20, vec![event1, event2]);
+ test.expect_batch(20, vec![event3]);
+
+ test.run();
+ }
+
+ // The following tests are taken from the diagrams on the specification:
+ // https://github.com/albinsuresh/thin-edge.io-specs/blob/main/src/telemetry-data/message-batching/message-batching.md
+
+ #[test]
+ fn simple_batching_with_batching_window() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let a = test.create_event(115, "a", 1);
+ let b = test.create_event(120, "b", 2);
+ let c = test.create_event(145, "c", 3);
+ let d = test.create_event(160, "d", 4);
+ let e = test.create_event(175, "e", 5);
+ let f = test.create_event(215, "f", 6);
+ let g = test.create_event(240, "g", 7);
+
+ test.event(125, &b);
+ test.event(135, &a); // order inversion
+ test.event(150, &c);
+ test.event(165, &d);
+ test.event(189, &e);
+ test.event(250, &g);
+ test.event(260, &f); // too late
+ test.expect_batch(140, vec![a]);
+ test.expect_batch(190, vec![b, c, d]);
+ test.expect_batch(245, vec![e]);
+ test.expect_batch(310, vec![g]);
+
+ test.run();
+ }
+
+ #[test]
+ fn simple_batching_with_batching_timeout() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let a = test.create_event(120, "a", 1);
+ let b = test.create_event(130, "b", 2);
+ let c = test.create_event(145, "c", 3);
+ let d = test.create_event(180, "d", 4);
+ let e = test.create_event(190, "e", 5);
+
+ test.event(130, &a);
+ test.event(140, &b);
+ test.event(150, &c);
+ test.event(189, &d);
+ test.event(210, &e);
+ test.expect_batch(190, vec![a, b, c]);
+ test.expect_batch(250, vec![d, e]);
+
+ test.run();
+ }
+
+ #[test]
+ fn batch_split_due_to_conflicting_measurements() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let a1 = test.create_event(120, "a", 1);
+ let b1 = test.create_event(125, "b", 2);
+ let a2 = test.create_event(140, "a", 3);
+ let c1 = test.create_event(150, "c", 4);
+ let a3 = test.create_event(170, "a", 5);
+
+ test.event(125, &a1);
+ test.event(140, &b1);
+ test.event(150, &a2);
+ test.event(170, &c1);
+ test.event(180, &a3);
+ test.expect_batch(150, vec![a1, b1]);
+ test.expect_batch(180, vec![a2, c1]);
+ test.expect_batch(190, vec![a3]);
+
+ test.run();
+ }
+
+ #[test]
+ fn receiving_older_already_batched_messages_after_starting_a_new_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let a = test.create_event(120, "a", 1);
+ let b = test.create_event(130, "b", 2);
+ let c = test.create_event(140, "c", 3);
+ let d = test.create_event(190, "d", 4);
+ let e = test.create_event(210, "e", 5);
+
+ test.event(130, &a);
+ test.event(140, &b);
+ test.event(150, &c);
+ test.event(160, &c);
+ test.event(175, &c);
+ test.event(210, &d);
+ test.event(220, &c);
+ test.event(230, &e);
+ test.expect_batch(190, vec![a, b, c]);
+ test.expect_batch(260, vec![d, e]);
+
+ test.run();
+ }
+
+ #[test]
+ fn receiving_older_unbatched_messages_after_starting_a_new_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let a1 = test.create_event(120, "a", 1);
+ let b1 = test.create_event(130, "b", 2);
+ let c1 = test.create_event(140, "c", 3);
+ let d1 = test.create_event(145, "d", 4);
+ let a2 = test.create_event(180, "a", 5);
+ let b2 = test.create_event(200, "b", 6);
+
+ test.event(130, &a1);
+ test.event(140, &b1);
+ test.event(150, &c1);
+ test.event(189, &a2);
+ test.event(205, &b2);
+ test.event(215, &d1);
+ test.expect_batch(190, vec![a1, b1, c1]);
+ test.expect_batch(250, vec![a2, b2]);
+
+ test.run();
+ }
+
+ #[derive(Debug, Clone, Eq, PartialEq)]
+ struct TestBatchEvent {
+ event_time: DateTime<Utc>,
+ key: String,
+ value: u64,
+ }
+
+ impl Batchable for TestBatchEvent {
+ type Key = String;
+
+ fn key(&self) -> Self::Key {
+ self.key.clone()
+ }
+
+ fn event_time(&self) -> DateTime<Utc> {
+ self.event_time
+ }
+ }
+
+ #[derive(Debug)]
+ enum EventOrTimer {
+ Event(TestBatchEvent),
+ Timer(),
+ }
+
+ struct BatcherTest {
+ start_time: DateTime<Utc>,
+ batcher: Batcher<TestBatchEvent>,
+ inputs: BTreeMap<DateTime<Utc>, EventOrTimer>,
+ flush_time: Option<DateTime<Utc>>,
+ expected_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
+ }
+
+ impl BatcherTest {
+ fn new(event_jitter: u32, delivery_jitter: u32, message_leap_limit: u32) -> BatcherTest {
+ let batcher_config = BatchConfigBuilder::new()
+ .event_jitter(event_jitter)
+ .delivery_jitter(delivery_jitter)
+ .message_leap_limit(message_leap_limit)
+ .build();
+
+ let start_time = Utc.timestamp_millis(0);
+ let batcher = Batcher::new(batcher_config);
+
+ BatcherTest {
+ start_time,
+ batcher,
+ inputs: BTreeMap::new(),
+ flush_time: None,
+ expected_batches: BTreeMap::new(),
+ }
+ }
+
+ fn create_event(&mut self, event_time: i64, key: &str, value: u64) -> TestBatchEvent {
+ let event_time = self.create_instant(event_time);
+ let key = key.into();
+ TestBatchEvent {
+ event_time,
+ key,
+ value,
+ }
+ }
+
+ fn event(&mut self, processed_time: i64, event: &TestBatchEvent) {
+ let processed_time = self.create_instant(processed_time);
+ if let Some(_existing) = self
+ .inputs
+ .insert(processed_time, EventOrTimer::Event(event.clone()))
+ {
+ panic!("Two events with same processing time")
+ }
+ }
+
+ fn flush(&mut self, flush_time: i64) {
+ self.flush_time = Some(self.create_instant(flush_time));
+ }
+
+ fn expect_batch(&mut self, batch_close_time: i64, batch: Vec<TestBatchEvent>) {
+ let batch_close_time = self.create_instant(batch_close_time);
+ let batches_at_time = self.expected_batches.entry(batch_close_time).or_default();
+ batches_at_time.push(batch);
+ }
+
+ fn run(mut self) {
+ let mut actual_batches = BTreeMap::new();
+
+ if let Some(flush_time) = self.flush_time {
+ if !self.inputs.split_off(&flush_time).is_empty() {
+ panic!("Flush must be the last test action");
+ }
+ }
+
+ while let Some((t, action)) = pop_first(&mut self.inputs) {
+ match action {
+ EventOrTimer::Event(event) => {
+ let outputs = self.batcher.event(t, event);
+ self.handle_outputs(t, outputs, &mut actual_batches, self.flush_time);
+ }
+ EventOrTimer::Timer() => {
+ actual_batches.insert(t, self.batcher.time(t));
+ }
+ };
+ }
+
+ if let Some(t) = self.flush_time {
+ let batches = self.batcher.flush();
+ if !batches.is_empty() {
+ actual_batches.insert(t, batches);
+ }
+ }
+
+ verify(self.expected_batches, actual_batches);
+ }
+
+ fn handle_outputs(
+ &mut self,
+ t: DateTime<Utc>,
+ outputs: Vec<BatcherOutput<TestBatchEvent>>,
+ all_batches: &mut BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
+ flush_time: Option<DateTime<Utc>>,
+ ) {
+ let mut batches = vec![];
+
+ for output in outputs {
+ match output {
+ BatcherOutput::Batch(batch) => batches.push(batch),
+ BatcherOutput::Timer(timer) => {
+ if timer <= t {
+ panic!(
+ "Batcher requested non-future timer. Input: {}, timer: {}",
+ t, timer
+ );
+ }
+ let add_timer = match flush_time {
+ None => true,
+ Some(flush_time) => timer < flush_time,
+ };
+ if add_timer {
+ if let Some(existing) = self.inputs.insert(timer, EventOrTimer::Timer())
+ {
+ panic!(
+ "Timer at the same time as existing event/timer: {}: {:?}",
+ timer, existing
+ );
+ }
+ }
+ }
+ }
+ }
+
+ if !batches.is_empty() {
+ all_batches.insert(t, batches);
+ }
+ }
+
+ fn create_instant(&self, time: i64) -> DateTime<Utc> {
+ self.start_time + Duration::milliseconds(time)
+ }
+ }
+
+ fn verify(
+ expected_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
+ mut actual_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
+ ) {
+ assert_eq!(
+ actual_batches.keys().collect::<Vec<_>>(),
+ expected_batches.keys().collect::<Vec<_>>()
+ );
+
+ for (time, timed_expected_batches) in expected_batches {
+ let mut timed_actual_batches = actual_batches.remove(&time).unwrap();
+
+ for timed_expected_batch in &timed_expected_batches {
+ let found =
+ timed_actual_batches
+ .iter()
+ .enumerate()
+ .find(|(_index, timed_actual_batch)| {
+ match_batches(timed_actual_batch, timed_expected_batch)
+ });
+
+ match found {
+ None => panic!(
+ "Failed to match batch @ {}: {:?}",
+ time, timed_actual_batches
+ ),
+ Some((index, _batch)) => timed_actual_batches.remove(index),
+ };
+ }
+ }
+ }
+
+ fn match_batches(batch1: &[TestBatchEvent], batch2: &[TestBatchEvent]) -> bool {
+ if batch1.len() != batch2.len() {
+ return false;
+ }
+
+ for event in batch1 {
+ if !batch2.contains(event) {
+ return false;
+ }
+ }
+
+ true
+ }
+
+ fn pop_first<K: Ord + Copy, V>(map: &mut BTreeMap<K, V>) -> Option<(K, V)> {
+ let (&key, _value) = map.iter().next()?;
+ map.remove_entry(&key)
+ }
+}