From 05ef02a817c24ff4c5eae2d813387301b093e220 Mon Sep 17 00:00:00 2001 From: Emmanuel Vasilakis Date: Fri, 20 May 2022 09:02:25 +0300 Subject: Apply some logic to possible streaming destinations (#12866) * replace connect_to_one_of with connect_to_one_of_destinations * move functions from socket.c * use sizeof * move current destination pointer to host * formatting * use snprintfz * get entries in same order * handle single destination as before (or when it is the last of the list), instead of skiping it every other loop * try other destinations on ssl problem --- streaming/receiver.c | 18 ++++++++++-- streaming/rrdpush.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++ streaming/rrdpush.h | 21 ++++++++++++++ streaming/sender.c | 45 ++++++++++++++++++++++++++++-- 4 files changed, 159 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/receiver.c b/streaming/receiver.c index c7879ea5b9..87f96039dc 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -466,9 +466,23 @@ static int rrdpush_receive(struct receiver_state *rpt) if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT"); - error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); + error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); + char initial_response[HTTP_HEADER_SIZE + 1]; + snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); +#ifdef ENABLE_HTTPS + rpt->host->stream_ssl.conn = rpt->ssl.conn; + rpt->host->stream_ssl.flags = rpt->ssl.flags; + if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { +#else + if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { +#endif + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + close(rpt->fd); + return 0; + } close(rpt->fd); - return 1; + return 0; } if (rpt->host==NULL) { diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 186719e51d..00fe02093d 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -417,6 +417,85 @@ void rrdpush_claimed_id(RRDHOST *host) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); } +int connect_to_one_of_destinations( + struct rrdpush_destinations *destinations, + int default_port, + struct timeval *timeout, + size_t *reconnects_counter, + char *connected_to, + size_t connected_to_size, + struct rrdpush_destinations **destination) +{ + int sock = -1; + + for (struct rrdpush_destinations *d = destinations; d; d = d->next) { + if (d->disabled_no_proper_reply) { + d->disabled_no_proper_reply = 0; + continue; + } else if (d->disabled_because_of_localhost) { + continue; + } else if (d->disabled_already_streaming && (d->disabled_already_streaming + 30 > now_realtime_sec())) { + continue; + } else if (d->disabled_because_of_denied_access) { + continue; + } + + if (reconnects_counter) + *reconnects_counter += 1; + sock = connect_to_this(d->destination, default_port, timeout); + if (sock != -1) { + if (connected_to && connected_to_size) { + strncpy(connected_to, d->destination, connected_to_size); + connected_to[connected_to_size - 1] = '\0'; + } + *destination = d; + break; + } + } + + return sock; +} + +struct rrdpush_destinations *destinations_init(const char *dests) { + const char *s = dests; + struct rrdpush_destinations *destinations = NULL, *prev = NULL; + while(*s) { + const char *e = s; + + // skip path, moving both s(tart) and e(nd) + if(*e == '/') + while(!isspace(*e) && *e != ',') s = ++e; + + // skip separators, moving both s(tart) and e(nd) + while(isspace(*e) || *e == ',') s = ++e; + + // move e(nd) to the first separator + while(*e && !isspace(*e) && *e != ',' && *e != '/') e++; + + // is there anything? + if(!*s || s == e) break; + + char buf[e - s + 1]; + strncpyz(buf, s, e - s); + struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); + strncpyz(d->destination, buf, sizeof(d->destination)-1); + d->disabled_no_proper_reply = 0; + d->disabled_because_of_localhost = 0; + d->disabled_already_streaming = 0; + d->disabled_because_of_denied_access = 0; + d->next = NULL; + if (!destinations) { + destinations = d; + } else { + prev->next = d; + } + prev = d; + + s = e; + } + return destinations; +} + // ---------------------------------------------------------------------------- // rrdpush sender thread diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 7eb2c6e580..6efe8cd6f3 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -26,6 +26,10 @@ #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" +#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back" +#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server" +#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info." + #define HTTP_HEADER_SIZE 8192 typedef enum { @@ -138,6 +142,14 @@ struct receiver_state { #endif }; +struct rrdpush_destinations { + char destination[CONNECTED_TO_SIZE + 1]; + int disabled_no_proper_reply; + int disabled_because_of_localhost; + time_t disabled_already_streaming; + int disabled_because_of_denied_access; + struct rrdpush_destinations *next; +}; extern unsigned int default_rrdpush_enabled; #ifdef ENABLE_COMPRESSION @@ -149,6 +161,7 @@ extern char *default_rrdpush_send_charts_matching; extern unsigned int remote_clock_resync_iterations; extern void sender_init(struct sender_state *s, RRDHOST *parent); +extern struct rrdpush_destinations *destinations_init(const char *destinations); void sender_start(struct sender_state *s); void sender_commit(struct sender_state *s); extern int rrdpush_init(); @@ -164,6 +177,14 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host); extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv); extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); +extern int connect_to_one_of_destinations( + struct rrdpush_destinations *destinations, + int default_port, + struct timeval *timeout, + size_t *reconnects_counter, + char *connected_to, + size_t connected_to_size, + struct rrdpush_destinations **destination); #ifdef ENABLE_COMPRESSION struct compressor_state *create_compressor(); diff --git a/streaming/sender.c b/streaming/sender.c index 52c1e87083..cdcd18d977 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -203,6 +203,18 @@ void rrdpush_clean_encoded(stream_encoded_t *se) freez(se->kernel_version); } +static inline long int parse_stream_version_for_errors(char *http) +{ + if (!memcmp(http, START_STREAMING_ERROR_SAME_LOCALHOST, sizeof(START_STREAMING_ERROR_SAME_LOCALHOST))) + return -2; + else if (!memcmp(http, START_STREAMING_ERROR_ALREADY_STREAMING, sizeof(START_STREAMING_ERROR_ALREADY_STREAMING))) + return -3; + else if (!memcmp(http, START_STREAMING_ERROR_NOT_PERMITTED, sizeof(START_STREAMING_ERROR_NOT_PERMITTED))) + return -4; + else + return -1; +} + static inline long int parse_stream_version(RRDHOST *host, char *http) { long int stream_version = -1; @@ -227,6 +239,9 @@ static inline long int parse_stream_version(RRDHOST *host, char *http) host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM; host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; } + else { + stream_version = parse_stream_version_for_errors(http); + } } } return stream_version; @@ -246,13 +261,14 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po debug(D_STREAM, "STREAM: Attempting to connect..."); info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination); - host->rrdpush_sender_socket = connect_to_one_of( - host->rrdpush_send_destination + host->rrdpush_sender_socket = connect_to_one_of_destinations( + host->destinations , default_port , &tv , &s->reconnects_counter , s->connected_to , sizeof(s->connected_to)-1 + , &host->destination ); if(unlikely(host->rrdpush_sender_socket == -1)) { @@ -411,6 +427,8 @@ if(!s->rrdpush_compression) if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); + if (host->destination->next) + host->destination->disabled_no_proper_reply = 1; return 0; }else { host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; @@ -423,6 +441,8 @@ if(!s->rrdpush_compression) worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); error("Closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); + if (host->destination->next) + host->destination->disabled_no_proper_reply = 1; return 0; } } @@ -462,6 +482,27 @@ if(!s->rrdpush_compression) worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE); error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to); rrdpush_sender_thread_close_socket(host); + //catch other reject reasons and force to check other destinations + if (host->destination->next) + host->destination->disabled_no_proper_reply = 1; + return 0; + } + else if(version == -2) { + error("STREAM %s [send to %s]: remote server is the localhost for [%s].", host->hostname, s->connected_to, host->hostname); + rrdpush_sender_thread_close_socket(host); + host->destination->disabled_because_of_localhost = 1; + return 0; + } + else if(version == -3) { + error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", host->hostname, s->connected_to, host->hostname); + rrdpush_sender_thread_close_socket(host); + host->destination->disabled_already_streaming = now_realtime_sec(); + return 0; + } + else if(version == -4) { + error("STREAM %s [send to %s]: remote server denied access for [%s].", host->hostname, s->connected_to, host->hostname); + rrdpush_sender_thread_close_socket(host); + host->destination->disabled_because_of_denied_access = 1; return 0; } s->version = version; -- cgit v1.2.3