summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-26 00:55:38 +0200
committerGitHub <noreply@github.com>2023-01-26 00:55:38 +0200
commit7a21b966381022b9dbb15d4377fb09b82d1f6067 (patch)
tree0aabb02c74b2611a5872dd05dba089bb7dc19f06
parent3e3ff4bee83363dca7cfb838baf1cf316960ed1b (diff)
DBENGINE v2 - improvements part 9 (#14326)
* on shutdown stop data collection for all hosts instead of freeing their memory * print number of sql statements per metadata host scan * print timings with metadata checking * use dbengine API to figure out of a database is legacy * Recalculate retention after a datafile deletion * validate child timestamps during replication * main cache uses a lockless aral per partition, protected by the partition index lock * prevent ML crash * Revert "main cache uses a lockless aral per partition, protected by the partition index lock" This reverts commit 6afc01527dc5c66548b4bc8a1d63c026c3149358. * Log direct index and binary searches * distribute metrics more evenly across time * statistics about retention recalculation * fix crash * Reverse the binary search to calculate retention * more optimization on retention calculation * removed commented old code Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
-rw-r--r--collectors/plugins.d/pluginsd_parser.c11
-rw-r--r--daemon/main.c5
-rw-r--r--database/engine/pagecache.h1
-rw-r--r--database/engine/rrdengine.c176
-rwxr-xr-xdatabase/engine/rrdengineapi.c68
-rw-r--r--database/engine/rrdengineapi.h1
-rw-r--r--database/rrd.h7
-rw-r--r--database/rrddim.c33
-rw-r--r--database/rrdhost.c24
-rw-r--r--database/rrdset.c50
-rw-r--r--database/sqlite/sqlite_metadata.c42
-rw-r--r--ml/Host.cc2
-rw-r--r--streaming/replication.c133
-rw-r--r--streaming/replication.h2
14 files changed, 388 insertions, 167 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index f6dacf5614..48acfd7dc1 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -326,7 +326,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
{
const char *first_entry_txt = get_word(words, num_words, 1);
const char *last_entry_txt = get_word(words, num_words, 2);
- const char *world_time_txt = get_word(words, num_words, 3);
+ const char *wall_clock_time_txt = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
@@ -336,12 +336,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
- time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec();
-
- if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0))
- error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).",
- rrdhost_hostname(host), rrdset_id(st),
- first_entry_child, last_entry_child, child_world_time);
+ time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
bool ok = true;
if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
@@ -358,7 +353,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
ok = replicate_chart_request(send_to_plugin, parser, host, st,
- first_entry_child, last_entry_child, child_world_time,
+ first_entry_child, last_entry_child, child_wall_clock_time,
0, 0);
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
diff --git a/daemon/main.c b/daemon/main.c
index 608985bddf..8f51e36c16 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -418,9 +418,10 @@ void netdata_cleanup_and_exit(int ret) {
#endif
// free the database
- delta_shutdown_time("free rrdhost structures");
+ delta_shutdown_time("stop collection for all hosts");
- rrdhost_free_all();
+ // rrdhost_free_all();
+ rrd_finalize_collection_for_all_hosts();
delta_shutdown_time("stop metasync threads");
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index d242aa81ca..f7454e5aeb 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -52,6 +52,7 @@ struct rrdeng_page_info {
struct pg_alignment {
uint32_t page_position;
uint32_t refcount;
+ uint16_t initial_slots;
};
struct rrdeng_query_handle;
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index acbf8e25b9..ed1d851b1f 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -1254,11 +1254,13 @@ static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused,
struct uuid_first_time_s {
uuid_t *uuid;
time_t first_time_s;
- time_t last_time_s;
METRIC *metric;
+ size_t pages_found;
+ size_t df_matched;
+ size_t df_index_oldest;
};
-static int journal_metric_uuid_compare(const void *key, const void *metric)
+static int journal_metric_compare(const void *key, const void *metric)
{
return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
}
@@ -1279,7 +1281,12 @@ struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struc
return next_datafile;
}
-void find_uuid_first_time(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, Pvoid_t metric_first_time_JudyL) {
+void find_uuid_first_time(
+ struct rrdengine_instance *ctx,
+ struct rrdengine_datafile *datafile,
+ struct uuid_first_time_s *uuid_first_entry_list,
+ size_t count)
+{
// acquire the datafile to work with it
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION))
@@ -1289,8 +1296,10 @@ void find_uuid_first_time(struct rrdengine_instance *ctx, struct rrdengine_dataf
if (unlikely(!datafile))
return;
- unsigned v2_count = 0;
unsigned journalfile_count = 0;
+ size_t binary_match = 0;
+ size_t not_matching_bsearches = 0;
+
while (datafile) {
struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0);
if (!j2_header) {
@@ -1299,55 +1308,114 @@ void find_uuid_first_time(struct rrdengine_instance *ctx, struct rrdengine_dataf
}
time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
- size_t journal_metric_count = (size_t)j2_header->metric_count;
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
+ struct uuid_first_time_s *uuid_original_entry;
- Word_t index = 0;
- bool first_then_next = true;
- Pvoid_t *PValue;
- while ((PValue = JudyLFirstThenNext(metric_first_time_JudyL, &index, &first_then_next))) {
- struct uuid_first_time_s *uuid_first_t_entry = *PValue;
+ size_t journal_metric_count = j2_header->metric_count;
- struct journal_metric_list *uuid_entry = bsearch(uuid_first_t_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_uuid_compare);
+ for (size_t index = 0; index < count; ++index) {
+ uuid_original_entry = &uuid_first_entry_list[index];
- if (unlikely(!uuid_entry))
+ // Check here if we should skip this
+ if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5)
continue;
- time_t first_time_s = uuid_entry->delta_start_s + journal_start_time_s;
- time_t last_time_s = uuid_entry->delta_end_s + journal_start_time_s;
- uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s , first_time_s);
- uuid_first_t_entry->last_time_s = MAX(uuid_first_t_entry->last_time_s , last_time_s);
- v2_count++;
+ struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare);
+ if (!live_entry) {
+ // Not found in this journal
+ not_matching_bsearches++;
+ continue;
+ }
+
+ uuid_original_entry->pages_found += live_entry->entries;
+ uuid_original_entry->df_matched++;
+
+ time_t old_first_time_s = uuid_original_entry->first_time_s;
+
+ // Calculate first / last for this match
+ time_t first_time_s = live_entry->delta_start_s + journal_start_time_s;
+ uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s);
+
+ if (uuid_original_entry->first_time_s != old_first_time_s)
+ uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched;
+
+ binary_match++;
}
+
journalfile_count++;
journalfile_v2_data_release(datafile->journalfile);
datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
}
// Let's scan the open cache for almost exact match
- bool first_then_next = true;
- Pvoid_t *PValue;
- Word_t index = 0;
- unsigned open_cache_count = 0;
- while ((PValue = JudyLFirstThenNext(metric_first_time_JudyL, &index, &first_then_next))) {
- struct uuid_first_time_s *uuid_first_t_entry = *PValue;
+ size_t open_cache_count = 0;
+
+ size_t df_index[10] = { 0 };
+ size_t without_metric = 0;
+ size_t open_cache_gave_first_time_s = 0;
+ size_t metric_count = 0;
+ size_t without_retention = 0;
+ size_t not_needed_bsearches = 0;
+
+ for (size_t index = 0; index < count; ++index) {
+ struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index];
+
+ metric_count++;
+
+ size_t idx = uuid_first_t_entry->df_index_oldest;
+ if(idx >= 10)
+ idx = 9;
+
+ df_index[idx]++;
+
+ not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest;
+
+ if (unlikely(!uuid_first_t_entry->metric)) {
+ without_metric++;
+ continue;
+ }
PGC_PAGE *page = pgc_page_get_and_acquire(
open_cache, (Word_t)ctx,
- (Word_t)uuid_first_t_entry->metric, uuid_first_t_entry->last_time_s,
- PGC_SEARCH_CLOSEST);
+ (Word_t)uuid_first_t_entry->metric, 0,
+ PGC_SEARCH_FIRST);
if (page) {
+ time_t old_first_time_s = uuid_first_t_entry->first_time_s;
+
time_t first_time_s = pgc_page_start_time_s(page);
- time_t last_time_s = pgc_page_end_time_s(page);
uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s);
- uuid_first_t_entry->last_time_s = MAX(uuid_first_t_entry->last_time_s, last_time_s);
pgc_page_release(open_cache, page);
open_cache_count++;
+
+ if(uuid_first_t_entry->first_time_s != old_first_time_s) {
+ open_cache_gave_first_time_s++;
+ }
+ }
+ else {
+ if(!uuid_first_t_entry->df_index_oldest)
+ without_retention++;
}
}
- info("DBENGINE: processed %u journalfiles and matched %u metric pages in v2 files and %u in open cache", journalfile_count,
- v2_count, open_cache_count);
+ internal_error(true,
+ "DBENGINE: analyzed the retention of %zu rotated metrics, "
+ "did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, "
+ "%zu metrics with entries in open cache, "
+ "metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), "
+ "open cache found first time %zu, "
+ "metrics without any remaining retention %zu, "
+ "metrics not in MRG %zu",
+ metric_count,
+ binary_match,
+ not_matching_bsearches,
+ not_needed_bsearches,
+ journalfile_count,
+ open_cache_count,
+ df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9],
+ open_cache_gave_first_time_s,
+ without_retention,
+ without_metric
+ );
}
static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
@@ -1360,59 +1428,49 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0);
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
- Pvoid_t metric_first_time_JudyL = (Pvoid_t) NULL;
- Pvoid_t *PValue;
-
- unsigned count = 0;
+ size_t count = j2_header->metric_count;
struct uuid_first_time_s *uuid_first_t_entry;
- for (uint32_t index = 0; index < j2_header->metric_count; ++index) {
+ struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s));
+
+ size_t added = 0;
+ for (size_t index = 0; index < count; ++index) {
METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx);
if (!metric)
continue;
- PValue = JudyLIns(&metric_first_time_JudyL, (Word_t) index, PJE0);
- fatal_assert(NULL != PValue);
- if (!*PValue) {
- uuid_first_t_entry = mallocz(sizeof(*uuid_first_t_entry));
- uuid_first_t_entry->metric = metric;
- uuid_first_t_entry->first_time_s = LONG_MAX;
- uuid_first_t_entry->last_time_s = 0;
- uuid_first_t_entry->uuid = mrg_metric_uuid(main_mrg, metric);
- *PValue = uuid_first_t_entry;
- count++;
- }
+ uuid_first_entry_list[added].metric = metric;
+ uuid_first_entry_list[added].first_time_s = LONG_MAX;
+ uuid_first_entry_list[added].df_matched = 0;
+ uuid_first_entry_list[added].df_index_oldest = 0;
+ uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric);
+ added++;
}
- journalfile_v2_data_release(journalfile);
- info("DBENGINE: recalculating retention for %u metrics starting with datafile %u", count, first_datafile_remaining->fileno);
+ info("DBENGINE: recalculating retention for %zu metrics starting with datafile %u", count, first_datafile_remaining->fileno);
+
+ journalfile_v2_data_release(journalfile);
// Update the first time / last time for all metrics we plan to delete
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION);
- find_uuid_first_time(ctx, first_datafile_remaining, metric_first_time_JudyL);
+ find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
- info("DBENGINE: updating metric registry retention for %u metrics", count);
-
- Word_t index = 0;
- bool first_then_next = true;
- while ((PValue = JudyLFirstThenNext(metric_first_time_JudyL, &index, &first_then_next))) {
- uuid_first_t_entry = *PValue;
+ info("DBENGINE: updating metric registry retention for %zu metrics", added);
- if (likely(uuid_first_t_entry->first_time_s != LONG_MAX && uuid_first_t_entry->last_time_s))
+ for (size_t index = 0; index < added; ++index) {
+ uuid_first_t_entry = &uuid_first_entry_list[index];
+ if (likely(uuid_first_t_entry->first_time_s != LONG_MAX))
mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
else
mrg_metric_set_first_time_s(main_mrg, uuid_first_t_entry->metric, 0);
-
mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
- freez(uuid_first_t_entry);
}
-
- JudyLFreeArray(&metric_first_time_JudyL, PJE0);
+ freez(uuid_first_entry_list);
if(worker)
worker_is_idle();
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index aca5f50bff..22fe30e4fd 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -412,7 +412,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
check_and_fix_mrg_update_every(handle);
}
-static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size) {
+static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
size_t size;
@@ -421,28 +421,45 @@ static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle,
size = tier_page_size[ctx->config.tier];
}
else {
+ size_t final_slots = 0;
+
// the first page
handle->options |= RRDENG_FIRST_PAGE_ALLOCATED;
size_t max_size = tier_page_size[ctx->config.tier];
size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
- size_t min_slots = max_slots / 5;
- size_t distribution = max_slots - min_slots;
- size_t this_page_end_slot = indexing_partition((Word_t)handle->alignment, distribution);
- size_t current_end_slot = (size_t)now_monotonic_sec() % distribution;
+ if(handle->alignment->initial_slots) {
+ final_slots = handle->alignment->initial_slots;
+ }
+ else {
+ max_slots -= 3;
+
+ size_t smaller_slot = indexing_partition((Word_t)handle->alignment, max_slots);
+ final_slots = smaller_slot;
- if(current_end_slot < this_page_end_slot)
- this_page_end_slot -= current_end_slot;
- else if(current_end_slot > this_page_end_slot)
- this_page_end_slot = (max_slots - current_end_slot) + this_page_end_slot;
+ time_t now_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
+ size_t current_pos = (now_s % max_slots);
- size_t final_slots = min_slots + this_page_end_slot;
+ if(current_pos > final_slots)
+ final_slots += max_slots - current_pos;
- if(final_slots > max_slots)
- final_slots = max_slots;
+ else if(current_pos < final_slots)
+ final_slots -= current_pos;
- if(final_slots < min_slots)
- final_slots = min_slots;
+ if(final_slots < 3) {
+ final_slots += 3;
+ smaller_slot += 3;
+
+ if(smaller_slot >= max_slots)
+ smaller_slot -= max_slots;
+ }
+
+ max_slots += 3;
+ handle->alignment->initial_slots = smaller_slot + 3;
+
+ internal_fatal(handle->alignment->initial_slots < 3 || handle->alignment->initial_slots >= max_slots, "ooops! wrong distribution of metrics across time");
+ internal_fatal(final_slots < 3 || final_slots >= max_slots, "ooops! wrong distribution of metrics across time");
+ }
size = final_slots * CTX_POINT_SIZE_BYTES(ctx);
}
@@ -485,7 +502,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
handle->page_flags |= RRDENG_PAGE_UNALIGNED;
rrdeng_store_metric_flush_current_page(collection_handle);
- data = rrdeng_alloc_new_metric_data(handle, &data_size);
+ data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
}
else {
data = pgc_page_data(handle->page);
@@ -493,7 +510,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
}
}
else
- data = rrdeng_alloc_new_metric_data(handle, &data_size);
+ data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
switch (ctx->config.page_type) {
case PAGE_METRICS: {
@@ -682,9 +699,15 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_writer_release(main_mrg, handle->metric))
internal_fatal(true, "DBENGINE: metric is already released");
+ time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, handle->metric);
+ time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, handle->metric);
+
mrg_metric_release(main_mrg, handle->metric);
freez(handle);
+ if(!first_time_s && !last_time_s)
+ return 1;
+
return 0;
}
@@ -1088,6 +1111,11 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) {
info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier);
}
+bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ return ctx->config.legacy;
+}
+
void rrdeng_exit_mode(struct rrdengine_instance *ctx) {
__atomic_store_n(&ctx->quiesce.exit_mode, true, __ATOMIC_RELAXED);
}
@@ -1149,7 +1177,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
finalize_rrd_files(ctx);
}
- if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) {
+ if (ctx->config.legacy) {
freez(ctx);
if (ctxp)
*ctxp = NULL;
@@ -1179,14 +1207,16 @@ int rrdeng_exit(struct rrdengine_instance *ctx) {
bool logged = false;
while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) {
if(!logged) {
- info("Waiting for collectors to finish on tier %d...", ctx->config.tier);
+ info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
}
+ info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx);
+ info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
struct completion completion = {};
completion_init(&completion);
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_SHUTDOWN, NULL, &completion, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL);
@@ -1195,7 +1225,7 @@ int rrdeng_exit(struct rrdengine_instance *ctx) {
finalize_rrd_files(ctx);
- if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
+ if(ctx->config.legacy)
freez(ctx);
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index bf63e6fa56..af3d5d8344 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -225,5 +225,6 @@ struct rrdeng_cache_efficiency_stats rrdeng_get_cache_efficiency_stats(void);
RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
size_t rrdeng_collectors_running(struct rrdengine_instance *ctx);
+bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance);
#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/rrd.h b/database/rrd.h
index 6fd7d6ac4e..1ae53b47e5 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -571,6 +571,8 @@ typedef enum __attribute__ ((__packed__)) rrdset_flags {
RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 25), // the receiving side has completed replication
RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 26), // a custom variable has been updated and needs to be exposed to parent
+
+ RRDSET_FLAG_COLLECTION_FINISHED = (1 << 27), // when set, data collection is not available for this chart
} RRDSET_FLAGS;
#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
@@ -1310,6 +1312,11 @@ collected_number rrddim_timed_set_by_pointer(RRDSET *st, RRDDIM *rd, struct time
collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value);
collected_number rrddim_set(RRDSET *st, const char *id, collected_number value);
+bool rrddim_finalize_collection_and_check_retention(RRDDIM *rd);
+void rrdset_finalize_collection(RRDSET *st, bool dimensions_too);
+void rrdhost_finalize_collection(RRDHOST *host);
+void rrd_finalize_collection_for_all_hosts(void);
+
long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
#ifdef NETDATA_LOG_COLLECTION_ERRORS
diff --git a/database/rrddim.c b/database/rrddim.c
index 5a3f962572..b520f21d3d 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -166,6 +166,25 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
}
+bool rrddim_finalize_collection_and_check_retention(RRDDIM *rd) {
+ size_t tiers_available = 0, tiers_said_no_retention = 0;
+
+ for(size_t tier = 0; tier < storage_tiers ;tier++) {
+ if(!rd->tiers[tier].db_collection_handle)
+ continue;
+
+ tiers_available++;
+
+ if(rd->tiers[tier].collect_ops->finalize(rd->tiers[tier].db_collection_handle))
+ tiers_said_no_retention++;
+
+ rd->tiers[tier].db_collection_handle = NULL;
+ }
+
+ // return true if the dimension has retention in the db
+ return (!tiers_said_no_retention || tiers_available > tiers_said_no_retention);
+}
+
static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrddim, void *rrdset) {
RRDDIM *rd = rrddim;
RRDSET *st = rrdset;
@@ -180,19 +199,7 @@ static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
debug(D_RRD_CALLS, "rrddim_free() %s.%s", rrdset_name(st), rrddim_name(rd));
- size_t tiers_available = 0, tiers_said_no_retention = 0;
- for(size_t tier = 0; tier < storage_tiers ;tier++) {
- if(rd->tiers[tier].db_collection_handle) {
- tiers_available++;
-
- if(rd->tiers[tier].collect_ops->finalize(rd->tiers[tier].db_collection_handle))
- tiers_said_no_retention++;
-
- rd->tiers[tier].db_collection_handle = NULL;
- }
- }
-
- if (tiers_available == tiers_said_no_retention && tiers_said_no_retention && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ if (!rrddim_finalize_collection_and_check_retention(rd) && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
/* This metric has no data and no references */
metaqueue_delete_dimension_uuid(&rd->metric_uuid);
}
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 516d733daa..b25fc72d21 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -33,10 +33,8 @@ time_t rrdhost_free_orphan_time_s = 3600;
bool is_storage_engine_shared(STORAGE_INSTANCE *engine __maybe_unused) {
#ifdef ENABLE_DBENGINE
- for(size_t tier = 0; tier < storage_tiers ;tier++) {
- if (engine == (STORAGE_INSTANCE *)multidb_ctx[tier])
- return true;
- }
+ if(!rrdeng_is_legacy(engine))
+ return true;
#endif
return false;
@@ -1223,6 +1221,15 @@ void rrdhost_free_all(void) {
rrd_unlock();
}
+void rrd_finalize_collection_for_all_hosts(void) {
+ RRDHOST *host;
+ rrd_wrlock();
+ rrdhost_foreach_read(host) {
+ rrdhost_finalize_collection(host);
+ }
+ rrd_unlock();
+}
+
// ----------------------------------------------------------------------------
// RRDHOST - save host files
@@ -1391,6 +1398,15 @@ void reload_host_labels(void) {
rrdpush_send_host_labels(localhost);
}
+void rrdhost_finalize_collection(RRDHOST *host) {
+ info("Stopping data collection for host '%s'...", rrdhost_hostname(host));
+
+ RRDSET *st;
+ rrdset_foreach_write(st, host)
+ rrdset_finalize_collection(st, true);
+ rrdset_foreach_done(st);
+}
+
// ----------------------------------------------------------------------------
// RRDHOST - delete host files
diff --git a/database/rrdset.c b/database/rrdset.c
index 886aa800ab..c97ebbb145 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -185,6 +185,29 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
ml_chart_new(st);
}
+void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) {
+ RRDHOST *host = st->rrdhost;
+
+ rrdset_flag_set(st, RRDSET_FLAG_COLLECTION_FINISHED);
+
+ if(dimensions_too) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st)
+ rrddim_finalize_collection_and_check_retention(rd);
+ rrddim_foreach_done(rd);
+ }
+
+ for(size_t tier = 0; tier < storage_tiers ; tier++) {
+ STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng;
+ if(!eng) continue;
+
+ if(st->storage_metrics_groups[tier]) {
+ eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]);
+ st->storage_metrics_groups[tier] = NULL;
+ }
+ }
+}
+
// the destructor - the dictionary is write locked while this runs
static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdset, void *rrdhost) {
RRDHOST *host = rrdhost;
@@ -192,15 +215,7 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
rrdset_flag_clear(st, RRDSET_FLAG_INDEXED_ID);
- // cleanup storage engines
- {
- for(size_t tier = 0; tier < storage_tiers ; tier++) {
- STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng;
- if(!eng) continue;
-
- eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]);
- }
- }
+ rrdset_finalize_collection(st, false);
// remove it from the name index
rrdset_index_del_name(host, st);
@@ -600,16 +615,17 @@ void rrdset_get_retention_of_tier_for_collected_chart(RRDSET *st, time_t *first_
}
if(unlikely(db_first_entry_s && db_last_entry_s && db_first_entry_s >= db_last_entry_s)) {
- internal_error(true,
- "RRDSET: 'host:%s/chart:%s' oldest db time %ld is equal or bigger than latest db time %ld, adjusting it last updated time - update every",
+ internal_error(db_first_entry_s > db_last_entry_s,
+ "RRDSET: 'host:%s/chart:%s' oldest db time %ld is bigger than latest db time %ld, adjusting it to (latest time %ld - update every %ld)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
- db_first_entry_s, db_last_entry_s);
+ db_first_entry_s, db_last_entry_s,
+ db_last_entry_s, (time_t)st->update_every);
db_first_entry_s = db_last_entry_s - st->update_every;
}
if(unlikely(!db_first_entry_s && db_last_entry_s))
// this can be the case on the first data collection of a chart
- db_first_entry_s = db_last_entry_s;
+ db_first_entry_s = db_last_entry_s - st->update_every;
*first_time_s = db_first_entry_s;
*last_time_s = db_last_entry_s;
@@ -1467,9 +1483,13 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
next_store_ut = 0, // the timestamp in microseconds, of the next entry to store in the db
update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds
+ RRDSET_FLAGS rrdset_flags = rrdset_flag_check(st, ~0);
+ if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED))
+ return;
+
netdata_thread_disable_cancelability();
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) {
+ if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) {
error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st));
rrdset_isnot_obsolete(st);
}
@@ -1554,7 +1574,7 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST))) {
+ if(unlikely(rrdset_flags & RRDSET_FLAG_STORE_FIRST)) {
store_this_entry = 1;
last_collect_ut = next_store_ut - update_every_ut;
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index ecfab42c12..326e5e5a1f 100644
--- a/database/sqlite/sqlite_metadata.c