summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-11-26 17:26:01 +0100
committerGitHub <noreply@github.com>2020-11-26 17:26:01 +0100
commitf1db235a36d85f81c5a19a7087567405771874f7 (patch)
treef99084d252e0af0496bf4776fa2c118f15dbee1a
parent4f867a58e5287e54aa5ecae023868615cafb9f93 (diff)
ACLK Child Availability Messages (#9918)
* new ACLK messages for Claiming MVP1
-rw-r--r--aclk/aclk_common.c3
-rw-r--r--aclk/aclk_common.h40
-rw-r--r--aclk/aclk_query.c124
-rw-r--r--aclk/aclk_rx_msgs.c8
-rw-r--r--aclk/agent_cloud_link.c264
-rw-r--r--aclk/agent_cloud_link.h14
-rw-r--r--claim/claim.c18
-rw-r--r--database/rrd.h8
-rw-r--r--database/rrdhost.c6
-rw-r--r--database/rrdset.c10
-rw-r--r--streaming/receiver.c23
-rw-r--r--streaming/rrdpush.c6
-rw-r--r--web/api/web_api_v1.c8
13 files changed, 402 insertions, 130 deletions
diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c
index c949d4c8c5..82c5e68eeb 100644
--- a/aclk/aclk_common.c
+++ b/aclk/aclk_common.c
@@ -12,9 +12,6 @@ int aclk_disable_runtime = 0;
int aclk_kill_link = 0;
struct aclk_shared_state aclk_shared_state = {
- .metadata_submitted = ACLK_METADATA_REQUIRED,
- .agent_state = AGENT_INITIALIZING,
- .last_popcorn_interrupt = 0,
.version_neg = 0,
.version_neg_wait_till = 0
};
diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h
index 6c749daff3..015e4b8bbd 100644
--- a/aclk/aclk_common.h
+++ b/aclk/aclk_common.h
@@ -10,7 +10,7 @@ extern netdata_mutex_t aclk_shared_state_mutex;
// minimum and maximum supported version of ACLK
// in this version of agent
#define ACLK_VERSION_MIN 2
-#define ACLK_VERSION_MAX 2
+#define ACLK_VERSION_MAX 3
// Version negotiation messages have they own versioning
// this is also used for LWT message as we set that up
@@ -26,7 +26,8 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#endif
// Define ACLK Feature Version Boundaries Here
-#define ACLK_V_COMPRESSION 2
+#define ACLK_V_COMPRESSION 2
+#define ACLK_V_CHILDRENSTATE 3
typedef enum aclk_cmd {
ACLK_CMD_CLOUD,
@@ -35,7 +36,9 @@ typedef enum aclk_cmd {
ACLK_CMD_CHART,
ACLK_CMD_CHARTDEL,
ACLK_CMD_ALARM,
- ACLK_CMD_CLOUD_QUERY_2
+ ACLK_CMD_CLOUD_QUERY_2,
+ ACLK_CMD_CHILD_CONNECT,
+ ACLK_CMD_CHILD_DISCONNECT
} ACLK_CMD;
typedef enum aclk_metadata_state {
@@ -45,13 +48,32 @@ typedef enum aclk_metadata_state {
} ACLK_METADATA_STATE;
typedef enum aclk_agent_state {
- AGENT_INITIALIZING,
- AGENT_STABLE
-} ACLK_AGENT_STATE;
+ ACLK_HOST_INITIALIZING,
+ ACLK_HOST_STABLE
+} ACLK_POPCORNING_STATE;
+
+typedef struct aclk_rrdhost_state {
+ char *claimed_id; // Claimed ID if host has one otherwise NULL
+
+#ifdef ENABLE_ACLK
+ // per child popcorning
+ ACLK_POPCORNING_STATE state;
+ ACLK_METADATA_STATE metadata;
+
+ time_t timestamp_created;
+ time_t t_last_popcorn_update;
+#endif
+} aclk_rrdhost_state;
+
+#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING)
+#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update)
+
+typedef struct rrdhost RRDHOST;
+
extern struct aclk_shared_state {
- ACLK_METADATA_STATE metadata_submitted;
- ACLK_AGENT_STATE agent_state;
- time_t last_popcorn_interrupt;
+ // optimization to avoid looping trough hosts
+ // every time Query Thread wakes up
+ RRDHOST *next_popcorn_host;
// read only while ACLK connected
// protect by lock otherwise
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 5e4b879512..be58c2209b 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -512,6 +512,13 @@ cleanup:
return retval;
}
+#define ACLK_HOST_PTR_COMPULSORY(x) \
+ if (unlikely(!host)) { \
+ errno = 0; \
+ error(x " needs host pointer"); \
+ break; \
+ }
+
/*
* This function will fetch the next pending command and process it
*
@@ -546,25 +553,40 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
switch (this_query->cmd) {
case ACLK_CMD_ONCONNECT:
- debug(D_ACLK, "EXECUTING on connect metadata command");
- ACLK_SHARED_STATE_LOCK;
- meta_state = aclk_shared_state.metadata_submitted;
- aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT;
- ACLK_SHARED_STATE_UNLOCK;
- aclk_send_metadata(meta_state);
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
+#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
+ if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
+ error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
+ break;
+ }
+#else
+#warning "This check became unnecessary. Remove"
+#endif
+
+ debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"",
+ host->hostname,
+ host->machine_guid);
+
+ rrdhost_aclk_state_lock(host);
+ meta_state = host->aclk_state.metadata;
+ host->aclk_state.metadata = ACLK_METADATA_SENT;
+ rrdhost_aclk_state_unlock(host);
+ aclk_send_metadata(meta_state, host);
break;
case ACLK_CMD_CHART:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART");
+
debug(D_ACLK, "EXECUTING a chart update command");
- if (!host)
- fatal("Pointer to host compulsory");
- aclk_send_single_chart(host->hostname, this_query->query);
+ aclk_send_single_chart(host, this_query->query);
break;
case ACLK_CMD_CHARTDEL:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL");
+
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
- aclk_send_info_metadata(ACLK_METADATA_SENT);
+ aclk_send_info_metadata(ACLK_METADATA_SENT, host);
break;
case ACLK_CMD_ALARM:
@@ -581,7 +603,19 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
aclk_execute_query_v2(this_query);
break;
+ case ACLK_CMD_CHILD_CONNECT:
+ case ACLK_CMD_CHILD_DISCONNECT:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT");
+
+ debug(
+ D_ACLK, "Execution Child %s command",
+ this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect");
+ aclk_send_info_child_connection(host, this_query->cmd);
+ break;
+
default:
+ errno = 0;
+ error("Unknown ACLK Query Command");
break;
}
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
@@ -634,6 +668,39 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
}
/**
+ * Checks and updates popcorning state of rrdhost
+ * returns actual/updated popcorning state
+ */
+
+ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
+{
+ rrdhost_aclk_state_lock(host);
+ ACLK_POPCORNING_STATE ret = host->aclk_state.state;
+ if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+ }
+
+ if (!host->aclk_state.t_last_popcorn_update){
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+ }
+
+ time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update;
+
+ if (t_diff >= ACLK_STABLE_TIMEOUT) {
+ host->aclk_state.state = ACLK_HOST_STABLE;
+ host->aclk_state.t_last_popcorn_update = 0;
+ rrdhost_aclk_state_unlock(host);
+ info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff);
+ return ACLK_HOST_STABLE;
+ }
+
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+}
+
+/**
* Main query processing thread
*
* On startup wait for the agent collectors to initialize
@@ -644,32 +711,14 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
void *aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
- time_t previous_popcorn_interrupt = 0;
while (!netdata_exit) {
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
- ACLK_SHARED_STATE_UNLOCK;
- break;
- }
-
- time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
-
- if (checkpoint > ACLK_STABLE_TIMEOUT) {
- aclk_shared_state.agent_state = AGENT_STABLE;
- ACLK_SHARED_STATE_UNLOCK;
- info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
+ if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) {
#ifdef ACLK_DEBUG
_dump_collector_list();
#endif
break;
}
-
- if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) {
- info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT);
- previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt;
- }
- ACLK_SHARED_STATE_UNLOCK;
sleep_usec(USEC_PER_SEC * 1);
}
@@ -692,15 +741,26 @@ void *aclk_query_main_thread(void *ptr)
aclk_shared_state.version_neg = ACLK_VERSION_MIN;
aclk_set_rx_handlers(aclk_shared_state.version_neg);
}
- if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) {
- if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ ACLK_SHARED_STATE_UNLOCK;
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
+ if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
ACLK_SHARED_STATE_UNLOCK;
errno = 0;
error("ACLK failed to queue on_connect command");
sleep(1);
continue;
}
- aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED;
+ localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED;
+ }
+ rrdhost_aclk_state_unlock(localhost);
+
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
+ aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ aclk_shared_state.next_popcorn_host = NULL;
+ aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 85b0d74ac9..b90f60d9a8 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -38,13 +38,13 @@ static inline int aclk_v2_payload_get_query(const char *payload, struct aclk_req
return 0;
}
-#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
+#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\
+ if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\
debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
- ACLK_SHARED_STATE_UNLOCK;\
+ rrdhost_aclk_state_unlock(localhost);\
return 1;\
}\
- ACLK_SHARED_STATE_UNLOCK;
+ rrdhost_aclk_state_unlock(localhost);
/*
* Parse the incoming payload and queue a command if valid
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index d19ee27fde..3c28e1e98a 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -438,23 +438,136 @@ static struct _collector *_add_collector(const char *hostname, const char *plugi
#pragma endregion
#endif
-inline static int aclk_popcorn_check_bump()
+/* Avoids the need to scan trough all RRDHOSTS
+ * every time any Query Thread Wakes Up
+ * (every time we need to check child popcorn expiry)
+ * call with ACLK_SHARED_STATE_LOCK held
+ */
+void aclk_update_next_child_to_popcorn(void)
{
+ RRDHOST *host;
+ int any = 0;
+
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)))
+ continue;
+
+ rrdhost_aclk_state_lock(host);
+ if (!ACLK_IS_HOST_POPCORNING(host)) {
+ rrdhost_aclk_state_unlock(host);
+ continue;
+ }
+
+ any = 1;
+
+ if (unlikely(!aclk_shared_state.next_popcorn_host)) {
+ aclk_shared_state.next_popcorn_host = host;
+ rrdhost_aclk_state_unlock(host);
+ continue;
+ }
+
+ if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
+ aclk_shared_state.next_popcorn_host = host;
+
+ rrdhost_aclk_state_unlock(host);
+ }
+ if(!any)
+ aclk_shared_state.next_popcorn_host = NULL;
+
+ rrd_unlock();
+}
+
+/* If popcorning bump timer.
+ * If popcorning or initializing (host not stable) return 1
+ * Otherwise return 0
+ */
+static int aclk_popcorn_check_bump(RRDHOST *host)
+{
+ time_t now = now_monotonic_sec();
+ int updated = 0, ret;
ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
- aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
+ rrdhost_aclk_state_lock(host);
+
+ ret = ACLK_IS_HOST_INITIALIZING(host);
+ if (unlikely(ACLK_IS_HOST_POPCORNING(host))) {
+ if(now != host->aclk_state.t_last_popcorn_update) {
+ updated = 1;
+ info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
+ }
+ host->aclk_state.t_last_popcorn_update = now;
+ rrdhost_aclk_state_unlock(host);
+
+ if (host != localhost && updated)
+ aclk_update_next_child_to_popcorn();
+
ACLK_SHARED_STATE_UNLOCK;
- return 1;
+ return ret;
+ }
+
+ rrdhost_aclk_state_unlock(host);
+ ACLK_SHARED_STATE_UNLOCK;
+ return ret;
+}
+
+inline static int aclk_host_initializing(RRDHOST *host)
+{
+ rrdhost_aclk_state_lock(host);
+ int ret = ACLK_IS_HOST_INITIALIZING(host);
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+}
+
+static void aclk_start_host_popcorning(RRDHOST *host)
+{
+ usec_t now = now_monotonic_sec();
+ info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
+ ACLK_SHARED_STATE_LOCK;
+ rrdhost_aclk_state_lock(host);
+ if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
+ errno = 0;
+ error("Localhost is allowed to do popcorning only once after startup!");
+ rrdhost_aclk_state_unlock(host);
+ ACLK_SHARED_STATE_UNLOCK;
+ return;
+ }
+
+ host->aclk_state.state = ACLK_HOST_INITIALIZING;
+ host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
+ host->aclk_state.t_last_popcorn_update = now;
+ rrdhost_aclk_state_unlock(host);
+ if (host != localhost)
+ aclk_update_next_child_to_popcorn();
+ ACLK_SHARED_STATE_UNLOCK;
+}
+
+static void aclk_stop_host_popcorning(RRDHOST *host)
+{
+ ACLK_SHARED_STATE_LOCK;
+ rrdhost_aclk_state_lock(host);
+ if (!ACLK_IS_HOST_POPCORNING(host)) {
+ rrdhost_aclk_state_unlock(host);
+ ACLK_SHARED_STATE_UNLOCK;
+ return;
+ }
+
+ info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid);
+ host->aclk_state.t_last_popcorn_update = 0;
+ host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
+ rrdhost_aclk_state_unlock(host);
+
+ if(host == aclk_shared_state.next_popcorn_host) {
+ aclk_shared_state.next_popcorn_host = NULL;
+ aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;
- return 0;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
-void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@@ -463,7 +576,7 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_LOCK;
- tmp_collector = _add_collector(hostname, plugin_name, module_name);
+ tmp_collector = _add_collector(host->hostname, plugin_name, module_name);
if (unlikely(tmp_collector->count != 1)) {
COLLECTOR_UNLOCK;
@@ -472,10 +585,10 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_UNLOCK;
- if(aclk_popcorn_check_bump())
+ if(aclk_popcorn_check_bump(host))
return;
- if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
@@ -487,7 +600,7 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
* This function will release the memory used and schedule
* a cloud update
*/
-void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@@ -496,7 +609,7 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_LOCK;
- tmp_collector = _del_collector(hostname, plugin_name, module_name);
+ tmp_collector = _del_collector(host->hostname, plugin_name, module_name);
if (unlikely(!tmp_collector || tmp_collector->count)) {
COLLECTOR_UNLOCK;
@@ -511,10 +624,10 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
_free_collector(tmp_collector);
- if (aclk_popcorn_check_bump())
+ if (aclk_popcorn_check_bump(host))
return;
- if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
@@ -895,6 +1008,7 @@ void *aclk_main(void *ptr)
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_query_threads query_threads;
struct aclk_stats_thread *stats_thread = NULL;
+ time_t last_periodic_query_wakeup = 0;
query_threads.thread_list = NULL;
@@ -941,7 +1055,8 @@ void *aclk_main(void *ptr)
config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
}
- aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); // without mutex here because threads are not yet started
+ //start localhost popcorning
+ aclk_start_host_popcorning(localhost);
aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
if (aclk_stats_enabled) {
@@ -1051,6 +1166,14 @@ void *aclk_main(void *ptr)
if (unlikely(!query_threads.thread_list)) {
aclk_query_threads_start(&query_threads);
}
+
+ time_t now = now_monotonic_sec();
+ if(aclk_connected && last_periodic_query_wakeup < now) {
+ // to make `aclk_queue_query()` param `run_after` work
+ // also makes per child popcorning work
+ last_periodic_query_wakeup = now;
+ QUERY_THREAD_WAKEUP;
+ }
} // forever
exited:
// Wakeup query thread to cleanup
@@ -1200,9 +1323,9 @@ void aclk_disconnect()
aclk_stats_upd_online(0);
aclk_subscribed = 0;
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.metadata_submitted = ACLK_METADATA_REQUIRED;
- ACLK_SHARED_STATE_UNLOCK;
+ rrdhost_aclk_state_lock(localhost);
+ localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED;
+ rrdhost_aclk_state_unlock(localhost);
aclk_connected = 0;
aclk_connecting = 0;
aclk_force_reconnect = 1;
@@ -1294,7 +1417,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
* /api/v1/info
* charts
*/
-int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
+int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1315,11 +1438,11 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
- web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
+ web_client_api_request_v1_info_fill_buffer(host, local_buffer);
debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
- charts2json(localhost, local_buffer, 1, 0);
+ charts2json(host, local_buffer, 1, 0);
buffer_sprintf(local_buffer, "\n}\n}");
debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
@@ -1330,6 +1453,66 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
return 0;
}
+int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
+{
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
+
+ debug(D_ACLK, "Sending Child Disconnect");
+
+ char *msg_id = create_uuid();
+
+ aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
+
+ buffer_strcat(local_buffer, ",\"payload\":");
+
+ buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid);
+ rrdhost_aclk_state_lock(host);
+ if(host->aclk_state.claimed_id)
+ buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id);
+ else
+ buffer_strcat(local_buffer, "null}}");
+
+ rrdhost_aclk_state_unlock(host);
+
+ aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
+
+ freez(msg_id);
+ buffer_free(local_buffer);
+ return 0;
+}
+
+void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
+{
+#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
+ if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ return;
+#else
+#warning "This check became unnecessary. Remove"
+#endif
+
+ if (unlikely(aclk_host_initializing(localhost)))
+ return;
+
+ switch (cmd) {
+ case ACLK_CMD_CHILD_CONNECT:
+ debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
+ aclk_start_host_popcorning(host);
+ aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
+ break;
+ case ACLK_CMD_CHILD_DISCONNECT:
+ debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
+ aclk_stop_host_popcorning(host);
+ aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
+ break;
+ default:
+ error("Unknown command for aclk_host_state_update %d.", (int)cmd);
+ }
+}
+
void aclk_send_stress_test(size_t size)
{
char *buffer = mallocz(size);
@@ -1351,11 +1534,12 @@ void aclk_send_stress_test(size_t size)
// Send info metadata message to the cloud if the link is established
// or on request
-int aclk_send_metadata(ACLK_METADATA_STATE state)
+int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
{
+ aclk_send_info_metadata(state, host);
- aclk_send_info_metadata(state);
- aclk_send_alarm_metadata(state);
+ if(host == localhost)
+ aclk_send_alarm_metadata(state);
return 0;
}
@@ -1373,15 +1557,10 @@ void aclk_single_update_enable()
// Trigged by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
-
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
+ if (unlikely(aclk_host_initializing(localhost)))
return;
- }
- ACLK_SHARED_STATE_UNLOCK;
- if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue on_connect command on alarm reload");
@@ -1390,17 +1569,11 @@ void aclk_alarm_reload()
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
-int aclk_send_single_chart(char *hostname, char *chart)
+int aclk_send_single_chart(RRDHOST *host, char *chart)
{
- RRDHOST *target_host;
-
- target_host = rrdhost_find_by_hostname(hostname, 0);
- if (!target_host)
- return 1;
-
- RRDSET *st = rrdset_find(target_host, chart);
+ RRDSET *st = rrdset_find(host, chart);
if (!st)
- st = rrdset_find_byname(target_host, chart);
+ st = rrdset_find_byname(host, chart);
if (!st) {
info("FAILED to find chart %s", chart);
return 1;
@@ -1437,13 +1610,16 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (!netdata_cloud_setting)
return 0;
- if (host != localhost)
+ if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
+ return 0;
+
+ if (aclk_host_initializing(localhost))
return 0;
if (unlikely(aclk_disable_single_updates))
return 0;
- if (aclk_popcorn_check_bump())
+ if (aclk_popcorn_check_bump(host))
return 0;
if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
@@ -1467,12 +1643,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
if (host != localhost)
return 0;
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
+ if(unlikely(aclk_host_initializing(localhost)))
return 0;
- }
- ACLK_SHARED_STATE_UNLOCK;
/*
* Check if individual updates have been disabled
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index b6bb74c2f4..b224a45433 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -66,24 +66,28 @@ int cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect();
void aclk_connect();
-int aclk_send_metadata(ACLK_METADATA_STATE state);
-int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted);
+int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host);
+int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
int aclk_wait_for_initialization();
char *create_publish_base_topic();
-int aclk_send_single_chart(char *host, char *chart);
+int aclk_send_single_chart(RRDHOST *host, char *chart);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
int aclk_handle_cloud_message(char *payload);
-void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
-void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_alarm_reload();
unsigned long int aclk_reconnect_delay(int mode);
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_single_update_enable();
void aclk_single_update_disable();
+void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd);
+int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd);
+void aclk_update_next_child_to_popcorn(void);
+
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/claim/claim.c b/claim/claim.c
index 7846925e6d..d87f46a7f8 100644
--- a/claim/claim.c
+++ b/claim/claim.c
@@ -34,9 +34,9 @@ static char *claiming_errors[] = {
char *is_agent_claimed()
{
char *result;
- netdata_mutex_lock(&localhost->claimed_id_lock);
- result = (localhost->claimed_id == NULL) ? NULL : strdupz(localhost->claimed_id);
- netdata_mutex_unlock(&localhost->claimed_id_lock);
+ rrdhost_aclk_state_lock(localhost);
+ result = (localhost->aclk_state.claimed_id == NULL) ? NULL : strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
return result;
}
@@ -134,10 +134,10 @@ void load_claiming_state(void)
netdata_cloud_setting = 0;
#else
uuid_t uuid;
- netdata_mutex_lock(&localhost->claimed_id_lock);
- if (localhost->claimed_id) {
- freez(localhost->claimed_id);
- localhost->claimed_id = NULL;
+ rrdhost_aclk_state_lock(localhost);
+ if (localhost->aclk_state.claimed_id) {
+ freez(localhost->aclk_state.claimed_id);
+ localhost->aclk_state.claimed_id = NULL;
}
if (aclk_connected)
{
@@ -159,8 +159,8 @@ void load_claiming_state(void)
freez(claimed_id);
claimed_id = NULL;
}
- localhost->claimed_id = claimed_id;
- netdata_mutex_unlock(&localhost->claimed_id_lock);
+ localhost->aclk_state.claimed_id = claimed_id;
+ rrdhost_aclk_state_unlock(localhost);
if (!claimed_id) {
info("Unable to load '%s', setting state to AGENT_UNCLAIMED", filename);
return;
diff --git a/database/rrd.h b/database/rrd.h
index c7bdb98e6d..fd4cae81aa 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -33,6 +33,7 @@ struct pg_cache_page_index;
#include "rrdcalc.h"
#include "rrdcalctemplate.h"
#include "../streaming/rrdpush.h"
+#include "../aclk/aclk_common.h"
struct context_param {
RRDDIM *rd;
@@ -825,8 +826,8 @@ struct rrdhost {
struct netdata_ssl stream_ssl; //Structure used to encrypt the stream
#endif
- netdata_mutex_t claimed_id_lock;
- char *claimed_id; // Claimed ID if host has one otherwise NULL
+ netdata_mutex_t aclk_state_lock;
+ aclk_rrdhost_state aclk_state;
struct rrdhost *next;
};
@@ -836,6 +837,9 @@ extern RRDHOST *localhost;
#define rrdhost_wrlock(host) netdata_rwlock_wrlock(&((host)->rrdhost_rwlock))
#define rrdhost_unlock(host) netdata_rwlock_unlock(&((host)->rrdhost_rwlock))
+#define rrdhost_aclk_state_lock(host) netdata_mutex_lock(&((host)->aclk_state_lock))
+#define rrdhost_aclk_state_unlock(host) netdata_mutex_unlock(&((host)->aclk_state_lock))
+
// ----------------------------------------------------------------------------
// these loop macros make sure the linked list is accessed with the right lock
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 54af0390e0..c3940c29b2 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -189,7 +189,7 @@ RRDHOST *rrdhost_create(const char *hostname,
netdata_rwlock_init(&host->rrdhost_rwlock);
netdata_rwlock_init(&host->labels_rwlock);
- netdata_mutex_init(&host->claimed_id_lock);
+ netdata_mutex_init(&host->aclk_state_lock);
host->system_info = system_info;
@@ -870,8 +870,8 @@ void rrdhost_free(RRDHOST *host) {
// ------------------------------------------------------------------------
// free it
- pthread_mutex_destroy(&host->claimed_id_lock);
- freez(host->claimed_id);
+ pthread_mutex_destroy(&host->aclk_state_lock);
+ freez(host->aclk_state.claimed_id);
freez((void *)host