diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-09-05 19:31:06 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-05 19:31:06 +0300 |
commit | 5e1b95cf92168c4df74586fb4430dc284806da82 (patch) | |
tree | f42077d8b02eaf316683453a7474bd1f599a833d /streaming/receiver.c | |
parent | 544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff) |
Deduplicate all netdata strings (#13570)
* rrdfamily
* rrddim
* rrdset plugin and module names
* rrdset units
* rrdset type
* rrdset family
* rrdset title
* rrdset title more
* rrdset context
* rrdcalctemplate context and removal of context hash from rrdset
* strings statistics
* rrdset name
* rearranged members of rrdset
* eliminate rrdset name hash; rrdcalc chart converted to STRING
* rrdset id, eliminated rrdset hash
* rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate
* rrdcalctemplate
* rrdvar
* eval_variable
* rrddimvar and rrdsetvar
* rrdhost hostname, os and tags
* fix master commits
* added thread cache; implemented string_dup without locks
* faster thread cache
* rrdset and rrddim now use dictionaries for indexing
* rrdhost now uses dictionary
* rrdfamily now uses DICTIONARY
* rrdvar using dictionary instead of AVL
* allocate the right size to rrdvar flag members
* rrdhost remaining char * members to STRING *
* better error handling on indexing
* strings now use a read/write lock to allow parallel searches to the index
* removed AVL support from dictionaries; implemented STRING with native Judy calls
* string releases should be negative
* only 31 bits are allowed for enum flags
* proper locking on strings
* string threading unittest and fixes
* fix lgtm finding
* fixed naming
* stream chart/dimension definitions at the beginning of a streaming session
* thread stack variable is undefined on thread cancel
* rrdcontext garbage collect per host on startup
* worker control in garbage collection
* relaxed deletion of rrdmetrics
* type checking on dictfe
* netdata chart to monitor rrdcontext triggers
* Group chart label updates
* rrdcontext better handling of collected rrdsets
* rrdpush incremental transmition of definitions should use as much buffer as possible
* require 1MB per chart
* empty the sender buffer before enabling metrics streaming
* fill up to 50% of buffer
* reset signaling metrics sending
* use the shared variable for status
* use separate host flag for enabling streaming of metrics
* make sure the flag is clear
* add logging for streaming
* add logging for streaming on buffer overflow
* circular_buffer proper sizing
* removed obsolete logs
* do not execute worker jobs if not necessary
* better messages about compression disabling
* proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped
* monitor stream sender used buffer ratio
* Update exporting unit tests
* no need to compare label value with strcmp
* streaming send workers now monitor bandwidth
* workers now use strings
* streaming receiver monitors incoming bandwidth
* parser shift of worker ids
* minor fixes
* Group chart label updates
* Populate context with dimensions that have data
* Fix chart id
* better shift of parser worker ids
* fix for streaming compression
* properly count received bytes
* ensure LZ4 compression ring buffer does not wrap prematurely
* do not stream empty charts; do not process empty instances in rrdcontext
* need_to_send_chart_definition() does not need an rrdset lock any more
* rrdcontext objects are collected, after data have been written to the db
* better logging of RRDCONTEXT transitions
* always set all variables needed by the worker utilization charts
* implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes
* lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock
* STRING code re-organization for clarity
* thread_cache improvements; double numbers precision on worker threads
* STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS
* rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts
* Add index to speed up database context searches
* Removed last_updated optimization (was also buggy after latest merge with master)
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 72 |
1 files changed, 44 insertions, 28 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 0890ebbcd0..a2852981a2 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,6 +1,13 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "parser/parser.h" + +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) + +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif extern struct config stream_config; @@ -66,7 +73,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; if (cd->version < VERSION_GAP_FILLING ) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd, + error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->version); return PARSER_RC_OK; // Ignore error and continue stream } @@ -78,7 +85,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins info( "STREAM %s from %s: Initial connection (no gap to check), " "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"", - host->hostname, + rrdhost_hostname(host), cd->cmd, (int64_t)remote_time, (int64_t)now, @@ -88,7 +95,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins info( "STREAM %s from %s: Checking for gaps... " "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap", - host->hostname, + rrdhost_hostname(host), cd->cmd, (int64_t)remote_time, (int64_t)prev, @@ -177,6 +184,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired); if (ret > 0 ) { r->read_len += ret; + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); return 0; } // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket @@ -192,6 +200,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) return 1; r->read_len = strlen(r->read_buffer); + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, r->read_len); return 0; } #else @@ -279,22 +288,24 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t */ static int receiver_read(struct receiver_state *r, FILE *fp) { // check any decompressed data present - if (r->decompressor && - r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + if (r->decompressor && r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { size_t available = sizeof(r->read_buffer) - r->read_len; if (available) { - size_t len = r->decompressor->get(r->decompressor, - r->read_buffer + r->read_len, available); + size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); if (!len) return 1; + r->read_len += len; } return 0; } + int ret = 0; if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) return 1; + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); + if (!is_compressed_data(r->read_buffer, ret)) { r->read_len += ret; return 0; @@ -303,8 +314,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { if (unlikely(!r->decompressor)) r->decompressor = create_decompressor(); - size_t bytes_to_read = r->decompressor->start(r->decompressor, - r->read_buffer, ret); + size_t bytes_to_read = r->decompressor->start(r->decompressor, r->read_buffer, ret); // Read the entire block of compressed data because // we're unable to decompress incomplete block @@ -312,18 +322,23 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { do { if (read_stream(r, fp, compressed, bytes_to_read, &ret)) return 1; + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); + // Send input data to decompressor if (ret) r->decompressor->put(r->decompressor, compressed, ret); + bytes_to_read -= ret; } while (bytes_to_read > 0); + // Decompress size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); if (!bytes_to_parse) return 1; + // Fill read buffer with decompressed data - r->read_len = r->decompressor->get(r->decompressor, - r->read_buffer, sizeof(r->read_buffer)); + r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); return 0; } @@ -486,8 +501,8 @@ static int rrdpush_receive(struct receiver_state *rpt) #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } @@ -575,14 +590,14 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->hostname , rpt->client_ip , rpt->client_port - , rpt->host->hostname + , rrdhost_hostname(rpt->host) , rpt->host->machine_guid , rpt->host->rrd_update_every , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") , ssl ? " SSL," : "" - , rpt->host->tags?rpt->host->tags:"" + , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS @@ -605,7 +620,7 @@ static int rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port); + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); char initial_response[HTTP_HEADER_SIZE]; if (rpt->stream_version > 1) { if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ @@ -618,13 +633,13 @@ static int rrdpush_receive(struct receiver_state *rpt) } #endif } - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); + info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); } else if (rpt->stream_version == 1) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); + info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); sprintf(initial_response, "%s", START_STREAMING_PROMPT); } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); @@ -635,27 +650,27 @@ static int rrdpush_receive(struct receiver_state *rpt) #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) - error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); struct timeval timeout; timeout.tv_sec = 120; timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); // convert the socket to a FILE * FILE *fp = fdopen(rpt->fd, "r"); if(!fp) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR"); - error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); + error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); close(rpt->fd); return 0; } @@ -686,7 +701,7 @@ static int rrdpush_receive(struct receiver_state *rpt) info( "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.", (int64_t)alarms_delay, - rpt->host->hostname); + rrdhost_hostname(rpt->host)); } } rpt->host->senders_connect_time = now_realtime_sec(); @@ -695,8 +710,8 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_unlock(rpt->host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); cd.version = rpt->stream_version; @@ -758,6 +773,7 @@ void *rrdpush_receiver_thread(void *ptr) { info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); worker_register("STREAMRCV"); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENTAL); rrdpush_receive(rpt); worker_unregister(); |