summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-22 22:30:40 +0200
committerGitHub <noreply@github.com>2023-02-22 22:30:40 +0200
commit1cfad181a8cdcafc04d6f2d52aa0ffb5e57e182a (patch)
treebb753be9e16256ef2fe420d9ad24cd27787ee8fc /streaming
parent7cd5570b49c5f156c944766dda7238611a969ff0 (diff)
/api/v2/data - multi-host/context/instance/dimension/label queries (#14564)
* fundamentals for having /api/v2/ working * use an atomic to prevent writing to internal pipe too much * first attempt of multi-node, multi-context, multi-chart, multi-dimension queries * v2 jsonwrap * first attempt for group by * cleaned up RRDR and fixed group by * improvements to /api/v2/api * query instance may be realloced, so pointers to it get invalid; solved memory leaks * count of quried metrics in summary information * provide detailed information about selected, excluded, queried and failed metrics for each entity * select instances by fqdn too * add timing information to json output * link charts to rrdcontexts, if a query comes in and it is found unlinked * calculate min, max, sum, average, volume, count per metric * api v2 parameters naming * renders alerts and units * render machine_guid and node_id in all sections it is relevant * unified keys * group by now takes into account units and when there are multiple units involved, it creates a dimension per unit * request and detailed are hidden behind an option * summary includes only a flattened list of alerts * alert counts per host and instance * count of grouped metrics per dimension * added contexts to summary * added chart title * added dimension priorities and chart type * support for multiple group by at the same time * minor fixes * labels are now a tree * keys uniformity * filtering by alerts, both having a specific alert and having a specific alert in a specific status * added scope of hosts and contexts * count of instances on contexts and hosts * make the api return valid responses even when the response contains no data * calculate average and contribution % for every item in the summary * fix compilation warnings * fix compilation warnings - again
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c116
-rw-r--r--streaming/rrdpush.h5
-rw-r--r--streaming/sender.c16
3 files changed, 84 insertions, 53 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 5a2ffee53c..1cd349dc4d 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -1626,67 +1626,85 @@ static void replication_initialize_workers(bool master) {
#define REQUEST_QUEUE_EMPTY (-1)
#define REQUEST_CHART_NOT_FOUND (-2)
-static int replication_execute_next_pending_request(bool cancel) {
- static __thread int max_requests_ahead = 0;
- static __thread struct replication_request *rqs = NULL;
- static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
- static __thread size_t queue_rounds = 0; (void)queue_rounds;
+static __thread struct replication_thread_pipeline {
+ int max_requests_ahead;
+ struct replication_request *rqs;
+ int rqs_last_executed, rqs_last_prepared;
+ size_t queue_rounds;
+} rtp = {
+ .max_requests_ahead = 0,
+ .rqs = NULL,
+ .rqs_last_executed = 0,
+ .rqs_last_prepared = 0,
+ .queue_rounds = 0,
+};
+
+static void replication_pipeline_cancel_and_cleanup(void) {
+ if(!rtp.rqs)
+ return;
+
struct replication_request *rq;
+ size_t cancelled = 0;
- if(unlikely(cancel)) {
- if(rqs) {
- size_t cancelled = 0;
- do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
+ do {
+ if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+ rtp.rqs_last_executed = 0;
- rq = &rqs[rqs_last_executed];
+ rq = &rtp.rqs[rtp.rqs_last_executed];
- if (rq->q) {
- internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
- internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
+ if (rq->q) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+ internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
- replication_response_cancel_and_finalize(rq->q);
- rq->q = NULL;
- cancelled++;
- }
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ cancelled++;
+ }
- rq->executed = true;
- rq->found = false;
+ rq->executed = true;
+ rq->found = false;
- } while (rqs_last_executed != rqs_last_prepared);
+ } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
- internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
- }
- return REQUEST_QUEUE_EMPTY;
- }
+ internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
+
+ freez(rtp.rqs);
+ rtp.rqs = NULL;
+ rtp.max_requests_ahead = 0;
+ rtp.rqs_last_executed = 0;
+ rtp.rqs_last_prepared = 0;
+ rtp.queue_rounds = 0;
+}
+
+static int replication_pipeline_execute_next(void) {
+ struct replication_request *rq;
- if(unlikely(!rqs)) {
- max_requests_ahead = get_netdata_cpus() / 2;
+ if(unlikely(!rtp.rqs)) {
+ rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
- if(max_requests_ahead > libuv_worker_threads * 2)
- max_requests_ahead = libuv_worker_threads * 2;
+ if(rtp.max_requests_ahead > libuv_worker_threads * 2)
+ rtp.max_requests_ahead = libuv_worker_threads * 2;
- if(max_requests_ahead < 2)
- max_requests_ahead = 2;
+ if(rtp.max_requests_ahead < 2)
+ rtp.max_requests_ahead = 2;
- rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
- __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
+ rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
+ __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
}
// fill the queue
do {
- if(++rqs_last_prepared >= max_requests_ahead) {
- rqs_last_prepared = 0;
- queue_rounds++;
+ if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
+ rtp.rqs_last_prepared = 0;
+ rtp.queue_rounds++;
}
- internal_fatal(rqs[rqs_last_prepared].q,
+ internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
"REPLAY FATAL: slot is used by query that has not been executed!");
worker_is_busy(WORKER_JOB_FIND_NEXT);
- rqs[rqs_last_prepared] = replication_request_get_first_available();
- rq = &rqs[rqs_last_prepared];
+ rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
+ rq = &rtp.rqs[rtp.rqs_last_prepared];
if(rq->found) {
if (!rq->st) {
@@ -1707,14 +1725,14 @@ static int replication_execute_next_pending_request(bool cancel) {
rq->executed = false;
}
- } while(rq->found && rqs_last_prepared != rqs_last_executed);
+ } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
// pick the first usable
do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
+ if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+ rtp.rqs_last_executed = 0;
- rq = &rqs[rqs_last_executed];
+ rq = &rtp.rqs[rtp.rqs_last_executed];
if(rq->found) {
internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
@@ -1747,7 +1765,7 @@ static int replication_execute_next_pending_request(bool cancel) {
else
internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
- } while(!rq->found && rqs_last_executed != rqs_last_prepared);
+ } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
if(unlikely(!rq->found)) {
worker_is_idle();
@@ -1771,7 +1789,7 @@ static int replication_execute_next_pending_request(bool cancel) {
}
static void replication_worker_cleanup(void *ptr __maybe_unused) {
- replication_execute_next_pending_request(true);
+ replication_pipeline_cancel_and_cleanup();
worker_unregister();
}
@@ -1781,7 +1799,7 @@ static void *replication_worker_thread(void *ptr) {
netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
@@ -1797,7 +1815,7 @@ static void replication_main_cleanup(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- replication_execute_next_pending_request(true);
+ replication_pipeline_cancel_and_cleanup();
int threads = (int)replication_globals.main_thread.threads;
for(int i = 0; i < threads ;i++) {
@@ -1914,7 +1932,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_idle();
}
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index a775b94992..2db5dd1608 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -186,12 +186,17 @@ struct sender_state {
} replication;
struct {
+ bool pending_data;
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
};
+#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
+
#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
diff --git a/streaming/sender.c b/streaming/sender.c
index 51c373f636..179c2dc603 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -178,8 +178,16 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
replication_recalculate_buffer_used_ratio_unsafe(s);
+ bool signal_sender = false;
+ if(!rrdpush_sender_pipe_has_pending_data(s)) {
+ rrdpush_sender_pipe_set_pending_data(s);
+ signal_sender = true;
+ }
+
netdata_mutex_unlock(&s->mutex);
- rrdpush_signal_sender_to_wake_up(s);
+
+ if(signal_sender)
+ rrdpush_signal_sender_to_wake_up(s);
}
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
@@ -1016,7 +1024,6 @@ void execute_commands(struct sender_state *s) {
}
struct rrdpush_sender_thread_data {
- struct sender_state *sender_state;
RRDHOST *host;
char *pipe_buffer;
};
@@ -1249,7 +1256,6 @@ void *rrdpush_sender_thread(void *ptr) {
struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
thread_data->pipe_buffer = mallocz(pipe_buffer_size);
- thread_data->sender_state = s;
thread_data->host = s->host;
netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
@@ -1305,8 +1311,10 @@ void *rrdpush_sender_thread(void *ptr) {
netdata_mutex_lock(&s->mutex);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
- if (unlikely(!outstanding))
+ if (unlikely(!outstanding)) {
+ rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
+ }
netdata_mutex_unlock(&s->mutex);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);