diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-20 00:50:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-20 00:50:42 +0200 |
commit | 9232bfb6a072155388578dc4e1338c6002afb515 (patch) | |
tree | dc85f9bfe3ed97394e6f2a92a2f710796d6d8979 /streaming | |
parent | 86538b005de50f23c9ff66542abab11683b85c06 (diff) |
track memory footprint of Netdata (#14294)
* track memory footprint of Netdata
* track db modes alloc/ram/save/map
* track system info; track sender and receiver
* fixes
* more fixes
* track workers memory, onewayalloc memory; unify judyhs size estimation
* track replication structures and buffers
* Properly clear host RRDHOST_FLAG_METADATA_UPDATE flag
* flush the replication buffer every 1000 times the circular buffer is found empty
* dont take timestamp too frequently in sender loop
* sender buffers are not used by the same thread as the sender, so they were never recreated - fixed it
* free sender thread buffer on replication threads when replication is idle
* use the last sender flag as a timestamp of the last buffer recreation
* free cbuffer before reconnecting
* recreate cbuffer on every flush
* timings for journal v2 loading
* inlining of metric and cache functions
* aral likely/unlikely
* free left-over thread buffers
* fix NULL pointer dereference in replication
* free sender thread buffer on sender thread too
* mark ctx as used before flushing
* better logging on ctx datafiles closing
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 2 | ||||
-rw-r--r-- | streaming/replication.c | 64 | ||||
-rw-r--r-- | streaming/replication.h | 3 | ||||
-rw-r--r-- | streaming/rrdpush.c | 15 | ||||
-rw-r--r-- | streaming/rrdpush.h | 13 | ||||
-rw-r--r-- | streaming/sender.c | 104 |
6 files changed, 146 insertions, 55 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 6d2810cd60..562107da8e 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -44,6 +44,8 @@ void receiver_state_free(struct receiver_state *rpt) { if(rpt->system_info) rrdhost_system_info_free(rpt->system_info); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); + freez(rpt); } diff --git a/streaming/replication.c b/streaming/replication.c index c639016349..8d136a04a4 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -46,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) { return ret; } +size_t replication_buffers_allocated = 0; + +size_t replication_allocated_buffers(void) { + return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED); +} + // ---------------------------------------------------------------------------- // sending replication replies @@ -109,6 +115,8 @@ static struct replication_query *replication_query_prepare( ) { size_t dimensions = rrdset_number_of_dimensions(st); struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); + q->dimensions = dimensions; q->st = st; @@ -170,7 +178,7 @@ static struct replication_query *replication_query_prepare( d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); d->rd = rd; - q->ops->init(rd->tiers[0]->db_metric_handle, &d->handle, q->query.after, q->query.before, + q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); d->enabled = true; d->skip = false; @@ -231,6 +239,7 @@ static void replication_query_finalize(struct replication_query *q, bool execute netdata_spinlock_unlock(&replication_queries.spinlock); } + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); freez(q); } @@ -817,6 +826,7 @@ static struct replication_thread { struct { size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed + size_t memory; // the total memory allocated by replication } atomic; // access should be with atomic operations struct { @@ -849,6 +859,7 @@ static struct replication_thread { .atomic = { .executed = 0, .latest_first_time = 0, + .memory = 0, }, .main_thread = { .last_executed = 0, @@ -857,6 +868,10 @@ static struct replication_thread { }, }; +size_t replication_allocated_memory(void) { + return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED); +} + #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) @@ -909,6 +924,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc fatal_when_replication_is_not_locked_for_me(); struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); @@ -926,6 +942,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { freez(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { @@ -959,12 +976,19 @@ static void replication_sort_entry_add(struct replication_request *rq) { Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key - inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); - if(!inner_judy_ptr) - inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR)) + fatal("REPLICATION: corrupted outer judyL"); // add it to the inner judy, using unique_id as key + size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr); Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr); + if(unlikely(!item || item == PJERR)) + fatal("REPLICATION: corrupted inner judyL"); + *item = rse; rq->indexed_in_judy = true; rq->not_indexed_buffer_full = false; @@ -974,6 +998,8 @@ static void replication_sort_entry_add(struct replication_request *rq) { replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); + + __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED); } static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) { @@ -989,18 +1015,28 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor rse->rq->indexed_in_judy = false; rse->rq->not_indexed_preprocessing = preprocessing; + size_t memory_saved = 0; + // delete it from the inner judy + size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); + memory_saved = mem_before_inner_judyl - mem_after_inner_judyl; // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + memory_saved += mem_before_outer_judyl - mem_after_outer_judyl; inner_judy_deleted = true; } // free memory replication_sort_entry_destroy(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED); + return inner_judy_deleted; } @@ -1426,6 +1462,7 @@ static int replication_execute_next_pending_request(void) { max_requests_ahead = 2; rqs = callocz(max_requests_ahead, sizeof(struct replication_request)); + __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); } // fill the queue @@ -1516,6 +1553,7 @@ static void *replication_worker_thread(void *ptr) { while(service_running(SERVICE_REPLICATION)) { if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); @@ -1534,9 +1572,11 @@ static void replication_main_cleanup(void *ptr) { for(int i = 0; i < threads ;i++) { netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); freez(replication_globals.main_thread.threads_ptrs[i]); + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); // custom code worker_unregister(); @@ -1553,14 +1593,16 @@ void *replication_thread_main(void *ptr __maybe_unused) { threads = 1; } - if(threads > 1) { + if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); - for(int i = 1; i < threads ;i++) { + for(int i = 0; i < threads ;i++) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 1); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag, NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); } @@ -1649,14 +1691,16 @@ void *replication_thread_main(void *ptr __maybe_unused) { // the timeout also defines now frequently we will traverse all the pending requests // when the outbound buffers of all senders is full usec_t timeout; - if(slow) + if(slow) { // no work to be done, wait for a request to come in timeout = 1000 * USEC_PER_MS; + sender_thread_buffer_free(); + } else if(replication_globals.unsafe.pending > 0) { - if(replication_globals.unsafe.sender_resets == last_sender_resets) { + if(replication_globals.unsafe.sender_resets == last_sender_resets) timeout = 1000 * USEC_PER_MS; - } + else { // there are pending requests waiting to be executed, // but none could be executed at this time. diff --git a/streaming/replication.h b/streaming/replication.h index 00462cc3a1..e6bedc83bb 100644 --- a/streaming/replication.h +++ b/streaming/replication.h @@ -30,4 +30,7 @@ void replication_sender_delete_pending_requests(struct sender_state *sender); void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming); void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s); +size_t replication_allocated_memory(void); +size_t replication_allocated_buffers(void); + #endif /* REPLICATION_H */ diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 7767c371c1..969f6eb6bf 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -373,6 +373,7 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); sender_commit(host->sender, wb); + sender_thread_buffer_free(); return true; } @@ -437,6 +438,8 @@ void rrdpush_send_host_labels(RRDHOST *host) { buffer_sprintf(wb, "OVERWRITE %s\n", "labels"); sender_commit(host->sender, wb); + + sender_thread_buffer_free(); } void rrdpush_claimed_id(RRDHOST *host) @@ -454,6 +457,8 @@ void rrdpush_claimed_id(RRDHOST *host) rrdhost_aclk_state_unlock(host); sender_commit(host->sender, wb); + + sender_thread_buffer_free(); } int connect_to_one_of_destinations( @@ -515,6 +520,8 @@ bool destinations_init_add_one(char *entry, void *data) { struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations)); d->destination = string_strdupz(entry); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); + DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next); t->count++; @@ -545,6 +552,7 @@ void rrdpush_destinations_free(RRDHOST *host) { DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next); string_freez(tmp->destination); freez(tmp); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED); } host->destinations = NULL; @@ -635,6 +643,9 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { rpt->capabilities = STREAM_CAP_INVALID; rpt->hops = 1; + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); + rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info)); rpt->system_info->hops = rpt->hops; @@ -1069,7 +1080,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) } void log_receiver_capabilities(struct receiver_state *rpt) { - BUFFER *wb = buffer_create(100); + BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", @@ -1079,7 +1090,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) { } void log_sender_capabilities(struct sender_state *s) { - BUFFER *wb = buffer_create(100); + BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index c7f07ac7e5..94c1320e76 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,7 +10,7 @@ #define CONNECTED_TO_SIZE 100 #define CBUFFER_INITIAL_SIZE (16 * 1024) -#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE * 4) +#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2) // ---------------------------------------------------------------------------- // obsolete versions - do not use anymore @@ -131,8 +131,8 @@ struct decompressor_state { // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. typedef enum { - SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct sender_state { @@ -189,9 +189,13 @@ struct sender_state { struct { size_t buffer_used_percentage; // the current utilization of the sending buffer usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; }; +#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED) +#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED) + #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST) #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST) @@ -296,7 +300,6 @@ void rrdpush_destinations_free(RRDHOST *host); BUFFER *sender_start(struct sender_state *s); void sender_commit(struct sender_state *s, BUFFER *wb); -void sender_cancel(struct sender_state *s); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); @@ -339,6 +342,8 @@ int32_t stream_capabilities_to_vn(uint32_t caps); void receiver_state_free(struct receiver_state *rpt); bool stop_streaming_receiver(RRDHOST *host, const char *reason); +void sender_thread_buffer_free(void); + #include "replication.h" #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 7de7142b50..854b57fc5a 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -36,29 +36,29 @@ extern char *netdata_ssl_ca_file; static __thread BUFFER *sender_thread_buffer = NULL; static __thread bool sender_thread_buffer_used = false; -static __thread bool sender_thread_buffer_recreate = false; +static __thread time_t sender_thread_buffer_last_reset_s = 0; void sender_thread_buffer_free(void) { buffer_free(sender_thread_buffer); sender_thread_buffer = NULL; + sender_thread_buffer_used = false; } // Collector thread starting a transmission -BUFFER *sender_start(struct sender_state *s __maybe_unused) { +BUFFER *sender_start(struct sender_state *s) { if(unlikely(sender_thread_buffer_used)) fatal("STREAMING: thread buffer is used multiple times concurrently."); - if(unlikely(sender_thread_buffer_recreate)) { - sender_thread_buffer_recreate = false; - if(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE) { + if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) { + if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) { buffer_free(sender_thread_buffer); sender_thread_buffer = NULL; } } - if(!sender_thread_buffer) { - sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE); - sender_thread_buffer_recreate = false; + if(unlikely(!sender_thread_buffer)) { + sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming); + sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s); } sender_thread_buffer_used = true; @@ -66,10 +66,6 @@ BUFFER *sender_start(struct sender_state *s __maybe_unused) { return sender_thread_buffer; } -void sender_cancel(struct sender_state *s __maybe_unused) { - sender_thread_buffer_used = false; -} - static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); #ifdef ENABLE_COMPRESSION @@ -108,11 +104,11 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { netdata_mutex_lock(&s->mutex); - if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { + if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", - rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); + rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); - s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; + s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } #ifdef ENABLE_COMPRESSION @@ -159,17 +155,17 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { } } - if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len)) + if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) s->flags |= SENDER_FLAG_OVERFLOW; src = src + size_to_compress; src_len -= size_to_compress; } } - else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + else if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; #else - if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + if(cbuffer_add_unsafe(s->buffer, src, src_len)) s->flags |= SENDER_FLAG_OVERFLOW; #endif @@ -195,6 +191,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU BUFFER *wb = sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer(wb, rva); sender_commit(host->sender, wb); + sender_thread_buffer_free(); } } @@ -223,6 +220,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp); (void)ret; sender_commit(host->sender, wb); + sender_thread_buffer_free(); debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); } @@ -248,6 +246,30 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { rrdhost_sender_replicating_charts_zero(host); } +static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) { + static __thread time_t last_reset_time_s = 0; + + if(!force && now_s - last_reset_time_s < 300) + return; + + if(!have_mutex) + netdata_mutex_lock(&s->mutex); + + rrdpush_sender_last_buffer_recreate_set(s, now_s); + last_reset_time_s = now_s; + + if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) { + size_t max = s->buffer->max_size; + cbuffer_free(s->buffer); + s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming); + } + + sender_thread_buffer_free(); + + if(!have_mutex) + netdata_mutex_unlock(&s->mutex); +} + static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { rrdpush_sender_set_flush_time(host->sender); @@ -255,6 +277,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); + rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true); replication_recalculate_buffer_used_ratio_unsafe(host->sender); netdata_mutex_unlock(&host->sender->mutex); @@ -781,8 +804,8 @@ static ssize_t attempt_to_send(struct sender_state *s) { debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS - SSL *conn = s->host->sender->ssl.conn ; - if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + SSL *conn = s->ssl.conn ; + if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) ret = netdata_ssl_write(conn, chunk, outstanding); else ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); @@ -817,9 +840,9 @@ static ssize_t attempt_read(struct sender_state *s) { ssize_t ret = 0; #ifdef ENABLE_HTTPS - if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { + if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { size_t desired = sizeof(s->read_buffer) - s->read_len - 1; - ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired); + ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired); if (ret > 0 ) { s->read_len += (int)ret; return ret; @@ -878,6 +901,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { pluginsd_function_result_end_to_buffer(wb); sender_commit(s, wb); + sender_thread_buffer_free(); internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", rrdhost_hostname(s->host), s->connected_to, @@ -932,7 +956,7 @@ void execute_commands(struct sender_state *s) { tmp->received_ut = now_realtime_usec(); tmp->sender = s; tmp->transaction = string_strdupz(transaction); - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1); + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); if(code != HTTP_RESP_OK) { @@ -1223,11 +1247,18 @@ void *rrdpush_sender_thread(void *ptr) { netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data); + size_t iterations = 0; + time_t now_s = now_monotonic_sec(); while(!rrdhost_sender_should_exit(s)) { + iterations++; // The connection attempt blocks (after which we use the socket in nonblocking) if(unlikely(s->rrdpush_sender_socket == -1)) { worker_is_busy(WORKER_SENDER_JOB_CONNECT); + + now_s = now_monotonic_sec(); + rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true); + rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); s->flags &= ~SENDER_FLAG_OVERFLOW; s->read_len = 0; @@ -1240,7 +1271,7 @@ void *rrdpush_sender_thread(void *ptr) { if(rrdhost_sender_should_exit(s)) break; - s->last_traffic_seen_t = now_monotonic_sec(); + now_s = s->last_traffic_seen_t = now_monotonic_sec(); rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); @@ -1250,8 +1281,11 @@ void *rrdpush_sender_thread(void *ptr) { continue; } + if(iterations % 1000 == 0) + now_s = now_monotonic_sec(); + // If the TCP window never opened then something is wrong, restart connection - if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout && + if(unlikely(now_s - s->last_traffic_seen_t > s->timeout && !rrdpush_sender_pending_replication_requests(s) && !rrdpush_sender_replicating_charts(s) )) { @@ -1262,22 +1296,13 @@ void *rrdpush_sender_thread(void *ptr) { } netdata_mutex_lock(&s->mutex); - size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL); - size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); - if(unlikely(!outstanding && s->host->sender->buffer->size > CBUFFER_INITIAL_SIZE)) { - static __thread time_t last_reset_time_t = 0; - time_t now_t = now_monotonic_sec(); - if(now_t - last_reset_time_t > 600) { - last_reset_time_t = now_t; - size_t max = s->host->sender->buffer->max_size; - cbuffer_free(s->host->sender->buffer); - s->host->sender->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max); - sender_thread_buffer_recreate = true; - } - } + size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); + size_t available = cbuffer_available_size_unsafe(s->buffer); + if (unlikely(!outstanding)) + rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); netdata_mutex_unlock(&s->mutex); - worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size); + worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); if(outstanding) s->send_attempts++; @@ -1329,6 +1354,7 @@ void *rrdpush_sender_thread(void *ptr) { if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) { netdata_thread_testcancel(); debug(D_STREAM, "Spurious wakeup"); + now_s = now_monotonic_sec(); continue; } |