// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
/*
* rrdpush
*
* 3 threads are involved for all stream operations
*
* 1. a random data collection thread, calling rrdset_done_push()
* this is called for each chart.
*
* the output of this work is kept in a BUFFER in RRDHOST
* the sender thread is signalled via a pipe (also in RRDHOST)
*
* 2. a sender thread running at the sending netdata
* this is spawned automatically on the first chart to be pushed
*
* It tries to push the metrics to the remote netdata, as fast
* as possible (i.e. immediately after they are collected).
*
* 3. a receiver thread, running at the receiving netdata
* this is spawned automatically when the sender connects to
* the receiver.
*
*/
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
typedef enum {
RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
unsigned int default_rrdpush_enabled = 0;
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
int rrdpush_init() {
default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
return default_rrdpush_enabled;
}
#define CONNECTED_TO_SIZE 100
// data collection happens from multiple threads
// each of these threads calls rrdset_done()
// which in turn calls rrdset_done_push()
// which uses this pipe to notify the streaming thread
// that there are more data ready to be sent
#define PIPE_READ 0
#define PIPE_WRITE 1
// to have the remote netdata re-sync the charts
// to its current clock, we send for this many
// iterations a BEGIN line without microseconds
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
#define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex))
#define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex))
static inline int should_send_chart_matching(RRDSET *st) {
if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
}
else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
RRDHOST *host = st->rrdhost;
if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||