diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-03-02 22:50:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-02 22:50:48 +0200 |
commit | 021e252fc5d18a7225c0f4c975b3281016861d3c (patch) | |
tree | 63f92adc27419ca9df464635cd85424f52c94179 /streaming | |
parent | c4d8d35b9f065f2a847f2780acb4342dabdfd34c (diff) |
/api/v2/contexts (#14592)
* preparation for /api/v2/contexts
* working /api/v2/contexts
* add anomaly rate information in all statistics; when sum-count is requested, return sums and counts instead of averages
* minor fix
* query targegt now accurately counts hosts, contexts, instances, dimensions, metrics
* cleanup /api/v2/contexts
* full text search with /api/v2/contexts
* simple patterns now support the option to search ignoring case
* full text search API with /api/v2/q
* simple pattern execution optimization
* do not show q when not given
* full text search accounting
* separated /api/v2/nodes from /api/v2/contexts
* fix ssv queries for group_by
* count query instances queried and failed per context and host
* split rrdcontext.c to multiple files
* add query totals
* fix anomaly rate calculation; provide "ni" for indexing hosts
* do not generate zero valued members
* faster calculation of anomaly rate; by just summing integers for each db points and doing math once for every generated point
* fix typo when printing dimensions totals
* added option minify to remove spaces and newlines fron JSON output
* send instance ids and names when they differ
* do not add in query target dimensions, instances, contexts and hosts for which there is no retention in the current timeframe
* fix for the previous + renames and code cleanup
* when a dimension is filtered, include in the response all the other dimensions that are selectable
* do not add nodes that do not have retention in the current window
* move selection of dimensions to query_dimension_add(), instead of query_metric_add()
* increase the pre-processing capacity of queries
* generate instance fqdn ids and names only when they are needed
* provide detailed statistics about tiers retention, queries, points, update_every
* late allocation of query dimensions
* cleanup
* more cleanup
* support for annotations per displayed point, RESET and PARTIAL
* new type annotations
* if a chart is not linked to contexts and it is collected, link it when it is collected
* make ML run reentrant
* make ML rrdr query synchronous
* optimize replication memory allocation of replication_sort_entry
* change units to percentage, when requesting a coefficinet of variation, or a percentage query
* initialize replication before starting main threads
* properly decrement no room requests counter
* propagate the non-zero flag to group-by
* the same by avoiding the extra loop
* respect non-zero in all dimension arrays
* remove dictionary garbage collection from dictionary_entries() and dictionary_version()
* be more verbose when jv2 indexing is postponed
* prevent infinite loop
* use hidden dimensions even when dimensions pattern is unset
* traverse hosts using dictionaries
* fix dictionary unittests
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/replication.c | 40 | ||||
-rw-r--r-- | streaming/rrdpush.c | 8 |
2 files changed, 31 insertions, 17 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 1cd349dc4d..edb64d8223 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -969,11 +969,12 @@ struct replication_sort_entry { // the global variables for the replication thread static struct replication_thread { + ARAL *aral_rse; + SPINLOCK spinlock; struct { size_t pending; // number of requests pending in the queue - Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) // statistics size_t added; // number of requests added to the queue @@ -992,6 +993,7 @@ static struct replication_thread { } unsafe; // protected from replication_recursive_lock() struct { + Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed size_t memory; // the total memory allocated by replication @@ -1005,10 +1007,10 @@ static struct replication_thread { } main_thread; // access is allowed only by the main thread } replication_globals = { + .aral_rse = NULL, .spinlock = NETDATA_SPINLOCK_INITIALIZER, .unsafe = { .pending = 0, - .unique_id = 0, .added = 0, .removed = 0, @@ -1025,6 +1027,7 @@ static struct replication_thread { }, }, .atomic = { + .unique_id = 0, .executed = 0, .latest_first_time = 0, .memory = 0, @@ -1088,17 +1091,15 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) { // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) { - fatal_when_replication_is_not_locked_for_me(); - - struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); +static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { + struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); // copy the request rse->rq = rq; - rse->unique_id = ++replication_globals.unsafe.unique_id; + rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST); // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; @@ -1109,26 +1110,30 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc } static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { - freez(rse); + aral_freez(replication_globals.aral_rse, rse); __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { - replication_recursive_lock(); - if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; rq->not_indexed_preprocessing = false; + replication_recursive_lock(); replication_globals.unsafe.pending_no_room++; replication_recursive_unlock(); return; } - if(rq->not_indexed_buffer_full) - replication_globals.unsafe.pending_no_room--; + // cache this, because it will be changed + bool decrement_no_room = rq->not_indexed_buffer_full; - struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq); + struct replication_sort_entry *rse = replication_sort_entry_create(rq); + + replication_recursive_lock(); + + if(decrement_no_room) + replication_globals.unsafe.pending_no_room--; // if(rq->after < (time_t)replication_globals.protected.queue.after && // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && @@ -1827,12 +1832,21 @@ static void replication_main_cleanup(void *ptr) { replication_globals.main_thread.threads_ptrs = NULL; __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); + aral_destroy(replication_globals.aral_rse); + replication_globals.aral_rse = NULL; + // custom code worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } +void replication_initialize(void) { + replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry), + 0, 65536, aral_by_size_statistics(), + NULL, NULL, false, false); +} + void *replication_thread_main(void *ptr __maybe_unused) { replication_initialize_workers(true); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index d8ad802578..3303a61e49 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -190,8 +190,8 @@ static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { else rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); } - else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || - simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) + else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) || + simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name)) rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); else @@ -941,7 +941,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { { SIMPLE_PATTERN *key_allow_from = simple_pattern_create( appconfig_get(&stream_config, rpt->key, "allow from", "*"), - NULL, SIMPLE_PATTERN_EXACT); + NULL, SIMPLE_PATTERN_EXACT, true); if(key_allow_from) { if(!simple_pattern_matches(key_allow_from, w->client_ip)) { @@ -988,7 +988,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { { SIMPLE_PATTERN *machine_allow_from = simple_pattern_create( appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"), - NULL, SIMPLE_PATTERN_EXACT); + NULL, SIMPLE_PATTERN_EXACT, true); if(machine_allow_from) { if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { |