summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-19 23:19:36 +0300
committerGitHub <noreply@github.com>2023-06-19 23:19:36 +0300
commit43c749b07d07e79dae8111dcdb7bc1a46c3dda1b (patch)
tree4c3a270652787c91ef15c7ef8e29915769fc1fd4 /streaming
parent0b4f820e9d42d10f64c3305d9c084261bc9880cf (diff)
Obvious memory reductions (#15204)
* remove rd->update_every * reduce amount of memory for RRDDIM * reorgnize rrddim->db entries * optimize rrdset and statsd * optimize dictionaries * RW_SPINLOCK for dictionaries * fix codeql warning * rw_spinlock improvements * remove obsolete assertion * fix crash on health_alarm_log_process() * use RW_SPINLOCK for AVL trees * add RW_SPINLOCK read/write trylock * pgc and mrg now use rw_spinlocks; cache line optimizations for mrg * thread tag of dbegnine init * append created datafile, lockless * make DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE friendly for lockless use * thread cancelability in spinlocks; optimize thread cancelability management * introduce a JudyL to index datafiles and use it during queries to quickly find the relevant files * use the last timestamp of each journal file for indexing * when the previous cannot be found, start from the beginning * add more stats to PDC to trace routing easier * rename spinlock functions * fix for spinlock renames * revert statsd socket statistics to size_t * turn fatal into internal_fatal() * show candidates always * show connected status and connection attempts
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c2
-rw-r--r--streaming/replication.c24
-rw-r--r--streaming/rrdpush.c32
-rw-r--r--streaming/rrdpush.h10
-rw-r--r--streaming/sender.c8
5 files changed, 40 insertions, 36 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 006cbb67d0..237345cc9c 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -720,7 +720,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef NETDATA_INTERNAL_CHECKS
info("STREAM '%s' [receive from [%s]:%s]: "
"client willing to stream metrics for host '%s' with machine_guid '%s': "
- "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'"
, rpt->hostname
, rpt->client_ip
, rpt->client_port
diff --git a/streaming/replication.c b/streaming/replication.c
index 753c72d8bd..d7909e7eb0 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -40,9 +40,9 @@ static struct replication_query_statistics replication_queries = {
};
struct replication_query_statistics replication_get_query_statistics(void) {
- netdata_spinlock_lock(&replication_queries.spinlock);
+ spinlock_lock(&replication_queries.spinlock);
struct replication_query_statistics ret = replication_queries;
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ spinlock_unlock(&replication_queries.spinlock);
return ret;
}
@@ -144,7 +144,7 @@ static struct replication_query *replication_query_prepare(
}
if(q->query.enable_streaming) {
- netdata_spinlock_lock(&st->data_collection_lock);
+ spinlock_lock(&st->data_collection_lock);
q->query.locked_data_collection = true;
if (st->last_updated.tv_sec > q->query.before) {
@@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare(
size_t count = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd)))
continue;
if (unlikely(rd_dfe.counter >= q->dimensions)) {
@@ -198,7 +198,7 @@ static struct replication_query *replication_query_prepare(
q->query.execute = false;
if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&st->data_collection_lock);
+ spinlock_unlock(&st->data_collection_lock);
q->query.locked_data_collection = false;
}
@@ -216,7 +216,7 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STRE
NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
rrddim_foreach_read(rd, st){
- if (!rd->exposed) continue;
+ if (!rrddim_check_exposed(rd)) continue;
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
@@ -248,7 +248,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&q->st->data_collection_lock);
+ spinlock_unlock(&q->st->data_collection_lock);
q->query.locked_data_collection = false;
}
@@ -269,7 +269,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
}
if(executed) {
- netdata_spinlock_lock(&replication_queries.spinlock);
+ spinlock_lock(&replication_queries.spinlock);
replication_queries.queries_started += queries;
replication_queries.queries_finished += queries;
replication_queries.points_read += q->points_read;
@@ -280,7 +280,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
s->replication.latest_completed_before_t = q->query.before;
}
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ spinlock_unlock(&replication_queries.spinlock);
}
__atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
@@ -678,7 +678,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
}
if(locked_data_collection)
- netdata_spinlock_unlock(&st->data_collection_lock);
+ spinlock_unlock(&st->data_collection_lock);
return enable_streaming;
}
@@ -1056,11 +1056,11 @@ static inline bool replication_recursive_lock_mode(char mode) {
if(mode == 'L') { // (L)ock
if(++recursions == 1)
- netdata_spinlock_lock(&replication_globals.spinlock);
+ spinlock_lock(&replication_globals.spinlock);
}
else if(mode == 'U') { // (U)nlock
if(--recursions == 0)
- netdata_spinlock_unlock(&replication_globals.spinlock);
+ spinlock_unlock(&replication_globals.spinlock);
}
else if(mode == 'C') { // (C)heck
if(recursions > 0)
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index d12d1761d3..f231fcab3a 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -264,7 +264,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart
buffer_sprintf(
wb
- , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, rrdset_id(st)
, name
, rrdset_title(st)
@@ -291,7 +291,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
rrddim_foreach_read(rd, st) {
buffer_sprintf(
wb
- , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
+ , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
, rrddim_id(rd)
, rrddim_name(rd)
, rrd_algorithm_name(rd->algorithm)
@@ -301,7 +301,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
, rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
, rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rd->exposed = 1;
+ rrddim_set_exposed(rd);
}
rrddim_foreach_done(rd);
@@ -355,10 +355,10 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(unlikely(!rd->updated))
+ if(unlikely(!rrddim_check_updated(rd)))
continue;
- if(likely(rd->exposed)) {
+ if(likely(rrddim_check_exposed(rd))) {
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
@@ -598,6 +598,7 @@ int connect_to_one_of_destinations(
*reconnects_counter += 1;
d->last_attempt = now;
+ d->attempts++;
sock = connect_to_this(string2str(d->destination), default_port, timeout);
if (sock != -1) {
@@ -1091,13 +1092,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
static time_t last_stream_accepted_t = 0;
time_t now = now_realtime_sec();
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
if(unlikely(last_stream_accepted_t == 0))
last_stream_accepted_t = now;
if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
char msg[100 + 1];
snprintfz(msg, 100,
@@ -1114,7 +1115,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
last_stream_accepted_t = now;
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
}
/*
@@ -1220,11 +1221,10 @@ static struct {
STREAM_HANDSHAKE err;
const char *str;
} handshake_errors[] = {
- { STREAM_HANDSHAKE_OK_V5, "OK_V5" },
- { STREAM_HANDSHAKE_OK_V4, "OK_V4" },
- { STREAM_HANDSHAKE_OK_V3, "OK_V3" },
- { STREAM_HANDSHAKE_OK_V2, "OK_V2" },
- { STREAM_HANDSHAKE_OK_V1, "OK_V1" },
+ { STREAM_HANDSHAKE_OK_V3, "CONNECTED" },
+ { STREAM_HANDSHAKE_OK_V2, "CONNECTED" },
+ { STREAM_HANDSHAKE_OK_V1, "CONNECTED" },
+ { STREAM_HANDSHAKE_NEVER, "NOT_TRIED_YET" },
{ STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" },
{ STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" },
{ STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" },
@@ -1241,12 +1241,16 @@ static struct {
};
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) {
+ if(handshake_error >= STREAM_HANDSHAKE_OK_V1)
+ // handshake_error is the whole version / capabilities number
+ return "CONNECTED";
+
for(size_t i = 0; handshake_errors[i].str ; i++) {
if(handshake_error == handshake_errors[i].err)
return handshake_errors[i].str;
}
- return "";
+ return "UNKNOWN";
}
static struct {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index d9b2f062b8..8b535a4521 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -78,11 +78,10 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
typedef enum {
- STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
- STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
- STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
- STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
- STREAM_HANDSHAKE_OK_V1 = 1,
+ STREAM_HANDSHAKE_OK_V3 = 3, // v3+
+ STREAM_HANDSHAKE_OK_V2 = 2, // v2
+ STREAM_HANDSHAKE_OK_V1 = 1, // v1
+ STREAM_HANDSHAKE_NEVER = 0, // never tried to connect
STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
@@ -334,6 +333,7 @@ struct receiver_state {
struct rrdpush_destinations {
STRING *destination;
bool ssl;
+ uint32_t attempts;
const char *last_error;
time_t last_attempt;
diff --git a/streaming/sender.c b/streaming/sender.c
index 3743889edb..e75ee11896 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -258,7 +258,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rd->exposed = 0;
+ rrddim_clear_exposed(rd);
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
@@ -1188,10 +1188,10 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
#ifdef ENABLE_HTTPS
static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&sp);
+ spinlock_lock(&sp);
if(netdata_ssl_streaming_sender_ctx || !host) {
- netdata_spinlock_unlock(&sp);
+ spinlock_unlock(&sp);
return;
}
@@ -1207,7 +1207,7 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
}
}
- netdata_spinlock_unlock(&sp);
+ spinlock_unlock(&sp);
#endif
}