diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-15 23:49:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-15 23:49:42 +0200 |
commit | becd97a3660af34104c557ba6c2877f624143c2e (patch) | |
tree | 14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 | |
parent | 1789d07c43182152437459a7a4f81267bbdd752c (diff) |
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)"
This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
30 files changed, 651 insertions, 2320 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index e92830fe20..3e0f83422c 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -293,25 +293,9 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us // rrdhost_hostname(host), rrdset_id(st), // (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); - bool ok = true; - if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { - -#ifdef NETDATA_INTERNAL_CHECKS - st->replay.start_streaming = false; - st->replay.after = 0; - st->replay.before = 0; -#endif - - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - - ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, - last_entry_child, 0, 0); - } - else { - internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st)); - } + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } @@ -891,11 +875,6 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use time_t start_time = strtol(start_time_str, NULL, 0); time_t end_time = strtol(end_time_str, NULL, 0); - internal_error( - (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), - "REPLAY: received a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).", - rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before); - if(start_time && end_time) { if (start_time > end_time) { error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.", @@ -1156,18 +1135,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) st->counter++; st->counter_done++; -#ifdef NETDATA_INTERNAL_CHECKS - st->replay.start_streaming = false; - st->replay.after = 0; - st->replay.before = 0; -#endif - if (start_streaming) { if (st->update_every != update_every_child) rrdset_set_update_every(st, update_every_child); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); return PARSER_RC_OK; } diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index 6d6c6dd5b8..e81fc49d11 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -865,7 +865,6 @@ static void dbengine_statistics_charts(void) { { static RRDSET *st_long_term_pages = NULL; - static RRDDIM *rd_memory = NULL; static RRDDIM *rd_total = NULL; static RRDDIM *rd_insertions = NULL; static RRDDIM *rd_deletions = NULL; @@ -886,7 +885,6 @@ static void dbengine_statistics_charts(void) { localhost->rrd_update_every, RRDSET_TYPE_LINE); - rd_memory = rrddim_add(st_long_term_pages, "journal v2 descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); @@ -895,7 +893,6 @@ static void dbengine_statistics_charts(void) { } else rrdset_next(st_long_term_pages); - rrddim_set_by_pointer(st_long_term_pages, rd_memory, (collected_number)stats_array[37]); rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]); rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]); rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]); diff --git a/daemon/main.c b/daemon/main.c index 6c189fead7..5c437d208d 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -680,9 +680,6 @@ static void get_netdata_configured_variables() { db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO); default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb); - db_engine_journal_indexing = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal indexing", CONFIG_BOOLEAN_YES); - db_engine_journal_check = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal integrity check", CONFIG_BOOLEAN_NO); - if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) { error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB); default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; diff --git a/database/engine/datafile.c b/database/engine/datafile.c index e534d9c673..9c70068d9f 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -1,29 +1,9 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile) -{ - struct extent_info *extent = datafile->extents.first, *next_extent; - - char path[RRDENG_PATH_MAX]; - - generate_journalfilepath_v2(datafile, path, sizeof(path)); - internal_error(true, "Deleting extents of file %s", path); - unsigned count = 0; - while (extent) { - next_extent = extent->next; - freez(extent); - count++; - extent = next_extent; - } - datafile->extents.first = NULL; - internal_error(true, "Deleted %u extents of file %s", count, path); -} - void df_extent_insert(struct extent_info *extent) { struct rrdengine_datafile *datafile = extent->datafile; - uv_rwlock_wrlock(&datafile->extent_rwlock); if (likely(NULL != datafile->extents.last)) { datafile->extents.last->next = extent; @@ -32,14 +12,10 @@ void df_extent_insert(struct extent_info *extent) datafile->extents.first = extent; } datafile->extents.last = extent; - - uv_rwlock_wrunlock(&datafile->extent_rwlock); } void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { - uv_rwlock_wrlock(&ctx->datafiles.rwlock); - if (likely(NULL != ctx->datafiles.last)) { ctx->datafiles.last->next = datafile; } @@ -47,13 +23,12 @@ void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_dataf ctx->datafiles.first = datafile; } ctx->datafiles.last = datafile; - - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); } -void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { struct rrdengine_datafile *next; + next = datafile->next; fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); ctx->datafiles.first = next; @@ -69,7 +44,6 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_ datafile->file = (uv_file)0; datafile->pos = 0; datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ - fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock)); datafile->journalfile = NULL; datafile->next = NULL; datafile->ctx = ctx; @@ -123,7 +97,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile) return ret; } -int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) +int destroy_data_file(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; @@ -202,7 +176,7 @@ int create_data_file(struct rrdengine_datafile *datafile) uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_data_file_unsafe(datafile); + destroy_data_file(datafile); return ret; } @@ -330,8 +304,10 @@ static int scan_data_files(struct rrdengine_instance *ctx) datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, no); datafiles[matched_files++] = datafile; @@ -361,7 +337,6 @@ static int scan_data_files(struct rrdengine_instance *ctx) journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); - journalfile->file_index = i; ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { if (!must_delete_pair) /* If datafile is still open close it */ @@ -371,7 +346,6 @@ static int scan_data_files(struct rrdengine_instance *ctx) if (must_delete_pair) { char path[RRDENG_PATH_MAX]; - // TODO: Also delete the version 2 error("Deleting invalid data and journal file pair."); ret = unlink_journal_file(journalfile); if (!ret) { @@ -433,7 +407,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsi return 0; error_after_journalfile: - destroy_data_file_unsafe(datafile); + destroy_data_file(datafile); freez(journalfile); error_after_datafile: freez(datafile); @@ -447,7 +421,6 @@ int init_data_files(struct rrdengine_instance *ctx) { int ret; - fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); ret = scan_data_files(ctx); if (ret < 0) { error("Failed to scan path \"%s\".", ctx->dbfiles_path); diff --git a/database/engine/datafile.h b/database/engine/datafile.h index 48d72623c3..1cf256aff4 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -13,13 +13,7 @@ struct rrdengine_instance; #define DATAFILE_PREFIX "datafile-" #define DATAFILE_EXTENSION ".ndf" -#ifndef MAX_DATAFILE_SIZE #define MAX_DATAFILE_SIZE (1073741824LU) -#endif -#if MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE -#error MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE -#endif - #define MIN_DATAFILE_SIZE (4194304LU) #define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ #define TARGET_DATAFILES (20) @@ -32,7 +26,6 @@ struct extent_info { uint8_t number_of_pages; struct rrdengine_datafile *datafile; struct extent_info *next; - uint32_t index; // This is the entent index for version 2 struct rrdeng_page_descr *pages[]; }; @@ -48,7 +41,6 @@ struct rrdengine_datafile { unsigned fileno; uv_file file; uint64_t pos; - uv_rwlock_t extent_rwlock; struct rrdengine_instance *ctx; struct rrdengine_df_extents extents; struct rrdengine_journalfile *journalfile; @@ -56,22 +48,20 @@ struct rrdengine_datafile { }; struct rrdengine_datafile_list { - uv_rwlock_t rwlock; struct rrdengine_datafile *first; /* oldest */ struct rrdengine_datafile *last; /* newest */ }; void df_extent_insert(struct extent_info *extent); void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); -void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); int close_data_file(struct rrdengine_datafile *datafile); int unlink_data_file(struct rrdengine_datafile *datafile); -int destroy_data_file_unsafe(struct rrdengine_datafile *datafile); +int destroy_data_file(struct rrdengine_datafile *datafile); int create_data_file(struct rrdengine_datafile *datafile); int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); int init_data_files(struct rrdengine_instance *ctx); void finalize_data_files(struct rrdengine_instance *ctx); -void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile); #endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 037603b9b6..500dd78800 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -1,25 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc) -{ - struct rrdeng_work *work_request; - - if (unlikely(wc->running_journal_migration)) - return; - - work_request = mallocz(sizeof(*work_request)); - work_request->req.data = work_request; - work_request->wc = wc; - work_request->count = 0; - work_request->rerun = false; - wc->running_journal_migration = 1; - if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) { - freez(work_request); - wc->running_journal_migration = 0; - } -} - static void flush_transaction_buffer_cb(uv_fs_t* req) { struct generic_io_descriptor *io_descr = req->data; @@ -66,7 +47,6 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) io_descr->bytes = size; io_descr->pos = journalfile->pos; io_descr->req.data = io_descr; - io_descr->data = journalfile; io_descr->completion = NULL; io_descr->iov = uv_buf_init((void *)io_descr->buf, size); @@ -113,12 +93,6 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } -void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen) -{ - (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2, - datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); -} - void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, @@ -130,49 +104,26 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->file = (uv_file)0; journalfile->pos = 0; journalfile->datafile = datafile; - journalfile->journal_data = NULL; - journalfile->journal_data_size = 0; - journalfile->JudyL_array = (Pvoid_t) NULL; - journalfile->data = NULL; - journalfile->file_index = 0; - journalfile->last_access = 0; } -static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; int ret; char path[RRDENG_PATH_MAX]; - uv_fs_t req; - ret = uv_fs_close(NULL, &req, file, NULL); + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - generate_journalfilepath(datafile, path, sizeof(path)); error("uv_fs_close(%s): %s", path, uv_strerror(ret)); - ++datafile->ctx->stats.fs_errors; + ++ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); } uv_fs_req_cleanup(&req); - return ret; -} - -int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - char path[RRDENG_PATH_MAX]; - - if (likely(journalfile->journal_data)) { - if (munmap(journalfile->journal_data, journalfile->journal_data_size)) { - generate_journalfilepath_v2(datafile, path, sizeof(path)); - error("Failed to unmap journal index file for %s", path); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - } - journalfile->journal_data = NULL; - journalfile->journal_data_size = 0; - return 0; - } - return close_uv_file(datafile, journalfile->file); + return ret; } int unlink_journal_file(struct rrdengine_journalfile *journalfile) @@ -198,16 +149,14 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile) return ret; } -int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; int ret; char path[RRDENG_PATH_MAX]; - char path_v2[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); - generate_journalfilepath_v2(datafile, path_v2, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { @@ -217,12 +166,9 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc } uv_fs_req_cleanup(&req); - (void) close_uv_file(datafile, journalfile->file); - - // This is the new journal v2 index file - ret = uv_fs_unlink(NULL, &req, path_v2, NULL); + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); ++ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); } @@ -237,13 +183,6 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; - ++ctx->stats.journalfile_deletions; - - if (journalfile->journal_data) { - if (munmap(journalfile->journal_data, journalfile->journal_data_size)) { - error("Failed to unmap index file %s", path_v2); - } - } return ret; } @@ -288,7 +227,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_journal_file_unsafe(journalfile, datafile); + destroy_journal_file(journalfile, datafile); return ret; } @@ -427,67 +366,36 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden uv_rwlock_wrlock(&pg_cache->metrics_index.lock); PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(temp_id, ctx); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; + *PValue = page_index = create_page_index(temp_id, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } - // Lookup this descriptor - Word_t p_time = start_time_ut / USEC_PER_SEC; - uv_rwlock_rdlock(&page_index->lock); - PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0); - descr = (NULL == PValue) ? NULL: *PValue; - uv_rwlock_rdunlock(&page_index->lock); - - bool descr_found = false; - if (unlikely(descr && descr->start_time_ut == start_time_ut)) { - // We have this descriptor already - descr_found = true; - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(page_index->id, uuid_str); - internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u", - uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, descr->extent->datafile->fileno, - descr->extent->offset, descr->extent->size); - internal_error(true, "APPLYING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u", - uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, extent->datafile->fileno, - extent->offset, extent->size); -#endif - // Remove entry from previous extent - unlink_descriptor_extent_unsafe(descr); - internal_error(true, "REMOVING UUID %s with %lu OK", uuid_str, start_time_ut); - } - else { - descr = pg_cache_create_descr(); - descr->id = &page_index->id; - } - + descr = pg_cache_create_descr(); descr->page_length = jf_metric_data->descr[i].page_length; descr->start_time_ut = start_time_ut; descr->end_time_ut = end_time_ut; descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s); - + descr->id = &page_index->id; descr->extent = extent; descr->type = page_type; extent->pages[valid_pages++] = descr; - if (likely(!descr_found)) - (void)pg_cache_insert(ctx, page_index, descr, true); + pg_cache_insert(ctx, page_index, descr); - if (page_index->latest_time_ut == descr->end_time_ut) + if(page_index->latest_time_ut == descr->end_time_ut) page_index->latest_update_every_s = descr->update_every_s; if(descr->update_every_s == 0) - fatal("DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", + fatal( + "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries); } extent->number_of_pages = valid_pages; - if (likely(valid_pages)) { + if (likely(valid_pages)) df_extent_insert(extent); - } else { freez(extent); ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++; @@ -536,13 +444,13 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng return size_bytes; } switch (jf_header->type) { - case STORE_DATA: - debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); - restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); - break; - default: - error("Unknown transaction type. Skipping record."); - break; + case STORE_DATA: + debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + error("Unknown transaction type. Skipping record."); + break; } return size_bytes; @@ -610,905 +518,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde if (likely(journal_is_mmapped)) buf += size_bytes; } - skip_file: +skip_file: if (unlikely(!journal_is_mmapped)) posix_memfree(buf); return max_id; } -bool unlink_descriptor_extent_unsafe(struct rrdeng_page_descr *descr) -{ - if (unlikely(!descr || !descr->extent)) - return true; - - struct extent_info *extent = descr->extent; - for (uint8_t index = 0; index < extent->number_of_pages; index++) { - if (extent->pages[index] == descr) { - extent->pages[index] = NULL; - return true; - } - } - return false; -} - -// Checks that the extent list checksum is valid -static int check_journal_v2_extent_list (void *data_start, size_t file_size) -{ - UNUSED(file_size); - uLong crc; - - struct journal_v2_header *j2_header = (void *) data_start; - struct journal_v2_block_trailer *journal_v2_trailer; - - journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset); - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list)); - if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("Extent list CRC32 check: FAILED"); - return 1; - } - - return 0; -} - -// Checks that the metric list (UUIDs) checksum is valid -static int check_journal_v2_metric_list(void *data_start, size_t file_size) -{ - UNUSED(file_size); - uLong crc; - - struct journal_v2_header *j2_header = (void *) data_start; - struct journal_v2_block_trailer *journal_v2_trailer; - - journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset); - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list)); - if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("Metric list CRC32 check: FAILED"); - return 1; - } - return 0; -} - -static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size) -{ - int rc; - uLong crc; - - struct journal_v2_header *j2_header = (void *) data_start; - struct journal_v2_block_trailer *journal_v2_trailer; - - if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC) - return 2; - - // Magic failure - if (j2_header->magic != JOURVAL_V2_MAGIC) - return 1; - - if (j2_header->total_file_size != file_size) - return 1; - - if (original_size && j2_header->original_file_size != original_size) - return 1; - - journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + file_size - sizeof(*journal_v2_trailer)); - - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (void *) j2_header, sizeof(*j2_header)); - - rc = crc32cmp(journal_v2_trailer->checksum, crc); - if (unlikely(rc)) { - error("File CRC32 check: FAILED"); |