summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-08-26 14:50:37 +0200
committerGitHub <noreply@github.com>2020-08-26 14:50:37 +0200
commitab7ff3131f3698710e0bd9fa3c66d31a3a194725 (patch)
treea78efc8cc829c2519b69e73e24d74e713c5105bb /streaming/receiver.c
parenta11d33bba77043b28cf2b0dac4989cd189c635b2 (diff)
Adds claimed_id streaming (#9804)
* streams claimed_id of child nodes to parents * adds this information into /api/v1/info
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c43
1 files changed, 43 insertions, 0 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a7f66bede6..57962a3928 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -95,6 +95,48 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
return PARSER_RC_ERROR;
}
+#define CLAIMED_ID_MIN_WORDS 3
+PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+{
+ UNUSED(plugins_action);
+
+ int i;
+ uuid_t uuid;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+ for (i = 0; words[i]; i++) ;
+ if (i != CLAIMED_ID_MIN_WORDS) {
+ error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1);
+ return PARSER_RC_ERROR;
+ }
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(words[1], uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(words[1], host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewehere else
+ }
+
+ netdata_mutex_lock(&host->claimed_id_lock);
+ if (host->claimed_id)
+ freez(host->claimed_id);
+ host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ netdata_mutex_unlock(&host->claimed_id_lock);
+
+ rrdpush_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
@@ -156,6 +198,7 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT);
parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
+ parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
if (unlikely(!parser)) {
error("Failed to initialize parser");