summaryrefslogtreecommitdiffstats
path: root/crates/common/batcher/src/batcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/common/batcher/src/batcher.rs')
-rw-r--r--crates/common/batcher/src/batcher.rs49
1 files changed, 24 insertions, 25 deletions
diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs
index 237e1b57..92b70996 100644
--- a/crates/common/batcher/src/batcher.rs
+++ b/crates/common/batcher/src/batcher.rs
@@ -1,12 +1,12 @@
use crate::batch::{Batch, BatchAdd};
use crate::batchable::Batchable;
use crate::config::BatchConfig;
-use chrono::{DateTime, Utc};
+use time::OffsetDateTime;
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum BatcherOutput<B> {
Batch(Vec<B>),
- Timer(DateTime<Utc>),
+ Timer(OffsetDateTime),
}
/// Provides the core implementation of the batching algorithm.
@@ -27,7 +27,7 @@ impl<B: Batchable> Batcher<B> {
pub(crate) fn event(
&mut self,
- processing_time: DateTime<Utc>,
+ processing_time: OffsetDateTime,
event: B,
) -> Vec<BatcherOutput<B>> {
let event_time = event.event_time();
@@ -63,8 +63,8 @@ impl<B: Batchable> Batcher<B> {
fn output_for_batch_end(
&mut self,
- processing_time: DateTime<Utc>,
- batch_end: DateTime<Utc>,
+ processing_time: OffsetDateTime,
+ batch_end: OffsetDateTime,
) -> Vec<BatcherOutput<B>> {
let batch_timeout = batch_end + self.config.delivery_jitter();
if processing_time < batch_timeout {
@@ -77,7 +77,7 @@ impl<B: Batchable> Batcher<B> {
}
}
- pub(crate) fn time(&mut self, time: DateTime<Utc>) -> Vec<Vec<B>> {
+ pub(crate) fn time(&mut self, time: OffsetDateTime) -> Vec<Vec<B>> {
let batches = std::mem::take(&mut self.batches);
let (open_batches, closed_batches) = batches
@@ -92,7 +92,7 @@ impl<B: Batchable> Batcher<B> {
.collect()
}
- fn is_open(&self, batch: &Batch<B>, time: DateTime<Utc>) -> bool {
+ fn is_open(&self, batch: &Batch<B>, time: OffsetDateTime) -> bool {
batch.batch_end() + self.config.delivery_jitter() > time
}
@@ -106,7 +106,7 @@ impl<B: Batchable> Batcher<B> {
batches
}
- fn find_target_batch(&mut self, event_time: DateTime<Utc>) -> Option<&mut Batch<B>> {
+ fn find_target_batch(&mut self, event_time: OffsetDateTime) -> Option<&mut Batch<B>> {
for batch in &mut self.batches {
if batch.batch_start() <= event_time && event_time <= batch.batch_end() {
return Some(batch);
@@ -131,14 +131,14 @@ impl<B: Batchable> Batcher<B> {
Batch::new(batch_start, batch_end, event)
}
- fn previous_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> {
+ fn previous_batch(&self, event_time: OffsetDateTime) -> Option<&Batch<B>> {
self.batches
.iter()
.filter(|batch| batch.batch_end() < event_time)
.max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end()))
}
- fn next_batch(&self, event_time: DateTime<Utc>) -> Option<&Batch<B>> {
+ fn next_batch(&self, event_time: OffsetDateTime) -> Option<&Batch<B>> {
self.batches
.iter()
.filter(|batch| batch.batch_start() > event_time)
@@ -151,9 +151,8 @@ mod tests {
use super::*;
use crate::batchable::Batchable;
use crate::config::BatchConfigBuilder;
- use chrono::offset::TimeZone;
- use chrono::{DateTime, Duration, Utc};
use std::collections::BTreeMap;
+ use time::Duration;
#[test]
fn single_event_batch() {
@@ -424,7 +423,7 @@ mod tests {
#[derive(Debug, Clone, Eq, PartialEq)]
struct TestBatchEvent {
- event_time: DateTime<Utc>,
+ event_time: OffsetDateTime,
key: String,
value: u64,
}
@@ -436,7 +435,7 @@ mod tests {
self.key.clone()
}
- fn event_time(&self) -> DateTime<Utc> {
+ fn event_time(&self) -> OffsetDateTime {
self.event_time
}
}
@@ -448,11 +447,11 @@ mod tests {
}
struct BatcherTest {
- start_time: DateTime<Utc>,
+ start_time: OffsetDateTime,
batcher: Batcher<TestBatchEvent>,
- inputs: BTreeMap<DateTime<Utc>, EventOrTimer>,
- flush_time: Option<DateTime<Utc>>,
- expected_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
+ inputs: BTreeMap<OffsetDateTime, EventOrTimer>,
+ flush_time: Option<OffsetDateTime>,
+ expected_batches: BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>,
}
impl BatcherTest {
@@ -463,7 +462,7 @@ mod tests {
.message_leap_limit(message_leap_limit)
.build();
- let start_time = Utc.timestamp_millis(0);
+ let start_time = OffsetDateTime::from_unix_timestamp(0).unwrap();
let batcher = Batcher::new(batcher_config);
BatcherTest {
@@ -538,10 +537,10 @@ mod tests {
fn handle_outputs(
&mut self,
- t: DateTime<Utc>,
+ t: OffsetDateTime,
outputs: Vec<BatcherOutput<TestBatchEvent>>,
- all_batches: &mut BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
- flush_time: Option<DateTime<Utc>>,
+ all_batches: &mut BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>,
+ flush_time: Option<OffsetDateTime>,
) {
let mut batches = vec![];
@@ -577,14 +576,14 @@ mod tests {
}
}
- fn create_instant(&self, time: i64) -> DateTime<Utc> {
+ fn create_instant(&self, time: i64) -> OffsetDateTime {
self.start_time + Duration::milliseconds(time)
}
}
fn verify(
- expected_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
- mut actual_batches: BTreeMap<DateTime<Utc>, Vec<Vec<TestBatchEvent>>>,
+ expected_batches: BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>,
+ mut actual_batches: BTreeMap<OffsetDateTime, Vec<Vec<TestBatchEvent>>>,
) {
assert_eq!(
actual_batches.keys().collect::<Vec<_>>(),