summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-04-07 21:25:01 +0300
committerGitHub <noreply@github.com>2023-04-07 21:25:01 +0300
commit204dd9ae272445d13f308badb07e99675fa34892 (patch)
treef42e873c60219b5031dcfc3e076adb2398cdb3fe /streaming
parent61925baaf6e2448c641e8e71248a47f7a01c4efd (diff)
Boost dbengine (#14832)
* configure extent cache size * workers can now execute up to 10 jobs in a run, boosting query prep and extent reads * fix dispatched and executing counters * boost to the max * increase libuv worker threads * query prep always get more prio than extent reads; stop processing in batch when dbengine is queue is critical * fix accounting of query prep * inlining of time-grouping functions, to speed up queries with billions of points * make switching based on a local const variable * print one pending contexts loading message per iteration * inlined store engine query API * inlined storage engine data collection api * inlined all storage engine query ops * eliminate and inline data collection ops * simplified query group-by * more error handling * optimized partial trimming of group-by queries * preparative work to support multiple passes of group-by * more preparative work to support multiple passes of group-by (accepts multiple group-by params) * unified query timings * unified query timings - weights endpoint * query target is no longer a static thread variable - there is a list of cached query targets, each of which of freed every 1000 queries * fix query memory accounting * added summary.dimension[].pri and sorted summary.dimensions based on priority and then name * limit max ACLK WEB response size to 30MB * the response type should be text/plain * more preparative work for multiple group-by passes * create functions for generating group by keys, ids and names * multiple group-by passes are now supported * parse group-by options array also with an index * implemented percentage-of-instance group by function * family is now merged in multi-node contexts * prevent uninitialized use
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c15
1 files changed, 7 insertions, 8 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index edb64d8223..63a0056abd 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -96,7 +96,7 @@ struct replication_query {
size_t points_read;
size_t points_generated;
- struct storage_engine_query_ops *ops;
+ STORAGE_ENGINE_BACKEND backend;
struct replication_request *rq;
size_t dimensions;
@@ -162,7 +162,7 @@ static struct replication_query *replication_query_prepare(
}
}
- q->ops = &st->rrdhost->db[0].eng->api.query_ops;
+ q->backend = st->rrdhost->db[0].eng->backend;
// prepare our array of dimensions
size_t count = 0;
@@ -184,7 +184,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -260,7 +260,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
struct replication_dimension *d = &q->data[i];
if (unlikely(!d->enabled)) continue;
- q->ops->finalize(&d->handle);
+ storage_engine_query_finalize(&d->handle);
dictionary_acquired_item_release(d->dict, d->rda);
@@ -292,7 +292,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
struct replication_dimension *d = &q->data[i];
if(unlikely(!d->enabled)) continue;
- time_t new_before = q->ops->align_to_optimal_before(&d->handle);
+ time_t new_before = rrdeng_load_align_to_optimal_before(&d->handle);
if (!expanded_before || new_before < expanded_before)
expanded_before = new_before;
}
@@ -311,7 +311,6 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
- struct storage_engine_query_ops *ops = q->ops;
time_t wall_clock_time = q->wall_clock_time;
bool finished_with_gap = false;
@@ -331,8 +330,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
// fetch the first valid point for the dimension
int max_skip = 1000;
- while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
- d->sp = ops->next_metric(&d->handle);
+ while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
+ d->sp = storage_engine_query_next_metric(&d->handle);
points_read++;
}