summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c15
1 files changed, 7 insertions, 8 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index edb64d8223..63a0056abd 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -96,7 +96,7 @@ struct replication_query {
size_t points_read;
size_t points_generated;
- struct storage_engine_query_ops *ops;
+ STORAGE_ENGINE_BACKEND backend;
struct replication_request *rq;
size_t dimensions;
@@ -162,7 +162,7 @@ static struct replication_query *replication_query_prepare(
}
}
- q->ops = &st->rrdhost->db[0].eng->api.query_ops;
+ q->backend = st->rrdhost->db[0].eng->backend;
// prepare our array of dimensions
size_t count = 0;
@@ -184,7 +184,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -260,7 +260,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
struct replication_dimension *d = &q->data[i];
if (unlikely(!d->enabled)) continue;
- q->ops->finalize(&d->handle);
+ storage_engine_query_finalize(&d->handle);
dictionary_acquired_item_release(d->dict, d->rda);
@@ -292,7 +292,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
struct replication_dimension *d = &q->data[i];
if(unlikely(!d->enabled)) continue;
- time_t new_before = q->ops->align_to_optimal_before(&d->handle);
+ time_t new_before = rrdeng_load_align_to_optimal_before(&d->handle);
if (!expanded_before || new_before < expanded_before)
expanded_before = new_before;
}
@@ -311,7 +311,6 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
- struct storage_engine_query_ops *ops = q->ops;
time_t wall_clock_time = q->wall_clock_time;
bool finished_with_gap = false;
@@ -331,8 +330,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
// fetch the first valid point for the dimension
int max_skip = 1000;
- while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
- d->sp = ops->next_metric(&d->handle);
+ while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
+ d->sp = storage_engine_query_next_metric(&d->handle);
points_read++;
}