diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-02 00:14:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-02 00:14:35 +0200 |
commit | 55d1f00bb7c2403b451947b2a225b5d1f6be9183 (patch) | |
tree | 043e57edb64b319b1eb6a883d6980fa2d9dd2c8e /streaming/replication.c | |
parent | 2e56e2b87622a102aef876d297a3cd80d35028e5 (diff) |
DBENGINE v2 - improvements part 12 (#14379)
* parallel initialization of tiers
* do not spawn multiple dbengine event loops
* user configurable dbengine parallel initialization
* size netdata based on the real cpu cores available on the system netdata runs, not on the system monitored
* user configurable system cpus
* move cpuset parsing to os.c/.h
* fix replication of misaligned chart dimensions
* give a different path to each tier thread
* statically allocate the path into the initialization structure
* use aral for reusing dbengine pages
* dictionaries uses ARAL for fixed sized values
* fix compilation without internal checks
* journal v2 index uses aral
* test to see judy allocations
* judy allocations using aral
* Add config option to select if dbengine will use direct I/O (default is yes)
* V1 journafiles will use uv_fs_read instead of mmap (respect the direct I/O setting)
* Remove sqlite3IsMemdb as it is unused
* Fix compilation error when --disable-dbengine is used
* use aral for dbengine work_cmds
* changed aral API to support new features
* pgc and mrg aral overheads
* rrdeng opcodes using aral
* better structuring and naming
* dbegnine query handles using aral
* page descriptors using aral
* remove obsolete linking
* extent io descriptors using aral
* aral keeps one last page alive
* add missing return value
* added judy aral overhead
* pdc now uses aral
* page_details now use aral
* epdl and deol using aral - make sure ARALs are initialized before spawning the event loop
* remove unused linking
* pgc now uses one aral per partition
* aral measure maximum allocation queue
* aral to allocate pages in parallel
* aral parallel pages allocation when needed
* aral cleanup
* track page allocation and page population separately
---------
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 52 |
1 files changed, 49 insertions, 3 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 34f05b4f3b..a585166e15 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -311,7 +311,7 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s time_t now = after + 1; time_t last_end_time_in_buffer = 0; while(now <= before) { - time_t min_start_time = 0, min_end_time = 0; + time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0; for (size_t i = 0; i < dimensions ;i++) { struct replication_dimension *d = &q->data[i]; if(unlikely(!d->enabled || d->skip)) continue; @@ -339,14 +339,58 @@ static void replication_query_execute(BUFFER *wb, struct replication_query *q, s // this dimension does not provide any data continue; + time_t update_every = d->sp.end_time_s - d->sp.start_time_s; + if(unlikely(!update_every)) + update_every = q->st->update_every; + + if(unlikely(!min_update_every)) + min_update_every = update_every; + if(unlikely(!min_start_time)) min_start_time = d->sp.start_time_s; if(unlikely(!min_end_time)) min_end_time = d->sp.end_time_s; + min_update_every = MIN(min_update_every, update_every); + max_update_every = MAX(max_update_every, update_every); + min_start_time = MIN(min_start_time, d->sp.start_time_s); + max_start_time = MAX(max_start_time, d->sp.start_time_s); + min_end_time = MIN(min_end_time, d->sp.end_time_s); + max_end_time = MAX(max_end_time, d->sp.end_time_s); + } + + if (unlikely(min_update_every != max_update_every || + min_start_time != max_start_time)) { + + time_t fix_min_start_time; + if(last_end_time_in_buffer && + last_end_time_in_buffer >= min_start_time && + last_end_time_in_buffer <= max_start_time) { + fix_min_start_time = last_end_time_in_buffer; + } + else + fix_min_start_time = min_end_time - min_update_every; + + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' " + "misaligned dimensions " + "update every (min: %ld, max: %ld), " + "start time (min: %ld, max: %ld), " + "end time (min %ld, max %ld), " + "now %ld, last end time sent %ld, " + "min start time is fixed to %ld", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), + min_update_every, max_update_every, + min_start_time, max_start_time, + min_end_time, max_end_time, + now, last_end_time_in_buffer, + fix_min_start_time + ); + + min_start_time = fix_min_start_time; } if(likely(min_start_time <= now && min_end_time >= now)) { @@ -1375,7 +1419,9 @@ void replication_sender_delete_pending_requests(struct sender_state *sender) { } void replication_init_sender(struct sender_state *sender) { - sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct replication_request)); + dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); @@ -1560,7 +1606,7 @@ static int replication_execute_next_pending_request(bool cancel) { } if(unlikely(!rqs)) { - max_requests_ahead = get_system_cpus() / 2; + max_requests_ahead = get_netdata_cpus() / 2; if(max_requests_ahead > libuv_worker_threads * 2) max_requests_ahead = libuv_worker_threads * 2; |