summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-26 14:00:59 +0300
committerGitHub <noreply@github.com>2023-06-26 14:00:59 +0300
commit0d61c11b5f4772a4762ede1d8204290b94bb08e7 (patch)
tree49c97d67e0d2a4846a4b379345f53ef8d93a6aec /streaming/sender.c
parentf90d56f18d29c2835bc278f6a22e840230b9ca86 (diff)
use gperf for the pluginsd/streaming parser hashtable (#15251)
* use gperf for the pluginsd parser * simplify pluginsd_parser by removing void pointers to user * pluginsd_split_words() with inlined pluginsd_space() * quoted_string_splitter() now uses a map instead of a function for determining spaces * add stress test for pluginsd parser * optimized BITMAP256 * optimized rrdpush receiver reception * optimized rrdpush sender compression * renames and cleanup * remove wrong negation * unify handshake and disconnection reasons * use parser_find_keyword * register job names only for the current repertoire
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c66
1 files changed, 31 insertions, 35 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index b565244b1e..cd1f75b12b 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -11,7 +11,7 @@
#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
-#define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
@@ -118,7 +118,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
#ifdef ENABLE_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -144,13 +144,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
char *dst;
- size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
- s->compressor->reset(s->compressor);
- dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ rrdpush_compressor_reset(&s->compressor);
+ dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
rrdhost_hostname(s->host), s->connected_to);
@@ -492,8 +492,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
}
if(version >= STREAM_HANDSHAKE_OK_V1) {
- host->destination->last_error = NULL;
- host->destination->last_handshake = version;
+ host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
s->capabilities = convert_stream_version_to_capabilities(version, host, true);
return true;
@@ -505,8 +504,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
worker_is_busy(worker_job_id);
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = error;
- host->destination->last_handshake = version;
+ host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
char buf[LOG_DATE_LENGTH];
@@ -533,8 +531,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "SSL error";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
return false;
}
@@ -546,8 +543,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "invalid SSL certificate";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
return false;
}
@@ -709,7 +705,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_connect_ssl(s))
return false;
- ssize_t bytes, len = strlen(http);
+ ssize_t bytes, len = (ssize_t)strlen(http);
bytes = send_timeout(
#ifdef ENABLE_HTTPS
@@ -725,8 +721,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
- host->destination->last_error = "timeout while sending request";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
return false;
}
@@ -745,8 +740,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
- host->destination->last_error = "timeout while expecting first response";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
return false;
}
@@ -763,12 +757,10 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
#ifdef ENABLE_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
- if(!s->compressor)
- s->compressor = create_compressor();
- else
- s->compressor->reset(s->compressor);
- }
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ rrdpush_compressor_reset(&s->compressor);
+ else
+ rrdpush_compressor_destroy(&s->compressor);
#endif //ENABLE_COMPRESSION
log_sender_capabilities(s);
@@ -821,9 +813,9 @@ static bool attempt_to_connect(struct sender_state *state)
return false;
}
-// TCP window is open and we have data to transmit.
+// TCP window is open, and we have data to transmit.
static ssize_t attempt_to_send(struct sender_state *s) {
- ssize_t ret = 0;
+ ssize_t ret;
#ifdef NETDATA_INTERNAL_CHECKS
struct circular_buffer *cb = s->buffer;
@@ -958,7 +950,7 @@ void execute_commands(struct sender_state *s) {
// internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS);
+ size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
@@ -1108,7 +1100,7 @@ static bool rrdhost_set_sender(RRDHOST *host) {
host->rrdpush_sender_connection_counter++;
host->sender->tid = gettid();
host->sender->last_state_since_t = now_realtime_sec();
- host->sender->exit.reason = NULL;
+ host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
ret = true;
}
netdata_mutex_unlock(&host->sender->mutex);
@@ -1126,6 +1118,10 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
host->sender->exit.shutdown = false;
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
host->sender->last_state_since_t = now_realtime_sec();
+ if(host->destination) {
+ host->destination->since = host->sender->last_state_since_t;
+ host->destination->reason = host->sender->exit.reason;
+ }
}
rrdpush_reset_destinations_postpone_time(host);
@@ -1137,25 +1133,25 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) {
if(unlikely(!service_running(SERVICE_STREAMING))) {
if(!s->exit.reason)
- s->exit.reason = "NETDATA EXIT";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT;
return true;
}
if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
if(!s->exit.reason)
- s->exit.reason = "NON STREAMABLE HOST";
+ s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST;
return true;
}
if(unlikely(s->exit.shutdown)) {
if(!s->exit.reason)
- s->exit.reason = "SENDER SHUTDOWN REQUESTED";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
return true;
}
if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
if(!s->exit.reason)
- s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST;
return true;
}
@@ -1171,7 +1167,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
netdata_mutex_lock(&host->sender->mutex);
info("STREAM %s [send]: sending thread exits %s",
rrdhost_hostname(host),
- host->sender->exit.reason ? host->sender->exit.reason : "");
+ host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
rrdpush_sender_thread_close_socket(host);
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
@@ -1220,7 +1216,7 @@ void *rrdpush_sender_thread(void *ptr) {
// disconnection reasons
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
@@ -1481,7 +1477,7 @@ void *rrdpush_sender_thread(void *ptr) {
error = "connection is invalid (POLLNVAL)";
if(unlikely(error)) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);