summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-15 23:49:42 +0200
committerGitHub <noreply@github.com>2022-11-15 23:49:42 +0200
commitbecd97a3660af34104c557ba6c2877f624143c2e (patch)
tree14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /database/engine/journalfile.c
parent1789d07c43182152437459a7a4f81267bbdd752c (diff)
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c1126
1 files changed, 34 insertions, 1092 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 037603b9b6..500dd78800 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,25 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc)
-{
- struct rrdeng_work *work_request;
-
- if (unlikely(wc->running_journal_migration))
- return;
-
- work_request = mallocz(sizeof(*work_request));
- work_request->req.data = work_request;
- work_request->wc = wc;
- work_request->count = 0;
- work_request->rerun = false;
- wc->running_journal_migration = 1;
- if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) {
- freez(work_request);
- wc->running_journal_migration = 0;
- }
-}
-
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
struct generic_io_descriptor *io_descr = req->data;
@@ -66,7 +47,6 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
io_descr->bytes = size;
io_descr->pos = journalfile->pos;
io_descr->req.data = io_descr;
- io_descr->data = journalfile;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
@@ -113,12 +93,6 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
-{
- (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
-}
-
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
@@ -130,49 +104,26 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->file = (uv_file)0;
journalfile->pos = 0;
journalfile->datafile = datafile;
- journalfile->journal_data = NULL;
- journalfile->journal_data_size = 0;
- journalfile->JudyL_array = (Pvoid_t) NULL;
- journalfile->data = NULL;
- journalfile->file_index = 0;
- journalfile->last_access = 0;
}
-static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- uv_fs_t req;
- ret = uv_fs_close(NULL, &req, file, NULL);
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- generate_journalfilepath(datafile, path, sizeof(path));
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++datafile->ctx->stats.fs_errors;
+ ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
- return ret;
-}
-
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
-{
- struct rrdengine_instance *ctx = datafile->ctx;
- char path[RRDENG_PATH_MAX];
-
- if (likely(journalfile->journal_data)) {
- if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
- generate_journalfilepath_v2(datafile, path, sizeof(path));
- error("Failed to unmap journal index file for %s", path);
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- }
- journalfile->journal_data = NULL;
- journalfile->journal_data_size = 0;
- return 0;
- }
- return close_uv_file(datafile, journalfile->file);
+ return ret;
}
int unlink_journal_file(struct rrdengine_journalfile *journalfile)
@@ -198,16 +149,14 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
return ret;
}
-int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- char path_v2[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- generate_journalfilepath_v2(datafile, path_v2, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
@@ -217,12 +166,9 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
}
uv_fs_req_cleanup(&req);
- (void) close_uv_file(datafile, journalfile->file);
-
- // This is the new journal v2 index file
- ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
@@ -237,13 +183,6 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- ++ctx->stats.journalfile_deletions;
-
- if (journalfile->journal_data) {
- if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
- error("Failed to unmap index file %s", path_v2);
- }
- }
return ret;
}
@@ -288,7 +227,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file_unsafe(journalfile, datafile);
+ destroy_journal_file(journalfile, datafile);
return ret;
}
@@ -427,67 +366,36 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
+ *PValue = page_index = create_page_index(temp_id, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
- // Lookup this descriptor
- Word_t p_time = start_time_ut / USEC_PER_SEC;
- uv_rwlock_rdlock(&page_index->lock);
- PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0);
- descr = (NULL == PValue) ? NULL: *PValue;
- uv_rwlock_rdunlock(&page_index->lock);
-
- bool descr_found = false;
- if (unlikely(descr && descr->start_time_ut == start_time_ut)) {
- // We have this descriptor already
- descr_found = true;
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(page_index->id, uuid_str);
- internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
- uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, descr->extent->datafile->fileno,
- descr->extent->offset, descr->extent->size);
- internal_error(true, "APPLYING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
- uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, extent->datafile->fileno,
- extent->offset, extent->size);
-#endif
- // Remove entry from previous extent
- unlink_descriptor_extent_unsafe(descr);
- internal_error(true, "REMOVING UUID %s with %lu OK", uuid_str, start_time_ut);
- }
- else {
- descr = pg_cache_create_descr();
- descr->id = &page_index->id;
- }
-
+ descr = pg_cache_create_descr();
descr->page_length = jf_metric_data->descr[i].page_length;
descr->start_time_ut = start_time_ut;
descr->end_time_ut = end_time_ut;
descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
-
+ descr->id = &page_index->id;
descr->extent = extent;
descr->type = page_type;
extent->pages[valid_pages++] = descr;
- if (likely(!descr_found))
- (void)pg_cache_insert(ctx, page_index, descr, true);
+ pg_cache_insert(ctx, page_index, descr);
- if (page_index->latest_time_ut == descr->end_time_ut)
+ if(page_index->latest_time_ut == descr->end_time_ut)
page_index->latest_update_every_s = descr->update_every_s;
if(descr->update_every_s == 0)
- fatal("DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
+ fatal(
+ "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
(unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
}
extent->number_of_pages = valid_pages;
- if (likely(valid_pages)) {
+ if (likely(valid_pages))
df_extent_insert(extent);
- }
else {
freez(extent);
ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
@@ -536,13 +444,13 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
return size_bytes;
}
switch (jf_header->type) {
- case STORE_DATA:
- debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
- restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
- break;
- default:
- error("Unknown transaction type. Skipping record.");
- break;
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
}
return size_bytes;
@@ -610,905 +518,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
if (likely(journal_is_mmapped))
buf += size_bytes;
}
- skip_file:
+skip_file:
if (unlikely(!journal_is_mmapped))
posix_memfree(buf);
return max_id;
}
-bool unlink_descriptor_extent_unsafe(struct rrdeng_page_descr *descr)
-{
- if (unlikely(!descr || !descr->extent))
- return true;
-
- struct extent_info *extent = descr->extent;
- for (uint8_t index = 0; index < extent->number_of_pages; index++) {
- if (extent->pages[index] == descr) {
- extent->pages[index] = NULL;
- return true;
- }
- }
- return false;
-}
-
-// Checks that the extent list checksum is valid
-static int check_journal_v2_extent_list (void *data_start, size_t file_size)
-{
- UNUSED(file_size);
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
- if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("Extent list CRC32 check: FAILED");
- return 1;
- }
-
- return 0;
-}
-
-// Checks that the metric list (UUIDs) checksum is valid
-static int check_journal_v2_metric_list(void *data_start, size_t file_size)
-{
- UNUSED(file_size);
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
- if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("Metric list CRC32 check: FAILED");
- return 1;
- }
- return 0;
-}
-
-static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size)
-{
- int rc;
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
- return 2;
-
- // Magic failure
- if (j2_header->magic != JOURVAL_V2_MAGIC)
- return 1;
-
- if (j2_header->total_file_size != file_size)
- return 1;
-
- if (original_size && j2_header->original_file_size != original_size)
- return 1;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + file_size - sizeof(*journal_v2_trailer));
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
-
- rc = crc32cmp(journal_v2_trailer->checksum, crc);
- if (unlikely(rc)) {
- error("File CRC32 check: FAILED");
- return 1;
- }
-
- rc = check_journal_v2_extent_list(data_start, file_size);
- if (rc) return 1;
-
- rc = check_journal_v2_metric_list(data_start, file_size);
- if (rc) return 1;
-
- if (!db_engine_journal_check)
- return 0;
-
- // Verify complete UUID chain
-
- struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
-
- unsigned verified = 0;
- unsigned entries;
- unsigned total_pages = 0;
-
- info("Checking %u metrics that exist in the journal", j2_header->metric_count);
- for (entries = 0; entries < j2_header->metric_count; entries++) {
-
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(metric->uuid, uuid_str);
- struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
- struct journal_page_header local_metric_list_header = *metric_list_header;
-
- local_metric_list_header.crc = JOURVAL_V2_MAGIC;
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header));
- rc = crc32cmp(metric_list_header->checksum, crc);
-
- internal_error(true, "Index %u : %s entries %u at offset %u (%llu -- %llu) verified, HEADER CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
- j2_header->start_time_ut + (usec_t) metric->delta_start * USEC_PER_SEC, j2_header->start_time_ut + (usec_t) metric->delta_end * USEC_PER_SEC,
- crc, metric_list_header->crc);
- if (!rc) {
- struct journal_v2_block_trailer *journal_trailer =
- (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list));
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list));
- rc = crc32cmp(journal_trailer->checksum, crc);
- internal_error(rc, "Index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
- crc, metric_list_header->crc);
- if (!rc) {
- total_pages += metric_list_header->entries;
- verified++;
- }
- }
-
- metric++;
- if (((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) file_size) {
- info("Verification failed EOF reached -- total entries %u, verified %u", entries, verified);
- return 1;
- }
- }
-
- if (entries != verified) {
- info("Verification failed -- total entries %u, verified %u", entries, verified);
- return 1;
- }
- info("Verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
-
- return 0;
-}
-
-int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct pg_cache_page_index *page_index;
- int ret, fd;
- uint64_t file_size;
- char path[RRDENG_PATH_MAX];
- struct stat statbuf;
- uint32_t original_file_size = 0;
-
- generate_journalfilepath(datafile, path, sizeof(path));
- ret = stat(path, &statbuf);
- if (!ret)
- original_file_size = (uint32_t)statbuf.st_size;
-
- generate_journalfilepath_v2(datafile, path, sizeof(path));
-
- fd = open(path, O_RDONLY);
- if (fd < 0) {
- if (errno == ENOENT)
- return 1;
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- error("Failed to open %s", path);
- return 1;
- }
-
- ret = fstat(fd, &statbuf);
- if (ret) {
- error("Failed to get file information for %s", path);
- close(fd);
- return 1;
- }
-
- file_size = (size_t)statbuf.st_size;
-
- if (file_size < sizeof(struct journal_v2_header)) {
- error_report("Invalid file %s. Not the expected size", path);
- close(fd);
- return 1;
- }
-
- usec_t start_loading = now_realtime_usec();
- uint8_t *data_start = mmap(NULL, file_size, PROT_READ, MAP_SHARED, fd, 0);
- if (data_start == MAP_FAILED) {
- close(fd);
- return 1;
- }
- close(fd);
-
- info("Checking integrity of %s", path);
- int rc = check_journal_v2_file(data_start, file_size, original_file_size);
- if (rc) {
- if (rc == 2)
- error_report("File %s needs to be rebuilt", path);
- else
- error_report("File %s is invalid", path);
- if (unlikely(munmap(data_start, file_size)))
- error("Failed to unmap %s", path);
- return 1;
- }
-
- struct journal_v2_header *j2_header = (void *) data_start;
-
- size_t entries = j2_header->metric_count;
-
- if (!entries) {
- if (unlikely(munmap(data_start, file_size)))
- error("Failed to unmap %s", path);
- return 1;
- }
-
- rc = madvise(data_start, file_size, MADV_DONTFORK);
- if (rc)
- error("MADV_DONTFORK: setting failed");
-
- rc = madvise(data_start, file_size, MADV_DONTDUMP);
- if (rc)
- error("MADV_DONTDUMP: setting failed");
-
- struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
-
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
-
- // Initialize the journal file to be able to access the data
- journalfile->journal_data = data_start;
- journalfile->journal_data_size = file_size;
-
- usec_t header_start_time = j2_header->start_time_ut;
- for (size_t i=0; i < entries; i++) {
- Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, metric->uuid, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- else {
- PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, metric->uuid, sizeof(uuid_t), PJE0);
- fatal_assert(NULL == *PValue);
- *PValue = page_index = create_page_index(&metric->uuid, ctx);
- page_index->oldest_time_ut = LLONG_MAX;
- page_index->latest_time_ut = 0;
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- }
-
- usec_t metric_start_ut = header_start_time + (usec_t ) metric->delta_start * USEC_PER_SEC;
- usec_t metric_end_ut = header_start_time + (usec_t ) metric->delta_end * USEC_PER_SEC;
-
- if (page_index->oldest_time_ut > metric_start_ut)
- page_index->oldest_time_ut = metric_start_ut;
-
- if (page_index->latest_time_ut < metric_end_ut)
- page_index->latest_time_ut = metric_end_ut;
-
- ++page_index->page_count;
- ++pg_cache->page_descriptors;
- metric++;
- }
-
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
-
- info("Journal file \"%s\" loaded (size:%"PRIu64") with %lu metrics in %d ms",
- path, file_size, entries,
- (int) ((now_realtime_usec() - start_loading) / USEC_PER_MS));
- return 0;
-}
-
-
-struct metric_info_s {
- uuid_t *id;
- struct pg_cache_page_index *page_index;
- uint32_t entries;
- time_t min_index_time_s;
- time_t max_index_time_s;
- usec_t min_time_ut;
- usec_t max_time_ut;
- uint32_t page_list_header;
- Pvoid_t JudyL_array;
-};
-
-struct journal_metric_list_to_sort {
- struct metric_info_s *metric_info;
-};
-
-static int journal_metric_compare (const void *item1, const void *item2)
-{
- const struct metric_info_s *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
- const struct metric_info_s *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
-
- return uuid_compare(*(metric1->id), *(metric2->id));
-}
-
-
-// Write list of extents for the journalfile
-void *journal_v2_write_extent_list(struct rrdengine_journalfile *journalfile, void *data)
-{
- struct extent_info *extent = journalfile->datafile->extents.first;
- struct journal_extent_list *j2_extent = (void *) data;
- while (extent) {
- j2_extent->datafile_offset = extent->offset;
- j2_extent->datafile_size = extent->size;
- j2_extent->pages = extent->number_of_pages;
- j2_extent->file_index = journalfile->file_index;
- j2_extent++;
- extent = extent->next;
- };
- return j2_extent;
-}
-
-static int verify_journal_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
-{
- if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->total_file_size - sizeof(struct journal_v2_block_trailer)))
- return 1;
-
- return 0;
-}
-
-void *journal_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct metric_info_s *metric_info, uint32_t pages_offset)
-{
- struct journal_metric_list *metric = (void *) data;
-
- if (verify_journal_space(j2_header, data, sizeof(*metric)))
- return NULL;
-
- uuid_copy(metric->uuid, *metric_info->id);
- metric->entries = metric_info->entries;
- metric->page_offset = pages_offset;
- metric->delta_start = (metric_info->min_time_ut - j2_header->start_time_ut) / USEC_PER_SEC;
- metric->delta_end = (metric_info->max_time_ut - j2_header->start_time_ut) / USEC_PER_SEC;
-
- return ++metric;
-}
-
-void *journal_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct metric_info_s *metric_info, uint32_t uuid_offset)
-{
- struct journal_page_header *data_page_header = (void *) data;
- uLong crc;
-
- uuid_copy(data_page_header->uuid, *metric_info->id);
- data_page_header->entries = metric_info->entries;
- data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory
- data_page_header->crc = JOURVAL_V2_MAGIC;
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header));
- crc32set(data_page_header->checksum, crc);
- return ++data_page_header;
-}
-
-void *journal_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
-{
- struct journal_page_header *data_page_header = (void *) page_header;
- struct journal_v2_block_trailer *journal_trailer = (void *) data;
- uLong crc;
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list));
- crc32set(journal_trailer->checksum, crc);
- return ++journal_trailer;
-}
-
-void *journal_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct rrdeng_page_descr *descr)
-{
- if (unlikely(!descr))
- return data;
-
- struct journal_page_list *data_page = data;
-
- // verify that we can write number of bytes
- if (verify_journal_space(j2_header, data, sizeof(*data_page)))
- return NULL;
-
- fatal_assert(descr->extent != NULL);
-
- uint32_t extent_index = unlikely(NULL == descr->extent) ? UINT32_MAX : descr->extent->index;
-
- data_page->delta_start_s = (descr->start_time_ut - j2_header->start_time_ut) / USEC_PER_SEC;
- data_page->delta_end_s = (descr->end_time_ut - j2_header->start_time_ut) / USEC_PER_SEC;
- data_page->extent_index = extent_index;
- data_page->update_every_s = (uint16_t) descr->update_every_s;
- data_page->page_length = descr->page_length;
- data_page->type = descr->type;
-
- // Rebuild on start to resolve unknown entry
- if (unlikely(UINT32_MAX == extent_index))
- j2_header->magic = JOURVAL_V2_REBUILD_MAGIC;
-
- return ++data_page;
-}
-
-// For a page_index write all descr @ time entries
-// Must be recorded in metric_info->entries
-void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct metric_info_s *metric_info, struct rrdengine_journalfile *journalfile)
-{
- struct rrdeng_page_descr *descr;
- Pvoid_t *PValue;
-
- struct journal_page_list *data_page = (void *)data;
- struct page_cache *pg_cache = &journalfile->datafile->ctx->pg_cache;
- struct pg_cache_page_index *page_index;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, metric_info->id, sizeof(uuid_t));
- page_index = (NULL == PValue) ? NULL : *PValue;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
-
- if (page_index == NULL)
- return data_page;
-
- // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s
- // that belong to this journal file
-
- Word_t index_time = metric_info->min_index_time_s;
- unsigned entries = 0;
-
- uv_rwlock_rdlock(&page_index->lock);
-
- Pvoid_t JudyL_array = metric_info->JudyL_array ? metric_info->JudyL_array : page_index->JudyL_array;
-
- // Need page_index lock if running live
- for (PValue = JudyLFirst(JudyL_array, &index_time, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue;
- descr != NULL;
- PValue = JudyLNext(JudyL_array, &index_time, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
-
- if (unlikely((time_t) index_time > metric_info->max_index_time_s) || entries == metric_info->entries)
- break;
-
- // Write one descriptor and return the next data page location
- data_page = journal_v2_write_data_page(j2_header, (void *)data_page, descr);
-
- entries++;
- if (unlikely(!data_page))
- break;
- }
-
- uv_rwlock_rdunlock(&page_index->lock);
- return data_page;
-}
-
-static void journal_v2_remove_active_descriptors(struct rrdengine_journalfile *journalfile, struct metric_info_s *metric_info, bool startup)
-{
- if (true == startup) {
- // This is during startup, so we are the only ones accessing the structures
- // thats why we can safely remote the entire page_index->JudyL_array
- struct rrdeng_page_descr *descr;
- Word_t index_time;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
-
- page_index = metric_info->page_index;
-
- for (index_time = 0, PValue = JudyLFirst(page_index->JudyL_array, &index_time, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue; descr != NULL;
- PValue = JudyLNext(page_index->JudyL_array, &index_time, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
-
- rrdeng_page_descr_freez(descr);
- }
- (void)JudyLFreeArray(&page_index->JudyL_array, PJE0);
- }
- else {
- // This is during runtime
- struct rrdeng_page_descr *descr;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = metric_info->page_index;
- struct page_cache *pg_cache = &page_index->ctx->pg_cache;
- struct rrdengine_instance *ctx = page_index->ctx;
-
- Word_t index_time = metric_info->min_index_time_s;
- uint32_t metric_info_offset = metric_info->page_list_header;
-
- struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) journalfile->journal_data + metric_info_offset);
- struct journal_v2_header *journal_header = (struct journal_v2_header *) journalfile->journal_data;
- // Sanity check that we refer to the same UUID
- fatal_assert(uuid_compare(page_list_header->uuid, *metric_info->id) == 0);
-
- struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header));
- struct journal_extent_list *extent_list = (void *)((uint8_t *)journal_header + journal_header->extent_offset);
-
- uint32_t index = 0;
- uint32_t entries = page_list_header->entries;
-
- uv_rwlock_rdlock(&page_index->lock);
-
- bool mark_journalfile_for_expiration_check = false;
- for (PValue = JudyLFirst(metric_info->JudyL_array, &index_time, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue;
- descr != NULL;
- PValue = JudyLNext(metric_info->JudyL_array, &index_time, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
-
- if (unlikely((time_t) index_time > metric_info->max_index_time_s) || index == entries)
- break;
-
- if (descr->extent_entry || (!descr->extent_entry && descr->extent && descr->extent->datafile->journalfile != journalfile))
- continue;
-
- struct journal_page_list *page_entry = &page_list[index++];
-
- if (likely(page_entry->extent_index != UINT32_MAX)) {
-
- fatal_assert(descr->extent->offset == extent_list[page_entry->extent_index].datafile_offset);
- fatal_assert(descr->extent->size == extent_list[page_entry->extent_index].datafile_size);
-
- rrdeng_page_descr_mutex_lock(ctx, descr);
- while (!pg_cache_try_get_unsafe(descr, 1)) {
- pg_cache_wait_event_unsafe(descr);
- }
-
- descr->extent_entry = &extent_list[page_entry->extent_index];
- descr->extent = NULL;
- descr->file = journalfile->datafile->file;
- ++pg_cache->active_descriptors;
- pg_cache_put_unsafe(descr);
- rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- mark_journalfile_for_expiration_check = true;
- }
- }
-
- if (mark_journalfile_for_expiration_check) {
- uint32_t page_offset = (uint8_t *)page_list_header - (uint8_t *)journalfile->journal_data;
- mark_journalfile_descriptor(pg_cache, journalfile, page_offset, 1);
- }
-
- uv_rwlock_rdunlock(&page_index->lock);
- }
-}
-
-bool descriptor_is_corrupted(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct pg_cache_page_index *page_index = get_page_index(pg_cache, descr->id);
-
- if (unlikely(!page_index))
- return true;
-
- time_t index_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
- struct rrdeng_page_descr *idx_descr = get_descriptor(page_index, index_time_s);
-
- bool is_corrupted = (idx_descr != descr);
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(page_index->id, uuid_str);
- internal_error(is_corrupted, "Descriptor corrupted (Extent %p Judy %p) @ %ld", descr, idx_descr, index_time_s);
-#endif
-
- return is_corrupted;
-}
-
-static bool journalfile_ready_to_index(struct rrdengine_datafile *datafile)
-{
- struct extent_info *extent = datafile->extents.first;
- while (extent) {
- uint8_t extent_pages = extent->number_of_pages;
- for (uint8_t index = 0; index < extent_pages; index++) {
- struct rrdeng_page_descr *descr = extent->pages[index];
- if (unlikely(!descr))
- continue;
- if (unlikely(!descr->extent))
- return false;
- }
- extent = extent->next;
- }
- return true;
-}
-
-// Migrate the journalfile pointed by datafile
-// activate : make the new file active immediately
-// journafile data will be set and descriptors (if deleted) will be repopulated as needed
-// startup : if the migration is done during agent startup
-// this will allow us to optimize certain things
-void migrate_journal_file_v2(struct rrdengine_datafile *datafile, bool activate, bool startup)
-{
- char path[RRDENG_PATH_MAX];
- size_t number_of_extents = 0; // Number of extents
- size_t number_of_metrics = 0; // Number of unique metrics (UUIDS)
- size_t number_of_pages = 0; // Total number of descriptors @ time
- Pvoid_t