summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-02 00:14:35 +0200
committerGitHub <noreply@github.com>2023-02-02 00:14:35 +0200
commit55d1f00bb7c2403b451947b2a225b5d1f6be9183 (patch)
tree043e57edb64b319b1eb6a883d6980fa2d9dd2c8e /streaming
parent2e56e2b87622a102aef876d297a3cd80d35028e5 (diff)
DBENGINE v2 - improvements part 12 (#14379)
* parallel initialization of tiers * do not spawn multiple dbengine event loops * user configurable dbengine parallel initialization * size netdata based on the real cpu cores available on the system netdata runs, not on the system monitored * user configurable system cpus * move cpuset parsing to os.c/.h * fix replication of misaligned chart dimensions * give a different path to each tier thread * statically allocate the path into the initialization structure * use aral for reusing dbengine pages * dictionaries uses ARAL for fixed sized values * fix compilation without internal checks * journal v2 index uses aral * test to see judy allocations * judy allocations using aral * Add config option to select if dbengine will use direct I/O (default is yes) * V1 journafiles will use uv_fs_read instead of mmap (respect the direct I/O setting) * Remove sqlite3IsMemdb as it is unused * Fix compilation error when --disable-dbengine is used * use aral for dbengine work_cmds * changed aral API to support new features * pgc and mrg aral overheads * rrdeng opcodes using aral * better structuring and naming * dbegnine query handles using aral * page descriptors using aral * remove obsolete linking * extent io descriptors using aral * aral keeps one last page alive * add missing return value * added judy aral overhead * pdc now uses aral * page_details now use aral * epdl and deol using aral - make sure ARALs are initialized before spawning the event loop * remove unused linking * pgc now uses one aral per partition * aral measure maximum allocation queue * aral to allocate pages in parallel * aral parallel pages allocation when needed * aral cleanup * track page allocation and page population separately --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c52
1 files changed, 49 insertions, 3 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 34f05b4f3b..a585166e15 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -311,7 +311,7 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
time_t now = after + 1;
time_t last_end_time_in_buffer = 0;
while(now <= before) {
- time_t min_start_time = 0, min_end_time = 0;
+ time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
for (size_t i = 0; i < dimensions ;i++) {
struct replication_dimension *d = &q->data[i];
if(unlikely(!d->enabled || d->skip)) continue;
@@ -339,14 +339,58 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s
// this dimension does not provide any data
continue;
+ time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
+ if(unlikely(!update_every))
+ update_every = q->st->update_every;
+
+ if(unlikely(!min_update_every))
+ min_update_every = update_every;
+
if(unlikely(!min_start_time))
min_start_time = d->sp.start_time_s;
if(unlikely(!min_end_time))
min_end_time = d->sp.end_time_s;
+ min_update_every = MIN(min_update_every, update_every);
+ max_update_every = MAX(max_update_every, update_every);
+
min_start_time = MIN(min_start_time, d->sp.start_time_s);
+ max_start_time = MAX(max_start_time, d->sp.start_time_s);
+
min_end_time = MIN(min_end_time, d->sp.end_time_s);
+ max_end_time = MAX(max_end_time, d->sp.end_time_s);
+ }
+
+ if (unlikely(min_update_every != max_update_every ||
+ min_start_time != max_start_time)) {
+
+ time_t fix_min_start_time;
+ if(last_end_time_in_buffer &&
+ last_end_time_in_buffer >= min_start_time &&
+ last_end_time_in_buffer <= max_start_time) {
+ fix_min_start_time = last_end_time_in_buffer;
+ }
+ else
+ fix_min_start_time = min_end_time - min_update_every;
+
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
+ "misaligned dimensions "
+ "update every (min: %ld, max: %ld), "
+ "start time (min: %ld, max: %ld), "
+ "end time (min %ld, max %ld), "
+ "now %ld, last end time sent %ld, "
+ "min start time is fixed to %ld",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
+ min_update_every, max_update_every,
+ min_start_time, max_start_time,
+ min_end_time, max_end_time,
+ now, last_end_time_in_buffer,
+ fix_min_start_time
+ );
+
+ min_start_time = fix_min_start_time;
}
if(likely(min_start_time <= now && min_end_time >= now)) {
@@ -1375,7 +1419,9 @@ void replication_sender_delete_pending_requests(struct sender_state *sender) {
}
void replication_init_sender(struct sender_state *sender) {
- sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct replication_request));
+
dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
@@ -1560,7 +1606,7 @@ static int replication_execute_next_pending_request(bool cancel) {
}
if(unlikely(!rqs)) {
- max_requests_ahead = get_system_cpus() / 2;
+ max_requests_ahead = get_netdata_cpus() / 2;
if(max_requests_ahead > libuv_worker_threads * 2)
max_requests_ahead = libuv_worker_threads * 2;