summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c53
1 files changed, 35 insertions, 18 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 3ff022e973..a78b547caf 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -391,7 +391,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
host->rrdpush_receiver_connection_counter++;
- __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdb.localhost->connected_children_count, 1, __ATOMIC_RELAXED);
host->receiver = rpt;
rpt->host = host;
@@ -445,7 +445,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
// Make sure that we detach this thread and don't kill a freshly arriving receiver
if(host->receiver == rpt) {
- __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&rrdb.localhost->connected_children_count, 1, __ATOMIC_RELAXED);
rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
host->trigger_chart_obsoletion_check = 0;
@@ -549,8 +549,8 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con
static void rrdpush_receive(struct receiver_state *rpt)
{
- rpt->config.mode = default_rrd_memory_mode;
- rpt->config.history = default_rrd_history_entries;
+ rpt->config.storage_engine_id = default_storage_engine_id;
+ rpt->config.history = rrdb.default_rrd_history_entries;
rpt->config.health_enabled = (int)default_health_enabled;
rpt->config.alarms_delay = 60;
@@ -572,17 +572,34 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history);
if(rpt->config.history < 5) rpt->config.history = 5;
- rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode)));
- rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode)));
+ // figure out storage engine ids for key and/or machine guid
+ {
+ const char *se_name = appconfig_get(&stream_config, rpt->key,
+ "default memory mode",
+ storage_engine_name(rpt->config.storage_engine_id));
+ if (!storage_engine_id(se_name, &rpt->config.storage_engine_id)) {
+ netdata_log_error("STREAM '%s' [receive from %s:%s]: invalid default memory mode for key %s (given: '%s', will use: '%s').",
+ rpt->hostname, rpt->client_ip, rpt->client_port,
+ rpt->key, se_name, storage_engine_name(rpt->config.storage_engine_id));
+ }
- if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
- netdata_log_error("STREAM '%s' [receive from %s:%s]: "
- "dbengine is not enabled, falling back to default."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ se_name = appconfig_get(&stream_config, rpt->machine_guid,
+ "memory mode",
+ storage_engine_name(rpt->config.storage_engine_id));
+
+ if (!storage_engine_id(se_name, &rpt->config.storage_engine_id)) {
+ netdata_log_error("STREAM '%s' [receive from %s:%s]: invalid memory mode for machine guid %s (given: '%s', will use: '%s').",
+ rpt->hostname, rpt->client_ip, rpt->client_port,
+ rpt->machine_guid, se_name, storage_engine_name(rpt->config.storage_engine_id));
+ }
+ }
+
+ if (unlikely(rpt->config.storage_engine_id == STORAGE_ENGINE_DBENGINE && !rrdb.dbengine_enabled))
+ {
+ netdata_log_error("STREAM '%s' [receive from %s:%s]: dbengine is not enabled, falling back to default.",
+ rpt->hostname, rpt->client_ip, rpt->client_port);
- rpt->config.mode = default_rrd_memory_mode;
+ rpt->config.storage_engine_id = default_storage_engine_id;
}
rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled);
@@ -626,7 +643,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
// find the host for this receiver
{
// this will also update the host with our system_info
- RRDHOST *host = rrdhost_find_or_create(
+ RRDHOST *host = rrdhost_get_or_create(
rpt->hostname
, rpt->registry_hostname
, rpt->machine_guid
@@ -639,7 +656,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->program_version
, rpt->config.update_every
, rpt->config.history
- , rpt->config.mode
+ , rpt->config.storage_engine_id
, (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO)
, (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key)
, rpt->config.rrdpush_destination
@@ -683,9 +700,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->client_port
, rrdhost_hostname(rpt->host)
, rpt->host->machine_guid
- , rpt->host->rrd_update_every
+ , rpt->host->update_every
, rpt->host->rrd_history_entries
- , rrd_memory_mode_name(rpt->host->rrd_memory_mode)
+ , storage_engine_name(rpt->host->storage_engine_id)
, (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
#ifdef ENABLE_HTTPS
, (rpt->ssl.conn != NULL) ? " SSL," : ""
@@ -698,7 +715,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
struct plugind cd = {
- .update_every = default_rrd_update_every,
+ .update_every = rrdb.default_update_every,
.unsafe = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.running = true,