// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
#include "parser/parser.h"
/*
* rrdpush
*
* 3 threads are involved for all stream operations
*
* 1. a random data collection thread, calling rrdset_done_push()
* this is called for each chart.
*
* the output of this work is kept in a thread BUFFER
* the sender thread is signalled via a pipe (in RRDHOST)
*
* 2. a sender thread running at the sending netdata
* this is spawned automatically on the first chart to be pushed
*
* It tries to push the metrics to the remote netdata, as fast
* as possible (i.e. immediately after they are collected).
*
* 3. a receiver thread, running at the receiving netdata
* this is spawned automatically when the sender connects to
* the receiver.
*
*/
struct config stream_config = {
.first_section = NULL,
.last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {
.avl_tree = {
.root = NULL,
.compar = appconfig_section_compare
},
.rwlock = AVL_LOCK_INITIALIZER
}
};
unsigned int default_rrdpush_enabled = 0;
#ifdef ENABLE_COMPRESSION
unsigned int default_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
bool default_rrdpush_enable_replication = true;
time_t default_rrdpush_seconds_to_replicate = 86400;
time_t default_rrdpush_replication_step = 600;
#ifdef ENABLE_HTTPS
int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
char *netdata_ssl_ca_path = NULL;
char *netdata_ssl_ca_file = NULL;
#endif
static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
for(co = stream_config.first_section; co; co = co->next) {
if(strcmp(co->name, "stream") == 0)
continue; // the first section is not relevant
char *s;
s = appconfig_get_by_section(co, "enabled", NULL);
if(!s || !appconfig_test_boolean_value(s))
continue;
s = appconfig_get_by_section(co, "default memory mode", NULL);
if(s && strcmp(s, "dbengine") == 0)
return true;
s = appconfig_get_by_section(co, "memory mode", NULL);
if(s && strcmp(s, "dbengine") == 0)
return true;
}
return false;
}
int rrdpush_init() {
// --------------------------------------------------------------------
// load stream.conf
load_stream_conf();
default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication);
default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB