// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
#include "parser/parser.h"
/*
* rrdpush
*
* 3 threads are involved for all stream operations
*
* 1. a random data collection thread, calling rrdset_done_push()
* this is called for each chart.
*
* the output of this work is kept in a thread BUFFER
* the sender thread is signalled via a pipe (in RRDHOST)
*
* 2. a sender thread running at the sending netdata
* this is spawned automatically on the first chart to be pushed
*
* It tries to push the metrics to the remote netdata, as fast
* as possible (i.e. immediately after they are collected).
*
* 3. a receiver thread, running at the receiving netdata
* this is spawned automatically when the sender connects to
* the receiver.
*
*/
struct config stream_config = {
.first_section = NULL,
.last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {
.avl_tree = {
.root = NULL,
.compar = appconfig_section_compare
},
.rwlock = AVL_LOCK_INITIALIZER
}
};
unsigned int default_rrdpush_enabled = 0;
#ifdef ENABLE_COMPRESSION
unsigned int default_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
#ifdef ENABLE_HTTPS
int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
char *netdata_ssl_ca_path = NULL;
char *netdata_ssl_ca_file = NULL;
#endif
static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
int rrdpush_init() {
// --------------------------------------------------------------------
// load stream.conf
load_stream_conf();
default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
#ifdef ENABLE_COMPRESSION
default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
"enable compression", default_compression_enabled);
#endif
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
#ifdef ENABLE_HTTPS
if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
if (default_rrdpush_destination){
char *test = strstr(default_rrdpush_destination,":SSL");
if(test){
*test = 0X00;
netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
}
}
}
bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
if(invalid_certificate == CONFIG_BOOLEAN_YES){
if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
info("Netdata is configured to accept invalid SSL certificate.");
netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
}
}
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", "/etc/ssl/certs/");
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", "/etc/ssl/certs/certs.pem");
#endif
return default_rrdpush_enabled;
}
// data collection happens from multiple threads
// each of these threads calls rrdset_done()
// which in turn calls rrdset_done_push()
// which uses this pipe to notify the streaming thread
// that there are more data ready to be sent
#define PIPE_READ 0
#define PIPE_WRITE 1
// to have the remote netdata re-sync the charts
// to its current clock, we send for this many
// iterations a BEGIN line without microseconds
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
static inline bool should_send_chart_matching(RRDSET *st) {
// get all the flags we need to check, with one atomic operation
RRDSET_FLAGS flags =