diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 53 |
1 files changed, 35 insertions, 18 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 3ff022e973..a78b547caf 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -391,7 +391,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); host->rrdpush_receiver_connection_counter++; - __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdb.localhost->connected_children_count, 1, __ATOMIC_RELAXED); host->receiver = rpt; rpt->host = host; @@ -445,7 +445,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { // Make sure that we detach this thread and don't kill a freshly arriving receiver if(host->receiver == rpt) { - __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&rrdb.localhost->connected_children_count, 1, __ATOMIC_RELAXED); rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); host->trigger_chart_obsoletion_check = 0; @@ -549,8 +549,8 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con static void rrdpush_receive(struct receiver_state *rpt) { - rpt->config.mode = default_rrd_memory_mode; - rpt->config.history = default_rrd_history_entries; + rpt->config.storage_engine_id = default_storage_engine_id; + rpt->config.history = rrdb.default_rrd_history_entries; rpt->config.health_enabled = (int)default_health_enabled; rpt->config.alarms_delay = 60; @@ -572,17 +572,34 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history); if(rpt->config.history < 5) rpt->config.history = 5; - rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode))); - rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); + // figure out storage engine ids for key and/or machine guid + { + const char *se_name = appconfig_get(&stream_config, rpt->key, + "default memory mode", + storage_engine_name(rpt->config.storage_engine_id)); + if (!storage_engine_id(se_name, &rpt->config.storage_engine_id)) { + netdata_log_error("STREAM '%s' [receive from %s:%s]: invalid default memory mode for key %s (given: '%s', will use: '%s').", + rpt->hostname, rpt->client_ip, rpt->client_port, + rpt->key, se_name, storage_engine_name(rpt->config.storage_engine_id)); + } - if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - netdata_log_error("STREAM '%s' [receive from %s:%s]: " - "dbengine is not enabled, falling back to default." - , rpt->hostname - , rpt->client_ip, rpt->client_port - ); + se_name = appconfig_get(&stream_config, rpt->machine_guid, + "memory mode", + storage_engine_name(rpt->config.storage_engine_id)); + + if (!storage_engine_id(se_name, &rpt->config.storage_engine_id)) { + netdata_log_error("STREAM '%s' [receive from %s:%s]: invalid memory mode for machine guid %s (given: '%s', will use: '%s').", + rpt->hostname, rpt->client_ip, rpt->client_port, + rpt->machine_guid, se_name, storage_engine_name(rpt->config.storage_engine_id)); + } + } + + if (unlikely(rpt->config.storage_engine_id == STORAGE_ENGINE_DBENGINE && !rrdb.dbengine_enabled)) + { + netdata_log_error("STREAM '%s' [receive from %s:%s]: dbengine is not enabled, falling back to default.", + rpt->hostname, rpt->client_ip, rpt->client_port); - rpt->config.mode = default_rrd_memory_mode; + rpt->config.storage_engine_id = default_storage_engine_id; } rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled); @@ -626,7 +643,7 @@ static void rrdpush_receive(struct receiver_state *rpt) // find the host for this receiver { // this will also update the host with our system_info - RRDHOST *host = rrdhost_find_or_create( + RRDHOST *host = rrdhost_get_or_create( rpt->hostname , rpt->registry_hostname , rpt->machine_guid @@ -639,7 +656,7 @@ static void rrdpush_receive(struct receiver_state *rpt) , rpt->program_version , rpt->config.update_every , rpt->config.history - , rpt->config.mode + , rpt->config.storage_engine_id , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO) , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key) , rpt->config.rrdpush_destination @@ -683,9 +700,9 @@ static void rrdpush_receive(struct receiver_state *rpt) , rpt->client_port , rrdhost_hostname(rpt->host) , rpt->host->machine_guid - , rpt->host->rrd_update_every + , rpt->host->update_every , rpt->host->rrd_history_entries - , rrd_memory_mode_name(rpt->host->rrd_memory_mode) + , storage_engine_name(rpt->host->storage_engine_id) , (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") #ifdef ENABLE_HTTPS , (rpt->ssl.conn != NULL) ? " SSL," : "" @@ -698,7 +715,7 @@ static void rrdpush_receive(struct receiver_state *rpt) struct plugind cd = { - .update_every = default_rrd_update_every, + .update_every = rrdb.default_update_every, .unsafe = { .spinlock = NETDATA_SPINLOCK_INITIALIZER, .running = true, |