summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-03-02 22:50:48 +0200
committerGitHub <noreply@github.com>2023-03-02 22:50:48 +0200
commit021e252fc5d18a7225c0f4c975b3281016861d3c (patch)
tree63f92adc27419ca9df464635cd85424f52c94179 /streaming
parentc4d8d35b9f065f2a847f2780acb4342dabdfd34c (diff)
/api/v2/contexts (#14592)
* preparation for /api/v2/contexts * working /api/v2/contexts * add anomaly rate information in all statistics; when sum-count is requested, return sums and counts instead of averages * minor fix * query targegt now accurately counts hosts, contexts, instances, dimensions, metrics * cleanup /api/v2/contexts * full text search with /api/v2/contexts * simple patterns now support the option to search ignoring case * full text search API with /api/v2/q * simple pattern execution optimization * do not show q when not given * full text search accounting * separated /api/v2/nodes from /api/v2/contexts * fix ssv queries for group_by * count query instances queried and failed per context and host * split rrdcontext.c to multiple files * add query totals * fix anomaly rate calculation; provide "ni" for indexing hosts * do not generate zero valued members * faster calculation of anomaly rate; by just summing integers for each db points and doing math once for every generated point * fix typo when printing dimensions totals * added option minify to remove spaces and newlines fron JSON output * send instance ids and names when they differ * do not add in query target dimensions, instances, contexts and hosts for which there is no retention in the current timeframe * fix for the previous + renames and code cleanup * when a dimension is filtered, include in the response all the other dimensions that are selectable * do not add nodes that do not have retention in the current window * move selection of dimensions to query_dimension_add(), instead of query_metric_add() * increase the pre-processing capacity of queries * generate instance fqdn ids and names only when they are needed * provide detailed statistics about tiers retention, queries, points, update_every * late allocation of query dimensions * cleanup * more cleanup * support for annotations per displayed point, RESET and PARTIAL * new type annotations * if a chart is not linked to contexts and it is collected, link it when it is collected * make ML run reentrant * make ML rrdr query synchronous * optimize replication memory allocation of replication_sort_entry * change units to percentage, when requesting a coefficinet of variation, or a percentage query * initialize replication before starting main threads * properly decrement no room requests counter * propagate the non-zero flag to group-by * the same by avoiding the extra loop * respect non-zero in all dimension arrays * remove dictionary garbage collection from dictionary_entries() and dictionary_version() * be more verbose when jv2 indexing is postponed * prevent infinite loop * use hidden dimensions even when dimensions pattern is unset * traverse hosts using dictionaries * fix dictionary unittests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c40
-rw-r--r--streaming/rrdpush.c8
2 files changed, 31 insertions, 17 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 1cd349dc4d..edb64d8223 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -969,11 +969,12 @@ struct replication_sort_entry {
// the global variables for the replication thread
static struct replication_thread {
+ ARAL *aral_rse;
+
SPINLOCK spinlock;
struct {
size_t pending; // number of requests pending in the queue
- Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
// statistics
size_t added; // number of requests added to the queue
@@ -992,6 +993,7 @@ static struct replication_thread {
} unsafe; // protected from replication_recursive_lock()
struct {
+ Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
size_t executed; // the number of replication requests executed
size_t latest_first_time; // the 'after' timestamp of the last request we executed
size_t memory; // the total memory allocated by replication
@@ -1005,10 +1007,10 @@ static struct replication_thread {
} main_thread; // access is allowed only by the main thread
} replication_globals = {
+ .aral_rse = NULL,
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.unsafe = {
.pending = 0,
- .unique_id = 0,
.added = 0,
.removed = 0,
@@ -1025,6 +1027,7 @@ static struct replication_thread {
},
},
.atomic = {
+ .unique_id = 0,
.executed = 0,
.latest_first_time = 0,
.memory = 0,
@@ -1088,17 +1091,15 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
- fatal_when_replication_is_not_locked_for_me();
-
- struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+ struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
// copy the request
rse->rq = rq;
- rse->unique_id = ++replication_globals.unsafe.unique_id;
+ rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
@@ -1109,26 +1110,30 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
}
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
- freez(rse);
+ aral_freez(replication_globals.aral_rse, rse);
__atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
}
static void replication_sort_entry_add(struct replication_request *rq) {
- replication_recursive_lock();
-
if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
rq->not_indexed_preprocessing = false;
+ replication_recursive_lock();
replication_globals.unsafe.pending_no_room++;
replication_recursive_unlock();
return;
}
- if(rq->not_indexed_buffer_full)
- replication_globals.unsafe.pending_no_room--;
+ // cache this, because it will be changed
+ bool decrement_no_room = rq->not_indexed_buffer_full;
- struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
+ struct replication_sort_entry *rse = replication_sort_entry_create(rq);
+
+ replication_recursive_lock();
+
+ if(decrement_no_room)
+ replication_globals.unsafe.pending_no_room--;
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
@@ -1827,12 +1832,21 @@ static void replication_main_cleanup(void *ptr) {
replication_globals.main_thread.threads_ptrs = NULL;
__atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
+ aral_destroy(replication_globals.aral_rse);
+ replication_globals.aral_rse = NULL;
+
// custom code
worker_unregister();
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
+void replication_initialize(void) {
+ replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
+ 0, 65536, aral_by_size_statistics(),
+ NULL, NULL, false, false);
+}
+
void *replication_thread_main(void *ptr __maybe_unused) {
replication_initialize_workers(true);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index d8ad802578..3303a61e49 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -190,8 +190,8 @@ static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
else
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
}
- else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
- simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st)))
+ else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) ||
+ simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name))
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
else
@@ -941,7 +941,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
{
SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
appconfig_get(&stream_config, rpt->key, "allow from", "*"),
- NULL, SIMPLE_PATTERN_EXACT);
+ NULL, SIMPLE_PATTERN_EXACT, true);
if(key_allow_from) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
@@ -988,7 +988,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
{
SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
- NULL, SIMPLE_PATTERN_EXACT);
+ NULL, SIMPLE_PATTERN_EXACT, true);
if(machine_allow_from) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {