summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-08-26 14:50:37 +0200
committerGitHub <noreply@github.com>2020-08-26 14:50:37 +0200
commitab7ff3131f3698710e0bd9fa3c66d31a3a194725 (patch)
treea78efc8cc829c2519b69e73e24d74e713c5105bb
parenta11d33bba77043b28cf2b0dac4989cd189c635b2 (diff)
Adds claimed_id streaming (#9804)
* streams claimed_id of child nodes to parents * adds this information into /api/v1/info
-rw-r--r--claim/claim.c27
-rw-r--r--daemon/commands.c1
-rw-r--r--database/rrd.h3
-rw-r--r--database/rrdhost.c4
-rw-r--r--streaming/receiver.c43
-rw-r--r--streaming/rrdpush.c19
-rw-r--r--streaming/rrdpush.h8
-rw-r--r--streaming/sender.c2
-rw-r--r--web/api/web_api_v1.c44
9 files changed, 129 insertions, 22 deletions
diff --git a/claim/claim.c b/claim/claim.c
index 0e84d6295c..b9186bebcd 100644
--- a/claim/claim.c
+++ b/claim/claim.c
@@ -27,8 +27,6 @@ static char *claiming_errors[] = {
"Service Unavailable", // 17
"Agent Unique Id Not Readable" // 18
};
-static netdata_mutex_t claim_mutex = NETDATA_MUTEX_INITIALIZER;
-static char *claimed_id = NULL;
/* Retrieve the claim id for the agent.
* Caller owns the string.
@@ -36,9 +34,9 @@ static char *claimed_id = NULL;
char *is_agent_claimed()
{
char *result;
- netdata_mutex_lock(&claim_mutex);
- result = (claimed_id == NULL) ? NULL : strdup(claimed_id);
- netdata_mutex_unlock(&claim_mutex);
+ netdata_mutex_lock(&localhost->claimed_id_lock);
+ result = (localhost->claimed_id == NULL) ? NULL : strdupz(localhost->claimed_id);
+ netdata_mutex_unlock(&localhost->claimed_id_lock);
return result;
}
@@ -135,10 +133,11 @@ void load_claiming_state(void)
#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
netdata_cloud_setting = 0;
#else
- netdata_mutex_lock(&claim_mutex);
- if (claimed_id != NULL) {
- freez(claimed_id);
- claimed_id = NULL;
+ uuid_t uuid;
+ netdata_mutex_lock(&localhost->claimed_id_lock);
+ if (localhost->claimed_id) {
+ freez(localhost->claimed_id);
+ localhost->claimed_id = NULL;
}
if (aclk_connected)
{
@@ -153,8 +152,14 @@ void load_claiming_state(void)
snprintfz(filename, FILENAME_MAX, "%s/cloud.d/claimed_id", netdata_configured_varlib_dir);
long bytes_read;
- claimed_id = read_by_filename(filename, &bytes_read);
- netdata_mutex_unlock(&claim_mutex); // Only the main thread can call this function, safe to release and then read
+ char *claimed_id = read_by_filename(filename, &bytes_read);
+ if(claimed_id && uuid_parse(claimed_id, uuid)) {
+ error("claimed_id \"%s\" doesn't look like valid UUID", claimed_id);
+ freez(claimed_id);
+ claimed_id = NULL;
+ }
+ localhost->claimed_id = claimed_id;
+ netdata_mutex_unlock(&localhost->claimed_id_lock);
if (!claimed_id) {
info("Unable to load '%s', setting state to AGENT_UNCLAIMED", filename);
return;
diff --git a/daemon/commands.c b/daemon/commands.c
index 57d39007e5..d6364e89a8 100644
--- a/daemon/commands.c
+++ b/daemon/commands.c
@@ -202,6 +202,7 @@ static cmd_status_t cmd_reload_claiming_state_execute(char *args, char **message
info("COMMAND: Reloading Agent Claiming configuration.");
load_claiming_state();
registry_update_cloud_base_url();
+ rrdpush_claimed_id(localhost);
error_log_limit_reset();
return CMD_STATUS_SUCCESS;
}
diff --git a/database/rrd.h b/database/rrd.h
index 405dec3b62..039a489672 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -817,6 +817,9 @@ struct rrdhost {
struct netdata_ssl stream_ssl; //Structure used to encrypt the stream
#endif
+ netdata_mutex_t claimed_id_lock;
+ char *claimed_id; // Claimed ID if host has one otherwise NULL
+
struct rrdhost *next;
};
extern RRDHOST *localhost;
diff --git a/database/rrdhost.c b/database/rrdhost.c
index d82a9099b4..08bdb1b3ba 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -167,6 +167,8 @@ RRDHOST *rrdhost_create(const char *hostname,
netdata_rwlock_init(&host->rrdhost_rwlock);
netdata_rwlock_init(&host->labels_rwlock);
+ netdata_mutex_init(&host->claimed_id_lock);
+
rrdhost_init_hostname(host, hostname);
rrdhost_init_machine_guid(host, guid);
@@ -858,6 +860,8 @@ void rrdhost_free(RRDHOST *host) {
// ------------------------------------------------------------------------
// free it
+ pthread_mutex_destroy(&host->claimed_id_lock);
+ freez(host->claimed_id);
freez((void *)host->tags);
free_host_labels(host->labels);
freez((void *)host->os);
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a7f66bede6..57962a3928 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -95,6 +95,48 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
return PARSER_RC_ERROR;
}
+#define CLAIMED_ID_MIN_WORDS 3
+PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+{
+ UNUSED(plugins_action);
+
+ int i;
+ uuid_t uuid;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+ for (i = 0; words[i]; i++) ;
+ if (i != CLAIMED_ID_MIN_WORDS) {
+ error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1);
+ return PARSER_RC_ERROR;
+ }
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(words[1], uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(words[1], host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewehere else
+ }
+
+ netdata_mutex_lock(&host->claimed_id_lock);
+ if (host->claimed_id)
+ freez(host->claimed_id);
+ host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ netdata_mutex_unlock(&host->claimed_id_lock);
+
+ rrdpush_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
@@ -156,6 +198,7 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT);
parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
+ parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
if (unlikely(!parser)) {
error("Failed to initialize parser");
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 1731857f56..a9e75d4ccf 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -364,6 +364,25 @@ void rrdpush_send_labels(RRDHOST *host) {
host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
}
+
+void rrdpush_claimed_id(RRDHOST *host)
+{
+ if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected))
+ return;
+
+ sender_start(host->sender);
+ netdata_mutex_lock(&host->claimed_id_lock);
+
+ buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->claimed_id ? host->claimed_id : "NULL") );
+
+ netdata_mutex_unlock(&host->claimed_id_lock);
+ sender_commit(host->sender);
+
+ // signal the sender there are more data
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+}
+
// ----------------------------------------------------------------------------
// rrdpush sender thread
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 3985198d97..39ffed1c3b 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,9 +10,10 @@
#define CONNECTED_TO_SIZE 100
-// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 Gap-filling
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)2
-#define VERSION_GAP_FILLING 3
+// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3
+#define VERSION_GAP_FILLING 4
+#define STREAM_VERSION_CLAIM 3
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
@@ -106,6 +107,7 @@ extern void rrdset_done_push(RRDSET *st);
extern void rrdset_push_chart_definition_now(RRDSET *st);
extern void *rrdpush_sender_thread(void *ptr);
extern void rrdpush_send_labels(RRDHOST *host);
+extern void rrdpush_claimed_id(RRDHOST *host);
extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
extern void rrdpush_sender_thread_stop(RRDHOST *host);
diff --git a/streaming/sender.c b/streaming/sender.c
index c6ea18bab9..c48f9b3e06 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -622,6 +622,8 @@ void *rrdpush_sender_thread(void *ptr) {
buffer_sprintf(s->build, "TIMESTAMP %ld", now);
sender_commit(s);
}
+ if (s->version >= STREAM_VERSION_CLAIM)
+ rrdpush_claimed_id(s->host);
continue;
}
diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c
index 09366de86a..b02814dc49 100644
--- a/web/api/web_api_v1.c
+++ b/web/api/web_api_v1.c
@@ -774,18 +774,48 @@ static inline void web_client_api_request_v1_info_summary_alarm_statuses(RRDHOST
}
static inline void web_client_api_request_v1_info_mirrored_hosts(BUFFER *wb) {
- RRDHOST *rc;
+ RRDHOST *host;
int count = 0;
+
+ buffer_strcat(wb, "\t\"mirrored_hosts\": [\n");
rrd_rdlock();
- rrdhost_foreach_read(rc) {
- if (rrdhost_flag_check(rc, RRDHOST_FLAG_ARCHIVED))
+ rrdhost_foreach_read(host) {
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
continue;
- if(count > 0) buffer_strcat(wb, ",\n");
- buffer_sprintf(wb, "\t\t\"%s\"", rc->hostname);
+ if (count > 0)
+ buffer_strcat(wb, ",\n");
+
+ buffer_sprintf(wb, "\t\t\"%s\"", host->hostname);
+ count++;
+ }
+
+ buffer_strcat(wb, "\n\t],\n\t\"mirrored_hosts_status\": [\n");
+ count = 0;
+ rrdhost_foreach_read(host)
+ {
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
+ continue;
+ if (count > 0)
+ buffer_strcat(wb, ",\n");
+
+ netdata_mutex_lock(&host->receiver_lock);
+ buffer_sprintf(
+ wb, "\t\t{ \"guid\": \"%s\", \"reachable\": %s, \"claim_id\": ", host->machine_guid,
+ (host->receiver || host == localhost) ? "true" : "false");
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ netdata_mutex_lock(&host->claimed_id_lock);
+ if (host->claimed_id)
+ buffer_sprintf(wb, "\"%s\" }", host->claimed_id);
+ else
+ buffer_strcat(wb, "null }");
+ netdata_mutex_unlock(&host->claimed_id_lock);
+
count++;
}
- buffer_strcat(wb, "\n");
rrd_unlock();
+
+ buffer_strcat(wb, "\n\t],\n");
}
inline void host_labels2json(RRDHOST *host, BUFFER *wb, size_t indentation) {
@@ -825,9 +855,7 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
buffer_sprintf(wb, "\t\"version\": \"%s\",\n", host->program_version);
buffer_sprintf(wb, "\t\"uid\": \"%s\",\n", host->machine_guid);
- buffer_strcat(wb, "\t\"mirrored_hosts\": [\n");
web_client_api_request_v1_info_mirrored_hosts(wb);
- buffer_strcat(wb, "\t],\n");
buffer_strcat(wb, "\t\"alarms\": {\n");
web_client_api_request_v1_info_summary_alarm_statuses(host, wb);