summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/cache.c56
-rw-r--r--database/engine/cache.h2
-rw-r--r--database/engine/datafile.c65
-rw-r--r--database/engine/datafile.h6
-rw-r--r--database/engine/journalfile.c194
-rw-r--r--database/engine/journalfile.h19
-rw-r--r--database/engine/metric.c225
-rw-r--r--database/engine/metric.h23
-rw-r--r--database/engine/pagecache.c82
-rw-r--r--database/engine/pdc.c30
-rw-r--r--database/engine/rrdengine.c68
-rw-r--r--database/engine/rrdengine.h58
-rwxr-xr-xdatabase/engine/rrdengineapi.c18
13 files changed, 534 insertions, 312 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c
index bc3ba6b6af..22f526cb8d 100644
--- a/database/engine/cache.c
+++ b/database/engine/cache.c
@@ -112,8 +112,9 @@ struct pgc {
PGC_CACHE_LINE_PADDING(0);
struct pgc_index {
- netdata_rwlock_t rwlock;
+ RW_SPINLOCK rw_spinlock;
Pvoid_t sections_judy;
+ PGC_CACHE_LINE_PADDING(0);
} *index;
PGC_CACHE_LINE_PADDING(1);
@@ -222,43 +223,40 @@ static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) {
}
static inline void pgc_index_read_lock(PGC *cache, size_t partition) {
- netdata_rwlock_rdlock(&cache->index[partition].rwlock);
+ rw_spinlock_read_lock(&cache->index[partition].rw_spinlock);
}
static inline void pgc_index_read_unlock(PGC *cache, size_t partition) {
- netdata_rwlock_unlock(&cache->index[partition].rwlock);
+ rw_spinlock_read_unlock(&cache->index[partition].rw_spinlock);
}
-//static inline bool pgc_index_write_trylock(PGC *cache, size_t partition) {
-// return !netdata_rwlock_trywrlock(&cache->index[partition].rwlock);
-//}
static inline void pgc_index_write_lock(PGC *cache, size_t partition) {
- netdata_rwlock_wrlock(&cache->index[partition].rwlock);
+ rw_spinlock_write_lock(&cache->index[partition].rw_spinlock);
}
static inline void pgc_index_write_unlock(PGC *cache, size_t partition) {
- netdata_rwlock_unlock(&cache->index[partition].rwlock);
+ rw_spinlock_write_unlock(&cache->index[partition].rw_spinlock);
}
static inline bool pgc_ll_trylock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
- return netdata_spinlock_trylock(&ll->spinlock);
+ return spinlock_trylock(&ll->spinlock);
}
static inline void pgc_ll_lock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
- netdata_spinlock_lock(&ll->spinlock);
+ spinlock_lock(&ll->spinlock);
}
static inline void pgc_ll_unlock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
- netdata_spinlock_unlock(&ll->spinlock);
+ spinlock_unlock(&ll->spinlock);
}
static inline bool page_transition_trylock(PGC *cache __maybe_unused, PGC_PAGE *page) {
- return netdata_spinlock_trylock(&page->transition_spinlock);
+ return spinlock_trylock(&page->transition_spinlock);
}
static inline void page_transition_lock(PGC *cache __maybe_unused, PGC_PAGE *page) {
- netdata_spinlock_lock(&page->transition_spinlock);
+ spinlock_lock(&page->transition_spinlock);
}
static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *page) {
- netdata_spinlock_unlock(&page->transition_spinlock);
+ spinlock_unlock(&page->transition_spinlock);
}
// ----------------------------------------------------------------------------
@@ -267,9 +265,9 @@ static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *p
static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
if(size_to_evict)
- netdata_spinlock_lock(&cache->usage.spinlock);
+ spinlock_lock(&cache->usage.spinlock);
- else if(!netdata_spinlock_trylock(&cache->usage.spinlock))
+ else if(!spinlock_trylock(&cache->usage.spinlock))
return __atomic_load_n(&cache->usage.per1000, __ATOMIC_RELAXED);
size_t current_cache_size;
@@ -319,7 +317,7 @@ static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
__atomic_store_n(&cache->stats.wanted_cache_size, wanted_cache_size, __ATOMIC_RELAXED);
__atomic_store_n(&cache->stats.current_cache_size, current_cache_size, __ATOMIC_RELAXED);
- netdata_spinlock_unlock(&cache->usage.spinlock);
+ spinlock_unlock(&cache->usage.spinlock);
if(size_to_evict) {
size_t target = (size_t)((unsigned long long)wanted_cache_size * (unsigned long long)cache->config.evict_low_threshold_per1000 / 1000ULL);
@@ -422,7 +420,7 @@ static void pgc_section_pages_static_aral_init(void) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
if(unlikely(!pgc_section_pages_aral)) {
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
// we have to check again
if(!pgc_section_pages_aral)
@@ -433,7 +431,7 @@ static void pgc_section_pages_static_aral_init(void) {
65536, NULL,
NULL, NULL, false, false);
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
}
}
@@ -1255,7 +1253,7 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
page->update_every_s = entry->update_every_s,
page->data = entry->data;
page->assumed_size = page_assumed_size(cache, entry->size);
- netdata_spinlock_init(&page->transition_spinlock);
+ spinlock_init(&page->transition_spinlock);
page->link.prev = NULL;
page->link.next = NULL;
@@ -1378,7 +1376,7 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric
Word_t time = start_time_s;
// find the previous page
- page_ptr = JudyLLast(*pages_judy_pptr, &time, PJE0);
+ page_ptr = JudyLPrev(*pages_judy_pptr, &time, PJE0);
if(unlikely(page_ptr == PJERR))
fatal("DBENGINE CACHE: corrupted page in pages judy array #2");
@@ -1779,11 +1777,11 @@ PGC *pgc_create(const char *name,
cache->index = callocz(cache->config.partitions, sizeof(struct pgc_index));
for(size_t part = 0; part < cache->config.partitions ; part++)
- netdata_rwlock_init(&cache->index[part].rwlock);
+ rw_spinlock_init(&cache->index[part].rw_spinlock);
- netdata_spinlock_init(&cache->hot.spinlock);
- netdata_spinlock_init(&cache->dirty.spinlock);
- netdata_spinlock_init(&cache->clean.spinlock);
+ spinlock_init(&cache->hot.spinlock);
+ spinlock_init(&cache->dirty.spinlock);
+ spinlock_init(&cache->clean.spinlock);
cache->hot.flags = PGC_PAGE_HOT;
cache->hot.linked_list_in_sections_judy = true;
@@ -1853,8 +1851,8 @@ void pgc_destroy(PGC *cache) {
else {
pointer_destroy_index(cache);
- for(size_t part = 0; part < cache->config.partitions ; part++)
- netdata_rwlock_destroy(&cache->index[part].rwlock);
+// for(size_t part = 0; part < cache->config.partitions ; part++)
+// netdata_rwlock_destroy(&cache->index[part].rw_spinlock);
#ifdef PGC_WITH_ARAL
for(size_t part = 0; part < cache->config.partitions ; part++)
@@ -2091,7 +2089,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
}
struct section_pages *sp = *section_pages_pptr;
- if(!netdata_spinlock_trylock(&sp->migration_to_v2_spinlock)) {
+ if(!spinlock_trylock(&sp->migration_to_v2_spinlock)) {
info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno);
pgc_ll_unlock(cache, &cache->hot);
return;
@@ -2205,7 +2203,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
pgc_ll_lock(cache, &cache->hot);
}
- netdata_spinlock_unlock(&sp->migration_to_v2_spinlock);
+ spinlock_unlock(&sp->migration_to_v2_spinlock);
pgc_ll_unlock(cache, &cache->hot);
// callback
diff --git a/database/engine/cache.h b/database/engine/cache.h
index 65e6a61379..1486fdc166 100644
--- a/database/engine/cache.h
+++ b/database/engine/cache.h
@@ -31,7 +31,7 @@ typedef struct pgc_entry {
uint8_t *custom_data;
} PGC_ENTRY;
-#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[128]
+#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[64]
struct pgc_queue_statistics {
size_t entries;
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 8c413d8dc9..c74575dfac 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -1,11 +1,15 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock)
{
- uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+ if(!having_lock)
+ uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next);
- uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
+
+ if(!having_lock)
+ uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
@@ -27,9 +31,9 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
datafile->users.available = true;
- netdata_spinlock_init(&datafile->users.spinlock);
- netdata_spinlock_init(&datafile->writers.spinlock);
- netdata_spinlock_init(&datafile->extent_queries.spinlock);
+ spinlock_init(&datafile->users.spinlock);
+ spinlock_init(&datafile->writers.spinlock);
+ spinlock_init(&datafile->extent_queries.spinlock);
return datafile;
}
@@ -37,7 +41,7 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
bool ret;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(df->users.available) {
ret = true;
@@ -47,25 +51,25 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re
else
ret = false;
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
return ret;
}
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired");
df->users.lockers--;
df->users.lockers_by_reason[reason]--;
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
}
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
bool can_be_deleted = false;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
df->users.available = false;
if(!df->users.lockers)
@@ -75,9 +79,9 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
// there are lockers
// evict any pages referencing this in the open cache
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
pgc_open_evict_clean_pages_of_datafile(open_cache, df);
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
can_be_deleted = true;
@@ -86,12 +90,12 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
// there are lockers still
// count the number of pages referencing this in the open cache
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
usec_t time_to_scan_ut = now_monotonic_usec();
size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df);
size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df);
time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
can_be_deleted = true;
@@ -149,7 +153,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
time_to_scan_ut);
}
}
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
return can_be_deleted;
}
@@ -410,11 +414,12 @@ static int scan_data_files(struct rrdengine_instance *ctx)
freez(datafiles);
return 0;
}
- if (matched_files == MAX_DATAFILES) {
+
+ if (matched_files == MAX_DATAFILES)
error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
- }
+
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
- /* TODO: change this when tiering is implemented */
+
ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno;
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
@@ -422,9 +427,9 @@ static int scan_data_files(struct rrdengine_instance *ctx)
datafile = datafiles[i];
ret = load_data_file(datafile);
- if (0 != ret) {
+ if (0 != ret)
must_delete_pair = 1;
- }
+
journalfile = journalfile_alloc_and_init(datafile);
ret = journalfile_load(ctx, journalfile, datafile);
if (0 != ret) {
@@ -432,6 +437,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
close_data_file(datafile);
must_delete_pair = 1;
}
+
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];
@@ -453,8 +459,9 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
- datafile_list_insert(ctx, datafile);
+ datafile_list_insert(ctx, datafile, false);
}
+
matched_files -= failed_to_load;
freez(datafiles);
@@ -462,7 +469,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
/* Creates a datafile and a journalfile pair */
-int create_new_datafile_pair(struct rrdengine_instance *ctx)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock)
{
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED);
@@ -490,7 +497,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
info("DBENGINE: created journal file \"%s\".", path);
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
- datafile_list_insert(ctx, datafile);
+ datafile_list_insert(ctx, datafile, having_lock);
ctx_last_fileno_increment(ctx);
return 0;
@@ -519,7 +526,7 @@ int init_data_files(struct rrdengine_instance *ctx)
} else if (0 == ret) {
info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path);
ctx->atomic.last_fileno = 0;
- ret = create_new_datafile_pair(ctx);
+ ret = create_new_datafile_pair(ctx, false);
if (ret) {
error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path);
return ret;
@@ -527,7 +534,7 @@ int init_data_files(struct rrdengine_instance *ctx)
}
else {
if (ctx->loading.create_new_datafile_pair)
- create_new_datafile_pair(ctx);
+ create_new_datafile_pair(ctx, false);
while(rrdeng_ctx_exceeded_disk_quota(ctx))
datafile_delete(ctx, ctx->datafiles.first, false, false);
@@ -569,11 +576,11 @@ void finalize_data_files(struct rrdengine_instance *ctx)
bool available = false;
do {
uv_rwlock_wrlock(&ctx->datafiles.rwlock);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
if(!available) {
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
if(!logged) {
info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier);
@@ -586,7 +593,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
journalfile_close(journalfile, datafile);
close_data_file(datafile);
datafile_list_delete_unsafe(ctx, datafile);
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
freez(journalfile);
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index a08f3ae048..569f1b0a28 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -21,7 +21,7 @@ struct rrdengine_instance;
#endif
#define MIN_DATAFILE_SIZE (4LU * 1024LU * 1024LU)
-#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define MAX_DATAFILES (65536 * 4) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (50)
typedef enum __attribute__ ((__packed__)) {
@@ -74,14 +74,14 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason);
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df);
-void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock);
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
int close_data_file(struct rrdengine_datafile *datafile);
int unlink_data_file(struct rrdengine_datafile *datafile);
int destroy_data_file_unsafe(struct rrdengine_datafile *datafile);
int create_data_file(struct rrdengine_datafile *datafile);
-int create_new_datafile_pair(struct rrdengine_instance *ctx);
+int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock);
int init_data_files(struct rrdengine_instance *ctx);
void finalize_data_files(struct rrdengine_instance *ctx);
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 9998ee5403..e95ce23838 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -92,10 +92,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin
io_descr->buf = wal->buf;
io_descr->bytes = wal->buf_size;
- netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ spinlock_lock(&journalfile->unsafe.spinlock);
io_descr->pos = journalfile->unsafe.pos;
journalfile->unsafe.pos += wal->buf_size;
- netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+ spinlock_unlock(&journalfile->unsafe.spinlock);
io_descr->req.data = wal;
io_descr->data = journalfile;
@@ -122,10 +122,128 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str
datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
+// ----------------------------------------------------------------------------
+
+struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) {
+ struct rrdengine_datafile *datafile = NULL;
+
+ rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock);
+
+ Pvoid_t *PValue = NULL;
+
+ if(unlikely(!s->init)) {
+ s->init = true;
+ s->last = s->wanted_start_time_s;
+
+ PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ s->last = 0;
+ PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue)
+ s->last = s->wanted_start_time_s;
+ }
+ }
+
+ while(1) {
+ if (likely(!PValue)) {
+ PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ // cannot find anything after that point
+ datafile = NULL;
+ break;
+ }
+ }
+
+ datafile = *PValue;
+ TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s,
+ datafile->journalfile->v2.last_time_s,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+
+ if(rc == PAGE_IS_IN_RANGE) {
+ // this is good to return
+ break;
+ }
+ else if(rc == PAGE_IS_IN_THE_PAST) {
+ // continue to get the next
+ datafile = NULL;
+ PValue = NULL;
+ continue;
+ }
+ else /* PAGE_IS_IN_THE_FUTURE */ {
+ // we finished - no more datafiles
+ datafile = NULL;
+ PValue = NULL;
+ break;
+ }
+ }
+
+ if(datafile)
+ s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+ else
+ s->j2_header_acquired = NULL;
+
+ rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock);
+
+ return datafile;
+}
+
+static void njfv2idx_add(struct rrdengine_datafile *datafile) {
+ internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+ datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s;
+
+ do {
+ internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed");
+
+ Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ if (!PValue || PValue == PJERR)
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if (unlikely(*PValue)) {
+ // already there
+ datafile->journalfile->njfv2idx.indexed_as++;
+ }
+ else {
+ *PValue = datafile;
+ break;
+ }
+ } while(0);
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+static void njfv2idx_remove(struct rrdengine_datafile *datafile) {
+ internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+
+ int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry");
+
+ datafile->journalfile->njfv2idx.indexed_as = 0;
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+// ----------------------------------------------------------------------------
+
static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
struct journal_v2_header *j2_header = NULL;
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!journalfile->mmap.data) {
journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
@@ -136,9 +254,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
journalfile->mmap.data = NULL;
journalfile->mmap.size = 0;
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
ctx_fs_error(journalfile->datafile->ctx);
}
@@ -150,9 +268,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
madvise_random(journalfile->mmap.data, journalfile->mmap.size);
madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
}
}
@@ -163,7 +281,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
*data_size = journalfile->mmap.size;
}
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return j2_header;
}
@@ -173,20 +291,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
if(!have_locks) {
if(!wait) {
- if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
+ if (!spinlock_trylock(&journalfile->mmap.spinlock))
return false;
}
else
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!wait) {
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ if(!spinlock_trylock(&journalfile->v2.spinlock)) {
+ spinlock_unlock(&journalfile->mmap.spinlock);
return false;
}
}
else
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
}
if(!journalfile->v2.refcount) {
@@ -209,8 +327,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
}
if(!have_locks) {
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
}
return unmounted;
@@ -230,7 +348,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
struct rrdengine_journalfile *journalfile = datafile->journalfile;
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
+ if(!spinlock_trylock(&journalfile->v2.spinlock))
continue;
bool unmount = false;
@@ -244,7 +362,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
// 2 minutes have passed since last use
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if (unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, false);
@@ -254,7 +372,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
}
struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
@@ -276,7 +394,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(do_we_need_it)
return journalfile_v2_mounted_data_get(journalfile, data_size);
@@ -285,7 +403,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
@@ -300,7 +418,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, true);
@@ -308,25 +426,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
return has_data;
}
size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
size_t data_size = journalfile->mmap.size;
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return data_size;
}
void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
@@ -344,19 +462,23 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd,
journalfile_v2_mounted_data_unmount(journalfile, true, true);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
+
+ njfv2idx_add(journalfile->datafile);
}
static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
+ njfv2idx_remove(journalfile->datafile);
+
bool has_references = false;
do {
if (has_references)
sleep_usec(10 * USEC_PER_MS);
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
if(journalfile->mmap.fd != -1)
@@ -374,8 +496,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *
internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
} while(has_references);
}
@@ -384,9 +506,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi
{
struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
journalfile->datafile = datafile;
- netdata_spinlock_init(&journalfile->mmap.spinlock);
- netdata_spinlock_init(&journalfile->v2.spinlock);
- netdata_spinlock_init(&journalfile->unsafe.spinlock);
+ spinloc