summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-09-05 19:31:06 +0300
committerGitHub <noreply@github.com>2022-09-05 19:31:06 +0300
commit5e1b95cf92168c4df74586fb4430dc284806da82 (patch)
treef42077d8b02eaf316683453a7474bd1f599a833d /streaming/receiver.c
parent544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff)
Deduplicate all netdata strings (#13570)
* rrdfamily * rrddim * rrdset plugin and module names * rrdset units * rrdset type * rrdset family * rrdset title * rrdset title more * rrdset context * rrdcalctemplate context and removal of context hash from rrdset * strings statistics * rrdset name * rearranged members of rrdset * eliminate rrdset name hash; rrdcalc chart converted to STRING * rrdset id, eliminated rrdset hash * rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate * rrdcalctemplate * rrdvar * eval_variable * rrddimvar and rrdsetvar * rrdhost hostname, os and tags * fix master commits * added thread cache; implemented string_dup without locks * faster thread cache * rrdset and rrddim now use dictionaries for indexing * rrdhost now uses dictionary * rrdfamily now uses DICTIONARY * rrdvar using dictionary instead of AVL * allocate the right size to rrdvar flag members * rrdhost remaining char * members to STRING * * better error handling on indexing * strings now use a read/write lock to allow parallel searches to the index * removed AVL support from dictionaries; implemented STRING with native Judy calls * string releases should be negative * only 31 bits are allowed for enum flags * proper locking on strings * string threading unittest and fixes * fix lgtm finding * fixed naming * stream chart/dimension definitions at the beginning of a streaming session * thread stack variable is undefined on thread cancel * rrdcontext garbage collect per host on startup * worker control in garbage collection * relaxed deletion of rrdmetrics * type checking on dictfe * netdata chart to monitor rrdcontext triggers * Group chart label updates * rrdcontext better handling of collected rrdsets * rrdpush incremental transmition of definitions should use as much buffer as possible * require 1MB per chart * empty the sender buffer before enabling metrics streaming * fill up to 50% of buffer * reset signaling metrics sending * use the shared variable for status * use separate host flag for enabling streaming of metrics * make sure the flag is clear * add logging for streaming * add logging for streaming on buffer overflow * circular_buffer proper sizing * removed obsolete logs * do not execute worker jobs if not necessary * better messages about compression disabling * proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped * monitor stream sender used buffer ratio * Update exporting unit tests * no need to compare label value with strcmp * streaming send workers now monitor bandwidth * workers now use strings * streaming receiver monitors incoming bandwidth * parser shift of worker ids * minor fixes * Group chart label updates * Populate context with dimensions that have data * Fix chart id * better shift of parser worker ids * fix for streaming compression * properly count received bytes * ensure LZ4 compression ring buffer does not wrap prematurely * do not stream empty charts; do not process empty instances in rrdcontext * need_to_send_chart_definition() does not need an rrdset lock any more * rrdcontext objects are collected, after data have been written to the db * better logging of RRDCONTEXT transitions * always set all variables needed by the worker utilization charts * implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes * lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock * STRING code re-organization for clarity * thread_cache improvements; double numbers precision on worker threads * STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS * rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts * Add index to speed up database context searches * Removed last_updated optimization (was also buggy after latest merge with master) Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c72
1 files changed, 44 insertions, 28 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 0890ebbcd0..a2852981a2 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,6 +1,13 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "parser/parser.h"
+
+#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
+
+#if WORKER_PARSER_FIRST_JOB < 1
+#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
+#endif
extern struct config stream_config;
@@ -66,7 +73,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
if (cd->version < VERSION_GAP_FILLING ) {
- error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd,
+ error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd,
cd->version);
return PARSER_RC_OK; // Ignore error and continue stream
}
@@ -78,7 +85,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
info(
"STREAM %s from %s: Initial connection (no gap to check), "
"remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
- host->hostname,
+ rrdhost_hostname(host),
cd->cmd,
(int64_t)remote_time,
(int64_t)now,
@@ -88,7 +95,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
info(
"STREAM %s from %s: Checking for gaps... "
"remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
- host->hostname,
+ rrdhost_hostname(host),
cd->cmd,
(int64_t)remote_time,
(int64_t)prev,
@@ -177,6 +184,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired);
if (ret > 0 ) {
r->read_len += ret;
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
return 0;
}
// Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
@@ -192,6 +200,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
return 1;
r->read_len = strlen(r->read_buffer);
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, r->read_len);
return 0;
}
#else
@@ -279,22 +288,24 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
// check any decompressed data present
- if (r->decompressor &&
- r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ if (r->decompressor && r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
size_t available = sizeof(r->read_buffer) - r->read_len;
if (available) {
- size_t len = r->decompressor->get(r->decompressor,
- r->read_buffer + r->read_len, available);
+ size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
if (!len)
return 1;
+
r->read_len += len;
}
return 0;
}
+
int ret = 0;
if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
return 1;
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
+
if (!is_compressed_data(r->read_buffer, ret)) {
r->read_len += ret;
return 0;
@@ -303,8 +314,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
if (unlikely(!r->decompressor))
r->decompressor = create_decompressor();
- size_t bytes_to_read = r->decompressor->start(r->decompressor,
- r->read_buffer, ret);
+ size_t bytes_to_read = r->decompressor->start(r->decompressor, r->read_buffer, ret);
// Read the entire block of compressed data because
// we're unable to decompress incomplete block
@@ -312,18 +322,23 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
do {
if (read_stream(r, fp, compressed, bytes_to_read, &ret))
return 1;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
+
// Send input data to decompressor
if (ret)
r->decompressor->put(r->decompressor, compressed, ret);
+
bytes_to_read -= ret;
} while (bytes_to_read > 0);
+
// Decompress
size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
if (!bytes_to_parse)
return 1;
+
// Fill read buffer with decompressed data
- r->read_len = r->decompressor->get(r->decompressor,
- r->read_buffer, sizeof(r->read_buffer));
+ r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
return 0;
}
@@ -486,8 +501,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
close(rpt->fd);
return 0;
}
@@ -575,14 +590,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->hostname
, rpt->client_ip
, rpt->client_port
- , rpt->host->hostname
+ , rrdhost_hostname(rpt->host)
, rpt->host->machine_guid
, rpt->host->rrd_update_every
, rpt->host->rrd_history_entries
, rrd_memory_mode_name(rpt->host->rrd_memory_mode)
, (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
, ssl ? " SSL," : ""
- , rpt->host->tags?rpt->host->tags:""
+ , rrdhost_tags(rpt->host)
);
#endif // NETDATA_INTERNAL_CHECKS
@@ -605,7 +620,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
char initial_response[HTTP_HEADER_SIZE];
if (rpt->stream_version > 1) {
if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
@@ -618,13 +633,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
#endif
}
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
} else if (rpt->stream_version == 1) {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
} else {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
sprintf(initial_response, "%s", START_STREAMING_PROMPT);
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
@@ -635,27 +650,27 @@ static int rrdpush_receive(struct receiver_state *rpt)
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
close(rpt->fd);
return 0;
}
// remove the non-blocking flag from the socket
if(sock_delnonblock(rpt->fd) < 0)
- error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
struct timeval timeout;
timeout.tv_sec = 120;
timeout.tv_usec = 0;
if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
- error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
// convert the socket to a FILE *
FILE *fp = fdopen(rpt->fd, "r");
if(!fp) {
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR");
- error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
close(rpt->fd);
return 0;
}
@@ -686,7 +701,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
info(
"Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.",
(int64_t)alarms_delay,
- rpt->host->hostname);
+ rrdhost_hostname(rpt->host));
}
}
rpt->host->senders_connect_time = now_realtime_sec();
@@ -695,8 +710,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_unlock(rpt->host);
// call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
cd.version = rpt->stream_version;
@@ -758,6 +773,7 @@ void *rrdpush_receiver_thread(void *ptr) {
info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
worker_register("STREAMRCV");
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENTAL);
rrdpush_receive(rpt);
worker_unregister();