diff options
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 11 | ||||
-rw-r--r-- | daemon/main.c | 5 | ||||
-rw-r--r-- | database/engine/pagecache.h | 1 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 176 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 68 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 1 | ||||
-rw-r--r-- | database/rrd.h | 7 | ||||
-rw-r--r-- | database/rrddim.c | 33 | ||||
-rw-r--r-- | database/rrdhost.c | 24 | ||||
-rw-r--r-- | database/rrdset.c | 50 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 42 | ||||
-rw-r--r-- | ml/Host.cc | 2 | ||||
-rw-r--r-- | streaming/replication.c | 133 | ||||
-rw-r--r-- | streaming/replication.h | 2 |
14 files changed, 388 insertions, 167 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index f6dacf5614..48acfd7dc1 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -326,7 +326,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); - const char *world_time_txt = get_word(words, num_words, 3); + const char *wall_clock_time_txt = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); if(!host) return PLUGINSD_DISABLE_PLUGIN(user); @@ -336,12 +336,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; - time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec(); - - if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0)) - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).", - rrdhost_hostname(host), rrdset_id(st), - first_entry_child, last_entry_child, child_world_time); + time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec(); bool ok = true; if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { @@ -358,7 +353,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; ok = replicate_chart_request(send_to_plugin, parser, host, st, - first_entry_child, last_entry_child, child_world_time, + first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); } #ifdef NETDATA_LOG_REPLICATION_REQUESTS diff --git a/daemon/main.c b/daemon/main.c index 608985bddf..8f51e36c16 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -418,9 +418,10 @@ void netdata_cleanup_and_exit(int ret) { #endif // free the database - delta_shutdown_time("free rrdhost structures"); + delta_shutdown_time("stop collection for all hosts"); - rrdhost_free_all(); + // rrdhost_free_all(); + rrd_finalize_collection_for_all_hosts(); delta_shutdown_time("stop metasync threads"); diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index d242aa81ca..f7454e5aeb 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -52,6 +52,7 @@ struct rrdeng_page_info { struct pg_alignment { uint32_t page_position; uint32_t refcount; + uint16_t initial_slots; }; struct rrdeng_query_handle; diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index acbf8e25b9..ed1d851b1f 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -1254,11 +1254,13 @@ static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, struct uuid_first_time_s { uuid_t *uuid; time_t first_time_s; - time_t last_time_s; METRIC *metric; + size_t pages_found; + size_t df_matched; + size_t df_index_oldest; }; -static int journal_metric_uuid_compare(const void *key, const void *metric) +static int journal_metric_compare(const void *key, const void *metric) { return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid); } @@ -1279,7 +1281,12 @@ struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struc return next_datafile; } -void find_uuid_first_time(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, Pvoid_t metric_first_time_JudyL) { +void find_uuid_first_time( + struct rrdengine_instance *ctx, + struct rrdengine_datafile *datafile, + struct uuid_first_time_s *uuid_first_entry_list, + size_t count) +{ // acquire the datafile to work with it uv_rwlock_rdlock(&ctx->datafiles.rwlock); while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION)) @@ -1289,8 +1296,10 @@ void find_uuid_first_time(struct rrdengine_instance *ctx, struct rrdengine_dataf if (unlikely(!datafile)) return; - unsigned v2_count = 0; unsigned journalfile_count = 0; + size_t binary_match = 0; + size_t not_matching_bsearches = 0; + while (datafile) { struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0); if (!j2_header) { @@ -1299,55 +1308,114 @@ void find_uuid_first_time(struct rrdengine_instance *ctx, struct rrdengine_dataf } time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); - size_t journal_metric_count = (size_t)j2_header->metric_count; struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); + struct uuid_first_time_s *uuid_original_entry; - Word_t index = 0; - bool first_then_next = true; - Pvoid_t *PValue; - while ((PValue = JudyLFirstThenNext(metric_first_time_JudyL, &index, &first_then_next))) { - struct uuid_first_time_s *uuid_first_t_entry = *PValue; + size_t journal_metric_count = j2_header->metric_count; - struct journal_metric_list *uuid_entry = bsearch(uuid_first_t_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_uuid_compare); + for (size_t index = 0; index < count; ++index) { + uuid_original_entry = &uuid_first_entry_list[index]; - if (unlikely(!uuid_entry)) + // Check here if we should skip this + if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5) continue; - time_t first_time_s = uuid_entry->delta_start_s + journal_start_time_s; - time_t last_time_s = uuid_entry->delta_end_s + journal_start_time_s; - uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s , first_time_s); - uuid_first_t_entry->last_time_s = MAX(uuid_first_t_entry->last_time_s , last_time_s); - v2_count++; + struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare); + if (!live_entry) { + // Not found in this journal + not_matching_bsearches++; + continue; + } + + uuid_original_entry->pages_found += live_entry->entries; + uuid_original_entry->df_matched++; + + time_t old_first_time_s = uuid_original_entry->first_time_s; + + // Calculate first / last for this match + time_t first_time_s = live_entry->delta_start_s + journal_start_time_s; + uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s); + + if (uuid_original_entry->first_time_s != old_first_time_s) + uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched; + + binary_match++; } + journalfile_count++; journalfile_v2_data_release(datafile->journalfile); datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile); } // Let's scan the open cache for almost exact match - bool first_then_next = true; - Pvoid_t *PValue; - Word_t index = 0; - unsigned open_cache_count = 0; - while ((PValue = JudyLFirstThenNext(metric_first_time_JudyL, &index, &first_then_next))) { - struct uuid_first_time_s *uuid_first_t_entry = *PValue; + size_t open_cache_count = 0; + + size_t df_index[10] = { 0 }; + size_t without_metric = 0; + size_t open_cache_gave_first_time_s = 0; + size_t metric_count = 0; + size_t without_retention = 0; + size_t not_needed_bsearches = 0; + + for (size_t index = 0; index < count; ++index) { + struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index]; + + metric_count++; + + size_t idx = uuid_first_t_entry->df_index_oldest; + if(idx >= 10) + idx = 9; + + df_index[idx]++; + + not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest; + + if (unlikely(!uuid_first_t_entry->metric)) { + without_metric++; + continue; + } PGC_PAGE *page = pgc_page_get_and_acquire( open_cache, (Word_t)ctx, - (Word_t)uuid_first_t_entry->metric, uuid_first_t_entry->last_time_s, - PGC_SEARCH_CLOSEST); + (Word_t)uuid_first_t_entry->metric, 0, + PGC_SEARCH_FIRST); if (page) { + time_t old_first_time_s = uuid_first_t_entry->first_time_s; + time_t first_time_s = pgc_page_start_time_s(page); - time_t last_time_s = pgc_page_end_time_s(page); uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s); - uuid_first_t_entry->last_time_s = MAX(uuid_first_t_entry->last_time_s, last_time_s); pgc_page_release(open_cache, page); open_cache_count++; + + if(uuid_first_t_entry->first_time_s != old_first_time_s) { + open_cache_gave_first_time_s++; + } + } + else { + if(!uuid_first_t_entry->df_index_oldest) + without_retention++; } } - info("DBENGINE: processed %u journalfiles and matched %u metric pages in v2 files and %u in open cache", journalfile_count, - v2_count, open_cache_count); + internal_error(true, + "DBENGINE: analyzed the retention of %zu rotated metrics, " + "did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, " + "%zu metrics with entries in open cache, " + "metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), " + "open cache found first time %zu, " + "metrics without any remaining retention %zu, " + "metrics not in MRG %zu", + metric_count, + binary_match, + not_matching_bsearches, + not_needed_bsearches, + journalfile_count, + open_cache_count, + df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9], + open_cache_gave_first_time_s, + without_retention, + without_metric + ); } static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) { @@ -1360,59 +1428,49 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0); struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); - Pvoid_t metric_first_time_JudyL = (Pvoid_t) NULL; - Pvoid_t *PValue; - - unsigned count = 0; + size_t count = j2_header->metric_count; struct uuid_first_time_s *uuid_first_t_entry; - for (uint32_t index = 0; index < j2_header->metric_count; ++index) { + struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s)); + + size_t added = 0; + for (size_t index = 0; index < count; ++index) { METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx); if (!metric) continue; - PValue = JudyLIns(&metric_first_time_JudyL, (Word_t) index, PJE0); - fatal_assert(NULL != PValue); - if (!*PValue) { - uuid_first_t_entry = mallocz(sizeof(*uuid_first_t_entry)); - uuid_first_t_entry->metric = metric; - uuid_first_t_entry->first_time_s = LONG_MAX; - uuid_first_t_entry->last_time_s = 0; - uuid_first_t_entry->uuid = mrg_metric_uuid(main_mrg, metric); - *PValue = uuid_first_t_entry; - count++; - } + uuid_first_entry_list[added].metric = metric; + uuid_first_entry_list[added].first_time_s = LONG_MAX; + uuid_first_entry_list[added].df_matched = 0; + uuid_first_entry_list[added].df_index_oldest = 0; + uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric); + added++; } - journalfile_v2_data_release(journalfile); - info("DBENGINE: recalculating retention for %u metrics starting with datafile %u", count, first_datafile_remaining->fileno); + info("DBENGINE: recalculating retention for %zu metrics starting with datafile %u", count, first_datafile_remaining->fileno); + + journalfile_v2_data_release(journalfile); // Update the first time / last time for all metrics we plan to delete if(worker) worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION); - find_uuid_first_time(ctx, first_datafile_remaining, metric_first_time_JudyL); + find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); if(worker) worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); - info("DBENGINE: updating metric registry retention for %u metrics", count); - - Word_t index = 0; - bool first_then_next = true; - while ((PValue = JudyLFirstThenNext(metric_first_time_JudyL, &index, &first_then_next))) { - uuid_first_t_entry = *PValue; + info("DBENGINE: updating metric registry retention for %zu metrics", added); - if (likely(uuid_first_t_entry->first_time_s != LONG_MAX && uuid_first_t_entry->last_time_s)) + for (size_t index = 0; index < added; ++index) { + uuid_first_t_entry = &uuid_first_entry_list[index]; + if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); else mrg_metric_set_first_time_s(main_mrg, uuid_first_t_entry->metric, 0); - mrg_metric_release(main_mrg, uuid_first_t_entry->metric); - freez(uuid_first_t_entry); } - - JudyLFreeArray(&metric_first_time_JudyL, PJE0); + freez(uuid_first_entry_list); if(worker) worker_is_idle(); diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index aca5f50bff..22fe30e4fd 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -412,7 +412,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha check_and_fix_mrg_update_every(handle); } -static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size) { +static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) { struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); size_t size; @@ -421,28 +421,45 @@ static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size = tier_page_size[ctx->config.tier]; } else { + size_t final_slots = 0; + // the first page handle->options |= RRDENG_FIRST_PAGE_ALLOCATED; size_t max_size = tier_page_size[ctx->config.tier]; size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx); - size_t min_slots = max_slots / 5; - size_t distribution = max_slots - min_slots; - size_t this_page_end_slot = indexing_partition((Word_t)handle->alignment, distribution); - size_t current_end_slot = (size_t)now_monotonic_sec() % distribution; + if(handle->alignment->initial_slots) { + final_slots = handle->alignment->initial_slots; + } + else { + max_slots -= 3; + + size_t smaller_slot = indexing_partition((Word_t)handle->alignment, max_slots); + final_slots = smaller_slot; - if(current_end_slot < this_page_end_slot) - this_page_end_slot -= current_end_slot; - else if(current_end_slot > this_page_end_slot) - this_page_end_slot = (max_slots - current_end_slot) + this_page_end_slot; + time_t now_s = (time_t)(point_in_time_ut / USEC_PER_SEC); + size_t current_pos = (now_s % max_slots); - size_t final_slots = min_slots + this_page_end_slot; + if(current_pos > final_slots) + final_slots += max_slots - current_pos; - if(final_slots > max_slots) - final_slots = max_slots; + else if(current_pos < final_slots) + final_slots -= current_pos; - if(final_slots < min_slots) - final_slots = min_slots; + if(final_slots < 3) { + final_slots += 3; + smaller_slot += 3; + + if(smaller_slot >= max_slots) + smaller_slot -= max_slots; + } + + max_slots += 3; + handle->alignment->initial_slots = smaller_slot + 3; + + internal_fatal(handle->alignment->initial_slots < 3 || handle->alignment->initial_slots >= max_slots, "ooops! wrong distribution of metrics across time"); + internal_fatal(final_slots < 3 || final_slots >= max_slots, "ooops! wrong distribution of metrics across time"); + } size = final_slots * CTX_POINT_SIZE_BYTES(ctx); } @@ -485,7 +502,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ handle->page_flags |= RRDENG_PAGE_UNALIGNED; rrdeng_store_metric_flush_current_page(collection_handle); - data = rrdeng_alloc_new_metric_data(handle, &data_size); + data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut); } else { data = pgc_page_data(handle->page); @@ -493,7 +510,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ } } else - data = rrdeng_alloc_new_metric_data(handle, &data_size); + data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut); switch (ctx->config.page_type) { case PAGE_METRICS: { @@ -682,9 +699,15 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_writer_release(main_mrg, handle->metric)) internal_fatal(true, "DBENGINE: metric is already released"); + time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, handle->metric); + time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, handle->metric); + mrg_metric_release(main_mrg, handle->metric); freez(handle); + if(!first_time_s && !last_time_s) + return 1; + return 0; } @@ -1088,6 +1111,11 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) { info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier); } +bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + return ctx->config.legacy; +} + void rrdeng_exit_mode(struct rrdengine_instance *ctx) { __atomic_store_n(&ctx->quiesce.exit_mode, true, __ATOMIC_RELAXED); } @@ -1149,7 +1177,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p finalize_rrd_files(ctx); } - if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) { + if (ctx->config.legacy) { freez(ctx); if (ctxp) *ctxp = NULL; @@ -1179,14 +1207,16 @@ int rrdeng_exit(struct rrdengine_instance *ctx) { bool logged = false; while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) { if(!logged) { - info("Waiting for collectors to finish on tier %d...", ctx->config.tier); + info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); } + info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx); + info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); struct completion completion = {}; completion_init(&completion); rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_SHUTDOWN, NULL, &completion, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); @@ -1195,7 +1225,7 @@ int rrdeng_exit(struct rrdengine_instance *ctx) { finalize_rrd_files(ctx); - if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) + if(ctx->config.legacy) freez(ctx); rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index bf63e6fa56..af3d5d8344 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -225,5 +225,6 @@ struct rrdeng_cache_efficiency_stats rrdeng_get_cache_efficiency_stats(void); RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); size_t rrdeng_collectors_running(struct rrdengine_instance *ctx); +bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance); #endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/rrd.h b/database/rrd.h index 6fd7d6ac4e..1ae53b47e5 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -571,6 +571,8 @@ typedef enum __attribute__ ((__packed__)) rrdset_flags { RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 25), // the receiving side has completed replication RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 26), // a custom variable has been updated and needs to be exposed to parent + + RRDSET_FLAG_COLLECTION_FINISHED = (1 << 27), // when set, data collection is not available for this chart } RRDSET_FLAGS; #define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag)) @@ -1310,6 +1312,11 @@ collected_number rrddim_timed_set_by_pointer(RRDSET *st, RRDDIM *rd, struct time collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value); collected_number rrddim_set(RRDSET *st, const char *id, collected_number value); +bool rrddim_finalize_collection_and_check_retention(RRDDIM *rd); +void rrdset_finalize_collection(RRDSET *st, bool dimensions_too); +void rrdhost_finalize_collection(RRDHOST *host); +void rrd_finalize_collection_for_all_hosts(void); + long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries); #ifdef NETDATA_LOG_COLLECTION_ERRORS diff --git a/database/rrddim.c b/database/rrddim.c index 5a3f962572..b520f21d3d 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -166,6 +166,25 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v } +bool rrddim_finalize_collection_and_check_retention(RRDDIM *rd) { + size_t tiers_available = 0, tiers_said_no_retention = 0; + + for(size_t tier = 0; tier < storage_tiers ;tier++) { + if(!rd->tiers[tier].db_collection_handle) + continue; + + tiers_available++; + + if(rd->tiers[tier].collect_ops->finalize(rd->tiers[tier].db_collection_handle)) + tiers_said_no_retention++; + + rd->tiers[tier].db_collection_handle = NULL; + } + + // return true if the dimension has retention in the db + return (!tiers_said_no_retention || tiers_available > tiers_said_no_retention); +} + static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrddim, void *rrdset) { RRDDIM *rd = rrddim; RRDSET *st = rrdset; @@ -180,19 +199,7 @@ static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v debug(D_RRD_CALLS, "rrddim_free() %s.%s", rrdset_name(st), rrddim_name(rd)); - size_t tiers_available = 0, tiers_said_no_retention = 0; - for(size_t tier = 0; tier < storage_tiers ;tier++) { - if(rd->tiers[tier].db_collection_handle) { - tiers_available++; - - if(rd->tiers[tier].collect_ops->finalize(rd->tiers[tier].db_collection_handle)) - tiers_said_no_retention++; - - rd->tiers[tier].db_collection_handle = NULL; - } - } - - if (tiers_available == tiers_said_no_retention && tiers_said_no_retention && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { + if (!rrddim_finalize_collection_and_check_retention(rd) && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { /* This metric has no data and no references */ metaqueue_delete_dimension_uuid(&rd->metric_uuid); } diff --git a/database/rrdhost.c b/database/rrdhost.c index 516d733daa..b25fc72d21 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -33,10 +33,8 @@ time_t rrdhost_free_orphan_time_s = 3600; bool is_storage_engine_shared(STORAGE_INSTANCE *engine __maybe_unused) { #ifdef ENABLE_DBENGINE - for(size_t tier = 0; tier < storage_tiers ;tier++) { - if (engine == (STORAGE_INSTANCE *)multidb_ctx[tier]) - return true; - } + if(!rrdeng_is_legacy(engine)) + return true; #endif return false; @@ -1223,6 +1221,15 @@ void rrdhost_free_all(void) { rrd_unlock(); } +void rrd_finalize_collection_for_all_hosts(void) { + RRDHOST *host; + rrd_wrlock(); + rrdhost_foreach_read(host) { + rrdhost_finalize_collection(host); + } + rrd_unlock(); +} + // ---------------------------------------------------------------------------- // RRDHOST - save host files @@ -1391,6 +1398,15 @@ void reload_host_labels(void) { rrdpush_send_host_labels(localhost); } +void rrdhost_finalize_collection(RRDHOST *host) { + info("Stopping data collection for host '%s'...", rrdhost_hostname(host)); + + RRDSET *st; + rrdset_foreach_write(st, host) + rrdset_finalize_collection(st, true); + rrdset_foreach_done(st); +} + // ---------------------------------------------------------------------------- // RRDHOST - delete host files diff --git a/database/rrdset.c b/database/rrdset.c index 886aa800ab..c97ebbb145 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -185,6 +185,29 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v ml_chart_new(st); } +void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) { + RRDHOST *host = st->rrdhost; + + rrdset_flag_set(st, RRDSET_FLAG_COLLECTION_FINISHED); + + if(dimensions_too) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) + rrddim_finalize_collection_and_check_retention(rd); + rrddim_foreach_done(rd); + } + + for(size_t tier = 0; tier < storage_tiers ; tier++) { + STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng; + if(!eng) continue; + + if(st->storage_metrics_groups[tier]) { + eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]); + st->storage_metrics_groups[tier] = NULL; + } + } +} + // the destructor - the dictionary is write locked while this runs static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdset, void *rrdhost) { RRDHOST *host = rrdhost; @@ -192,15 +215,7 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v rrdset_flag_clear(st, RRDSET_FLAG_INDEXED_ID); - // cleanup storage engines - { - for(size_t tier = 0; tier < storage_tiers ; tier++) { - STORAGE_ENGINE *eng = st->rrdhost->db[tier].eng; - if(!eng) continue; - - eng->api.collect_ops.metrics_group_release(host->db[tier].instance, st->storage_metrics_groups[tier]); - } - } + rrdset_finalize_collection(st, false); // remove it from the name index rrdset_index_del_name(host, st); @@ -600,16 +615,17 @@ void rrdset_get_retention_of_tier_for_collected_chart(RRDSET *st, time_t *first_ } if(unlikely(db_first_entry_s && db_last_entry_s && db_first_entry_s >= db_last_entry_s)) { - internal_error(true, - "RRDSET: 'host:%s/chart:%s' oldest db time %ld is equal or bigger than latest db time %ld, adjusting it last updated time - update every", + internal_error(db_first_entry_s > db_last_entry_s, + "RRDSET: 'host:%s/chart:%s' oldest db time %ld is bigger than latest db time %ld, adjusting it to (latest time %ld - update every %ld)", rrdhost_hostname(st->rrdhost), rrdset_id(st), - db_first_entry_s, db_last_entry_s); + db_first_entry_s, db_last_entry_s, + db_last_entry_s, (time_t)st->update_every); db_first_entry_s = db_last_entry_s - st->update_every; } if(unlikely(!db_first_entry_s && db_last_entry_s)) // this can be the case on the first data collection of a chart - db_first_entry_s = db_last_entry_s; + db_first_entry_s = db_last_entry_s - st->update_every; *first_time_s = db_first_entry_s; *last_time_s = db_last_entry_s; @@ -1467,9 +1483,13 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) next_store_ut = 0, // the timestamp in microseconds, of the next entry to store in the db update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds + RRDSET_FLAGS rrdset_flags = rrdset_flag_check(st, ~0); + if(unlikely(rrdset_flags & RRDSET_FLAG_COLLECTION_FINISHED)) + return; + netdata_thread_disable_cancelability(); - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) { + if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) { error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st)); rrdset_isnot_obsolete(st); } @@ -1554,7 +1574,7 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next) last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST))) { + if(unlikely(rrdset_flags & RRDSET_FLAG_STORE_FIRST)) { store_this_entry = 1; last_collect_ut = next_store_ut - update_every_ut; diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index ecfab42c12..326e5e5a1f 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -881,7 +881,7 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) freez(data); } -static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { +static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) { RRDSET *st; int rc; @@ -895,6 +895,8 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { break; } if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) { + (*query_counter)++; + rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE); scan_count++; @@ -924,6 +926,8 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { RRDDIM *rd; rrddim_foreach_read(rd, st) { if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) { + (*query_counter)++; + rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE); if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)) @@ -963,12 +967,18 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) struct scan_metadata_payload *data = req->data; struct metadata_wc *wc = data->wc; + usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut; + internal_error(true, "METADATA: checking all hosts..."); + |