diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-08-26 14:50:37 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-26 14:50:37 +0200 |
commit | ab7ff3131f3698710e0bd9fa3c66d31a3a194725 (patch) | |
tree | a78efc8cc829c2519b69e73e24d74e713c5105bb /streaming | |
parent | a11d33bba77043b28cf2b0dac4989cd189c635b2 (diff) |
Adds claimed_id streaming (#9804)
* streams claimed_id of child nodes to parents
* adds this information into /api/v1/info
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 43 | ||||
-rw-r--r-- | streaming/rrdpush.c | 19 | ||||
-rw-r--r-- | streaming/rrdpush.h | 8 | ||||
-rw-r--r-- | streaming/sender.c | 2 |
4 files changed, 69 insertions, 3 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index a7f66bede6..57962a3928 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -95,6 +95,48 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins return PARSER_RC_ERROR; } +#define CLAIMED_ID_MIN_WORDS 3 +PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action) +{ + UNUSED(plugins_action); + + int i; + uuid_t uuid; + RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; + + for (i = 0; words[i]; i++) ; + if (i != CLAIMED_ID_MIN_WORDS) { + error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1); + return PARSER_RC_ERROR; + } + + // We don't need the parsed UUID + // just do it to check the format + if(uuid_parse(words[1], uuid)) { + error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]); + return PARSER_RC_ERROR; + } + if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) { + error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]); + return PARSER_RC_ERROR; + } + + if(strcmp(words[1], host->machine_guid)) { + error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid); + return PARSER_RC_OK; //the message is OK problem must be somewehere else + } + + netdata_mutex_lock(&host->claimed_id_lock); + if (host->claimed_id) + freez(host->claimed_id); + host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; + netdata_mutex_unlock(&host->claimed_id_lock); + + rrdpush_claimed_id(host); + + return PARSER_RC_OK; +} + /* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. */ static int receiver_read(struct receiver_state *r, FILE *fp) { @@ -156,6 +198,7 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT); parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); + parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); if (unlikely(!parser)) { error("Failed to initialize parser"); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 1731857f56..a9e75d4ccf 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -364,6 +364,25 @@ void rrdpush_send_labels(RRDHOST *host) { host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; } + +void rrdpush_claimed_id(RRDHOST *host) +{ + if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected)) + return; + + sender_start(host->sender); + netdata_mutex_lock(&host->claimed_id_lock); + + buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->claimed_id ? host->claimed_id : "NULL") ); + + netdata_mutex_unlock(&host->claimed_id_lock); + sender_commit(host->sender); + + // signal the sender there are more data + if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) + error("STREAM %s [send]: cannot write to internal pipe", host->hostname); +} + // ---------------------------------------------------------------------------- // rrdpush sender thread diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 3985198d97..39ffed1c3b 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,9 +10,10 @@ #define CONNECTED_TO_SIZE 100 -// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 Gap-filling -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)2 -#define VERSION_GAP_FILLING 3 +// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling +#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 +#define VERSION_GAP_FILLING 4 +#define STREAM_VERSION_CLAIM 3 #define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..." @@ -106,6 +107,7 @@ extern void rrdset_done_push(RRDSET *st); extern void rrdset_push_chart_definition_now(RRDSET *st); extern void *rrdpush_sender_thread(void *ptr); extern void rrdpush_send_labels(RRDHOST *host); +extern void rrdpush_claimed_id(RRDHOST *host); extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); extern void rrdpush_sender_thread_stop(RRDHOST *host); diff --git a/streaming/sender.c b/streaming/sender.c index c6ea18bab9..c48f9b3e06 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -622,6 +622,8 @@ void *rrdpush_sender_thread(void *ptr) { buffer_sprintf(s->build, "TIMESTAMP %ld", now); sender_commit(s); } + if (s->version >= STREAM_VERSION_CLAIM) + rrdpush_claimed_id(s->host); continue; } |