diff options
30 files changed, 2320 insertions, 651 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 3e0f83422c..e92830fe20 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -293,9 +293,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us // rrdhost_hostname(host), rrdset_id(st), // (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + bool ok = true; + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + +#ifdef NETDATA_INTERNAL_CHECKS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + + ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, + last_entry_child, 0, 0); + } + else { + internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st)); + } - bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } @@ -875,6 +891,11 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use time_t start_time = strtol(start_time_str, NULL, 0); time_t end_time = strtol(end_time_str, NULL, 0); + internal_error( + (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), + "REPLAY: received a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).", + rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before); + if(start_time && end_time) { if (start_time > end_time) { error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.", @@ -1135,11 +1156,18 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) st->counter++; st->counter_done++; +#ifdef NETDATA_INTERNAL_CHECKS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + if (start_streaming) { if (st->update_every != update_every_child) rrdset_set_update_every(st, update_every_child); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); return PARSER_RC_OK; } diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index e81fc49d11..6d6c6dd5b8 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -865,6 +865,7 @@ static void dbengine_statistics_charts(void) { { static RRDSET *st_long_term_pages = NULL; + static RRDDIM *rd_memory = NULL; static RRDDIM *rd_total = NULL; static RRDDIM *rd_insertions = NULL; static RRDDIM *rd_deletions = NULL; @@ -885,6 +886,7 @@ static void dbengine_statistics_charts(void) { localhost->rrd_update_every, RRDSET_TYPE_LINE); + rd_memory = rrddim_add(st_long_term_pages, "journal v2 descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); @@ -893,6 +895,7 @@ static void dbengine_statistics_charts(void) { } else rrdset_next(st_long_term_pages); + rrddim_set_by_pointer(st_long_term_pages, rd_memory, (collected_number)stats_array[37]); rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]); rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]); rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]); diff --git a/daemon/main.c b/daemon/main.c index 5c437d208d..6c189fead7 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -680,6 +680,9 @@ static void get_netdata_configured_variables() { db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO); default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb); + db_engine_journal_indexing = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal indexing", CONFIG_BOOLEAN_YES); + db_engine_journal_check = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal integrity check", CONFIG_BOOLEAN_NO); + if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) { error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB); default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 9c70068d9f..e534d9c673 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -1,9 +1,29 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" +void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile) +{ + struct extent_info *extent = datafile->extents.first, *next_extent; + + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath_v2(datafile, path, sizeof(path)); + internal_error(true, "Deleting extents of file %s", path); + unsigned count = 0; + while (extent) { + next_extent = extent->next; + freez(extent); + count++; + extent = next_extent; + } + datafile->extents.first = NULL; + internal_error(true, "Deleted %u extents of file %s", count, path); +} + void df_extent_insert(struct extent_info *extent) { struct rrdengine_datafile *datafile = extent->datafile; + uv_rwlock_wrlock(&datafile->extent_rwlock); if (likely(NULL != datafile->extents.last)) { datafile->extents.last->next = extent; @@ -12,10 +32,14 @@ void df_extent_insert(struct extent_info *extent) datafile->extents.first = extent; } datafile->extents.last = extent; + + uv_rwlock_wrunlock(&datafile->extent_rwlock); } void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + if (likely(NULL != ctx->datafiles.last)) { ctx->datafiles.last->next = datafile; } @@ -23,12 +47,13 @@ void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_dataf ctx->datafiles.first = datafile; } ctx->datafiles.last = datafile; + + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); } -void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { struct rrdengine_datafile *next; - next = datafile->next; fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); ctx->datafiles.first = next; @@ -44,6 +69,7 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_ datafile->file = (uv_file)0; datafile->pos = 0; datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ + fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock)); datafile->journalfile = NULL; datafile->next = NULL; datafile->ctx = ctx; @@ -97,7 +123,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile) return ret; } -int destroy_data_file(struct rrdengine_datafile *datafile) +int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; @@ -176,7 +202,7 @@ int create_data_file(struct rrdengine_datafile *datafile) uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_data_file(datafile); + destroy_data_file_unsafe(datafile); return ret; } @@ -304,10 +330,8 @@ static int scan_data_files(struct rrdengine_instance *ctx) datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { - info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, no); datafiles[matched_files++] = datafile; @@ -337,6 +361,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); + journalfile->file_index = i; ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { if (!must_delete_pair) /* If datafile is still open close it */ @@ -346,6 +371,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) if (must_delete_pair) { char path[RRDENG_PATH_MAX]; + // TODO: Also delete the version 2 error("Deleting invalid data and journal file pair."); ret = unlink_journal_file(journalfile); if (!ret) { @@ -407,7 +433,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsi return 0; error_after_journalfile: - destroy_data_file(datafile); + destroy_data_file_unsafe(datafile); freez(journalfile); error_after_datafile: freez(datafile); @@ -421,6 +447,7 @@ int init_data_files(struct rrdengine_instance *ctx) { int ret; + fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); ret = scan_data_files(ctx); if (ret < 0) { error("Failed to scan path \"%s\".", ctx->dbfiles_path); diff --git a/database/engine/datafile.h b/database/engine/datafile.h index 1cf256aff4..48d72623c3 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -13,7 +13,13 @@ struct rrdengine_instance; #define DATAFILE_PREFIX "datafile-" #define DATAFILE_EXTENSION ".ndf" +#ifndef MAX_DATAFILE_SIZE #define MAX_DATAFILE_SIZE (1073741824LU) +#endif +#if MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE +#error MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE +#endif + #define MIN_DATAFILE_SIZE (4194304LU) #define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ #define TARGET_DATAFILES (20) @@ -26,6 +32,7 @@ struct extent_info { uint8_t number_of_pages; struct rrdengine_datafile *datafile; struct extent_info *next; + uint32_t index; // This is the entent index for version 2 struct rrdeng_page_descr *pages[]; }; @@ -41,6 +48,7 @@ struct rrdengine_datafile { unsigned fileno; uv_file file; uint64_t pos; + uv_rwlock_t extent_rwlock; struct rrdengine_instance *ctx; struct rrdengine_df_extents extents; struct rrdengine_journalfile *journalfile; @@ -48,20 +56,22 @@ struct rrdengine_datafile { }; struct rrdengine_datafile_list { + uv_rwlock_t rwlock; struct rrdengine_datafile *first; /* oldest */ struct rrdengine_datafile *last; /* newest */ }; void df_extent_insert(struct extent_info *extent); void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); -void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); int close_data_file(struct rrdengine_datafile *datafile); int unlink_data_file(struct rrdengine_datafile *datafile); -int destroy_data_file(struct rrdengine_datafile *datafile); +int destroy_data_file_unsafe(struct rrdengine_datafile *datafile); int create_data_file(struct rrdengine_datafile *datafile); int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); int init_data_files(struct rrdengine_instance *ctx); void finalize_data_files(struct rrdengine_instance *ctx); +void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile); #endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 500dd78800..037603b9b6 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -1,6 +1,25 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" +void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc) +{ + struct rrdeng_work *work_request; + + if (unlikely(wc->running_journal_migration)) + return; + + work_request = mallocz(sizeof(*work_request)); + work_request->req.data = work_request; + work_request->wc = wc; + work_request->count = 0; + work_request->rerun = false; + wc->running_journal_migration = 1; + if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) { + freez(work_request); + wc->running_journal_migration = 0; + } +} + static void flush_transaction_buffer_cb(uv_fs_t* req) { struct generic_io_descriptor *io_descr = req->data; @@ -47,6 +66,7 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) io_descr->bytes = size; io_descr->pos = journalfile->pos; io_descr->req.data = io_descr; + io_descr->data = journalfile; io_descr->completion = NULL; io_descr->iov = uv_buf_init((void *)io_descr->buf, size); @@ -93,6 +113,12 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } +void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, @@ -104,28 +130,51 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->file = (uv_file)0; journalfile->pos = 0; journalfile->datafile = datafile; + journalfile->journal_data = NULL; + journalfile->journal_data_size = 0; + journalfile->JudyL_array = (Pvoid_t) NULL; + journalfile->data = NULL; + journalfile->file_index = 0; + journalfile->last_access = 0; } -int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) { - struct rrdengine_instance *ctx = datafile->ctx; - uv_fs_t req; int ret; char path[RRDENG_PATH_MAX]; - generate_journalfilepath(datafile, path, sizeof(path)); - - ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + uv_fs_t req; + ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { + generate_journalfilepath(datafile, path, sizeof(path)); error("uv_fs_close(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; + ++datafile->ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); } uv_fs_req_cleanup(&req); - return ret; } +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + char path[RRDENG_PATH_MAX]; + + if (likely(journalfile->journal_data)) { + if (munmap(journalfile->journal_data, journalfile->journal_data_size)) { + generate_journalfilepath_v2(datafile, path, sizeof(path)); + error("Failed to unmap journal index file for %s", path); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + journalfile->journal_data = NULL; + journalfile->journal_data_size = 0; + return 0; + } + + return close_uv_file(datafile, journalfile->file); +} + int unlink_journal_file(struct rrdengine_journalfile *journalfile) { struct rrdengine_datafile *datafile = journalfile->datafile; @@ -149,14 +198,16 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile) return ret; } -int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; int ret; char path[RRDENG_PATH_MAX]; + char path_v2[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); + generate_journalfilepath_v2(datafile, path_v2, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { @@ -166,9 +217,12 @@ int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrden } uv_fs_req_cleanup(&req); - ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + (void) close_uv_file(datafile, journalfile->file); + + // This is the new journal v2 index file + ret = uv_fs_unlink(NULL, &req, path_v2, NULL); if (ret < 0) { - error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ++ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); } @@ -183,6 +237,13 @@ int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrden uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; + ++ctx->stats.journalfile_deletions; + + if (journalfile->journal_data) { + if (munmap(journalfile->journal_data, journalfile->journal_data_size)) { + error("Failed to unmap index file %s", path_v2); + } + } return ret; } @@ -227,7 +288,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_journal_file(journalfile, datafile); + destroy_journal_file_unsafe(journalfile, datafile); return ret; } @@ -366,36 +427,67 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden uv_rwlock_wrlock(&pg_cache->metrics_index.lock); PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(temp_id, ctx); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; + *PValue = page_index = create_page_index(temp_id, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } - descr = pg_cache_create_descr(); + // Lookup this descriptor + Word_t p_time = start_time_ut / USEC_PER_SEC; + uv_rwlock_rdlock(&page_index->lock); + PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0); + descr = (NULL == PValue) ? NULL: *PValue; + uv_rwlock_rdunlock(&page_index->lock); + + bool descr_found = false; + if (unlikely(descr && descr->start_time_ut == start_time_ut)) { + // We have this descriptor already + descr_found = true; + +#ifdef NETDATA_INTERNAL_CHECKS + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower(page_index->id, uuid_str); + internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u", + uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, descr->extent->datafile->fileno, + descr->extent->offset, descr->extent->size); + internal_error(true, "APPLYING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u", + uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, extent->datafile->fileno, + extent->offset, extent->size); +#endif + // Remove entry from previous extent + unlink_descriptor_extent_unsafe(descr); + internal_error(true, "REMOVING UUID %s with %lu OK", uuid_str, start_time_ut); + } + else { + descr = pg_cache_create_descr(); + descr->id = &page_index->id; + } + descr->page_length = jf_metric_data->descr[i].page_length; descr->start_time_ut = start_time_ut; descr->end_time_ut = end_time_ut; descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s); - descr->id = &page_index->id; + descr->extent = extent; descr->type = page_type; extent->pages[valid_pages++] = descr; - pg_cache_insert(ctx, page_index, descr); + if (likely(!descr_found)) + (void)pg_cache_insert(ctx, page_index, descr, true); - if(page_index->latest_time_ut == descr->end_time_ut) + if (page_index->latest_time_ut == descr->end_time_ut) page_index->latest_update_every_s = descr->update_every_s; if(descr->update_every_s == 0) - fatal( - "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", + fatal("DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries); } extent->number_of_pages = valid_pages; - if (likely(valid_pages)) + if (likely(valid_pages)) { df_extent_insert(extent); + } else { freez(extent); ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++; @@ -444,13 +536,13 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng return size_bytes; } switch (jf_header->type) { - case STORE_DATA: - debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); - restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); - break; - default: - error("Unknown transaction type. Skipping record."); - break; + case STORE_DATA: + debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + error("Unknown transaction type. Skipping record."); + break; } return size_bytes; @@ -518,12 +610,905 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde if (likely(journal_is_mmapped)) buf += size_bytes; } -skip_file: + skip_file: if (unlikely(!journal_is_mmapped)) posix_memfree(buf); return max_id; } +bool unlink_descriptor_extent_unsafe(struct rrdeng_page_descr *descr) +{ + if (unlikely(!descr || !descr->extent)) + return true; + + struct extent_info *extent = descr->extent; + for (uint8_t index = 0; index < extent->number_of_pages; index++) { + if (extent->pages[index] == descr) { + extent->pages[index] = NULL; + return true; + } + } + return false; +} + +// Checks that the extent list checksum is valid +static int check_journal_v2_extent_list (void *data_start, size_t file_size) +{ + UNUSED(file_size); + uLong crc; + + struct journal_v2_header *j2_header = (void *) data_start; + struct journal_v2_block_trailer *journal_v2_trailer; + + journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list)); + if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { + error("Extent list CRC32 check: FAILED"); + return 1; + } + + return 0; +} + +// Checks that the metric list (UUIDs) checksum is valid +static int check_journal_v2_metric_list(void *data_start, size_t file_size) +{ + UNUSED(file_size); + uLong crc; + + struct journal_v2_header *j2_header = (void *) data_start; + struct journal_v2_block_trailer *journal_v2_trailer; + + journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list)); + if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { + error("Metric list CRC32 check: FAILED"); + return 1; + } + return 0; +} + +static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size) +{ + int rc; + uLong crc; + + struct journal_v2_header *j2_header = (void *) data_start; + struct journal_v2_block_trailer *journal_v2_trailer; + + if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC) + return 2; + + // Magic failure + if (j2_header->magic != JOURVAL_V2_MAGIC) + return 1; + + if (j2_header->total_file_size != file_size) + return 1; + + if (original_size && j2_header->original_file_size != original_size) + return 1; + + journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + file_size - sizeof(*journal_v2_trailer)); + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (void *) j2_header, sizeof(*j2_header)); + + rc = crc32cmp(journal_v2_trailer->checksum, crc); + if (unlikely(rc)) { + error("File CRC32 check: FAILED"); + return 1; + } + + rc = check_journal_v2_extent_list(data_start, file_size); + if (rc) return 1; + + rc = check_journal_v2_metric_list(data_start, file_size); + if (rc) return 1; + + if (!db_engine_journal_check) + return 0; + + // Verify complete UUID chain + + struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset); + + unsigned verified = 0; + unsigned entries; + unsigned total_pages = 0; + + info("Checking %u metrics that exist in the journal", j2_header->metric_count); + for (entries = 0; entries < j2_header->metric_count; entries++) { + + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower(metric->uuid, uuid_str); + struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset); + struct journal_page_header local_metric_list_header = *metric_list_header; + + local_metric_list_header.crc = JOURVAL_V2_MAGIC; + + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header)); + rc = crc32cmp(metric_list_header->checksum, crc); + + internal_error(true, "Index %u : %s entries %u at offset %u (%llu -- %llu) verified, HEADER CRC comp |