summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-15 23:49:42 +0200
committerGitHub <noreply@github.com>2022-11-15 23:49:42 +0200
commitbecd97a3660af34104c557ba6c2877f624143c2e (patch)
tree14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721
parent1789d07c43182152437459a7a4f81267bbdd752c (diff)
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
-rw-r--r--collectors/plugins.d/pluginsd_parser.c32
-rw-r--r--daemon/global_statistics.c3
-rw-r--r--daemon/main.c3
-rw-r--r--database/engine/datafile.c41
-rw-r--r--database/engine/datafile.h14
-rw-r--r--database/engine/journalfile.c1126
-rw-r--r--database/engine/journalfile.h88
-rw-r--r--database/engine/pagecache.c569
-rw-r--r--database/engine/pagecache.h63
-rw-r--r--database/engine/rrdengine.c570
-rw-r--r--database/engine/rrdengine.h45
-rwxr-xr-xdatabase/engine/rrdengineapi.c87
-rw-r--r--database/engine/rrdengineapi.h4
-rw-r--r--database/engine/rrdenglocking.c24
-rw-r--r--database/engine/rrdenglocking.h7
-rw-r--r--database/rrd.h14
-rw-r--r--database/rrddim.c6
-rw-r--r--database/rrdhost.c13
-rw-r--r--database/rrdset.c4
-rw-r--r--libnetdata/arrayalloc/arrayalloc.c4
-rw-r--r--libnetdata/dictionary/dictionary.c9
-rw-r--r--libnetdata/dictionary/dictionary.h8
-rw-r--r--libnetdata/libnetdata.c5
-rw-r--r--libnetdata/libnetdata.h2
-rw-r--r--streaming/compression.c1
-rw-r--r--streaming/receiver.c25
-rw-r--r--streaming/replication.c16
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c184
-rw-r--r--web/api/web_api_v1.c2
30 files changed, 651 insertions, 2320 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index e92830fe20..3e0f83422c 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -293,25 +293,9 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
// rrdhost_hostname(host), rrdset_id(st),
// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
- bool ok = true;
- if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
-
-#ifdef NETDATA_INTERNAL_CHECKS
- st->replay.start_streaming = false;
- st->replay.after = 0;
- st->replay.before = 0;
-#endif
-
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
-
- ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child,
- last_entry_child, 0, 0);
- }
- else {
- internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st));
- }
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
@@ -891,11 +875,6 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
time_t start_time = strtol(start_time_str, NULL, 0);
time_t end_time = strtol(end_time_str, NULL, 0);
- internal_error(
- (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
- "REPLAY: received a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).",
- rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before);
-
if(start_time && end_time) {
if (start_time > end_time) {
error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
@@ -1156,18 +1135,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
st->counter++;
st->counter_done++;
-#ifdef NETDATA_INTERNAL_CHECKS
- st->replay.start_streaming = false;
- st->replay.after = 0;
- st->replay.before = 0;
-#endif
-
if (start_streaming) {
if (st->update_every != update_every_child)
rrdset_set_update_every(st, update_every_child);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
return PARSER_RC_OK;
}
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index 6d6c6dd5b8..e81fc49d11 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -865,7 +865,6 @@ static void dbengine_statistics_charts(void) {
{
static RRDSET *st_long_term_pages = NULL;
- static RRDDIM *rd_memory = NULL;
static RRDDIM *rd_total = NULL;
static RRDDIM *rd_insertions = NULL;
static RRDDIM *rd_deletions = NULL;
@@ -886,7 +885,6 @@ static void dbengine_statistics_charts(void) {
localhost->rrd_update_every,
RRDSET_TYPE_LINE);
- rd_memory = rrddim_add(st_long_term_pages, "journal v2 descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -895,7 +893,6 @@ static void dbengine_statistics_charts(void) {
} else
rrdset_next(st_long_term_pages);
- rrddim_set_by_pointer(st_long_term_pages, rd_memory, (collected_number)stats_array[37]);
rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]);
rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]);
rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]);
diff --git a/daemon/main.c b/daemon/main.c
index 6c189fead7..5c437d208d 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -680,9 +680,6 @@ static void get_netdata_configured_variables() {
db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO);
default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb);
- db_engine_journal_indexing = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal indexing", CONFIG_BOOLEAN_YES);
- db_engine_journal_check = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal integrity check", CONFIG_BOOLEAN_NO);
-
if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) {
error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB);
default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index e534d9c673..9c70068d9f 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -1,29 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile)
-{
- struct extent_info *extent = datafile->extents.first, *next_extent;
-
- char path[RRDENG_PATH_MAX];
-
- generate_journalfilepath_v2(datafile, path, sizeof(path));
- internal_error(true, "Deleting extents of file %s", path);
- unsigned count = 0;
- while (extent) {
- next_extent = extent->next;
- freez(extent);
- count++;
- extent = next_extent;
- }
- datafile->extents.first = NULL;
- internal_error(true, "Deleted %u extents of file %s", count, path);
-}
-
void df_extent_insert(struct extent_info *extent)
{
struct rrdengine_datafile *datafile = extent->datafile;
- uv_rwlock_wrlock(&datafile->extent_rwlock);
if (likely(NULL != datafile->extents.last)) {
datafile->extents.last->next = extent;
@@ -32,14 +12,10 @@ void df_extent_insert(struct extent_info *extent)
datafile->extents.first = extent;
}
datafile->extents.last = extent;
-
- uv_rwlock_wrunlock(&datafile->extent_rwlock);
}
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
- uv_rwlock_wrlock(&ctx->datafiles.rwlock);
-
if (likely(NULL != ctx->datafiles.last)) {
ctx->datafiles.last->next = datafile;
}
@@ -47,13 +23,12 @@ void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_dataf
ctx->datafiles.first = datafile;
}
ctx->datafiles.last = datafile;
-
- uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}
-void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
struct rrdengine_datafile *next;
+
next = datafile->next;
fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
ctx->datafiles.first = next;
@@ -69,7 +44,6 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->file = (uv_file)0;
datafile->pos = 0;
datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
- fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock));
datafile->journalfile = NULL;
datafile->next = NULL;
datafile->ctx = ctx;
@@ -123,7 +97,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
return ret;
}
-int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
+int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
@@ -202,7 +176,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_data_file_unsafe(datafile);
+ destroy_data_file(datafile);
return ret;
}
@@ -330,8 +304,10 @@ static int scan_data_files(struct rrdengine_instance *ctx)
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -361,7 +337,6 @@ static int scan_data_files(struct rrdengine_instance *ctx)
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
- journalfile->file_index = i;
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
if (!must_delete_pair) /* If datafile is still open close it */
@@ -371,7 +346,6 @@ static int scan_data_files(struct rrdengine_instance *ctx)
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];
- // TODO: Also delete the version 2
error("Deleting invalid data and journal file pair.");
ret = unlink_journal_file(journalfile);
if (!ret) {
@@ -433,7 +407,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsi
return 0;
error_after_journalfile:
- destroy_data_file_unsafe(datafile);
+ destroy_data_file(datafile);
freez(journalfile);
error_after_datafile:
freez(datafile);
@@ -447,7 +421,6 @@ int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
- fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
ret = scan_data_files(ctx);
if (ret < 0) {
error("Failed to scan path \"%s\".", ctx->dbfiles_path);
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index 48d72623c3..1cf256aff4 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -13,13 +13,7 @@ struct rrdengine_instance;
#define DATAFILE_PREFIX "datafile-"
#define DATAFILE_EXTENSION ".ndf"
-#ifndef MAX_DATAFILE_SIZE
#define MAX_DATAFILE_SIZE (1073741824LU)
-#endif
-#if MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
-#error MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
-#endif
-
#define MIN_DATAFILE_SIZE (4194304LU)
#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (20)
@@ -32,7 +26,6 @@ struct extent_info {
uint8_t number_of_pages;
struct rrdengine_datafile *datafile;
struct extent_info *next;
- uint32_t index; // This is the entent index for version 2
struct rrdeng_page_descr *pages[];
};
@@ -48,7 +41,6 @@ struct rrdengine_datafile {
unsigned fileno;
uv_file file;
uint64_t pos;
- uv_rwlock_t extent_rwlock;
struct rrdengine_instance *ctx;
struct rrdengine_df_extents extents;
struct rrdengine_journalfile *journalfile;
@@ -56,22 +48,20 @@ struct rrdengine_datafile {
};
struct rrdengine_datafile_list {
- uv_rwlock_t rwlock;
struct rrdengine_datafile *first; /* oldest */
struct rrdengine_datafile *last; /* newest */
};
void df_extent_insert(struct extent_info *extent);
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
-void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
int close_data_file(struct rrdengine_datafile *datafile);
int unlink_data_file(struct rrdengine_datafile *datafile);
-int destroy_data_file_unsafe(struct rrdengine_datafile *datafile);
+int destroy_data_file(struct rrdengine_datafile *datafile);
int create_data_file(struct rrdengine_datafile *datafile);
int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
int init_data_files(struct rrdengine_instance *ctx);
void finalize_data_files(struct rrdengine_instance *ctx);
-void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 037603b9b6..500dd78800 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,25 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc)
-{
- struct rrdeng_work *work_request;
-
- if (unlikely(wc->running_journal_migration))
- return;
-
- work_request = mallocz(sizeof(*work_request));
- work_request->req.data = work_request;
- work_request->wc = wc;
- work_request->count = 0;
- work_request->rerun = false;
- wc->running_journal_migration = 1;
- if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) {
- freez(work_request);
- wc->running_journal_migration = 0;
- }
-}
-
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
struct generic_io_descriptor *io_descr = req->data;
@@ -66,7 +47,6 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
io_descr->bytes = size;
io_descr->pos = journalfile->pos;
io_descr->req.data = io_descr;
- io_descr->data = journalfile;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
@@ -113,12 +93,6 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
-{
- (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
-}
-
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
@@ -130,49 +104,26 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->file = (uv_file)0;
journalfile->pos = 0;
journalfile->datafile = datafile;
- journalfile->journal_data = NULL;
- journalfile->journal_data_size = 0;
- journalfile->JudyL_array = (Pvoid_t) NULL;
- journalfile->data = NULL;
- journalfile->file_index = 0;
- journalfile->last_access = 0;
}
-static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- uv_fs_t req;
- ret = uv_fs_close(NULL, &req, file, NULL);
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- generate_journalfilepath(datafile, path, sizeof(path));
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++datafile->ctx->stats.fs_errors;
+ ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
- return ret;
-}
-
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
-{
- struct rrdengine_instance *ctx = datafile->ctx;
- char path[RRDENG_PATH_MAX];
-
- if (likely(journalfile->journal_data)) {
- if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
- generate_journalfilepath_v2(datafile, path, sizeof(path));
- error("Failed to unmap journal index file for %s", path);
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- }
- journalfile->journal_data = NULL;
- journalfile->journal_data_size = 0;
- return 0;
- }
- return close_uv_file(datafile, journalfile->file);
+ return ret;
}
int unlink_journal_file(struct rrdengine_journalfile *journalfile)
@@ -198,16 +149,14 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
return ret;
}
-int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- char path_v2[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
- generate_journalfilepath_v2(datafile, path_v2, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
@@ -217,12 +166,9 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
}
uv_fs_req_cleanup(&req);
- (void) close_uv_file(datafile, journalfile->file);
-
- // This is the new journal v2 index file
- ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
@@ -237,13 +183,6 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- ++ctx->stats.journalfile_deletions;
-
- if (journalfile->journal_data) {
- if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
- error("Failed to unmap index file %s", path_v2);
- }
- }
return ret;
}
@@ -288,7 +227,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file_unsafe(journalfile, datafile);
+ destroy_journal_file(journalfile, datafile);
return ret;
}
@@ -427,67 +366,36 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
+ *PValue = page_index = create_page_index(temp_id, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
- // Lookup this descriptor
- Word_t p_time = start_time_ut / USEC_PER_SEC;
- uv_rwlock_rdlock(&page_index->lock);
- PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0);
- descr = (NULL == PValue) ? NULL: *PValue;
- uv_rwlock_rdunlock(&page_index->lock);
-
- bool descr_found = false;
- if (unlikely(descr && descr->start_time_ut == start_time_ut)) {
- // We have this descriptor already
- descr_found = true;
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(page_index->id, uuid_str);
- internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
- uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, descr->extent->datafile->fileno,
- descr->extent->offset, descr->extent->size);
- internal_error(true, "APPLYING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
- uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, extent->datafile->fileno,
- extent->offset, extent->size);
-#endif
- // Remove entry from previous extent
- unlink_descriptor_extent_unsafe(descr);
- internal_error(true, "REMOVING UUID %s with %lu OK", uuid_str, start_time_ut);
- }
- else {
- descr = pg_cache_create_descr();
- descr->id = &page_index->id;
- }
-
+ descr = pg_cache_create_descr();
descr->page_length = jf_metric_data->descr[i].page_length;
descr->start_time_ut = start_time_ut;
descr->end_time_ut = end_time_ut;
descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
-
+ descr->id = &page_index->id;
descr->extent = extent;
descr->type = page_type;
extent->pages[valid_pages++] = descr;
- if (likely(!descr_found))
- (void)pg_cache_insert(ctx, page_index, descr, true);
+ pg_cache_insert(ctx, page_index, descr);
- if (page_index->latest_time_ut == descr->end_time_ut)
+ if(page_index->latest_time_ut == descr->end_time_ut)
page_index->latest_update_every_s = descr->update_every_s;
if(descr->update_every_s == 0)
- fatal("DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
+ fatal(
+ "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
(unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
}
extent->number_of_pages = valid_pages;
- if (likely(valid_pages)) {
+ if (likely(valid_pages))
df_extent_insert(extent);
- }
else {
freez(extent);
ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
@@ -536,13 +444,13 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
return size_bytes;
}
switch (jf_header->type) {
- case STORE_DATA:
- debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
- restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
- break;
- default:
- error("Unknown transaction type. Skipping record.");
- break;
+ case STORE_DATA:
+ debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ break;
+ default:
+ error("Unknown transaction type. Skipping record.");
+ break;
}
return size_bytes;
@@ -610,905 +518,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
if (likely(journal_is_mmapped))
buf += size_bytes;
}
- skip_file:
+skip_file:
if (unlikely(!journal_is_mmapped))
posix_memfree(buf);
return max_id;
}
-bool unlink_descriptor_extent_unsafe(struct rrdeng_page_descr *descr)
-{
- if (unlikely(!descr || !descr->extent))
- return true;
-
- struct extent_info *extent = descr->extent;
- for (uint8_t index = 0; index < extent->number_of_pages; index++) {
- if (extent->pages[index] == descr) {
- extent->pages[index] = NULL;
- return true;
- }
- }
- return false;
-}
-
-// Checks that the extent list checksum is valid
-static int check_journal_v2_extent_list (void *data_start, size_t file_size)
-{
- UNUSED(file_size);
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
- if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("Extent list CRC32 check: FAILED");
- return 1;
- }
-
- return 0;
-}
-
-// Checks that the metric list (UUIDs) checksum is valid
-static int check_journal_v2_metric_list(void *data_start, size_t file_size)
-{
- UNUSED(file_size);
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
- if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("Metric list CRC32 check: FAILED");
- return 1;
- }
- return 0;
-}
-
-static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size)
-{
- int rc;
- uLong crc;
-
- struct journal_v2_header *j2_header = (void *) data_start;
- struct journal_v2_block_trailer *journal_v2_trailer;
-
- if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
- return 2;
-
- // Magic failure
- if (j2_header->magic != JOURVAL_V2_MAGIC)
- return 1;
-
- if (j2_header->total_file_size != file_size)
- return 1;
-
- if (original_size && j2_header->original_file_size != original_size)
- return 1;
-
- journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + file_size - sizeof(*journal_v2_trailer));
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
-
- rc = crc32cmp(journal_v2_trailer->checksum, crc);
- if (unlikely(rc)) {
- error("File CRC32 check: FAILED");