summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-15 23:49:42 +0200
committerGitHub <noreply@github.com>2022-11-15 23:49:42 +0200
commitbecd97a3660af34104c557ba6c2877f624143c2e (patch)
tree14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /database
parent1789d07c43182152437459a7a4f81267bbdd752c (diff)
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'database')
-rw-r--r--database/engine/datafile.c41
-rw-r--r--database/engine/datafile.h14
-rw-r--r--database/engine/journalfile.c1126
-rw-r--r--database/engine/journalfile.h88
-rw-r--r--database/engine/pagecache.c569
-rw-r--r--database/engine/pagecache.h63
-rw-r--r--database/engine/rrdengine.c570
-rw-r--r--database/engine/rrdengine.h45
-rwxr-xr-xdatabase/engine/rrdengineapi.c87
-rw-r--r--database/engine/rrdengineapi.h4
-rw-r--r--database/engine/rrdenglocking.c24
-rw-r--r--database/engine/rrdenglocking.h7
-rw-r--r--database/rrd.h14
-rw-r--r--database/rrddim.c6
-rw-r--r--database/rrdhost.c13
-rw-r--r--database/rrdset.c4
16 files changed, 576 insertions, 2099 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index e534d9c673..9c70068d9f 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -1,29 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile)
-{
- struct extent_info *extent = datafile->extents.first, *next_extent;
-
- char path[RRDENG_PATH_MAX];
-
- generate_journalfilepath_v2(datafile, path, sizeof(path));
- internal_error(true, "Deleting extents of file %s", path);
- unsigned count = 0;
- while (extent) {
- next_extent = extent->next;
- freez(extent);
- count++;
- extent = next_extent;
- }
- datafile->extents.first = NULL;
- internal_error(true, "Deleted %u extents of file %s", count, path);
-}
-
void df_extent_insert(struct extent_info *extent)
{
struct rrdengine_datafile *datafile = extent->datafile;
- uv_rwlock_wrlock(&datafile->extent_rwlock);
if (likely(NULL != datafile->extents.last)) {
datafile->extents.last->next = extent;
@@ -32,14 +12,10 @@ void df_extent_insert(struct extent_info *extent)
datafile->extents.first = extent;
}
datafile->extents.last = extent;
-
- uv_rwlock_wrunlock(&datafile->extent_rwlock);
}
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
- uv_rwlock_wrlock(&ctx->datafiles.rwlock);
-
if (likely(NULL != ctx->datafiles.last)) {
ctx->datafiles.last->next = datafile;
}
@@ -47,13 +23,12 @@ void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_dataf
ctx->datafiles.first = datafile;
}
ctx->datafiles.last = datafile;
-
- uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}
-void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
struct rrdengine_datafile *next;
+
next = datafile->next;
fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
ctx->datafiles.first = next;
@@ -69,7 +44,6 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->file = (uv_file)0;
datafile->pos = 0;
datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
- fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock));
datafile->journalfile = NULL;
datafile->next = NULL;
datafile->ctx = ctx;
@@ -123,7 +97,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
return ret;
}
-int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
+int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
@@ -202,7 +176,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_data_file_unsafe(datafile);
+ destroy_data_file(datafile);
return ret;
}
@@ -330,8 +304,10 @@ static int scan_data_files(struct rrdengine_instance *ctx)
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -361,7 +337,6 @@ static int scan_data_files(struct rrdengine_instance *ctx)
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
- journalfile->file_index = i;
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
if (!must_delete_pair) /* If datafile is still open close it */
@@ -371,7 +346,6 @@ static int scan_data_files(struct rrdengine_instance *ctx)
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];
- // TODO: Also delete the version 2
error("Deleting invalid data and journal file pair.");
ret = unlink_journal_file(journalfile);
if (!ret) {
@@ -433,7 +407,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsi
return 0;
error_after_journalfile:
- destroy_data_file_unsafe(datafile);
+ destroy_data_file(datafile);
freez(journalfile);
error_after_datafile:
freez(datafile);
@@ -447,7 +421,6 @@ int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
- fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
ret = scan_data_files(ctx);
if (ret < 0) {
error("Failed to scan path \"%s\".", ctx->dbfiles_path);
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index 48d72623c3..1cf256aff4 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -13,13 +13,7 @@ struct rrdengine_instance;
#define DATAFILE_PREFIX "datafile-"
#define DATAFILE_EXTENSION ".ndf"
-#ifndef MAX_DATAFILE_SIZE
#define MAX_DATAFILE_SIZE (1073741824LU)
-#endif
-#if MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
-#error MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
-#endif
-
#define MIN_DATAFILE_SIZE (4194304LU)
#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (20)
@@ -32,7 +26,6 @@ struct extent_info {
uint8_t number_of_pages;
struct rrdengine_datafile *datafile;
struct extent_info *next;
- uint32_t index; // This is the entent index for version 2
struct rrdeng_page_descr *pages[];
};
@@ -48,7 +41,6 @@ struct rrdengine_datafile {
unsigned fileno;
uv_file file;
uint64_t pos;
- uv_rwlock_t extent_rwlock;
struct rrdengine_instance *ctx;
struct rrdengine_df_extents extents;
struct rrdengine_journalfile *journalfile;
@@ -56,22 +48,20 @@ struct rrdengine_datafile {
};
struct rrdengine_datafile_list {
- uv_rwlock_t rwlock;
struct rrdengine_datafile *first; /* oldest */
struct rrdengine_datafile *last; /* newest */
};
void df_extent_insert(struct extent_info *extent);
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
-void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
int close_data_file(struct rrdengine_datafile *datafile);
int unlink_data_file(struct rrdengine_datafile *datafile);
-int destroy_data_file_unsafe(struct rrdengine_datafile *datafile);
+int destroy_data_file(struct rrdengine_datafile *datafile);
int create_data_file(struct rrdengine_datafile *datafile);
int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
int init_data_files(struct rrdengine_instance *ctx);
void finalize_data_files(struct rrdengine_instance *ctx);
-void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 037603b9b6..500dd78800 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,25 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc)
-{
- struct rrdeng_work *work_request;
-
- if (unlikely(wc->running_journal_migration))
- return;
-
- work_request = mallocz(sizeof(*work_request));
- work_request->req.data = work_request;
- work_request->wc = wc;
- work_request->count = 0;
- work_request->rerun = false;
- wc->running_journal_migration = 1;
- if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) {
- freez(work_request);
- wc->running_journal_migration = 0;
- }
-}
-
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
struct generic_io_descriptor *io_descr = req->data;
@@ -66,7 +47,6 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
io_descr->bytes = size;
io_descr->pos = journalfile->pos;
io_descr->req.data = io_descr;
- io_descr->data = journalfile;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
@@ -113,12 +93,6 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
-{
- (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
-}
-
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
@@ -130,49 +104,26 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->file = (uv_file)0;
journalfile->pos = 0;
journalfile->datafile = datafile;
- journalfile->journal_data = NULL;
- journalfile->journal_data_size = 0;
- journalfile->JudyL_array = (Pvoid_t) NULL;
- journalfile->data = NULL;
- journalfile->file_index = 0;
- journalfile->last_access = 0;
}
-static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- uv_fs_t req;
- ret = uv_fs_close(NULL, &req, file, NULL);
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- generate_journalfilepath(datafile, path, sizeof(path));
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++datafile->ctx->stats.fs_errors;
+ ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
- return ret;
-}
-
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
-{
- struct rrdengine_instance *ctx = datafile->ctx;
- char path[RRDENG_PATH_MAX];
-
- if (likely(journalfile->journal_data)) {
- if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
- generate_journalfilepath_v2(datafile, path, sizeof(path));
- error("Failed to unmap journal index file for %s", path);
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- }
- journalfile->journal_data = NULL;
- journalfile->journal_data_size = 0;
- return 0;
- }
- return close_uv_file(datafile, journalfile->file);
+ return ret;
}
int unlink_journal_file(struct rrdengine_journalfile *journalfile)
@@ -198,16 +149,14 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
return ret;
}
-int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- char path_v2[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- generate_journalfilepath_v2(datafile, path_v2, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
@@ -217,12 +166,9 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
}
uv_fs_req_cleanup(&req);
- (void) close_uv_file(datafile, journalfile->file);
-
- // This is the new journal v2 index file
- ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
@@ -237,13 +183,6 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- ++ctx->stats.journalfile_deletions;
-
- if (journalfile->journal_data) {
- if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
- error("Failed to unmap index file %s", path_v2);
- }
- }
return ret;
}
@@ -288,7 +227,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file_unsafe(journalfile, datafile);
+ destroy_journal_file(journalfile, datafile);
return ret;
}
@@ -427,67 +366,36 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
+ *PValue = page_index = create_page_index(temp_id, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
- // Lookup this descriptor
- Word_t p_time = start_time_ut / USEC_PER_SEC;
- uv_rwlock_rdlock(&page_index->lock);
- PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0);
- descr = (NULL == PValue) ? NULL: *PValue;
- uv_rwlock_rdunlock(&page_index->lock);
-
- bool descr_found = false;
- if (unlikely(descr && descr->start_time_ut == start_time_ut)) {
- // We have this descriptor already
- descr_found = true;
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(page_index->id, uuid_str);
- internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
- uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, descr->extent->datafile->fileno,
- descr->extent->offset, descr->extent->size);
- internal_error(true, "APPLYING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
- uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, extent->datafile->fileno,
- extent->offset, extent->size);
-#endif
- // Remove entry from previous extent
- unlink_descriptor_extent_unsafe(descr);
- internal_error(true, "REMOVING UUID %s with %lu OK", uuid_str, start_time_ut);
- }
- else {
- descr = pg_cache_create_descr();
- descr->id = &page_index->id;
- }
-
+ descr = pg_cache_create_descr();
descr->page_length = jf_metric_data->descr[i].page_length;
descr->start_time_ut = start_time_ut;
descr->end_time_ut = end_time_ut;
descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
-
+ descr->id = &page_index->id;
descr->extent = extent;
descr->type = page_type;
extent->pages[valid_pages++] = descr;
- if (likely(!descr_found))
- (void)pg_cache_insert(ctx, page_index, descr, true);
+ pg_cache_insert(ctx, page_index, descr);
- if (page_index->latest_time_ut == descr->end_time_ut)
+ if(page_index->latest_time_ut == descr->end_time_ut)
page_index->latest_update_every_s = descr->update_every_s;
if(descr->update_every_s == 0)
- fatal("DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
+ fatal(
+ "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
(unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
}
extent->number_of_pages = valid_pages;
- if (likely(valid_pages)) {
+ if (likely(valid_pages))
df_extent_insert(extent);
- }
else {
freez(extent);
ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
@@ -536,13 +444,13 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
return size_bytes;
}
switch (jf_header->type) {
- case STORE_DATA:
- debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
- restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
- break;
- default:
- error("Unknown transaction type. Skipping record.");
- break;
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
}
return size_bytes;
@@ -610,905 +518,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
if (likely(journal_is_mmapped))
buf += size_bytes;
}
- skip_file:
+skip_file:
if (unlikely(!journal_is_mmapped))
posix_memfree(buf);
return max_id;
}
-bool unlink_descriptor_extent_unsafe(struct rrdeng_page_descr *descr)
-{
- if (unlikely(!descr || !descr->extent))
- return true;
-
- struct extent_info *extent = descr->extent;
- for (uint8_t index = 0; index < extent->number_of_pages; index++) {
- if (extent->pages[index] == descr) {
- extent->pages[index] = NULL;
- return true;
- }
- }
- return false;
-}
-
-// Checks that the extent list checksum is valid
-static int check_journal_v2_extent_list (void *data_start, size_t file_size)
-{
- UNUSED(file_size);
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
- if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("Extent list CRC32 check: FAILED");
- return 1;
- }
-
- return 0;
-}
-
-// Checks that the metric list (UUIDs) checksum is valid
-static int check_journal_v2_metric_list(void *data_start, size_t file_size)
-{
- UNUSED(file_size);
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
- if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("Metric list CRC32 check: FAILED");
- return 1;
- }
- return 0;
-}
-
-static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size)
-{
- int rc;
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
- return 2;
-
- // Magic failure
- if (j2_header->magic != JOURVAL_V2_MAGIC)
- return 1;
-
- if (j2_header->total_file_size != file_size)
- return 1;
-
- if (original_size && j2_header->original_file_size != original_size)
- return 1;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + file_size - sizeof(*journal_v2_trailer));
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
-
- rc = crc32cmp(journal_v2_trailer->checksum, crc);
- if (unlikely(rc)) {
- error("File CRC32 check: FAILED");
- return 1;
- }
-
- rc = check_journal_v2_extent_list(data_start, file_size);
- if (rc) return 1;
-
- rc = check_journal_v2_metric_list(data_start, file_size);
- if (rc) return 1;
-
- if (!db_engine_journal_check)
- return 0;
-
- // Verify complete UUID chain
-
- struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
-
- unsigned verified = 0;
- unsigned entries;
- unsigned total_pages = 0;
-
- info("Checking %u metrics that exist in the journal", j2_header->metric_count);
- for (entries = 0; entries < j2_header->metric_count; entries++) {
-
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(metric->uuid, uuid_str);
- struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
- struct journal_page_header local_metric_list_header = *metric_list_header;
-
- local_metric_list_header.crc = JOURVAL_V2_MAGIC;
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header));
- rc = crc32cmp(metric_list_header->checksum, crc);
-
- internal_error(true, "Index %u : %s entries %u at offset %u (%llu -- %llu) verified, HEADER CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
- j2_header->start_time_ut + (usec_t) metric->delta_start * USEC_PER_SEC, j2_header->start_time_ut + (usec_t) metric->delta_end * USEC_PER_SEC,
- crc, metric_list_header->crc);
- if (!rc) {
- struct journal_v2_block_trailer *journal_trailer =
- (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list));
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list));
- rc = crc32cmp(journal_trailer->checksum, crc);
- internal_error(rc, "Index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
- crc, metric_list_header->crc);
- if (!rc) {
- total_pages += metric_list_header->entries;
- verified++;
- }
- }
-
- metric++;
- if (((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) file_size) {
- info("Verification failed EOF reached -- total entries %u, verified %u", entries, verified);
- return 1;
- }
- }
-
- if (entries != verified) {
- info("Verification failed -- total entries %u, verified %u", entries, verified);
- return 1;
- }
- info("Verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
-
- return 0;
-}
-
-int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct pg_cache_page_index *page_index;
- int ret, fd;
- uint64_t file_size;
- char path[RRDENG_PATH_MAX];
- struct stat statbuf;
- uint32_t original_file_size = 0;
-
- generate_journalfilepath(datafile, path, sizeof(path));
- ret = stat(path, &statbuf);
- if (!ret)
- original_file_size = (uint32_t)statbuf.st_size;
-
- generate_journalfilepath_v2(datafile, path, sizeof(path));
-
- fd = open(path, O_RDONLY);
- if (fd < 0) {
- if (errno == ENOENT)
- return 1;
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- error("Failed to open %s", path);
- return 1;
- }
-
- ret = fstat(fd, &statbuf);
- if (ret) {
- error("Failed to get file information for %s", path);
- close(fd);
- return 1;
- }
-
- file_size = (size_t)statbuf.st_size;
-
- if (file_size < sizeof(struct journal_v2_header)) {
- error_report("Invalid file %s. Not the expected size", path);
- close(fd);
- return 1;
- }
-
- usec_t start_loading = now_realtime_usec();
- uint8_t *data_start = mmap(NULL, file_size, PROT_READ, MAP_SHARED, fd, 0);
- if (data_start == MAP_FAILED) {
- close(fd);
- return 1;
- }
- close(fd);
-
- info("Checking integrity of %s", path);
- int rc = check_journal_v2_file(data_start, file_size, original_file_size);
- if (rc) {
- if (rc == 2)
- error_report("File %s needs to be rebuilt", path);
- else
- error_report("File %s is invalid", path);
- if (unlikely(munmap(data_start, file_size)))
- error("Failed to unmap %s", path);
- return 1;
- }
-
- struct journal_v2_header *j2_header = (void *) data_start;
-
- size_t entries = j2_header->metric_count;
-
- if (!entries) {
- if (unlikely(munmap(data_start, file_size)))
- error("Failed to unmap %s", path);
- return 1;
- }
-
- rc = madvise(data_start, file_size, MADV_DONTFORK);
- if (rc)
- error("MADV_DONTFORK: setting failed");
-
- rc = madvise(data_start, file_size, MADV_DONTDUMP);
- if (rc)
- error("MADV_DONTDUMP: setting failed");
-
- struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
-
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
-
- // Initialize the journal file to be able to access the data
- journalfile->journal_data = data_start;
- journalfile->journal_data_size = file_size;
-
- usec_t header_start_time = j2_header->start_time_ut;
- for (size_t i=0; i < entries; i++) {
- Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, metric->uuid, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- else {
- PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, metric->uuid, sizeof(uuid_t), PJE0);
- fatal_assert(NULL == *PValue);
- *PValue = page_index = create_page_index(&metric->uuid, ctx);
- page_index->oldest_time_ut = LLONG_MAX;
- page_index->latest_time_ut = 0;
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- }
-
- usec_t metric_start_ut = header_start_time + (usec_t ) metric->delta_start * USEC_PER_SEC;
- usec_t metric_end_ut = header_start_time + (usec_t ) metric->delta_end * USEC_PER_SEC;
-
- if (page_index->oldest_time_ut > metric_start_ut)
- page_index->oldest_time_ut = metric_start_ut;
-
- if (page_index->latest_time_ut < metric_end_ut)
- page_index->latest_time_ut = metric_end_ut;
-
- ++page_index->page_count;
- ++pg_cache->page_descriptors;
- metric++;
- }
-
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
-
- info("Journal file \"%s\" loaded (size:%"PRIu64") with %lu metrics in %d ms",
- path, file_size, entries,
- (int) ((now_realtime_usec() - start_loading) / USEC_PER_MS));
- return 0;
-}
-
-
-struct metric_info_s {
- uuid_t *id;
- struct pg_cache_page_index *page_index;
- uint32_t entries;
- time_t min_index_time_s;
- time_t max_index_time_s;
- usec_t min_time_ut;
- usec_t max_time_ut;
- uint32_t page_list_header;
- Pvoid_t JudyL_array;
-};
-
-struct journal_metric_list_to_sort {
- struct metric_info_s *metric_info;
-};
-
-static int journal_metric_compare (const void *item1, const void *item2)
-{
- const struct metric_info_s *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
- const struct metric_info_s *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
-
- return uuid_compare(*(metric1->id), *(metric2->id));
-}
-
-
-// Write list of extents for the journalfile
-void *journal_v2_write_extent_list(struct rrdengine_journalfile *journalfile, void *data)
-{
- struct extent_info *extent = journalfile->datafile->extents.first;
- struct journal_extent_list *j2_extent = (void *) data;
- while (extent) {
- j2_extent->datafile_offset = extent->offset;
- j2_extent->datafile_size = extent->size;
- j2_extent->pages = extent->number_of_pages;
- j2_extent->file_index = journalfile->file_index;
- j2_extent++;
- extent = extent->next;
- };
- return j2_extent;
-}
-
-static int verify_journal_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
-{
- if ((u