From 0d61c11b5f4772a4762ede1d8204290b94bb08e7 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Mon, 26 Jun 2023 14:00:59 +0300 Subject: use gperf for the pluginsd/streaming parser hashtable (#15251) * use gperf for the pluginsd parser * simplify pluginsd_parser by removing void pointers to user * pluginsd_split_words() with inlined pluginsd_space() * quoted_string_splitter() now uses a map instead of a function for determining spaces * add stress test for pluginsd parser * optimized BITMAP256 * optimized rrdpush receiver reception * optimized rrdpush sender compression * renames and cleanup * remove wrong negation * unify handshake and disconnection reasons * use parser_find_keyword * register job names only for the current repertoire --- streaming/receiver.c | 231 ++++++++++++++++++++------------------------------- 1 file changed, 92 insertions(+), 139 deletions(-) (limited to 'streaming/receiver.c') diff --git a/streaming/receiver.c b/streaming/receiver.c index 237345cc9c..ce48019683 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -2,17 +2,6 @@ #include "rrdpush.h" -// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly -#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) -#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) - -// this has to be the same at parser.h -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - -#if WORKER_PARSER_FIRST_JOB < 1 -#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 -#endif - extern struct config stream_config; void receiver_state_free(struct receiver_state *rpt) { @@ -40,12 +29,11 @@ void receiver_state_free(struct receiver_state *rpt) { } #ifdef ENABLE_COMPRESSION - if (rpt->decompressor) - rpt->decompressor->destroy(&rpt->decompressor); + rrdpush_decompressor_destroy(&rpt->decompressor); #endif if(rpt->system_info) - rrdhost_system_info_free(rpt->system_info); + rrdhost_system_info_free(rpt->system_info); __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); @@ -54,51 +42,18 @@ void receiver_state_free(struct receiver_state *rpt) { #include "collectors/plugins.d/pluginsd_parser.h" -PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) -{ - const char *host_uuid_str = get_word(words, num_words, 1); - const char *claim_id_str = get_word(words, num_words, 2); - - if (!host_uuid_str || !claim_id_str) { - error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", - host_uuid_str ? host_uuid_str : "[unset]", - claim_id_str ? claim_id_str : "[unset]"); - return PARSER_RC_ERROR; - } - - uuid_t uuid; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - - // We don't need the parsed UUID - // just do it to check the format - if(uuid_parse(host_uuid_str, uuid)) { - error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); - return PARSER_RC_ERROR; - } - if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { - error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); - return PARSER_RC_ERROR; - } - - if(strcmp(host_uuid_str, host->machine_guid)) { - error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); - return PARSER_RC_OK; //the message is OK problem must be somewhere else - } - - rrdhost_aclk_state_lock(host); - if (host->aclk_state.claimed_id) - freez(host->aclk_state.claimed_id); - host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - rrdhost_aclk_state_unlock(host); - - rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) - rrdpush_send_claimed_id(host); +// this has to be the same at parser.h +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - return PARSER_RC_OK; -} +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif -static int read_stream(struct receiver_state *r, char* buffer, size_t size) { +static inline int read_stream(struct receiver_state *r, char* buffer, size_t size) { if(unlikely(!size)) { internal_error(true, "%s() asked to read zero bytes", __FUNCTION__); return 0; @@ -137,7 +92,7 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) { return (int)bytes_read; } -static bool receiver_read_uncompressed(struct receiver_state *r) { +static inline bool receiver_read_uncompressed(struct receiver_state *r) { #ifdef NETDATA_INTERNAL_CHECKS if(r->read_buffer[r->read_len] != '\0') fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); @@ -157,19 +112,17 @@ static bool receiver_read_uncompressed(struct receiver_state *r) { } #ifdef ENABLE_COMPRESSION -static bool receiver_read_compressed(struct receiver_state *r) { +static inline bool receiver_read_compressed(struct receiver_state *r) { -#ifdef NETDATA_INTERNAL_CHECKS - if(r->read_buffer[r->read_len] != '\0') - fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ ); -#endif + internal_fatal(r->read_buffer[r->read_len] != '\0', + "%s: read_buffer does not start with zero #2", __FUNCTION__ ); // first use any available uncompressed data - if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) { size_t available = sizeof(r->read_buffer) - r->read_len - 1; - if (available) { - size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); - if (!len) { + if (likely(available)) { + size_t len = rrdpush_decompressor_get(&r->decompressor, r->read_buffer + r->read_len, available); + if (unlikely(!len)) { internal_error(true, "decompressor returned zero length #1"); return false; } @@ -178,7 +131,7 @@ static bool receiver_read_compressed(struct receiver_state *r) { r->read_buffer[r->read_len] = '\0'; } else - internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); + internal_fatal(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); return true; } @@ -186,8 +139,9 @@ static bool receiver_read_compressed(struct receiver_state *r) { // no decompressed data available // read the compression signature of the next block - if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) { - internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len); + if(unlikely(r->read_len + r->decompressor.signature_size > sizeof(r->read_buffer) - 1)) { + internal_error(true, "The last incomplete line does not leave enough room for the next compression header! " + "Already have %d bytes in read_buffer.", r->read_len); return false; } @@ -195,19 +149,19 @@ static bool receiver_read_compressed(struct receiver_state *r) { // we have to do a loop here, because read_stream() may return less than the data we need int bytes_read = 0; do { - int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read); + int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor.signature_size - bytes_read); if (unlikely(ret <= 0)) return false; bytes_read += ret; - } while(unlikely(bytes_read < (int)r->decompressor->signature_size)); + } while(unlikely(bytes_read < (int)r->decompressor.signature_size)); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); - if(unlikely(bytes_read != (int)r->decompressor->signature_size)) - fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size); + if(unlikely(bytes_read != (int)r->decompressor.signature_size)) + fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor.signature_size); - size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read); + size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->read_buffer + r->read_len, bytes_read); if (unlikely(!compressed_message_size)) { internal_error(true, "multiplexed uncompressed data in compressed stream!"); r->read_len += bytes_read; @@ -244,8 +198,8 @@ static bool receiver_read_compressed(struct receiver_state *r) { worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read); // decompress the compressed block - size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read); - if (!bytes_to_parse) { + size_t bytes_to_parse = rrdpush_decompress(&r->decompressor, compressed, compressed_bytes_read); + if (unlikely(!bytes_to_parse)) { internal_error(true, "no bytes to parse."); return false; } @@ -253,8 +207,8 @@ static bool receiver_read_compressed(struct receiver_state *r) { worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse); // fill read buffer with decompressed data - size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); - if (!len) { + size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + if (unlikely(!len)) { internal_error(true, "decompressor returned zero length #2"); return false; } @@ -264,7 +218,7 @@ static bool receiver_read_compressed(struct receiver_state *r) { return true; } #else // !ENABLE_COMPRESSION -static bool receiver_read_compressed(struct receiver_state *r) { +static inline bool receiver_read_compressed(struct receiver_state *r) { return receiver_read_uncompressed(r); } #endif // ENABLE_COMPRESSION @@ -272,7 +226,7 @@ static bool receiver_read_compressed(struct receiver_state *r) { /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { +static inline char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { size_t start = *pos; char *ss = &r->read_buffer[start]; @@ -323,20 +277,50 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b bool plugin_is_enabled(struct plugind *cd); +static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) { + if(force || !rpt->exit.reason) + rpt->exit.reason = reason; +} + +static inline bool receiver_should_continue(struct receiver_state *rpt) { + static __thread size_t counter = 0; + + if(unlikely(rpt->exit.shutdown)) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); + return false; + } + + // check every 1000 lines read + if((counter++ % 1000) != 0) return true; + + if(unlikely(!service_running(SERVICE_STREAMING))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false); + return false; + } + + netdata_thread_testcancel(); + + rpt->last_msg_t = now_monotonic_sec(); + + return true; +} + static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { size_t result; - PARSER_USER_OBJECT user = { - .enabled = plugin_is_enabled(cd), - .host = rpt->host, - .opaque = rpt, - .cd = cd, - .trust_durations = 1, - .capabilities = rpt->capabilities, - }; - - PARSER *parser = parser_init(&user, NULL, NULL, fd, - PARSER_INPUT_SPLIT, ssl); + PARSER *parser = NULL; + { + PARSER_USER_OBJECT user = { + .enabled = plugin_is_enabled(cd), + .host = rpt->host, + .opaque = rpt, + .cd = cd, + .trust_durations = 1, + .capabilities = rpt->capabilities, + }; + + parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + } pluginsd_keywords_init(parser, PARSER_INIT_STREAMING); @@ -346,20 +330,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - - user.parser = parser; - bool compressed_connection = false; + #ifdef ENABLE_COMPRESSION if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { compressed_connection = true; - - if (!rpt->decompressor) - rpt->decompressor = create_decompressor(); - else - rpt->decompressor->reset(rpt->decompressor); + rrdpush_decompressor_reset(&rpt->decompressor); } + else + rrdpush_decompressor_destroy(&rpt->decompressor); #endif rpt->read_buffer[0] = '\0'; @@ -367,51 +346,27 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(service_running(SERVICE_STREAMING)) { - netdata_thread_testcancel(); + while(receiver_should_continue(rpt)) { if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { - bool have_new_data; - if(likely(compressed_connection)) - have_new_data = receiver_read_compressed(rpt); - else - have_new_data = receiver_read_uncompressed(rpt); + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); if(unlikely(!have_new_data)) { - if(!rpt->exit.reason) - rpt->exit.reason = "SOCKET READ ERROR"; - + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false); break; } - rpt->last_msg_t = now_realtime_sec(); continue; } - if(unlikely(!service_running(SERVICE_STREAMING))) { - if(!rpt->exit.reason) - rpt->exit.reason = "NETDATA EXIT"; - goto done; - } - if(unlikely(rpt->exit.shutdown)) { - if(!rpt->exit.reason) - rpt->exit.reason = "SHUTDOWN REQUESTED"; - - goto done; - } - if (unlikely(parser_action(parser, buffer))) { internal_error(true, "parser_action() failed on keyword '%s'.", buffer); - - if(!rpt->exit.reason) - rpt->exit.reason = "PARSER FAILED"; - + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); break; } } -done: - result = user.data_collections_count; + result = parser ? parser->user.data_collections_count : 0; // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -501,7 +456,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) host->health.health_enabled = 0; - rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false); + rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false); signal_rrdcontext = true; rrdpush_receiver_replication_reset(host); @@ -520,7 +475,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { } } -bool stop_streaming_receiver(RRDHOST *host, const char *reason) { +bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { bool ret = false; netdata_mutex_lock(&host->receiver_lock); @@ -528,7 +483,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { if(host->receiver) { if(!host->receiver->exit.shutdown) { host->receiver->exit.shutdown = true; - host->receiver->exit.reason = reason; + receiver_set_exit_reason(host->receiver, reason, true); shutdown(host->receiver->fd, SHUT_RDWR); } @@ -586,9 +541,9 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con , rpt->client_ip, rpt->client_port , msg , status - , rpt->exit.reason?" (":"" - , rpt->exit.reason?rpt->exit.reason:"" - , rpt->exit.reason?")":"" + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" + , stream_handshake_error_to_string(rpt->exit.reason) + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" ); } @@ -661,7 +616,6 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.rrdpush_compression = default_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); - rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled); #endif //ENABLE_COMPRESSION (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); @@ -758,7 +712,7 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef ENABLE_COMPRESSION if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - if (!rpt->rrdpush_compression) + if (!rpt->config.rrdpush_compression) rpt->capabilities &= ~STREAM_CAP_COMPRESSION; } #endif @@ -839,8 +793,7 @@ static void rrdpush_receive(struct receiver_state *rpt) #endif ); - if(!rpt->exit.reason) - rpt->exit.reason = "PARSER EXIT"; + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false); { char msg[100 + 1]; -- cgit v1.2.3