summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-20 00:50:42 +0200
committerGitHub <noreply@github.com>2023-01-20 00:50:42 +0200
commit9232bfb6a072155388578dc4e1338c6002afb515 (patch)
treedc85f9bfe3ed97394e6f2a92a2f710796d6d8979 /streaming/replication.c
parent86538b005de50f23c9ff66542abab11683b85c06 (diff)
track memory footprint of Netdata (#14294)
* track memory footprint of Netdata * track db modes alloc/ram/save/map * track system info; track sender and receiver * fixes * more fixes * track workers memory, onewayalloc memory; unify judyhs size estimation * track replication structures and buffers * Properly clear host RRDHOST_FLAG_METADATA_UPDATE flag * flush the replication buffer every 1000 times the circular buffer is found empty * dont take timestamp too frequently in sender loop * sender buffers are not used by the same thread as the sender, so they were never recreated - fixed it * free sender thread buffer on replication threads when replication is idle * use the last sender flag as a timestamp of the last buffer recreation * free cbuffer before reconnecting * recreate cbuffer on every flush * timings for journal v2 loading * inlining of metric and cache functions * aral likely/unlikely * free left-over thread buffers * fix NULL pointer dereference in replication * free sender thread buffer on sender thread too * mark ctx as used before flushing * better logging on ctx datafiles closing Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c64
1 files changed, 54 insertions, 10 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index c639016349..8d136a04a4 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -46,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) {
return ret;
}
+size_t replication_buffers_allocated = 0;
+
+size_t replication_allocated_buffers(void) {
+ return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
+}
+
// ----------------------------------------------------------------------------
// sending replication replies
@@ -109,6 +115,8 @@ static struct replication_query *replication_query_prepare(
) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
+
q->dimensions = dimensions;
q->st = st;
@@ -170,7 +178,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- q->ops->init(rd->tiers[0]->db_metric_handle, &d->handle, q->query.after, q->query.before,
+ q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -231,6 +239,7 @@ static void replication_query_finalize(struct replication_query *q, bool execute
netdata_spinlock_unlock(&replication_queries.spinlock);
}
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
freez(q);
}
@@ -817,6 +826,7 @@ static struct replication_thread {
struct {
size_t executed; // the number of replication requests executed
size_t latest_first_time; // the 'after' timestamp of the last request we executed
+ size_t memory; // the total memory allocated by replication
} atomic; // access should be with atomic operations
struct {
@@ -849,6 +859,7 @@ static struct replication_thread {
.atomic = {
.executed = 0,
.latest_first_time = 0,
+ .memory = 0,
},
.main_thread = {
.last_executed = 0,
@@ -857,6 +868,10 @@ static struct replication_thread {
},
};
+size_t replication_allocated_memory(void) {
+ return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
+}
+
#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
@@ -909,6 +924,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
fatal_when_replication_is_not_locked_for_me();
struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+ __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
@@ -926,6 +942,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
freez(rse);
+ __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
}
static void replication_sort_entry_add(struct replication_request *rq) {
@@ -959,12 +976,19 @@ static void replication_sort_entry_add(struct replication_request *rq) {
Pvoid_t *inner_judy_ptr;
// find the outer judy entry, using after as key
- inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
- if(!inner_judy_ptr)
- inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
+ fatal("REPLICATION: corrupted outer judyL");
// add it to the inner judy, using unique_id as key
+ size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
+ size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
+ if(unlikely(!item || item == PJERR))
+ fatal("REPLICATION: corrupted inner judyL");
+
*item = rse;
rq->indexed_in_judy = true;
rq->not_indexed_buffer_full = false;
@@ -974,6 +998,8 @@ static void replication_sort_entry_add(struct replication_request *rq) {
replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
+
+ __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
}
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
@@ -989,18 +1015,28 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
rse->rq->indexed_in_judy = false;
rse->rq->not_indexed_preprocessing = preprocessing;
+ size_t memory_saved = 0;
+
// delete it from the inner judy
+ size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
+ size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
+ memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
// if no items left, delete it from the outer judy
if(**inner_judy_ppptr == NULL) {
+ size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
+ size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
inner_judy_deleted = true;
}
// free memory
replication_sort_entry_destroy(rse);
+ __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
+
return inner_judy_deleted;
}
@@ -1426,6 +1462,7 @@ static int replication_execute_next_pending_request(void) {
max_requests_ahead = 2;
rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
+ __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
}
// fill the queue
@@ -1516,6 +1553,7 @@ static void *replication_worker_thread(void *ptr) {
while(service_running(SERVICE_REPLICATION)) {
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
sleep_usec(1 * USEC_PER_SEC);
@@ -1534,9 +1572,11 @@ static void replication_main_cleanup(void *ptr) {
for(int i = 0; i < threads ;i++) {
netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
freez(replication_globals.main_thread.threads_ptrs[i]);
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
}
freez(replication_globals.main_thread.threads_ptrs);
replication_globals.main_thread.threads_ptrs = NULL;
+ __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
// custom code
worker_unregister();
@@ -1553,14 +1593,16 @@ void *replication_thread_main(void *ptr __maybe_unused) {
threads = 1;
}
- if(threads > 1) {
+ if(--threads) {
replication_globals.main_thread.threads = threads;
replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
+ __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
- for(int i = 1; i < threads ;i++) {
+ for(int i = 0; i < threads ;i++) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 1);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
}
@@ -1649,14 +1691,16 @@ void *replication_thread_main(void *ptr __maybe_unused) {
// the timeout also defines now frequently we will traverse all the pending requests
// when the outbound buffers of all senders is full
usec_t timeout;
- if(slow)
+ if(slow) {
// no work to be done, wait for a request to come in
timeout = 1000 * USEC_PER_MS;
+ sender_thread_buffer_free();
+ }
else if(replication_globals.unsafe.pending > 0) {
- if(replication_globals.unsafe.sender_resets == last_sender_resets) {
+ if(replication_globals.unsafe.sender_resets == last_sender_resets)
timeout = 1000 * USEC_PER_MS;
- }
+
else {
// there are pending requests waiting to be executed,
// but none could be executed at this time.