diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-06-19 23:19:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-19 23:19:36 +0300 |
commit | 43c749b07d07e79dae8111dcdb7bc1a46c3dda1b (patch) | |
tree | 4c3a270652787c91ef15c7ef8e29915769fc1fd4 /database/engine/journalfile.c | |
parent | 0b4f820e9d42d10f64c3305d9c084261bc9880cf (diff) |
Obvious memory reductions (#15204)
* remove rd->update_every
* reduce amount of memory for RRDDIM
* reorgnize rrddim->db entries
* optimize rrdset and statsd
* optimize dictionaries
* RW_SPINLOCK for dictionaries
* fix codeql warning
* rw_spinlock improvements
* remove obsolete assertion
* fix crash on health_alarm_log_process()
* use RW_SPINLOCK for AVL trees
* add RW_SPINLOCK read/write trylock
* pgc and mrg now use rw_spinlocks; cache line optimizations for mrg
* thread tag of dbegnine init
* append created datafile, lockless
* make DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE friendly for lockless use
* thread cancelability in spinlocks; optimize thread cancelability management
* introduce a JudyL to index datafiles and use it during queries to quickly find the relevant files
* use the last timestamp of each journal file for indexing
* when the previous cannot be found, start from the beginning
* add more stats to PDC to trace routing easier
* rename spinlock functions
* fix for spinlock renames
* revert statsd socket statistics to size_t
* turn fatal into internal_fatal()
* show candidates always
* show connected status and connection attempts
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 194 |
1 files changed, 158 insertions, 36 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 9998ee5403..e95ce23838 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -92,10 +92,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin io_descr->buf = wal->buf; io_descr->bytes = wal->buf_size; - netdata_spinlock_lock(&journalfile->unsafe.spinlock); + spinlock_lock(&journalfile->unsafe.spinlock); io_descr->pos = journalfile->unsafe.pos; journalfile->unsafe.pos += wal->buf_size; - netdata_spinlock_unlock(&journalfile->unsafe.spinlock); + spinlock_unlock(&journalfile->unsafe.spinlock); io_descr->req.data = wal; io_descr->data = journalfile; @@ -122,10 +122,128 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } +// ---------------------------------------------------------------------------- + +struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) { + struct rrdengine_datafile *datafile = NULL; + + rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock); + + Pvoid_t *PValue = NULL; + + if(unlikely(!s->init)) { + s->init = true; + s->last = s->wanted_start_time_s; + + PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + s->last = 0; + PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) + s->last = s->wanted_start_time_s; + } + } + + while(1) { + if (likely(!PValue)) { + PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + // cannot find anything after that point + datafile = NULL; + break; + } + } + + datafile = *PValue; + TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s, + datafile->journalfile->v2.last_time_s, + s->wanted_start_time_s, + s->wanted_end_time_s); + + if(rc == PAGE_IS_IN_RANGE) { + // this is good to return + break; + } + else if(rc == PAGE_IS_IN_THE_PAST) { + // continue to get the next + datafile = NULL; + PValue = NULL; + continue; + } + else /* PAGE_IS_IN_THE_FUTURE */ { + // we finished - no more datafiles + datafile = NULL; + PValue = NULL; + break; + } + } + + if(datafile) + s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL, + s->wanted_start_time_s, + s->wanted_end_time_s); + else + s->j2_header_acquired = NULL; + + rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock); + + return datafile; +} + +static void njfv2idx_add(struct rrdengine_datafile *datafile) { + internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s; + + do { + internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed"); + + Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if (unlikely(*PValue)) { + // already there + datafile->journalfile->njfv2idx.indexed_as++; + } + else { + *PValue = datafile; + break; + } + } while(0); + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +static void njfv2idx_remove(struct rrdengine_datafile *datafile) { + internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + + int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry"); + + datafile->journalfile->njfv2idx.indexed_as = 0; + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +// ---------------------------------------------------------------------------- + static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) { struct journal_v2_header *j2_header = NULL; - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!journalfile->mmap.data) { journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0); @@ -136,9 +254,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin journalfile->mmap.data = NULL; journalfile->mmap.size = 0; - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); ctx_fs_error(journalfile->datafile->ctx); } @@ -150,9 +268,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin madvise_random(journalfile->mmap.data, journalfile->mmap.size); madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); } } @@ -163,7 +281,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin *data_size = journalfile->mmap.size; } - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return j2_header; } @@ -173,20 +291,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo if(!have_locks) { if(!wait) { - if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock)) + if (!spinlock_trylock(&journalfile->mmap.spinlock)) return false; } else - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!wait) { - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) { - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + if(!spinlock_trylock(&journalfile->v2.spinlock)) { + spinlock_unlock(&journalfile->mmap.spinlock); return false; } } else - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); } if(!journalfile->v2.refcount) { @@ -209,8 +327,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo } if(!have_locks) { - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } return unmounted; @@ -230,7 +348,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) { struct rrdengine_journalfile *journalfile = datafile->journalfile; - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) + if(!spinlock_trylock(&journalfile->v2.spinlock)) continue; bool unmount = false; @@ -244,7 +362,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { // 2 minutes have passed since last use unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if (unmount) journalfile_v2_mounted_data_unmount(journalfile, false, false); @@ -254,7 +372,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { } struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED); @@ -276,7 +394,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(do_we_need_it) return journalfile_v2_mounted_data_get(journalfile, data_size); @@ -285,7 +403,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data"); internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile"); @@ -300,7 +418,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(unmount) journalfile_v2_mounted_data_unmount(journalfile, false, true); @@ -308,25 +426,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); return has_data; } size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); size_t data_size = journalfile->mmap.size; - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return data_size; } void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd"); internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data"); @@ -344,19 +462,23 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, journalfile_v2_mounted_data_unmount(journalfile, true, true); - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + + njfv2idx_add(journalfile->datafile); } static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) { + njfv2idx_remove(journalfile->datafile); + bool has_references = false; do { if (has_references) sleep_usec(10 * USEC_PER_MS); - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) { if(journalfile->mmap.fd != -1) @@ -374,8 +496,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile * internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap..."); } - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } while(has_references); } @@ -384,9 +506,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi { struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile)); journalfile->datafile = datafile; - netdata_spinlock_init(&journalfile->mmap.spinlock); - netdata_spinlock_init(&journalfile->v2.spinlock); - netdata_spinlock_init(&journalfile->unsafe.spinlock); + spinlock_init(&journalfile->mmap.spinlock); + spinlock_init(&journalfile->v2.spinlock); + spinlock_init(&journalfile->unsafe.spinlock); journalfile->mmap.fd = -1; datafile->journalfile = journalfile; return journalfile; |