diff options
author | Lukasz Woznicki <75632179+makr11st@users.noreply.github.com> | 2021-11-24 20:54:56 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-24 20:54:56 +0000 |
commit | a4ffeccf60090e4456755bc53a6e3b8c8038e855 (patch) | |
tree | 9583f187114913a92866571920dd3bb205bd50a3 /crates/common | |
parent | 8217e80670e76dbf9168780f5e0545355a39f8f3 (diff) |
Restructure directories of the workspace (#559)
* Restructure directories of the workspace
* Rename c8y_translator_lib to c8y_translator
* Update comment on how to get dummy plugin path
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/common')
57 files changed, 7345 insertions, 0 deletions
diff --git a/crates/common/batcher/Cargo.toml b/crates/common/batcher/Cargo.toml new file mode 100644 index 00000000..9cc6c8fb --- /dev/null +++ b/crates/common/batcher/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "batcher" +version = "0.4.3" +authors = ["thin-edge.io team <info@thin-edge.io>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4" +tokio = { version = "1.12", features = ["sync", "time"] } + +[dev-dependencies] +tokio = { version = "1.12", features = ["rt", "macros"] } diff --git a/crates/common/batcher/src/batch.rs b/crates/common/batcher/src/batch.rs new file mode 100644 index 00000000..8c06d5c6 --- /dev/null +++ b/crates/common/batcher/src/batch.rs @@ -0,0 +1,182 @@ +use crate::batchable::Batchable; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::iter::once; + +#[must_use] +#[derive(Debug)] +pub enum BatchAdd<B: Batchable> { + Added, + Duplicate, + Split(Batch<B>), +} + +#[derive(Debug)] +pub struct Batch<B: Batchable> { + batch_start: DateTime<Utc>, + batch_end: DateTime<Utc>, + events: HashMap<B::Key, B>, +} + +impl<B: Batchable> Batch<B> { + pub fn new(batch_start: DateTime<Utc>, batch_end: DateTime<Utc>, event: B) -> Batch<B> { + let mut events = HashMap::new(); + events.insert(event.key(), event); + + Batch { + batch_start, + batch_end, + events, + } + } + + pub fn batch_start(&self) -> DateTime<Utc> { + self.batch_start + } + + pub fn batch_end(&self) -> DateTime<Utc> { + self.batch_end + } + + pub fn add(&mut self, event: B) -> BatchAdd<B> { + let key = event.key(); + if let Some(existing_event) = self.events.get(&key) { + let existing_event_time = existing_event.event_time(); + + if event.event_time() == existing_event_time { + return BatchAdd::Duplicate; + } + + return BatchAdd::Split(self.split(existing_event_time, event)); + } + + self.events.insert(key, event); + + BatchAdd::Added + } + + fn split(&mut self, existing_event_time: DateTime<Utc>, event: B) -> Batch<B> { + let split_point = midpoint(existing_event_time, event.event_time()); + + let mut new_batch_events = HashMap::new(); + let new_batch_end = self.batch_end; + + let all_events = std::mem::take(&mut self.events); + self.batch_end = split_point; + + // Go over all the events in this batch plus the new event and allocate them, + // either the existing batch or the new batch. + for event in all_events + .into_iter() + .map(|(_key, event)| event) + .chain(once(event)) + { + let event_time = event.event_time(); + + if event_time < split_point { + self.events.insert(event.key(), event); + } else { + new_batch_events.insert(event.key(), event); + } + } + + Batch { + batch_start: split_point, + batch_end: new_batch_end, + events: new_batch_events, + } + } + + pub fn into_vec(self) -> Vec<B> { + self.events.into_iter().map(|(_k, v)| v).collect() + } +} + +fn midpoint(event_time1: DateTime<Utc>, event_time2: DateTime<Utc>) -> DateTime<Utc> { + let gap = event_time1.signed_duration_since(event_time2); + event_time2 + gap / 2 +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + #[test] + fn add() { + let batch_start = Utc.timestamp_millis(0); + let batch_end = Utc.timestamp_millis(100); + let event1 = TestBatchEvent::new(1, 40); + let event2 = TestBatchEvent::new(2, 60); + + let mut batch = Batch::new(batch_start, batch_end, event1.clone()); + assert!(matches!(batch.add(event2.clone()), BatchAdd::Added)); + + let result = batch.into_vec(); + assert_eq!(result.len(), 2); + assert!(result.contains(&event1)); + assert!(result.contains(&event2)); + } + + #[test] + fn split() { + let batch_start = Utc.timestamp_millis(0); + let batch_end = Utc.timestamp_millis(100); + let event1 = TestBatchEvent::new(1, 40); + let event2 = TestBatchEvent::new(1, 60); + + let mut batch1 = Batch::new(batch_start, batch_end, event1.clone()); + match batch1.add(event2.clone()) { + BatchAdd::Split(batch2) => { + let result1 = batch1.into_vec(); + assert_eq!(result1.len(), 1); + assert!(result1.contains(&event1)); + + let result2 = batch2.into_vec(); + assert_eq!(result2.len(), 1); + assert!(result2.contains(&event2)); + } + _ => panic!("Expected split"), + } + } + + #[test] + fn duplicate() { + let batch_start = Utc.timestamp_millis(0); + let batch_end = Utc.timestamp_millis(100); + let event1 = TestBatchEvent::new(1, 40); + let event2 = TestBatchEvent::new(1, 40); + + let mut batch = Batch::new(batch_start, batch_end, event1.clone()); + assert!(matches!(batch.add(event2), BatchAdd::Duplicate)); + + let result = batch.into_vec(); + assert_eq!(result.len(), 1); + assert!(result.contains(&event1)); + } + + #[derive(Debug, Clone, Eq, PartialEq)] + struct TestBatchEvent { + key: u64, + event_time: DateTime<Utc>, + } + + impl TestBatchEvent { + fn new(key: u64, event_time: i64) -> TestBatchEvent { + let event_time = Utc.timestamp_millis(event_time); + TestBatchEvent { key, event_time } + } + } + + impl Batchable for TestBatchEvent { + type Key = u64; + + fn key(&self) -> Self::Key { + self.key + } + + fn event_time(&self) -> DateTime<Utc> { + self.event_time + } + } +} diff --git a/crates/common/batcher/src/batchable.rs b/crates/common/batcher/src/batchable.rs new file mode 100644 index 00000000..3585fc2f --- /dev/null +++ b/crates/common/batcher/src/batchable.rs @@ -0,0 +1,17 @@ +use chrono::{DateTime, Utc}; +use std::fmt::Debug; +use std::hash::Hash; + +/// Implement this interface for the items that you want batched. +/// No items with the same key will go in the same batch. +/// The event_time of the item will determine how items are grouped, +/// dependent on how the batcher is configured. +pub trait Batchable { + type Key: Eq + Hash + Debug; + + /// Define the uniqueness within a batch. + fn key(&self) -> Self::Key; + + /// The time at which this item was created. This time is used to group items into a batch. + fn event_time(&self) -> DateTime<Utc>; +} diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs new file mode 100644 index 00000000..237e1b57 --- /dev/null +++ b/crates/common/batcher/src/batcher.rs @@ -0,0 +1,635 @@ +use crate::batch::{Batch, BatchAdd}; +use crate::batchable::Batchable; +use crate::config::BatchConfig; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum BatcherOutput<B> { + Batch(Vec<B>), + Timer(DateTime<Utc>), +} + +/// Provides the core implementation of the batching algorithm. +#[derive(Debug)] +pub struct Batcher<B: Batchable> { + config: BatchConfig, + batches: Vec<Batch<B>>, +} + +impl<B: Batchable> Batcher<B> { + /// Create a Batcher with the specified config. + pub fn new(config: BatchConfig) -> Batcher<B> { + Batcher { + config, + batches: vec![], + } + } + + pub(crate) fn event( + &mut self, + processing_time: DateTime<Utc>, + event: B, + ) -> Vec<BatcherOutput<B>> { + let event_time = event.event_time(); + + if event_time < processing_time - self.config.delivery_jitter() { + // Discard event because it is too old + return vec![]; + } + + if event_time > processing_time + self.config.message_leap_limit() { + // Discard event because it is too futuristic + return vec![]; + } + + match self.find_target_batch(event_time) { + None => { + let new_batch = self.make_new_batch(event); + let new_batch_end = new_batch.batch_end(); + self.batches.push(new_batch); + self.output_for_batch_end(processing_time, new_batch_end) + } + Some(target_batch) => match target_batch.add(event) { + BatchAdd::Added => vec![], + BatchAdd::Duplicate => vec![], + BatchAdd::Split(new_batch) => { + let split_batch_end = target_batch.batch_end(); + self.batches.push(new_batch); + self.output_for_batch_end(processing_time, split_batch_end) + } + }, + } + } + + fn output_for_batch_end( + &mut self, + processing_time: DateTime<Utc>, + batch_end: DateTime<Utc>, + ) -> Vec<BatcherOutput<B>> { + let batch_timeout = batch_end + self.config.delivery_jitter(); + if processing_time < batch_timeout { + vec![BatcherOutput::Timer(batch_timeout)] + } else { + self.time(processing_time) + .into_iter() + .map(BatcherOutput::Batch) + .collect() + } + } + + pub(crate) fn time(&mut self, time: DateTime<Utc>) -> Vec<Vec<B>> { + let batches = std::mem::take(&mut self.batches); + + let (open_batches, closed_batches) = batches + .into_iter() + .partition(|batch| self.is_open(batch, time)); + + self.batches = open_batches; + + closed_batches + .into_iter() + .map(|batch| batch.into_vec()) + .collect() + } + + fn is_open(&self, batch: &Batch<B>, time: DateTime<Utc>) -> bool { + batch.batch_end() + self.config.delivery_jitter() > time + } + + pub(crate) fn flush(self) -> Vec<Vec<B>> { + let mut batches = Vec::with_capacity(self.batches.len()); + + for batch in self.batches { + batches.push(batch.into_vec()) + } + + batches + } + + fn find_target_batch(&mut self, event_time: DateTime<Utc>) -> Option<&mut Batch<B>> { + for batch in &mut self.batches { + if batch.batch_start() <= event_time && event_time <= batch.batch_end() { + return Some(batch); + } + } + + None + } + + fn make_new_batch(&self, event: B) -> Batch<B> { + let event_time = event.event_time(); + let mut batch_start = event_time; + let mut batch_end = batch_start + self.config.event_jitter(); + + if let Some(previous_batch) = self.previous_batch(event_time) { + batch_start = batch_start.max(previous_batch.batch_end()) + } + if let Some(next_batch) = self.next_batch(event_time) { + batch_end = batch_end.min(next_batch.batch_start()) + } + + Batch::new(batch_start, batch_end, event) + } + + fn previous_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> { + self.batches + .iter() + .filter(|batch| batch.batch_end() < event_time) + .max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end())) + } + + fn next_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> { + self.batches + .iter() + .filter(|batch| batch.batch_start() > event_time) + .min_by(|batch1, batch2| batch1.batch_start().cmp(&batch2.batch_start())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::batchable::Batchable; + use crate::config::BatchConfigBuilder; + use chrono::offset::TimeZone; + use chrono::{DateTime, Duration, Utc}; + use std::collections::BTreeMap; + + #[test] + fn single_event_batch() { + let mut test = BatcherTest::new(50, 20, 0); + + let event1 = test.create_event(0, "a", 1); + + test.event(1, &event1); + test.expect_batch(70, vec![event1]); + + test.run(); + } + + #[test] + fn multi_event_batch() { + let mut test = BatcherTest::new(50, 20, 0); + + let event1 = test.create_event(0, "a", 1); + let event2 = test.create_event(10, "b", 2); + + test.event(1, &event1); + test.event(11, &event2); + test.expect_batch(70, vec![event1, event2]); + + test.run(); + } + + #[test] + // The same behavior as for `multi_event_batch` is expected + // Since we just change how long we wait for an event + fn multi_event_batch_with_long_delivery_jitter() { + let mut test = BatcherTest::new(50, 50, 0); + + let event1 = test.create_event(0, "a", 1); + let event2 = test.create_event(10, "b", 2); + + test.event(1, &event1); + test.event(11, &event2); + test.expect_batch(100, vec![event1, event2]); + + test.run(); + } + + #[test] + fn multi_event_batch_with_long_delivery_jitter_and_delayed_message() { + let mut test = BatcherTest::new(50, 50, 0); + + let event1 = test.create_event(5, "a", 2); + let event2 = test.create_event(10, "b", 1); + + test.event(11, &event2); + test.event(25, &event1); // late, but not too late + + test.expect_batch(60, vec![event1]); + test.expect_batch(110, vec![event2]); + + test.run(); + } + + #[test] + fn split_batch() { + let mut test = BatcherTest::new(50, 20, 0); + + let event1 = test.create_event(0, "a", 1); + let event2 = test.create_event(10, "a", 2); + + test.event(1, &event1); + test.event(11, &event2); + test.expect_batch(25, vec![event1]); // why 25? + test.expect_batch(70, vec![event2 |