summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-22 22:30:40 +0200
committerGitHub <noreply@github.com>2023-02-22 22:30:40 +0200
commit1cfad181a8cdcafc04d6f2d52aa0ffb5e57e182a (patch)
treebb753be9e16256ef2fe420d9ad24cd27787ee8fc /streaming/sender.c
parent7cd5570b49c5f156c944766dda7238611a969ff0 (diff)
/api/v2/data - multi-host/context/instance/dimension/label queries (#14564)
* fundamentals for having /api/v2/ working * use an atomic to prevent writing to internal pipe too much * first attempt of multi-node, multi-context, multi-chart, multi-dimension queries * v2 jsonwrap * first attempt for group by * cleaned up RRDR and fixed group by * improvements to /api/v2/api * query instance may be realloced, so pointers to it get invalid; solved memory leaks * count of quried metrics in summary information * provide detailed information about selected, excluded, queried and failed metrics for each entity * select instances by fqdn too * add timing information to json output * link charts to rrdcontexts, if a query comes in and it is found unlinked * calculate min, max, sum, average, volume, count per metric * api v2 parameters naming * renders alerts and units * render machine_guid and node_id in all sections it is relevant * unified keys * group by now takes into account units and when there are multiple units involved, it creates a dimension per unit * request and detailed are hidden behind an option * summary includes only a flattened list of alerts * alert counts per host and instance * count of grouped metrics per dimension * added contexts to summary * added chart title * added dimension priorities and chart type * support for multiple group by at the same time * minor fixes * labels are now a tree * keys uniformity * filtering by alerts, both having a specific alert and having a specific alert in a specific status * added scope of hosts and contexts * count of instances on contexts and hosts * make the api return valid responses even when the response contains no data * calculate average and contribution % for every item in the summary * fix compilation warnings * fix compilation warnings - again
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c16
1 files changed, 12 insertions, 4 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 51c373f636..179c2dc603 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -178,8 +178,16 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
replication_recalculate_buffer_used_ratio_unsafe(s);
+ bool signal_sender = false;
+ if(!rrdpush_sender_pipe_has_pending_data(s)) {
+ rrdpush_sender_pipe_set_pending_data(s);
+ signal_sender = true;
+ }
+
netdata_mutex_unlock(&s->mutex);
- rrdpush_signal_sender_to_wake_up(s);
+
+ if(signal_sender)
+ rrdpush_signal_sender_to_wake_up(s);
}
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
@@ -1016,7 +1024,6 @@ void execute_commands(struct sender_state *s) {
}
struct rrdpush_sender_thread_data {
- struct sender_state *sender_state;
RRDHOST *host;
char *pipe_buffer;
};
@@ -1249,7 +1256,6 @@ void *rrdpush_sender_thread(void *ptr) {
struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
thread_data->pipe_buffer = mallocz(pipe_buffer_size);
- thread_data->sender_state = s;
thread_data->host = s->host;
netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
@@ -1305,8 +1311,10 @@ void *rrdpush_sender_thread(void *ptr) {
netdata_mutex_lock(&s->mutex);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
- if (unlikely(!outstanding))
+ if (unlikely(!outstanding)) {
+ rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
+ }
netdata_mutex_unlock(&s->mutex);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);