diff options
Diffstat (limited to 'crates/common/batcher/src/batcher.rs')
-rw-r--r-- | crates/common/batcher/src/batcher.rs | 49 |
1 files changed, 24 insertions, 25 deletions
diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs index 237e1b57..92b70996 100644 --- a/crates/common/batcher/src/batcher.rs +++ b/crates/common/batcher/src/batcher.rs @@ -1,12 +1,12 @@ use crate::batch::{Batch, BatchAdd}; use crate::batchable::Batchable; use crate::config::BatchConfig; -use chrono::{DateTime, Utc}; +use time::OffsetDateTime; #[derive(Debug, Eq, PartialEq)] pub(crate) enum BatcherOutput<B> { Batch(Vec<B>), - Timer(DateTime<Utc>), + Timer(OffsetDateTime), } /// Provides the core implementation of the batching algorithm. @@ -27,7 +27,7 @@ impl<B: Batchable> Batcher<B> { pub(crate) fn event( &mut self, - processing_time: DateTime<Utc>, + processing_time: OffsetDateTime, event: B, ) -> Vec<BatcherOutput<B>> { let event_time = event.event_time(); @@ -63,8 +63,8 @@ impl<B: Batchable> Batcher<B> { fn output_for_batch_end( &mut self, - processing_time: DateTime<Utc>, - batch_end: DateTime<Utc>, + processing_time: OffsetDateTime, + batch_end: OffsetDateTime, ) -> Vec<BatcherOutput<B>> { let batch_timeout = batch_end + self.config.delivery_jitter(); if processing_time < batch_timeout { @@ -77,7 +77,7 @@ impl<B: Batchable> Batcher<B> { } } - pub(crate) fn time(&mut self, time: DateTime<Utc>) -> Vec<Vec<B>> { + pub(crate) fn time(&mut self, time: OffsetDateTime) -> Vec<Vec<B>> { let batches = std::mem::take(&mut self.batches); let (open_batches, closed_batches) = batches @@ -92,7 +92,7 @@ impl<B: Batchable> Batcher<B> { .collect() } - fn is_open(&self, batch: &Batch<B>, time: DateTime<Utc>) -> bool { + fn is_open(&self, batch: &Batch<B>, time: OffsetDateTime) -> bool { batch.batch_end() + self.config.delivery_jitter() > time } @@ -106,7 +106,7 @@ impl<B: Batchable> Batcher<B> { batches } - fn find_target_batch(&mut self, event_time: DateTime<Utc>) -> Option<&mut Batch<B>> { + fn find_target_batch(&mut self, event_time: OffsetDateTime) -> Option<&mut Batch<B>> { for batch in &mut self.batches { if batch.batch_start() <= event_time && event_time <= batch.batch_end() { return Some(batch); @@ -131,14 +131,14 @@ impl<B: Batchable> Batcher<B> { Batch::new(batch_start, batch_end, event) } - fn previous_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> { + fn previous_batch(&self, event_time: OffsetDateTime) -> Option<&Batch<B>> { self.batches .iter() .filter(|batch| batch.batch_end() < event_time) .max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end())) } - fn next_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> { + fn next_batch(&self, event_time: OffsetDateTime) -> Option<&Batch<B>> { self.batches .iter() .filter(|batch| batch.batch_start() > event_time) @@ -151,9 +151,8 @@ mod tests { use super::*; use crate::batchable::Batchable; use crate::config::BatchConfigBuilder; - use chrono::offset::TimeZone; - use chrono::{DateTime, Duration, Utc}; use std::collections::BTreeMap; + use time::Duration; #[test] fn single_event_batch() { @@ -424,7 +423,7 @@ mod tests { #[derive(Debug, Clone, Eq, PartialEq)] struct TestBatchEvent { - event_time: DateTime<Utc>, + event_time: OffsetDateTime, key: String, value: u64, } @@ -436,7 +435,7 @@ mod tests { self.key.clone() } - fn event_time(&self) -> DateTime<Utc> { + fn event_time(&self) -> OffsetDateTime { self.event_time } } @@ -448,11 +447,11 @@ mod tests { } struct BatcherTest { - start_time: DateTime<Utc>, + start_time: OffsetDateTime, batcher: Batcher<TestBatchEvent>, - inputs: BTreeMap<DateTime<Utc>, EventOrTimer>, - flush_time: Option<DateTime<Utc>>, - expected_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>, + inputs: BTreeMap<OffsetDateTime, EventOrTimer>, + flush_time: Option<OffsetDateTime>, + expected_batches: BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>, } impl BatcherTest { @@ -463,7 +462,7 @@ mod tests { .message_leap_limit(message_leap_limit) .build(); - let start_time = Utc.timestamp_millis(0); + let start_time = OffsetDateTime::from_unix_timestamp(0).unwrap(); let batcher = Batcher::new(batcher_config); BatcherTest { @@ -538,10 +537,10 @@ mod tests { fn handle_outputs( &mut self, - t: DateTime<Utc>, + t: OffsetDateTime, outputs: Vec<BatcherOutput<TestBatchEvent>>, - all_batches: &mut BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>, - flush_time: Option<DateTime<Utc>>, + all_batches: &mut BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>, + flush_time: Option<OffsetDateTime>, ) { let mut batches = vec![]; @@ -577,14 +576,14 @@ mod tests { } } - fn create_instant(&self, time: i64) -> DateTime<Utc> { + fn create_instant(&self, time: i64) -> OffsetDateTime { self.start_time + Duration::milliseconds(time) } } fn verify( - expected_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>, - mut actual_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>, + expected_batches: BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>, + mut actual_batches: BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>, ) { assert_eq!( actual_batches.keys().collect::<Vec<_>>(), |