summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-20 00:50:42 +0200
committerGitHub <noreply@github.com>2023-01-20 00:50:42 +0200
commit9232bfb6a072155388578dc4e1338c6002afb515 (patch)
treedc85f9bfe3ed97394e6f2a92a2f710796d6d8979 /streaming
parent86538b005de50f23c9ff66542abab11683b85c06 (diff)
track memory footprint of Netdata (#14294)
* track memory footprint of Netdata * track db modes alloc/ram/save/map * track system info; track sender and receiver * fixes * more fixes * track workers memory, onewayalloc memory; unify judyhs size estimation * track replication structures and buffers * Properly clear host RRDHOST_FLAG_METADATA_UPDATE flag * flush the replication buffer every 1000 times the circular buffer is found empty * dont take timestamp too frequently in sender loop * sender buffers are not used by the same thread as the sender, so they were never recreated - fixed it * free sender thread buffer on replication threads when replication is idle * use the last sender flag as a timestamp of the last buffer recreation * free cbuffer before reconnecting * recreate cbuffer on every flush * timings for journal v2 loading * inlining of metric and cache functions * aral likely/unlikely * free left-over thread buffers * fix NULL pointer dereference in replication * free sender thread buffer on sender thread too * mark ctx as used before flushing * better logging on ctx datafiles closing Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c2
-rw-r--r--streaming/replication.c64
-rw-r--r--streaming/replication.h3
-rw-r--r--streaming/rrdpush.c15
-rw-r--r--streaming/rrdpush.h13
-rw-r--r--streaming/sender.c104
6 files changed, 146 insertions, 55 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 6d2810cd60..562107da8e 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -44,6 +44,8 @@ void receiver_state_free(struct receiver_state *rpt) {
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
+
freez(rpt);
}
diff --git a/streaming/replication.c b/streaming/replication.c
index c639016349..8d136a04a4 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -46,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) {
return ret;
}
+size_t replication_buffers_allocated = 0;
+
+size_t replication_allocated_buffers(void) {
+ return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
+}
+
// ----------------------------------------------------------------------------
// sending replication replies
@@ -109,6 +115,8 @@ static struct replication_query *replication_query_prepare(
) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
+
q->dimensions = dimensions;
q->st = st;
@@ -170,7 +178,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- q->ops->init(rd->tiers[0]->db_metric_handle, &d->handle, q->query.after, q->query.before,
+ q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -231,6 +239,7 @@ static void replication_query_finalize(struct replication_query *q, bool execute
netdata_spinlock_unlock(&replication_queries.spinlock);
}
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
freez(q);
}
@@ -817,6 +826,7 @@ static struct replication_thread {
struct {
size_t executed; // the number of replication requests executed
size_t latest_first_time; // the 'after' timestamp of the last request we executed
+ size_t memory; // the total memory allocated by replication
} atomic; // access should be with atomic operations
struct {
@@ -849,6 +859,7 @@ static struct replication_thread {
.atomic = {
.executed = 0,
.latest_first_time = 0,
+ .memory = 0,
},
.main_thread = {
.last_executed = 0,
@@ -857,6 +868,10 @@ static struct replication_thread {
},
};
+size_t replication_allocated_memory(void) {
+ return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
+}
+
#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
@@ -909,6 +924,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
fatal_when_replication_is_not_locked_for_me();
struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+ __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
@@ -926,6 +942,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
freez(rse);
+ __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
}
static void replication_sort_entry_add(struct replication_request *rq) {
@@ -959,12 +976,19 @@ static void replication_sort_entry_add(struct replication_request *rq) {
Pvoid_t *inner_judy_ptr;
// find the outer judy entry, using after as key
- inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
- if(!inner_judy_ptr)
- inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
+ fatal("REPLICATION: corrupted outer judyL");
// add it to the inner judy, using unique_id as key
+ size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
+ size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
+ if(unlikely(!item || item == PJERR))
+ fatal("REPLICATION: corrupted inner judyL");
+
*item = rse;
rq->indexed_in_judy = true;
rq->not_indexed_buffer_full = false;
@@ -974,6 +998,8 @@ static void replication_sort_entry_add(struct replication_request *rq) {
replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
+
+ __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
}
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
@@ -989,18 +1015,28 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
rse->rq->indexed_in_judy = false;
rse->rq->not_indexed_preprocessing = preprocessing;
+ size_t memory_saved = 0;
+
// delete it from the inner judy
+ size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
+ size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
+ memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
// if no items left, delete it from the outer judy
if(**inner_judy_ppptr == NULL) {
+ size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
+ size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
inner_judy_deleted = true;
}
// free memory
replication_sort_entry_destroy(rse);
+ __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
+
return inner_judy_deleted;
}
@@ -1426,6 +1462,7 @@ static int replication_execute_next_pending_request(void) {
max_requests_ahead = 2;
rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
+ __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
}
// fill the queue
@@ -1516,6 +1553,7 @@ static void *replication_worker_thread(void *ptr) {
while(service_running(SERVICE_REPLICATION)) {
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
sleep_usec(1 * USEC_PER_SEC);
@@ -1534,9 +1572,11 @@ static void replication_main_cleanup(void *ptr) {
for(int i = 0; i < threads ;i++) {
netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
freez(replication_globals.main_thread.threads_ptrs[i]);
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
}
freez(replication_globals.main_thread.threads_ptrs);
replication_globals.main_thread.threads_ptrs = NULL;
+ __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
// custom code
worker_unregister();
@@ -1553,14 +1593,16 @@ void *replication_thread_main(void *ptr __maybe_unused) {
threads = 1;
}
- if(threads > 1) {
+ if(--threads) {
replication_globals.main_thread.threads = threads;
replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
+ __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
- for(int i = 1; i < threads ;i++) {
+ for(int i = 0; i < threads ;i++) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 1);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
}
@@ -1649,14 +1691,16 @@ void *replication_thread_main(void *ptr __maybe_unused) {
// the timeout also defines now frequently we will traverse all the pending requests
// when the outbound buffers of all senders is full
usec_t timeout;
- if(slow)
+ if(slow) {
// no work to be done, wait for a request to come in
timeout = 1000 * USEC_PER_MS;
+ sender_thread_buffer_free();
+ }
else if(replication_globals.unsafe.pending > 0) {
- if(replication_globals.unsafe.sender_resets == last_sender_resets) {
+ if(replication_globals.unsafe.sender_resets == last_sender_resets)
timeout = 1000 * USEC_PER_MS;
- }
+
else {
// there are pending requests waiting to be executed,
// but none could be executed at this time.
diff --git a/streaming/replication.h b/streaming/replication.h
index 00462cc3a1..e6bedc83bb 100644
--- a/streaming/replication.h
+++ b/streaming/replication.h
@@ -30,4 +30,7 @@ void replication_sender_delete_pending_requests(struct sender_state *sender);
void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming);
void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s);
+size_t replication_allocated_memory(void);
+size_t replication_allocated_buffers(void);
+
#endif /* REPLICATION_H */
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 7767c371c1..969f6eb6bf 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -373,6 +373,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
BUFFER *wb = sender_start(host->sender);
rrdpush_send_chart_definition(wb, st);
sender_commit(host->sender, wb);
+ sender_thread_buffer_free();
return true;
}
@@ -437,6 +438,8 @@ void rrdpush_send_host_labels(RRDHOST *host) {
buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
sender_commit(host->sender, wb);
+
+ sender_thread_buffer_free();
}
void rrdpush_claimed_id(RRDHOST *host)
@@ -454,6 +457,8 @@ void rrdpush_claimed_id(RRDHOST *host)
rrdhost_aclk_state_unlock(host);
sender_commit(host->sender, wb);
+
+ sender_thread_buffer_free();
}
int connect_to_one_of_destinations(
@@ -515,6 +520,8 @@ bool destinations_init_add_one(char *entry, void *data) {
struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
d->destination = string_strdupz(entry);
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
+
DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next);
t->count++;
@@ -545,6 +552,7 @@ void rrdpush_destinations_free(RRDHOST *host) {
DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next);
string_freez(tmp->destination);
freez(tmp);
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
}
host->destinations = NULL;
@@ -635,6 +643,9 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
rpt->capabilities = STREAM_CAP_INVALID;
rpt->hops = 1;
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
+ __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
+
rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
rpt->system_info->hops = rpt->hops;
@@ -1069,7 +1080,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
}
void log_receiver_capabilities(struct receiver_state *rpt) {
- BUFFER *wb = buffer_create(100);
+ BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
@@ -1079,7 +1090,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
}
void log_sender_capabilities(struct sender_state *s) {
- BUFFER *wb = buffer_create(100);
+ BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index c7f07ac7e5..94c1320e76 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,7 +10,7 @@
#define CONNECTED_TO_SIZE 100
#define CBUFFER_INITIAL_SIZE (16 * 1024)
-#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE * 4)
+#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
// ----------------------------------------------------------------------------
// obsolete versions - do not use anymore
@@ -131,8 +131,8 @@ struct decompressor_state {
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
typedef enum {
- SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
- SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+ SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct sender_state {
@@ -189,9 +189,13 @@ struct sender_state {
struct {
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
};
+#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
+#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
+
#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
@@ -296,7 +300,6 @@ void rrdpush_destinations_free(RRDHOST *host);
BUFFER *sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s, BUFFER *wb);
-void sender_cancel(struct sender_state *s);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
@@ -339,6 +342,8 @@ int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
bool stop_streaming_receiver(RRDHOST *host, const char *reason);
+void sender_thread_buffer_free(void);
+
#include "replication.h"
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 7de7142b50..854b57fc5a 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -36,29 +36,29 @@ extern char *netdata_ssl_ca_file;
static __thread BUFFER *sender_thread_buffer = NULL;
static __thread bool sender_thread_buffer_used = false;
-static __thread bool sender_thread_buffer_recreate = false;
+static __thread time_t sender_thread_buffer_last_reset_s = 0;
void sender_thread_buffer_free(void) {
buffer_free(sender_thread_buffer);
sender_thread_buffer = NULL;
+ sender_thread_buffer_used = false;
}
// Collector thread starting a transmission
-BUFFER *sender_start(struct sender_state *s __maybe_unused) {
+BUFFER *sender_start(struct sender_state *s) {
if(unlikely(sender_thread_buffer_used))
fatal("STREAMING: thread buffer is used multiple times concurrently.");
- if(unlikely(sender_thread_buffer_recreate)) {
- sender_thread_buffer_recreate = false;
- if(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE) {
+ if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) {
+ if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) {
buffer_free(sender_thread_buffer);
sender_thread_buffer = NULL;
}
}
- if(!sender_thread_buffer) {
- sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE);
- sender_thread_buffer_recreate = false;
+ if(unlikely(!sender_thread_buffer)) {
+ sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming);
+ sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s);
}
sender_thread_buffer_used = true;
@@ -66,10 +66,6 @@ BUFFER *sender_start(struct sender_state *s __maybe_unused) {
return sender_thread_buffer;
}
-void sender_cancel(struct sender_state *s __maybe_unused) {
- sender_thread_buffer_used = false;
-}
-
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
#ifdef ENABLE_COMPRESSION
@@ -108,11 +104,11 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
- if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
+ if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
- rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
+ rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
- s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
+ s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
#ifdef ENABLE_COMPRESSION
@@ -159,17 +155,17 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
}
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
+ if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
s->flags |= SENDER_FLAG_OVERFLOW;
src = src + size_to_compress;
src_len -= size_to_compress;
}
}
- else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ else if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
#else
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
#endif
@@ -195,6 +191,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU
BUFFER *wb = sender_start(host->sender);
rrdpush_sender_add_host_variable_to_buffer(wb, rva);
sender_commit(host->sender, wb);
+ sender_thread_buffer_free();
}
}
@@ -223,6 +220,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
(void)ret;
sender_commit(host->sender, wb);
+ sender_thread_buffer_free();
debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
}
@@ -248,6 +246,30 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
rrdhost_sender_replicating_charts_zero(host);
}
+static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) {
+ static __thread time_t last_reset_time_s = 0;
+
+ if(!force && now_s - last_reset_time_s < 300)
+ return;
+
+ if(!have_mutex)
+ netdata_mutex_lock(&s->mutex);
+
+ rrdpush_sender_last_buffer_recreate_set(s, now_s);
+ last_reset_time_s = now_s;
+
+ if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) {
+ size_t max = s->buffer->max_size;
+ cbuffer_free(s->buffer);
+ s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming);
+ }
+
+ sender_thread_buffer_free();
+
+ if(!have_mutex)
+ netdata_mutex_unlock(&s->mutex);
+}
+
static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
rrdpush_sender_set_flush_time(host->sender);
@@ -255,6 +277,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
// flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
+ rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
netdata_mutex_unlock(&host->sender->mutex);
@@ -781,8 +804,8 @@ static ssize_t attempt_to_send(struct sender_state *s) {
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
#ifdef ENABLE_HTTPS
- SSL *conn = s->host->sender->ssl.conn ;
- if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ SSL *conn = s->ssl.conn ;
+ if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
ret = netdata_ssl_write(conn, chunk, outstanding);
else
ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
@@ -817,9 +840,9 @@ static ssize_t attempt_read(struct sender_state *s) {
ssize_t ret = 0;
#ifdef ENABLE_HTTPS
- if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+ if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
- ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired);
+ ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired);
if (ret > 0 ) {
s->read_len += (int)ret;
return ret;
@@ -878,6 +901,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
pluginsd_function_result_end_to_buffer(wb);
sender_commit(s, wb);
+ sender_thread_buffer_free();
internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
rrdhost_hostname(s->host), s->connected_to,
@@ -932,7 +956,7 @@ void execute_commands(struct sender_state *s) {
tmp->received_ut = now_realtime_usec();
tmp->sender = s;
tmp->transaction = string_strdupz(transaction);
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1);
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
if(code != HTTP_RESP_OK) {
@@ -1223,11 +1247,18 @@ void *rrdpush_sender_thread(void *ptr) {
netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
+ size_t iterations = 0;
+ time_t now_s = now_monotonic_sec();
while(!rrdhost_sender_should_exit(s)) {
+ iterations++;
// The connection attempt blocks (after which we use the socket in nonblocking)
if(unlikely(s->rrdpush_sender_socket == -1)) {
worker_is_busy(WORKER_SENDER_JOB_CONNECT);
+
+ now_s = now_monotonic_sec();
+ rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
+
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
s->flags &= ~SENDER_FLAG_OVERFLOW;
s->read_len = 0;
@@ -1240,7 +1271,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(rrdhost_sender_should_exit(s))
break;
- s->last_traffic_seen_t = now_monotonic_sec();
+ now_s = s->last_traffic_seen_t = now_monotonic_sec();
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
@@ -1250,8 +1281,11 @@ void *rrdpush_sender_thread(void *ptr) {
continue;
}
+ if(iterations % 1000 == 0)
+ now_s = now_monotonic_sec();
+
// If the TCP window never opened then something is wrong, restart connection
- if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
+ if(unlikely(now_s - s->last_traffic_seen_t > s->timeout &&
!rrdpush_sender_pending_replication_requests(s) &&
!rrdpush_sender_replicating_charts(s)
)) {
@@ -1262,22 +1296,13 @@ void *rrdpush_sender_thread(void *ptr) {
}
netdata_mutex_lock(&s->mutex);
- size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL);
- size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
- if(unlikely(!outstanding && s->host->sender->buffer->size > CBUFFER_INITIAL_SIZE)) {
- static __thread time_t last_reset_time_t = 0;
- time_t now_t = now_monotonic_sec();
- if(now_t - last_reset_time_t > 600) {
- last_reset_time_t = now_t;
- size_t max = s->host->sender->buffer->max_size;
- cbuffer_free(s->host->sender->buffer);
- s->host->sender->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max);
- sender_thread_buffer_recreate = true;
- }
- }
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
+ size_t available = cbuffer_available_size_unsafe(s->buffer);
+ if (unlikely(!outstanding))
+ rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
netdata_mutex_unlock(&s->mutex);
- worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size);
+ worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
if(outstanding)
s->send_attempts++;
@@ -1329,6 +1354,7 @@ void *rrdpush_sender_thread(void *ptr) {
if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
netdata_thread_testcancel();
debug(D_STREAM, "Spurious wakeup");
+ now_s = now_monotonic_sec();
continue;
}