summaryrefslogtreecommitdiffstats
path: root/crates/common
diff options
context:
space:
mode:
authorLukasz Woznicki <75632179+makr11st@users.noreply.github.com>2021-11-24 20:54:56 +0000
committerGitHub <noreply@github.com>2021-11-24 20:54:56 +0000
commita4ffeccf60090e4456755bc53a6e3b8c8038e855 (patch)
tree9583f187114913a92866571920dd3bb205bd50a3 /crates/common
parent8217e80670e76dbf9168780f5e0545355a39f8f3 (diff)
Restructure directories of the workspace (#559)
* Restructure directories of the workspace * Rename c8y_translator_lib to c8y_translator * Update comment on how to get dummy plugin path Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/common')
-rw-r--r--crates/common/batcher/Cargo.toml14
-rw-r--r--crates/common/batcher/src/batch.rs182
-rw-r--r--crates/common/batcher/src/batchable.rs17
-rw-r--r--crates/common/batcher/src/batcher.rs635
-rw-r--r--crates/common/batcher/src/config.rs117
-rw-r--r--crates/common/batcher/src/driver.rs260
-rw-r--r--crates/common/batcher/src/lib.rs18
-rw-r--r--crates/common/certificate/Cargo.toml21
-rw-r--r--crates/common/certificate/src/device_id.rs126
-rw-r--r--crates/common/certificate/src/lib.rs345
-rw-r--r--crates/common/clock/Cargo.toml9
-rw-r--r--crates/common/clock/src/lib.rs19
-rw-r--r--crates/common/download/Cargo.toml27
-rw-r--r--crates/common/download/examples/simple_download.rs23
-rw-r--r--crates/common/download/src/download.rs422
-rw-r--r--crates/common/download/src/error.rs39
-rw-r--r--crates/common/download/src/lib.rs5
-rw-r--r--crates/common/flockfile/Cargo.toml16
-rw-r--r--crates/common/flockfile/src/lib.rs11
-rw-r--r--crates/common/flockfile/src/unix.rs171
-rw-r--r--crates/common/flockfile/src/windows.rs32
-rw-r--r--crates/common/json_writer/Cargo.toml12
-rw-r--r--crates/common/json_writer/src/lib.rs176
-rw-r--r--crates/common/mqtt_client/Cargo.toml28
-rw-r--r--crates/common/mqtt_client/examples/publish_test.rs29
-rw-r--r--crates/common/mqtt_client/examples/sawtooth_publisher.rs222
-rw-r--r--crates/common/mqtt_client/examples/simple_mapper.rs69
-rw-r--r--crates/common/mqtt_client/examples/temperature_publisher.rs96
-rw-r--r--crates/common/mqtt_client/src/lib.rs862
-rw-r--r--crates/common/mqtt_client/tests/mqtt_pub_sub_test.rs116
-rw-r--r--crates/common/mqtt_client/tests/packet_size_tests.rs177
-rw-r--r--crates/common/tedge_config/Cargo.toml18
-rw-r--r--crates/common/tedge_config/src/config_setting.rs75
-rw-r--r--crates/common/tedge_config/src/error.rs20
-rw-r--r--crates/common/tedge_config/src/lib.rs15
-rw-r--r--crates/common/tedge_config/src/models/connect_url.rs86
-rw-r--r--crates/common/tedge_config/src/models/file_path.rs46
-rw-r--r--crates/common/tedge_config/src/models/flag.rs89
-rw-r--r--crates/common/tedge_config/src/models/mod.rs5
-rw-r--r--crates/common/tedge_config/src/models/port.rs56
-rw-r--r--crates/common/tedge_config/src/settings.rs264
-rw-r--r--crates/common/tedge_config/src/tedge_config.rs455
-rw-r--r--crates/common/tedge_config/src/tedge_config_defaults.rs84
-rw-r--r--crates/common/tedge_config/src/tedge_config_dto.rs90
-rw-r--r--crates/common/tedge_config/src/tedge_config_location.rs108
-rw-r--r--crates/common/tedge_config/src/tedge_config_repository.rs103
-rw-r--r--crates/common/tedge_config/tests/test_tedge_config.rs867
-rw-r--r--crates/common/tedge_users/Cargo.toml11
-rw-r--r--crates/common/tedge_users/src/lib.rs29
-rw-r--r--crates/common/tedge_users/src/unix.rs225
-rw-r--r--crates/common/tedge_users/src/windows.rs37
-rw-r--r--crates/common/tedge_utils/Cargo.toml26
-rw-r--r--crates/common/tedge_utils/src/fs.rs98
-rw-r--r--crates/common/tedge_utils/src/lib.rs6
-rw-r--r--crates/common/tedge_utils/src/logging.rs20
-rw-r--r--crates/common/tedge_utils/src/paths.rs200
-rw-r--r--crates/common/tedge_utils/src/signals.rs16
57 files changed, 7345 insertions, 0 deletions
diff --git a/crates/common/batcher/Cargo.toml b/crates/common/batcher/Cargo.toml
new file mode 100644
index 00000000..9cc6c8fb
--- /dev/null
+++ b/crates/common/batcher/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "batcher"
+version = "0.4.3"
+authors = ["thin-edge.io team <info@thin-edge.io>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+chrono = "0.4"
+tokio = { version = "1.12", features = ["sync", "time"] }
+
+[dev-dependencies]
+tokio = { version = "1.12", features = ["rt", "macros"] }
diff --git a/crates/common/batcher/src/batch.rs b/crates/common/batcher/src/batch.rs
new file mode 100644
index 00000000..8c06d5c6
--- /dev/null
+++ b/crates/common/batcher/src/batch.rs
@@ -0,0 +1,182 @@
+use crate::batchable::Batchable;
+use chrono::{DateTime, Utc};
+use std::collections::HashMap;
+use std::iter::once;
+
+#[must_use]
+#[derive(Debug)]
+pub enum BatchAdd<B: Batchable> {
+ Added,
+ Duplicate,
+ Split(Batch<B>),
+}
+
+#[derive(Debug)]
+pub struct Batch<B: Batchable> {
+ batch_start: DateTime<Utc>,
+ batch_end: DateTime<Utc>,
+ events: HashMap<B::Key, B>,
+}
+
+impl<B: Batchable> Batch<B> {
+ pub fn new(batch_start: DateTime<Utc>, batch_end: DateTime<Utc>, event: B) -> Batch<B> {
+ let mut events = HashMap::new();
+ events.insert(event.key(), event);
+
+ Batch {
+ batch_start,
+ batch_end,
+ events,
+ }
+ }
+
+ pub fn batch_start(&self) -> DateTime<Utc> {
+ self.batch_start
+ }
+
+ pub fn batch_end(&self) -> DateTime<Utc> {
+ self.batch_end
+ }
+
+ pub fn add(&mut self, event: B) -> BatchAdd<B> {
+ let key = event.key();
+ if let Some(existing_event) = self.events.get(&key) {
+ let existing_event_time = existing_event.event_time();
+
+ if event.event_time() == existing_event_time {
+ return BatchAdd::Duplicate;
+ }
+
+ return BatchAdd::Split(self.split(existing_event_time, event));
+ }
+
+ self.events.insert(key, event);
+
+ BatchAdd::Added
+ }
+
+ fn split(&mut self, existing_event_time: DateTime<Utc>, event: B) -> Batch<B> {
+ let split_point = midpoint(existing_event_time, event.event_time());
+
+ let mut new_batch_events = HashMap::new();
+ let new_batch_end = self.batch_end;
+
+ let all_events = std::mem::take(&mut self.events);
+ self.batch_end = split_point;
+
+ // Go over all the events in this batch plus the new event and allocate them,
+ // either the existing batch or the new batch.
+ for event in all_events
+ .into_iter()
+ .map(|(_key, event)| event)
+ .chain(once(event))
+ {
+ let event_time = event.event_time();
+
+ if event_time < split_point {
+ self.events.insert(event.key(), event);
+ } else {
+ new_batch_events.insert(event.key(), event);
+ }
+ }
+
+ Batch {
+ batch_start: split_point,
+ batch_end: new_batch_end,
+ events: new_batch_events,
+ }
+ }
+
+ pub fn into_vec(self) -> Vec<B> {
+ self.events.into_iter().map(|(_k, v)| v).collect()
+ }
+}
+
+fn midpoint(event_time1: DateTime<Utc>, event_time2: DateTime<Utc>) -> DateTime<Utc> {
+ let gap = event_time1.signed_duration_since(event_time2);
+ event_time2 + gap / 2
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use chrono::TimeZone;
+
+ #[test]
+ fn add() {
+ let batch_start = Utc.timestamp_millis(0);
+ let batch_end = Utc.timestamp_millis(100);
+ let event1 = TestBatchEvent::new(1, 40);
+ let event2 = TestBatchEvent::new(2, 60);
+
+ let mut batch = Batch::new(batch_start, batch_end, event1.clone());
+ assert!(matches!(batch.add(event2.clone()), BatchAdd::Added));
+
+ let result = batch.into_vec();
+ assert_eq!(result.len(), 2);
+ assert!(result.contains(&event1));
+ assert!(result.contains(&event2));
+ }
+
+ #[test]
+ fn split() {
+ let batch_start = Utc.timestamp_millis(0);
+ let batch_end = Utc.timestamp_millis(100);
+ let event1 = TestBatchEvent::new(1, 40);
+ let event2 = TestBatchEvent::new(1, 60);
+
+ let mut batch1 = Batch::new(batch_start, batch_end, event1.clone());
+ match batch1.add(event2.clone()) {
+ BatchAdd::Split(batch2) => {
+ let result1 = batch1.into_vec();
+ assert_eq!(result1.len(), 1);
+ assert!(result1.contains(&event1));
+
+ let result2 = batch2.into_vec();
+ assert_eq!(result2.len(), 1);
+ assert!(result2.contains(&event2));
+ }
+ _ => panic!("Expected split"),
+ }
+ }
+
+ #[test]
+ fn duplicate() {
+ let batch_start = Utc.timestamp_millis(0);
+ let batch_end = Utc.timestamp_millis(100);
+ let event1 = TestBatchEvent::new(1, 40);
+ let event2 = TestBatchEvent::new(1, 40);
+
+ let mut batch = Batch::new(batch_start, batch_end, event1.clone());
+ assert!(matches!(batch.add(event2), BatchAdd::Duplicate));
+
+ let result = batch.into_vec();
+ assert_eq!(result.len(), 1);
+ assert!(result.contains(&event1));
+ }
+
+ #[derive(Debug, Clone, Eq, PartialEq)]
+ struct TestBatchEvent {
+ key: u64,
+ event_time: DateTime<Utc>,
+ }
+
+ impl TestBatchEvent {
+ fn new(key: u64, event_time: i64) -> TestBatchEvent {
+ let event_time = Utc.timestamp_millis(event_time);
+ TestBatchEvent { key, event_time }
+ }
+ }
+
+ impl Batchable for TestBatchEvent {
+ type Key = u64;
+
+ fn key(&self) -> Self::Key {
+ self.key
+ }
+
+ fn event_time(&self) -> DateTime<Utc> {
+ self.event_time
+ }
+ }
+}
diff --git a/crates/common/batcher/src/batchable.rs b/crates/common/batcher/src/batchable.rs
new file mode 100644
index 00000000..3585fc2f
--- /dev/null
+++ b/crates/common/batcher/src/batchable.rs
@@ -0,0 +1,17 @@
+use chrono::{DateTime, Utc};
+use std::fmt::Debug;
+use std::hash::Hash;
+
+/// Implement this interface for the items that you want batched.
+/// No items with the same key will go in the same batch.
+/// The event_time of the item will determine how items are grouped,
+/// dependent on how the batcher is configured.
+pub trait Batchable {
+ type Key: Eq + Hash + Debug;
+
+ /// Define the uniqueness within a batch.
+ fn key(&self) -> Self::Key;
+
+ /// The time at which this item was created. This time is used to group items into a batch.
+ fn event_time(&self) -> DateTime<Utc>;
+}
diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs
new file mode 100644
index 00000000..237e1b57
--- /dev/null
+++ b/crates/common/batcher/src/batcher.rs
@@ -0,0 +1,635 @@
+use crate::batch::{Batch, BatchAdd};
+use crate::batchable::Batchable;
+use crate::config::BatchConfig;
+use chrono::{DateTime, Utc};
+
+#[derive(Debug, Eq, PartialEq)]
+pub(crate) enum BatcherOutput<B> {
+ Batch(Vec<B>),
+ Timer(DateTime<Utc>),
+}
+
+/// Provides the core implementation of the batching algorithm.
+#[derive(Debug)]
+pub struct Batcher<B: Batchable> {
+ config: BatchConfig,
+ batches: Vec<Batch<B>>,
+}
+
+impl<B: Batchable> Batcher<B> {
+ /// Create a Batcher with the specified config.
+ pub fn new(config: BatchConfig) -> Batcher<B> {
+ Batcher {
+ config,
+ batches: vec![],
+ }
+ }
+
+ pub(crate) fn event(
+ &mut self,
+ processing_time: DateTime<Utc>,
+ event: B,
+ ) -> Vec<BatcherOutput<B>> {
+ let event_time = event.event_time();
+
+ if event_time < processing_time - self.config.delivery_jitter() {
+ // Discard event because it is too old
+ return vec![];
+ }
+
+ if event_time > processing_time + self.config.message_leap_limit() {
+ // Discard event because it is too futuristic
+ return vec![];
+ }
+
+ match self.find_target_batch(event_time) {
+ None => {
+ let new_batch = self.make_new_batch(event);
+ let new_batch_end = new_batch.batch_end();
+ self.batches.push(new_batch);
+ self.output_for_batch_end(processing_time, new_batch_end)
+ }
+ Some(target_batch) => match target_batch.add(event) {
+ BatchAdd::Added => vec![],
+ BatchAdd::Duplicate => vec![],
+ BatchAdd::Split(new_batch) => {
+ let split_batch_end = target_batch.batch_end();
+ self.batches.push(new_batch);
+ self.output_for_batch_end(processing_time, split_batch_end)
+ }
+ },
+ }
+ }
+
+ fn output_for_batch_end(
+ &mut self,
+ processing_time: DateTime<Utc>,
+ batch_end: DateTime<Utc>,
+ ) -> Vec<BatcherOutput<B>> {
+ let batch_timeout = batch_end + self.config.delivery_jitter();
+ if processing_time < batch_timeout {
+ vec![BatcherOutput::Timer(batch_timeout)]
+ } else {
+ self.time(processing_time)
+ .into_iter()
+ .map(BatcherOutput::Batch)
+ .collect()
+ }
+ }
+
+ pub(crate) fn time(&mut self, time: DateTime<Utc>) -> Vec<Vec<B>> {
+ let batches = std::mem::take(&mut self.batches);
+
+ let (open_batches, closed_batches) = batches
+ .into_iter()
+ .partition(|batch| self.is_open(batch, time));
+
+ self.batches = open_batches;
+
+ closed_batches
+ .into_iter()
+ .map(|batch| batch.into_vec())
+ .collect()
+ }
+
+ fn is_open(&self, batch: &Batch<B>, time: DateTime<Utc>) -> bool {
+ batch.batch_end() + self.config.delivery_jitter() > time
+ }
+
+ pub(crate) fn flush(self) -> Vec<Vec<B>> {
+ let mut batches = Vec::with_capacity(self.batches.len());
+
+ for batch in self.batches {
+ batches.push(batch.into_vec())
+ }
+
+ batches
+ }
+
+ fn find_target_batch(&mut self, event_time: DateTime<Utc>) -> Option<&mut Batch<B>> {
+ for batch in &mut self.batches {
+ if batch.batch_start() <= event_time && event_time <= batch.batch_end() {
+ return Some(batch);
+ }
+ }
+
+ None
+ }
+
+ fn make_new_batch(&self, event: B) -> Batch<B> {
+ let event_time = event.event_time();
+ let mut batch_start = event_time;
+ let mut batch_end = batch_start + self.config.event_jitter();
+
+ if let Some(previous_batch) = self.previous_batch(event_time) {
+ batch_start = batch_start.max(previous_batch.batch_end())
+ }
+ if let Some(next_batch) = self.next_batch(event_time) {
+ batch_end = batch_end.min(next_batch.batch_start())
+ }
+
+ Batch::new(batch_start, batch_end, event)
+ }
+
+ fn previous_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> {
+ self.batches
+ .iter()
+ .filter(|batch| batch.batch_end() < event_time)
+ .max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end()))
+ }
+
+ fn next_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> {
+ self.batches
+ .iter()
+ .filter(|batch| batch.batch_start() > event_time)
+ .min_by(|batch1, batch2| batch1.batch_start().cmp(&batch2.batch_start()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::batchable::Batchable;
+ use crate::config::BatchConfigBuilder;
+ use chrono::offset::TimeZone;
+ use chrono::{DateTime, Duration, Utc};
+ use std::collections::BTreeMap;
+
+ #[test]
+ fn single_event_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+
+ test.event(1, &event1);
+ test.expect_batch(70, vec![event1]);
+
+ test.run();
+ }
+
+ #[test]
+ fn multi_event_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "b", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.expect_batch(70, vec![event1, event2]);
+
+ test.run();
+ }
+
+ #[test]
+ // The same behavior as for `multi_event_batch` is expected
+ // Since we just change how long we wait for an event
+ fn multi_event_batch_with_long_delivery_jitter() {
+ let mut test = BatcherTest::new(50, 50, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "b", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.expect_batch(100, vec![event1, event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn multi_event_batch_with_long_delivery_jitter_and_delayed_message() {
+ let mut test = BatcherTest::new(50, 50, 0);
+
+ let event1 = test.create_event(5, "a", 2);
+ let event2 = test.create_event(10, "b", 1);
+
+ test.event(11, &event2);
+ test.event(25, &event1); // late, but not too late
+
+ test.expect_batch(60, vec![event1]);
+ test.expect_batch(110, vec![event2]);
+
+ test.run();
+ }
+
+ #[test]
+ fn split_batch() {
+ let mut test = BatcherTest::new(50, 20, 0);
+
+ let event1 = test.create_event(0, "a", 1);
+ let event2 = test.create_event(10, "a", 2);
+
+ test.event(1, &event1);
+ test.event(11, &event2);
+ test.expect_batch(25, vec![event1]); // why 25?
+ test.expect_batch(70, vec![event2