summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorEmmanuel Vasilakis <mrzammler@mm.st>2022-05-20 09:02:25 +0300
committerGitHub <noreply@github.com>2022-05-20 09:02:25 +0300
commit05ef02a817c24ff4c5eae2d813387301b093e220 (patch)
tree39a348b6aa80048d01b9c984ca1e691a9e791ee7 /streaming
parent81c15a9bbb59f007b215363c1d24ee2cb66ecea1 (diff)
Apply some logic to possible streaming destinations (#12866)
* replace connect_to_one_of with connect_to_one_of_destinations * move functions from socket.c * use sizeof * move current destination pointer to host * formatting * use snprintfz * get entries in same order * handle single destination as before (or when it is the last of the list), instead of skiping it every other loop * try other destinations on ssl problem
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c18
-rw-r--r--streaming/rrdpush.c79
-rw-r--r--streaming/rrdpush.h21
-rw-r--r--streaming/sender.c45
4 files changed, 159 insertions, 4 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index c7879ea5b9..87f96039dc 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -466,9 +466,23 @@ static int rrdpush_receive(struct receiver_state *rpt)
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
- error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ char initial_response[HTTP_HEADER_SIZE + 1];
+ snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
+#ifdef ENABLE_HTTPS
+ rpt->host->stream_ssl.conn = rpt->ssl.conn;
+ rpt->host->stream_ssl.flags = rpt->ssl.flags;
+ if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+#else
+ if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+#endif
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ close(rpt->fd);
+ return 0;
+ }
close(rpt->fd);
- return 1;
+ return 0;
}
if (rpt->host==NULL) {
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 186719e51d..00fe02093d 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -417,6 +417,85 @@ void rrdpush_claimed_id(RRDHOST *host)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
}
+int connect_to_one_of_destinations(
+ struct rrdpush_destinations *destinations,
+ int default_port,
+ struct timeval *timeout,
+ size_t *reconnects_counter,
+ char *connected_to,
+ size_t connected_to_size,
+ struct rrdpush_destinations **destination)
+{
+ int sock = -1;
+
+ for (struct rrdpush_destinations *d = destinations; d; d = d->next) {
+ if (d->disabled_no_proper_reply) {
+ d->disabled_no_proper_reply = 0;
+ continue;
+ } else if (d->disabled_because_of_localhost) {
+ continue;
+ } else if (d->disabled_already_streaming && (d->disabled_already_streaming + 30 > now_realtime_sec())) {
+ continue;
+ } else if (d->disabled_because_of_denied_access) {
+ continue;
+ }
+
+ if (reconnects_counter)
+ *reconnects_counter += 1;
+ sock = connect_to_this(d->destination, default_port, timeout);
+ if (sock != -1) {
+ if (connected_to && connected_to_size) {
+ strncpy(connected_to, d->destination, connected_to_size);
+ connected_to[connected_to_size - 1] = '\0';
+ }
+ *destination = d;
+ break;
+ }
+ }
+
+ return sock;
+}
+
+struct rrdpush_destinations *destinations_init(const char *dests) {
+ const char *s = dests;
+ struct rrdpush_destinations *destinations = NULL, *prev = NULL;
+ while(*s) {
+ const char *e = s;
+
+ // skip path, moving both s(tart) and e(nd)
+ if(*e == '/')
+ while(!isspace(*e) && *e != ',') s = ++e;
+
+ // skip separators, moving both s(tart) and e(nd)
+ while(isspace(*e) || *e == ',') s = ++e;
+
+ // move e(nd) to the first separator
+ while(*e && !isspace(*e) && *e != ',' && *e != '/') e++;
+
+ // is there anything?
+ if(!*s || s == e) break;
+
+ char buf[e - s + 1];
+ strncpyz(buf, s, e - s);
+ struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
+ strncpyz(d->destination, buf, sizeof(d->destination)-1);
+ d->disabled_no_proper_reply = 0;
+ d->disabled_because_of_localhost = 0;
+ d->disabled_already_streaming = 0;
+ d->disabled_because_of_denied_access = 0;
+ d->next = NULL;
+ if (!destinations) {
+ destinations = d;
+ } else {
+ prev->next = d;
+ }
+ prev = d;
+
+ s = e;
+ }
+ return destinations;
+}
+
// ----------------------------------------------------------------------------
// rrdpush sender thread
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 7eb2c6e580..6efe8cd6f3 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -26,6 +26,10 @@
#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
+#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
+#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
+#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
+
#define HTTP_HEADER_SIZE 8192
typedef enum {
@@ -138,6 +142,14 @@ struct receiver_state {
#endif
};
+struct rrdpush_destinations {
+ char destination[CONNECTED_TO_SIZE + 1];
+ int disabled_no_proper_reply;
+ int disabled_because_of_localhost;
+ time_t disabled_already_streaming;
+ int disabled_because_of_denied_access;
+ struct rrdpush_destinations *next;
+};
extern unsigned int default_rrdpush_enabled;
#ifdef ENABLE_COMPRESSION
@@ -149,6 +161,7 @@ extern char *default_rrdpush_send_charts_matching;
extern unsigned int remote_clock_resync_iterations;
extern void sender_init(struct sender_state *s, RRDHOST *parent);
+extern struct rrdpush_destinations *destinations_init(const char *destinations);
void sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s);
extern int rrdpush_init();
@@ -164,6 +177,14 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host);
extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
+extern int connect_to_one_of_destinations(
+ struct rrdpush_destinations *destinations,
+ int default_port,
+ struct timeval *timeout,
+ size_t *reconnects_counter,
+ char *connected_to,
+ size_t connected_to_size,
+ struct rrdpush_destinations **destination);
#ifdef ENABLE_COMPRESSION
struct compressor_state *create_compressor();
diff --git a/streaming/sender.c b/streaming/sender.c
index 52c1e87083..cdcd18d977 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -203,6 +203,18 @@ void rrdpush_clean_encoded(stream_encoded_t *se)
freez(se->kernel_version);
}
+static inline long int parse_stream_version_for_errors(char *http)
+{
+ if (!memcmp(http, START_STREAMING_ERROR_SAME_LOCALHOST, sizeof(START_STREAMING_ERROR_SAME_LOCALHOST)))
+ return -2;
+ else if (!memcmp(http, START_STREAMING_ERROR_ALREADY_STREAMING, sizeof(START_STREAMING_ERROR_ALREADY_STREAMING)))
+ return -3;
+ else if (!memcmp(http, START_STREAMING_ERROR_NOT_PERMITTED, sizeof(START_STREAMING_ERROR_NOT_PERMITTED)))
+ return -4;
+ else
+ return -1;
+}
+
static inline long int parse_stream_version(RRDHOST *host, char *http)
{
long int stream_version = -1;
@@ -227,6 +239,9 @@ static inline long int parse_stream_version(RRDHOST *host, char *http)
host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
}
+ else {
+ stream_version = parse_stream_version_for_errors(http);
+ }
}
}
return stream_version;
@@ -246,13 +261,14 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
debug(D_STREAM, "STREAM: Attempting to connect...");
info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
- host->rrdpush_sender_socket = connect_to_one_of(
- host->rrdpush_send_destination
+ host->rrdpush_sender_socket = connect_to_one_of_destinations(
+ host->destinations
, default_port
, &tv
, &s->reconnects_counter
, s->connected_to
, sizeof(s->connected_to)-1
+ , &host->destination
);
if(unlikely(host->rrdpush_sender_socket == -1)) {
@@ -411,6 +427,8 @@ if(!s->rrdpush_compression)
if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
+ if (host->destination->next)
+ host->destination->disabled_no_proper_reply = 1;
return 0;
}else {
host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
@@ -423,6 +441,8 @@ if(!s->rrdpush_compression)
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
error("Closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
+ if (host->destination->next)
+ host->destination->disabled_no_proper_reply = 1;
return 0;
}
}
@@ -462,6 +482,27 @@ if(!s->rrdpush_compression)
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE);
error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
rrdpush_sender_thread_close_socket(host);
+ //catch other reject reasons and force to check other destinations
+ if (host->destination->next)
+ host->destination->disabled_no_proper_reply = 1;
+ return 0;
+ }
+ else if(version == -2) {
+ error("STREAM %s [send to %s]: remote server is the localhost for [%s].", host->hostname, s->connected_to, host->hostname);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->disabled_because_of_localhost = 1;
+ return 0;
+ }
+ else if(version == -3) {
+ error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", host->hostname, s->connected_to, host->hostname);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->disabled_already_streaming = now_realtime_sec();
+ return 0;
+ }
+ else if(version == -4) {
+ error("STREAM %s [send to %s]: remote server denied access for [%s].", host->hostname, s->connected_to, host->hostname);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->disabled_because_of_denied_access = 1;
return 0;
}
s->version = version;