diff options
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/cache.c | 56 | ||||
-rw-r--r-- | database/engine/cache.h | 2 | ||||
-rw-r--r-- | database/engine/datafile.c | 65 | ||||
-rw-r--r-- | database/engine/datafile.h | 6 | ||||
-rw-r--r-- | database/engine/journalfile.c | 194 | ||||
-rw-r--r-- | database/engine/journalfile.h | 19 | ||||
-rw-r--r-- | database/engine/metric.c | 225 | ||||
-rw-r--r-- | database/engine/metric.h | 23 | ||||
-rw-r--r-- | database/engine/pagecache.c | 82 | ||||
-rw-r--r-- | database/engine/pdc.c | 30 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 68 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 58 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 18 |
13 files changed, 534 insertions, 312 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c index bc3ba6b6af..22f526cb8d 100644 --- a/database/engine/cache.c +++ b/database/engine/cache.c @@ -112,8 +112,9 @@ struct pgc { PGC_CACHE_LINE_PADDING(0); struct pgc_index { - netdata_rwlock_t rwlock; + RW_SPINLOCK rw_spinlock; Pvoid_t sections_judy; + PGC_CACHE_LINE_PADDING(0); } *index; PGC_CACHE_LINE_PADDING(1); @@ -222,43 +223,40 @@ static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) { } static inline void pgc_index_read_lock(PGC *cache, size_t partition) { - netdata_rwlock_rdlock(&cache->index[partition].rwlock); + rw_spinlock_read_lock(&cache->index[partition].rw_spinlock); } static inline void pgc_index_read_unlock(PGC *cache, size_t partition) { - netdata_rwlock_unlock(&cache->index[partition].rwlock); + rw_spinlock_read_unlock(&cache->index[partition].rw_spinlock); } -//static inline bool pgc_index_write_trylock(PGC *cache, size_t partition) { -// return !netdata_rwlock_trywrlock(&cache->index[partition].rwlock); -//} static inline void pgc_index_write_lock(PGC *cache, size_t partition) { - netdata_rwlock_wrlock(&cache->index[partition].rwlock); + rw_spinlock_write_lock(&cache->index[partition].rw_spinlock); } static inline void pgc_index_write_unlock(PGC *cache, size_t partition) { - netdata_rwlock_unlock(&cache->index[partition].rwlock); + rw_spinlock_write_unlock(&cache->index[partition].rw_spinlock); } static inline bool pgc_ll_trylock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) { - return netdata_spinlock_trylock(&ll->spinlock); + return spinlock_trylock(&ll->spinlock); } static inline void pgc_ll_lock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) { - netdata_spinlock_lock(&ll->spinlock); + spinlock_lock(&ll->spinlock); } static inline void pgc_ll_unlock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) { - netdata_spinlock_unlock(&ll->spinlock); + spinlock_unlock(&ll->spinlock); } static inline bool page_transition_trylock(PGC *cache __maybe_unused, PGC_PAGE *page) { - return netdata_spinlock_trylock(&page->transition_spinlock); + return spinlock_trylock(&page->transition_spinlock); } static inline void page_transition_lock(PGC *cache __maybe_unused, PGC_PAGE *page) { - netdata_spinlock_lock(&page->transition_spinlock); + spinlock_lock(&page->transition_spinlock); } static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *page) { - netdata_spinlock_unlock(&page->transition_spinlock); + spinlock_unlock(&page->transition_spinlock); } // ---------------------------------------------------------------------------- @@ -267,9 +265,9 @@ static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *p static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) { if(size_to_evict) - netdata_spinlock_lock(&cache->usage.spinlock); + spinlock_lock(&cache->usage.spinlock); - else if(!netdata_spinlock_trylock(&cache->usage.spinlock)) + else if(!spinlock_trylock(&cache->usage.spinlock)) return __atomic_load_n(&cache->usage.per1000, __ATOMIC_RELAXED); size_t current_cache_size; @@ -319,7 +317,7 @@ static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) { __atomic_store_n(&cache->stats.wanted_cache_size, wanted_cache_size, __ATOMIC_RELAXED); __atomic_store_n(&cache->stats.current_cache_size, current_cache_size, __ATOMIC_RELAXED); - netdata_spinlock_unlock(&cache->usage.spinlock); + spinlock_unlock(&cache->usage.spinlock); if(size_to_evict) { size_t target = (size_t)((unsigned long long)wanted_cache_size * (unsigned long long)cache->config.evict_low_threshold_per1000 / 1000ULL); @@ -422,7 +420,7 @@ static void pgc_section_pages_static_aral_init(void) { static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; if(unlikely(!pgc_section_pages_aral)) { - netdata_spinlock_lock(&spinlock); + spinlock_lock(&spinlock); // we have to check again if(!pgc_section_pages_aral) @@ -433,7 +431,7 @@ static void pgc_section_pages_static_aral_init(void) { 65536, NULL, NULL, NULL, false, false); - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); } } @@ -1255,7 +1253,7 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) { page->update_every_s = entry->update_every_s, page->data = entry->data; page->assumed_size = page_assumed_size(cache, entry->size); - netdata_spinlock_init(&page->transition_spinlock); + spinlock_init(&page->transition_spinlock); page->link.prev = NULL; page->link.next = NULL; @@ -1378,7 +1376,7 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric Word_t time = start_time_s; // find the previous page - page_ptr = JudyLLast(*pages_judy_pptr, &time, PJE0); + page_ptr = JudyLPrev(*pages_judy_pptr, &time, PJE0); if(unlikely(page_ptr == PJERR)) fatal("DBENGINE CACHE: corrupted page in pages judy array #2"); @@ -1779,11 +1777,11 @@ PGC *pgc_create(const char *name, cache->index = callocz(cache->config.partitions, sizeof(struct pgc_index)); for(size_t part = 0; part < cache->config.partitions ; part++) - netdata_rwlock_init(&cache->index[part].rwlock); + rw_spinlock_init(&cache->index[part].rw_spinlock); - netdata_spinlock_init(&cache->hot.spinlock); - netdata_spinlock_init(&cache->dirty.spinlock); - netdata_spinlock_init(&cache->clean.spinlock); + spinlock_init(&cache->hot.spinlock); + spinlock_init(&cache->dirty.spinlock); + spinlock_init(&cache->clean.spinlock); cache->hot.flags = PGC_PAGE_HOT; cache->hot.linked_list_in_sections_judy = true; @@ -1853,8 +1851,8 @@ void pgc_destroy(PGC *cache) { else { pointer_destroy_index(cache); - for(size_t part = 0; part < cache->config.partitions ; part++) - netdata_rwlock_destroy(&cache->index[part].rwlock); +// for(size_t part = 0; part < cache->config.partitions ; part++) +// netdata_rwlock_destroy(&cache->index[part].rw_spinlock); #ifdef PGC_WITH_ARAL for(size_t part = 0; part < cache->config.partitions ; part++) @@ -2091,7 +2089,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ } struct section_pages *sp = *section_pages_pptr; - if(!netdata_spinlock_trylock(&sp->migration_to_v2_spinlock)) { + if(!spinlock_trylock(&sp->migration_to_v2_spinlock)) { info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno); pgc_ll_unlock(cache, &cache->hot); return; @@ -2205,7 +2203,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ pgc_ll_lock(cache, &cache->hot); } - netdata_spinlock_unlock(&sp->migration_to_v2_spinlock); + spinlock_unlock(&sp->migration_to_v2_spinlock); pgc_ll_unlock(cache, &cache->hot); // callback diff --git a/database/engine/cache.h b/database/engine/cache.h index 65e6a61379..1486fdc166 100644 --- a/database/engine/cache.h +++ b/database/engine/cache.h @@ -31,7 +31,7 @@ typedef struct pgc_entry { uint8_t *custom_data; } PGC_ENTRY; -#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[128] +#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[64] struct pgc_queue_statistics { size_t entries; diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 8c413d8dc9..c74575dfac 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -1,11 +1,15 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock) { - uv_rwlock_wrlock(&ctx->datafiles.rwlock); + if(!having_lock) + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + + if(!having_lock) + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); } void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) @@ -27,9 +31,9 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta datafile->users.available = true; - netdata_spinlock_init(&datafile->users.spinlock); - netdata_spinlock_init(&datafile->writers.spinlock); - netdata_spinlock_init(&datafile->extent_queries.spinlock); + spinlock_init(&datafile->users.spinlock); + spinlock_init(&datafile->writers.spinlock); + spinlock_init(&datafile->extent_queries.spinlock); return datafile; } @@ -37,7 +41,7 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { bool ret; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(df->users.available) { ret = true; @@ -47,25 +51,25 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re else ret = false; - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); return ret; } void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired"); df->users.lockers--; df->users.lockers_by_reason[reason]--; - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); } bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { bool can_be_deleted = false; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); df->users.available = false; if(!df->users.lockers) @@ -75,9 +79,9 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { // there are lockers // evict any pages referencing this in the open cache - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); pgc_open_evict_clean_pages_of_datafile(open_cache, df); - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) can_be_deleted = true; @@ -86,12 +90,12 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { // there are lockers still // count the number of pages referencing this in the open cache - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); usec_t time_to_scan_ut = now_monotonic_usec(); size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) can_be_deleted = true; @@ -149,7 +153,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { time_to_scan_ut); } } - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); return can_be_deleted; } @@ -410,11 +414,12 @@ static int scan_data_files(struct rrdengine_instance *ctx) freez(datafiles); return 0; } - if (matched_files == MAX_DATAFILES) { + + if (matched_files == MAX_DATAFILES) error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); - } + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); - /* TODO: change this when tiering is implemented */ + ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { @@ -422,9 +427,9 @@ static int scan_data_files(struct rrdengine_instance *ctx) datafile = datafiles[i]; ret = load_data_file(datafile); - if (0 != ret) { + if (0 != ret) must_delete_pair = 1; - } + journalfile = journalfile_alloc_and_init(datafile); ret = journalfile_load(ctx, journalfile, datafile); if (0 != ret) { @@ -432,6 +437,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) close_data_file(datafile); must_delete_pair = 1; } + if (must_delete_pair) { char path[RRDENG_PATH_MAX]; @@ -453,8 +459,9 @@ static int scan_data_files(struct rrdengine_instance *ctx) } ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile); + datafile_list_insert(ctx, datafile, false); } + matched_files -= failed_to_load; freez(datafiles); @@ -462,7 +469,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) } /* Creates a datafile and a journalfile pair */ -int create_new_datafile_pair(struct rrdengine_instance *ctx) +int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED); @@ -490,7 +497,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx) info("DBENGINE: created journal file \"%s\".", path); ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile); + datafile_list_insert(ctx, datafile, having_lock); ctx_last_fileno_increment(ctx); return 0; @@ -519,7 +526,7 @@ int init_data_files(struct rrdengine_instance *ctx) } else if (0 == ret) { info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); ctx->atomic.last_fileno = 0; - ret = create_new_datafile_pair(ctx); + ret = create_new_datafile_pair(ctx, false); if (ret) { error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); return ret; @@ -527,7 +534,7 @@ int init_data_files(struct rrdengine_instance *ctx) } else { if (ctx->loading.create_new_datafile_pair) - create_new_datafile_pair(ctx); + create_new_datafile_pair(ctx, false); while(rrdeng_ctx_exceeded_disk_quota(ctx)) datafile_delete(ctx, ctx->datafiles.first, false, false); @@ -569,11 +576,11 @@ void finalize_data_files(struct rrdengine_instance *ctx) bool available = false; do { uv_rwlock_wrlock(&ctx->datafiles.rwlock); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; if(!available) { - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_wrunlock(&ctx->datafiles.rwlock); if(!logged) { info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); @@ -586,7 +593,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) journalfile_close(journalfile, datafile); close_data_file(datafile); datafile_list_delete_unsafe(ctx, datafile); - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_wrunlock(&ctx->datafiles.rwlock); freez(journalfile); diff --git a/database/engine/datafile.h b/database/engine/datafile.h index a08f3ae048..569f1b0a28 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -21,7 +21,7 @@ struct rrdengine_instance; #endif #define MIN_DATAFILE_SIZE (4LU * 1024LU * 1024LU) -#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ +#define MAX_DATAFILES (65536 * 4) /* Supports up to 64TiB for now */ #define TARGET_DATAFILES (50) typedef enum __attribute__ ((__packed__)) { @@ -74,14 +74,14 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason); bool datafile_acquire_for_deletion(struct rrdengine_datafile *df); -void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock); void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); int close_data_file(struct rrdengine_datafile *datafile); int unlink_data_file(struct rrdengine_datafile *datafile); int destroy_data_file_unsafe(struct rrdengine_datafile *datafile); int create_data_file(struct rrdengine_datafile *datafile); -int create_new_datafile_pair(struct rrdengine_instance *ctx); +int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock); int init_data_files(struct rrdengine_instance *ctx); void finalize_data_files(struct rrdengine_instance *ctx); diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 9998ee5403..e95ce23838 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -92,10 +92,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin io_descr->buf = wal->buf; io_descr->bytes = wal->buf_size; - netdata_spinlock_lock(&journalfile->unsafe.spinlock); + spinlock_lock(&journalfile->unsafe.spinlock); io_descr->pos = journalfile->unsafe.pos; journalfile->unsafe.pos += wal->buf_size; - netdata_spinlock_unlock(&journalfile->unsafe.spinlock); + spinlock_unlock(&journalfile->unsafe.spinlock); io_descr->req.data = wal; io_descr->data = journalfile; @@ -122,10 +122,128 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } +// ---------------------------------------------------------------------------- + +struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) { + struct rrdengine_datafile *datafile = NULL; + + rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock); + + Pvoid_t *PValue = NULL; + + if(unlikely(!s->init)) { + s->init = true; + s->last = s->wanted_start_time_s; + + PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + s->last = 0; + PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) + s->last = s->wanted_start_time_s; + } + } + + while(1) { + if (likely(!PValue)) { + PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + // cannot find anything after that point + datafile = NULL; + break; + } + } + + datafile = *PValue; + TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s, + datafile->journalfile->v2.last_time_s, + s->wanted_start_time_s, + s->wanted_end_time_s); + + if(rc == PAGE_IS_IN_RANGE) { + // this is good to return + break; + } + else if(rc == PAGE_IS_IN_THE_PAST) { + // continue to get the next + datafile = NULL; + PValue = NULL; + continue; + } + else /* PAGE_IS_IN_THE_FUTURE */ { + // we finished - no more datafiles + datafile = NULL; + PValue = NULL; + break; + } + } + + if(datafile) + s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL, + s->wanted_start_time_s, + s->wanted_end_time_s); + else + s->j2_header_acquired = NULL; + + rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock); + + return datafile; +} + +static void njfv2idx_add(struct rrdengine_datafile *datafile) { + internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s; + + do { + internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed"); + + Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if (unlikely(*PValue)) { + // already there + datafile->journalfile->njfv2idx.indexed_as++; + } + else { + *PValue = datafile; + break; + } + } while(0); + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +static void njfv2idx_remove(struct rrdengine_datafile *datafile) { + internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + + int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry"); + + datafile->journalfile->njfv2idx.indexed_as = 0; + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +// ---------------------------------------------------------------------------- + static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) { struct journal_v2_header *j2_header = NULL; - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!journalfile->mmap.data) { journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0); @@ -136,9 +254,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin journalfile->mmap.data = NULL; journalfile->mmap.size = 0; - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); ctx_fs_error(journalfile->datafile->ctx); } @@ -150,9 +268,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin madvise_random(journalfile->mmap.data, journalfile->mmap.size); madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); } } @@ -163,7 +281,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin *data_size = journalfile->mmap.size; } - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return j2_header; } @@ -173,20 +291,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo if(!have_locks) { if(!wait) { - if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock)) + if (!spinlock_trylock(&journalfile->mmap.spinlock)) return false; } else - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!wait) { - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) { - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + if(!spinlock_trylock(&journalfile->v2.spinlock)) { + spinlock_unlock(&journalfile->mmap.spinlock); return false; } } else - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); } if(!journalfile->v2.refcount) { @@ -209,8 +327,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo } if(!have_locks) { - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } return unmounted; @@ -230,7 +348,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) { struct rrdengine_journalfile *journalfile = datafile->journalfile; - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) + if(!spinlock_trylock(&journalfile->v2.spinlock)) continue; bool unmount = false; @@ -244,7 +362,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { // 2 minutes have passed since last use unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if (unmount) journalfile_v2_mounted_data_unmount(journalfile, false, false); @@ -254,7 +372,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { } struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED); @@ -276,7 +394,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(do_we_need_it) return journalfile_v2_mounted_data_get(journalfile, data_size); @@ -285,7 +403,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data"); internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile"); @@ -300,7 +418,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(unmount) journalfile_v2_mounted_data_unmount(journalfile, false, true); @@ -308,25 +426,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); return has_data; } size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); size_t data_size = journalfile->mmap.size; - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return data_size; } void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd"); internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data"); @@ -344,19 +462,23 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, journalfile_v2_mounted_data_unmount(journalfile, true, true); - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + + njfv2idx_add(journalfile->datafile); } static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) { + njfv2idx_remove(journalfile->datafile); + bool has_references = false; do { if (has_references) sleep_usec(10 * USEC_PER_MS); - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) { if(journalfile->mmap.fd != -1) @@ -374,8 +496,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile * internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap..."); } - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } while(has_references); } @@ -384,9 +506,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi { struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile)); journalfile->datafile = datafile; - netdata_spinlock_init(&journalfile->mmap.spinlock); - netdata_spinlock_init(&journalfile->v2.spinlock); - netdata_spinlock_init(&journalfile->unsafe.spinlock); + spinloc |