diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-20 00:50:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-20 00:50:42 +0200 |
commit | 9232bfb6a072155388578dc4e1338c6002afb515 (patch) | |
tree | dc85f9bfe3ed97394e6f2a92a2f710796d6d8979 /streaming/replication.c | |
parent | 86538b005de50f23c9ff66542abab11683b85c06 (diff) |
track memory footprint of Netdata (#14294)
* track memory footprint of Netdata
* track db modes alloc/ram/save/map
* track system info; track sender and receiver
* fixes
* more fixes
* track workers memory, onewayalloc memory; unify judyhs size estimation
* track replication structures and buffers
* Properly clear host RRDHOST_FLAG_METADATA_UPDATE flag
* flush the replication buffer every 1000 times the circular buffer is found empty
* dont take timestamp too frequently in sender loop
* sender buffers are not used by the same thread as the sender, so they were never recreated - fixed it
* free sender thread buffer on replication threads when replication is idle
* use the last sender flag as a timestamp of the last buffer recreation
* free cbuffer before reconnecting
* recreate cbuffer on every flush
* timings for journal v2 loading
* inlining of metric and cache functions
* aral likely/unlikely
* free left-over thread buffers
* fix NULL pointer dereference in replication
* free sender thread buffer on sender thread too
* mark ctx as used before flushing
* better logging on ctx datafiles closing
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 64 |
1 files changed, 54 insertions, 10 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index c639016349..8d136a04a4 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -46,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) { return ret; } +size_t replication_buffers_allocated = 0; + +size_t replication_allocated_buffers(void) { + return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED); +} + // ---------------------------------------------------------------------------- // sending replication replies @@ -109,6 +115,8 @@ static struct replication_query *replication_query_prepare( ) { size_t dimensions = rrdset_number_of_dimensions(st); struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); + q->dimensions = dimensions; q->st = st; @@ -170,7 +178,7 @@ static struct replication_query *replication_query_prepare( d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); d->rd = rd; - q->ops->init(rd->tiers[0]->db_metric_handle, &d->handle, q->query.after, q->query.before, + q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); d->enabled = true; d->skip = false; @@ -231,6 +239,7 @@ static void replication_query_finalize(struct replication_query *q, bool execute netdata_spinlock_unlock(&replication_queries.spinlock); } + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); freez(q); } @@ -817,6 +826,7 @@ static struct replication_thread { struct { size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed + size_t memory; // the total memory allocated by replication } atomic; // access should be with atomic operations struct { @@ -849,6 +859,7 @@ static struct replication_thread { .atomic = { .executed = 0, .latest_first_time = 0, + .memory = 0, }, .main_thread = { .last_executed = 0, @@ -857,6 +868,10 @@ static struct replication_thread { }, }; +size_t replication_allocated_memory(void) { + return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED); +} + #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) @@ -909,6 +924,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc fatal_when_replication_is_not_locked_for_me(); struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); @@ -926,6 +942,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { freez(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { @@ -959,12 +976,19 @@ static void replication_sort_entry_add(struct replication_request *rq) { Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key - inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); - if(!inner_judy_ptr) - inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR)) + fatal("REPLICATION: corrupted outer judyL"); // add it to the inner judy, using unique_id as key + size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr); Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr); + if(unlikely(!item || item == PJERR)) + fatal("REPLICATION: corrupted inner judyL"); + *item = rse; rq->indexed_in_judy = true; rq->not_indexed_buffer_full = false; @@ -974,6 +998,8 @@ static void replication_sort_entry_add(struct replication_request *rq) { replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); + + __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED); } static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) { @@ -989,18 +1015,28 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor rse->rq->indexed_in_judy = false; rse->rq->not_indexed_preprocessing = preprocessing; + size_t memory_saved = 0; + // delete it from the inner judy + size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); + memory_saved = mem_before_inner_judyl - mem_after_inner_judyl; // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + memory_saved += mem_before_outer_judyl - mem_after_outer_judyl; inner_judy_deleted = true; } // free memory replication_sort_entry_destroy(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED); + return inner_judy_deleted; } @@ -1426,6 +1462,7 @@ static int replication_execute_next_pending_request(void) { max_requests_ahead = 2; rqs = callocz(max_requests_ahead, sizeof(struct replication_request)); + __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); } // fill the queue @@ -1516,6 +1553,7 @@ static void *replication_worker_thread(void *ptr) { while(service_running(SERVICE_REPLICATION)) { if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); @@ -1534,9 +1572,11 @@ static void replication_main_cleanup(void *ptr) { for(int i = 0; i < threads ;i++) { netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); freez(replication_globals.main_thread.threads_ptrs[i]); + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); // custom code worker_unregister(); @@ -1553,14 +1593,16 @@ void *replication_thread_main(void *ptr __maybe_unused) { threads = 1; } - if(threads > 1) { + if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); - for(int i = 1; i < threads ;i++) { + for(int i = 0; i < threads ;i++) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 1); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag, NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); } @@ -1649,14 +1691,16 @@ void *replication_thread_main(void *ptr __maybe_unused) { // the timeout also defines now frequently we will traverse all the pending requests // when the outbound buffers of all senders is full usec_t timeout; - if(slow) + if(slow) { // no work to be done, wait for a request to come in timeout = 1000 * USEC_PER_MS; + sender_thread_buffer_free(); + } else if(replication_globals.unsafe.pending > 0) { - if(replication_globals.unsafe.sender_resets == last_sender_resets) { + if(replication_globals.unsafe.sender_resets == last_sender_resets) timeout = 1000 * USEC_PER_MS; - } + else { // there are pending requests waiting to be executed, // but none could be executed at this time. |