summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c194
1 files changed, 158 insertions, 36 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 9998ee5403..e95ce23838 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -92,10 +92,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin
io_descr->buf = wal->buf;
io_descr->bytes = wal->buf_size;
- netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ spinlock_lock(&journalfile->unsafe.spinlock);
io_descr->pos = journalfile->unsafe.pos;
journalfile->unsafe.pos += wal->buf_size;
- netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+ spinlock_unlock(&journalfile->unsafe.spinlock);
io_descr->req.data = wal;
io_descr->data = journalfile;
@@ -122,10 +122,128 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str
datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
+// ----------------------------------------------------------------------------
+
+struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) {
+ struct rrdengine_datafile *datafile = NULL;
+
+ rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock);
+
+ Pvoid_t *PValue = NULL;
+
+ if(unlikely(!s->init)) {
+ s->init = true;
+ s->last = s->wanted_start_time_s;
+
+ PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ s->last = 0;
+ PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue)
+ s->last = s->wanted_start_time_s;
+ }
+ }
+
+ while(1) {
+ if (likely(!PValue)) {
+ PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ // cannot find anything after that point
+ datafile = NULL;
+ break;
+ }
+ }
+
+ datafile = *PValue;
+ TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s,
+ datafile->journalfile->v2.last_time_s,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+
+ if(rc == PAGE_IS_IN_RANGE) {
+ // this is good to return
+ break;
+ }
+ else if(rc == PAGE_IS_IN_THE_PAST) {
+ // continue to get the next
+ datafile = NULL;
+ PValue = NULL;
+ continue;
+ }
+ else /* PAGE_IS_IN_THE_FUTURE */ {
+ // we finished - no more datafiles
+ datafile = NULL;
+ PValue = NULL;
+ break;
+ }
+ }
+
+ if(datafile)
+ s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+ else
+ s->j2_header_acquired = NULL;
+
+ rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock);
+
+ return datafile;
+}
+
+static void njfv2idx_add(struct rrdengine_datafile *datafile) {
+ internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+ datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s;
+
+ do {
+ internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed");
+
+ Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ if (!PValue || PValue == PJERR)
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if (unlikely(*PValue)) {
+ // already there
+ datafile->journalfile->njfv2idx.indexed_as++;
+ }
+ else {
+ *PValue = datafile;
+ break;
+ }
+ } while(0);
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+static void njfv2idx_remove(struct rrdengine_datafile *datafile) {
+ internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+
+ int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry");
+
+ datafile->journalfile->njfv2idx.indexed_as = 0;
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+// ----------------------------------------------------------------------------
+
static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
struct journal_v2_header *j2_header = NULL;
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!journalfile->mmap.data) {
journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
@@ -136,9 +254,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
journalfile->mmap.data = NULL;
journalfile->mmap.size = 0;
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
ctx_fs_error(journalfile->datafile->ctx);
}
@@ -150,9 +268,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
madvise_random(journalfile->mmap.data, journalfile->mmap.size);
madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
}
}
@@ -163,7 +281,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
*data_size = journalfile->mmap.size;
}
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return j2_header;
}
@@ -173,20 +291,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
if(!have_locks) {
if(!wait) {
- if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
+ if (!spinlock_trylock(&journalfile->mmap.spinlock))
return false;
}
else
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!wait) {
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ if(!spinlock_trylock(&journalfile->v2.spinlock)) {
+ spinlock_unlock(&journalfile->mmap.spinlock);
return false;
}
}
else
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
}
if(!journalfile->v2.refcount) {
@@ -209,8 +327,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
}
if(!have_locks) {
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
}
return unmounted;
@@ -230,7 +348,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
struct rrdengine_journalfile *journalfile = datafile->journalfile;
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
+ if(!spinlock_trylock(&journalfile->v2.spinlock))
continue;
bool unmount = false;
@@ -244,7 +362,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
// 2 minutes have passed since last use
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if (unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, false);
@@ -254,7 +372,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
}
struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
@@ -276,7 +394,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(do_we_need_it)
return journalfile_v2_mounted_data_get(journalfile, data_size);
@@ -285,7 +403,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
@@ -300,7 +418,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, true);
@@ -308,25 +426,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
return has_data;
}
size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
size_t data_size = journalfile->mmap.size;
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return data_size;
}
void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
@@ -344,19 +462,23 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd,
journalfile_v2_mounted_data_unmount(journalfile, true, true);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
+
+ njfv2idx_add(journalfile->datafile);
}
static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
+ njfv2idx_remove(journalfile->datafile);
+
bool has_references = false;
do {
if (has_references)
sleep_usec(10 * USEC_PER_MS);
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
if(journalfile->mmap.fd != -1)
@@ -374,8 +496,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *
internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
} while(has_references);
}
@@ -384,9 +506,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi
{
struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
journalfile->datafile = datafile;
- netdata_spinlock_init(&journalfile->mmap.spinlock);
- netdata_spinlock_init(&journalfile->v2.spinlock);
- netdata_spinlock_init(&journalfile->unsafe.spinlock);
+ spinlock_init(&journalfile->mmap.spinlock);
+ spinlock_init(&journalfile->v2.spinlock);
+ spinlock_init(&journalfile->unsafe.spinlock);
journalfile->mmap.fd = -1;
datafile->journalfile = journalfile;
return journalfile;