From 57eab742c88093c89d5d46deb495558ad726e6f0 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Fri, 27 Jan 2023 01:32:20 +0200 Subject: DBENGINE v2 - improvements part 10 (#14332) * replication cancels pending queries on exit * log when waiting for inflight queries * when there are collected and not-collected metrics, use the context priority from the collected only * Write metadata with a faster pace * Remove journal file size limit and sync mode to 0 / Drop wal checkpoint for now * Wrap in a big transaction remaining metadata writes (test 1) * fix higher tiers when tiering iterations = 2 * dbengine always returns db-aligned points; query engine expands the queries by 2 points in every direction to have enough data for interpolation * Wrap in a big transaction metadata writes (test 2) * replication cancelling fix * do not first and last entry in replication when the db has no retention * fix internal check condition * Increase metadata write batch size * always apply error limit to dbengine logs * Remove code that processes the obsolete health.db files * cleanup in query.c * do not allow queries to go beyond db boundaries * prevent internal log for +1 delta in timestamp * detect gap pages in conflicts * double protection for gap injection in main cache * Add checkpoint to prevent large WAL while running Remove unused and duplicate functions * do not allocate chart cache dir if not needed * add more info to unittests * revert query expansion to satisfy unittests Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> --- daemon/unit_test.c | 45 ++-- database/engine/metric.c | 39 ++-- database/engine/metric.h | 6 +- database/engine/pagecache.c | 14 +- database/engine/pdc.c | 3 + database/engine/rrdengine.c | 10 +- database/engine/rrdengineapi.c | 98 ++++++--- database/rrd.c | 2 +- database/rrd.h | 7 +- database/rrdcontext.c | 21 +- database/rrddim.c | 2 +- database/rrdhost.c | 33 ++- database/rrdset.c | 35 +-- database/sqlite/sqlite_functions.c | 110 ---------- database/sqlite/sqlite_functions.h | 2 - database/sqlite/sqlite_health.c | 6 - database/sqlite/sqlite_metadata.c | 11 +- health/health.c | 23 +- health/health_log.c | 428 +------------------------------------ streaming/replication.c | 65 +++++- web/api/queries/query.c | 48 +++-- 21 files changed, 305 insertions(+), 703 deletions(-) diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 9845ed430f..52b55c4e58 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1858,7 +1858,7 @@ static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDI now_realtime_timeval(&now); rrdset_timed_done(st[i], now, false); } - // Fluh pages for subsequent real values + // Flush pages for subsequent real values for (i = 0 ; i < CHARTS ; ++i) { for (j = 0; j < DIMS; ++j) { rrdeng_store_metric_flush_current_page((rd[i][j])->tiers[0].db_collection_handle); @@ -1978,10 +1978,11 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] int current_region, time_t time_start, time_t time_end) { int update_every = REGION_UPDATE_EVERY[current_region]; - fprintf(stderr, "%s() running on region %d, start time %lld, end time %lld, update every %d...\n", __FUNCTION__, current_region, (long long)time_start, (long long)time_end, update_every); + fprintf(stderr, "%s() running on region %d, start time %lld, end time %lld, update every %d, on %d dimensions...\n", + __FUNCTION__, current_region, (long long)time_start, (long long)time_end, update_every, CHARTS * DIMS); uint8_t same; time_t time_now, time_retrieved; - int i, j, errors, value_errors = 0, time_errors = 0; + int i, j, errors, value_errors = 0, time_errors = 0, value_right = 0, time_right = 0; long c; collected_number last; NETDATA_DOUBLE value, expected; @@ -2020,17 +2021,22 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] same = (roundndd(value) == roundndd(expected)) ? 1 : 0; if(!same) { if(value_errors < 20) - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT + fprintf(stderr, " DB-engine unittest %s/%s: point #%ld, at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT ", RRDR found " NETDATA_DOUBLE_FORMAT ", ### E R R O R ###\n", - rrdset_name(st[i]), rrddim_name(rd[i][j]), (unsigned long)time_now, expected, value); + rrdset_name(st[i]), rrddim_name(rd[i][j]), (long) c+1, (unsigned long)time_now, expected, value); value_errors++; } + else + value_right++; + if(time_retrieved != time_now) { if(time_errors < 20) - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", - rrdset_name(st[i]), rrddim_name(rd[i][j]), (unsigned long)time_now, (unsigned long)time_retrieved); + fprintf(stderr, " DB-engine unittest %s/%s: point #%ld at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", + rrdset_name(st[i]), rrddim_name(rd[i][j]), (long)c+1, (unsigned long)time_now, (unsigned long)time_retrieved); time_errors++; } + else + time_right++; } rrddim_foreach_done(d); } @@ -2040,10 +2046,10 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] } if(value_errors) - fprintf(stderr, "%d value errors encountered\n", value_errors); + fprintf(stderr, "%d value errors encountered (%d were ok)\n", value_errors, value_right); if(time_errors) - fprintf(stderr, "%d time errors encountered\n", time_errors); + fprintf(stderr, "%d time errors encountered (%d were ok)\n", time_errors, value_right); return errors + value_errors + time_errors; } @@ -2051,7 +2057,7 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] int test_dbengine(void) { fprintf(stderr, "%s() running...\n", __FUNCTION__ ); - int i, j, errors, value_errors = 0, time_errors = 0, update_every, current_region; + int i, j, errors = 0, value_errors = 0, time_errors = 0, update_every, current_region; RRDHOST *host = NULL; RRDSET *st[CHARTS]; RRDDIM *rd[CHARTS][DIMS]; @@ -2074,9 +2080,7 @@ int test_dbengine(void) time_start[current_region] = 2 * API_RELATIVE_TIME_MAX; time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); - errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); - if (errors) - goto error_out; + errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); current_region = 1; //this is the second region of data update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 3 seconds @@ -2093,9 +2097,7 @@ int test_dbengine(void) time_start[current_region] += update_every - time_start[current_region] % update_every; time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); - errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); - if (errors) - goto error_out; + errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); current_region = 2; //this is the third region of data update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 1 seconds @@ -2112,19 +2114,14 @@ int test_dbengine(void) time_start[current_region] += update_every - time_start[current_region] % update_every; time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); - errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); - if (errors) - goto error_out; + errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]); for (current_region = 0 ; current_region < REGIONS ; ++current_region) { - errors = test_dbengine_check_rrdr(st, rd, current_region, time_start[current_region], time_end[current_region]); - if (errors) - goto error_out; + errors += test_dbengine_check_rrdr(st, rd, current_region, time_start[current_region], time_end[current_region]); } current_region = 1; update_every = REGION_UPDATE_EVERY[current_region]; // use the maximum update_every = 3 - errors = 0; long points = (time_end[REGIONS - 1] - time_start[0]) / update_every; // cover all time regions with RRDR long point_offset = (time_start[current_region] - time_start[0]) / update_every; for (i = 0 ; i < CHARTS ; ++i) { @@ -2181,7 +2178,7 @@ int test_dbengine(void) } onewayalloc_destroy(owa); } -error_out: + rrd_wrlock(); rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].instance); rrdhost_delete_charts(host); diff --git a/database/engine/metric.c b/database/engine/metric.c index af769fda95..d16bc063d9 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -327,33 +327,42 @@ bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metr return ret; } -bool mrg_metric_set_first_time_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { - bool ret = false; +time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t first_time_s; netdata_spinlock_lock(&metric->spinlock); - if(!metric->first_time_s) { - metric->first_time_s = first_time_s; - ret = true; + + if(unlikely(!metric->first_time_s)) { + if(metric->latest_time_s_clean) + metric->first_time_s = metric->latest_time_s_clean; + + else if(metric->latest_time_s_hot) + metric->first_time_s = metric->latest_time_s_hot; } + + first_time_s = metric->first_time_s; + netdata_spinlock_unlock(&metric->spinlock); - return ret; + return first_time_s; } -time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t first_time_s; +void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { netdata_spinlock_lock(&metric->spinlock); - first_time_s = metric->first_time_s; - if(!first_time_s) { + + if(unlikely(!metric->first_time_s)) { if(metric->latest_time_s_clean) - first_time_s = metric->latest_time_s_clean; + metric->first_time_s = metric->latest_time_s_clean; - if(!first_time_s || metric->latest_time_s_hot < metric->latest_time_s_clean) - first_time_s = metric->latest_time_s_hot; + else if(metric->latest_time_s_hot) + metric->first_time_s = metric->latest_time_s_hot; } - netdata_spinlock_unlock(&metric->spinlock); - return first_time_s; + *first_time_s = metric->first_time_s; + *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); + *update_every_s = metric->latest_update_every_s; + + netdata_spinlock_unlock(&metric->spinlock); } bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { diff --git a/database/engine/metric.h b/database/engine/metric.h index 3eb2c97469..fe0481a1b1 100644 --- a/database/engine/metric.h +++ b/database/engine/metric.h @@ -46,18 +46,18 @@ Word_t mrg_metric_section(MRG *mrg, METRIC *metric); bool mrg_metric_set_first_time_s(MRG *mrg, METRIC *metric, time_t first_time_s); bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg, METRIC *metric, time_t first_time_s); -bool mrg_metric_set_first_time_s_if_zero(MRG *mrg, METRIC *metric, time_t first_time_s); time_t mrg_metric_get_first_time_s(MRG *mrg, METRIC *metric); -void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s); bool mrg_metric_set_clean_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s); bool mrg_metric_set_hot_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s); time_t mrg_metric_get_latest_time_s(MRG *mrg, METRIC *metric); bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, time_t update_every_s); +bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s); time_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric); -bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s); +void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s); +void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s); bool mrg_metric_writer_acquire(MRG *mrg, METRIC *metric); bool mrg_metric_writer_release(MRG *mrg, METRIC *metric); diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 961c2b2886..11f63751b8 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -356,17 +356,27 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin } static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) { + + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + + if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE) + return; + PGC_ENTRY page_entry = { .hot = false, .section = (Word_t)ctx, .metric_id = (Word_t)metric, - .start_time_s = start_time_s, - .end_time_s = end_time_s, + .start_time_s = MAX(start_time_s, db_first_time_s), + .end_time_s = MIN(end_time_s, db_last_time_s), .update_every_s = 0, .size = 0, .data = DBENGINE_EMPTY_PAGE, }; + if(page_entry.start_time_s >= page_entry.end_time_s) + return; + PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, NULL); pgc_page_release(main_cache, page); } diff --git a/database/engine/pdc.c b/database/engine/pdc.c index d0daaa5c12..0563133719 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -606,6 +606,9 @@ void pdc_acquire(PDC *pdc) { } bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) { + if(unlikely(!pdc)) + return true; + netdata_spinlock_lock(&pdc->refcount_spinlock); if(pdc->refcount <= 0) diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index ed1d851b1f..dbc017aafb 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -1620,9 +1620,17 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse completion_wait_for(&ctx->quiesce.completion); completion_destroy(&ctx->quiesce.completion); + bool logged = false; while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) || - __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { + if(!logged) { + logged = true; + info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), + (ctx->config.legacy) ? -1 : ctx->config.tier); + } sleep_usec(1 * USEC_PER_MS); + } completion_mark_complete(completion); diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 22fe30e4fd..b7d2eae3b3 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -266,16 +266,19 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri if(!is_1st_metric_writer) __atomic_add_fetch(&ctx->atomic.collectors_running_duplicate, 1, __ATOMIC_RELAXED); + mrg_metric_set_update_every(main_mrg, metric, update_every); + + handle->alignment = (struct pg_alignment *)smg; + rrdeng_page_alignment_acquire(handle->alignment); + // this is important! // if we don't set the page_end_time_ut during the first collection // data collection may be able to go back in time and during the addition of new pages // clean pages may be found matching ours! - handle->page_end_time_ut = (usec_t)mrg_metric_get_latest_time_s(main_mrg, metric) * USEC_PER_SEC; - mrg_metric_set_update_every(main_mrg, metric, update_every); - - handle->alignment = (struct pg_alignment *)smg; - rrdeng_page_alignment_acquire(handle->alignment); + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + handle->page_end_time_ut = (usec_t)db_last_time_s * USEC_PER_SEC; return (STORAGE_COLLECT_HANDLE *)handle; } @@ -382,11 +385,12 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha error_limit(&erl, #endif "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache " - "with existing %s page from %ld to %ld, update every %ld - " + "with existing %s%s page from %ld to %ld, update every %ld - " "is it collected more than once?", uuid, page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s, pgc_is_page_hot(page) ? "hot" : "not-hot", + pgc_page_data(page) == DBENGINE_EMPTY_PAGE ? " gap" : "", pgc_page_start_time_s(page), pgc_page_end_time_s(page), pgc_page_update_every_s(page) ); @@ -580,12 +584,8 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle, us collect_page_flags_to_buffer(wb, handle->page_flags); } -#ifdef NETDATA_INTERNAL_CHECKS - internal_error(true, -#else error_limit_static_global_var(erl, 1, 0); error_limit(&erl, -#endif "DBENGINE: metric '%s' collected point at %ld, %s last collection at %ld, " "update every %ld, %s page from %ld to %ld, position %u (of %u), flags: %s", uuid, @@ -699,8 +699,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_writer_release(main_mrg, handle->metric)) internal_fatal(true, "DBENGINE: metric is already released"); - time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, handle->metric); - time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, handle->metric); + time_t first_time_s, last_time_s, update_every_s; + mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, &update_every_s); mrg_metric_release(main_mrg, handle->metric); freez(handle); @@ -759,7 +759,11 @@ static void unregister_query_handle(struct rrdeng_query_handle *handle __maybe_u * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, + struct storage_engine_query_handle *rrddim_handle, + time_t start_time_s, + time_t end_time_s, + STORAGE_PRIORITY priority) { usec_t started_ut = now_monotonic_usec(); @@ -769,8 +773,6 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct sto struct rrdengine_instance *ctx = mrg_metric_ctx(metric); struct rrdeng_query_handle *handle; - mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every); - handle = rrdeng_query_handle_get(); register_query_handle(handle); @@ -781,23 +783,48 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct sto handle->ctx = ctx; handle->metric = metric; - handle->start_time_s = start_time_s; - handle->end_time_s = end_time_s; handle->priority = priority; - handle->now_s = start_time_s; - handle->dt_s = mrg_metric_get_update_every_s(main_mrg, metric); - if(!handle->dt_s) - handle->dt_s = default_rrd_update_every; + // IMPORTANT! + // It is crucial not to exceed the db boundaries, because dbengine + // now has gap caching, so when a gap is detected a negative page + // is inserted into the main cache, to avoid scanning the journals + // again for pages matching the gap. - rrddim_handle->handle = (STORAGE_QUERY_HANDLE *)handle; - rrddim_handle->start_time_s = start_time_s; - rrddim_handle->end_time_s = end_time_s; - rrddim_handle->priority = priority; + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); - pg_cache_preload(handle); + if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) == PAGE_IS_IN_RANGE) { + handle->start_time_s = MAX(start_time_s, db_first_time_s); + handle->end_time_s = MIN(end_time_s, db_last_time_s); + handle->now_s = handle->start_time_s; - __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_init, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED); + handle->dt_s = db_update_every_s; + if (!handle->dt_s) { + handle->dt_s = default_rrd_update_every; + mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every); + } + + rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle; + rrddim_handle->start_time_s = handle->start_time_s; + rrddim_handle->end_time_s = handle->end_time_s; + rrddim_handle->priority = priority; + + pg_cache_preload(handle); + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_init, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED); + } + else { + handle->start_time_s = start_time_s; + handle->end_time_s = end_time_s; + handle->now_s = start_time_s; + handle->dt_s = db_update_every_s; + + rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle; + rrddim_handle->start_time_s = handle->start_time_s; + rrddim_handle->end_time_s = 0; + rrddim_handle->priority = priority; + } } static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) { @@ -827,10 +854,19 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han unsigned position; if(likely(handle->now_s >= page_start_time_s && handle->now_s <= page_end_time_s)) { - if(unlikely(entries == 1 || page_start_time_s == page_end_time_s)) + if(unlikely(entries == 1 || page_start_time_s == page_end_time_s || !page_update_every_s)) { position = 0; - else + handle->now_s = page_start_time_s; + } + else { position = (handle->now_s - page_start_time_s) * (entries - 1) / (page_end_time_s - page_start_time_s); + time_t point_end_time_s = page_start_time_s + position * page_update_every_s; + if(point_end_time_s < handle->now_s && position + 1 < entries) { + position++; + point_end_time_s = page_start_time_s + position * page_update_every_s; + } + handle->now_s = point_end_time_s; + } internal_fatal(position >= entries, "DBENGINE: wrong page position calculation"); } @@ -986,8 +1022,8 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_ if (unlikely(!metric)) return false; - *first_entry_s = mrg_metric_get_first_time_s(main_mrg, metric); - *last_entry_s = mrg_metric_get_latest_time_s(main_mrg, metric); + time_t update_every_s; + mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, &update_every_s); mrg_metric_release(main_mrg, metric); diff --git a/database/rrd.c b/database/rrd.c index df364419ea..d489ddb8b1 100644 --- a/database/rrd.c +++ b/database/rrd.c @@ -135,7 +135,7 @@ const char *rrdset_type_name(RRDSET_TYPE chart_type) { // ---------------------------------------------------------------------------- // RRD - cache directory -char *rrdset_cache_dir(RRDHOST *host, const char *id) { +char *rrdhost_cache_dir_for_rrdset_alloc(RRDHOST *host, const char *id) { char *ret = NULL; char b[FILENAME_MAX + 1]; diff --git a/database/rrd.h b/database/rrd.h index 1ae53b47e5..1128f7c6ab 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -314,7 +314,7 @@ typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE; struct rrddim_tier { STORAGE_POINT virtual_point; size_t tier_grouping; - time_t next_point_time_s; + time_t next_point_end_time_s; STORAGE_METRIC_HANDLE *db_metric_handle; // the metric handle inside the database STORAGE_COLLECT_HANDLE *db_collection_handle; // the data collection handle struct storage_engine_collect_ops *collect_ops; @@ -905,9 +905,7 @@ typedef struct health { time_t health_delay_up_to; // a timestamp to delay alarms processing up to STRING *health_default_exec; // the full path of the alarms notifications program STRING *health_default_recipient; // the default recipient for all alarms - char *health_log_filename; // the alarms event log filename size_t health_log_entries_written; // the number of alarm events written to the alarms event log - FILE *health_log_fp; // the FILE pointer to the open alarms event log file uint32_t health_default_warn_repeat_every; // the default value for the interval between repeating warning notifications uint32_t health_default_crit_repeat_every; // the default value for the interval between repeating critical notifications } HEALTH; @@ -1340,7 +1338,8 @@ void rrdset_free(RRDSET *st); #ifdef NETDATA_RRD_INTERNALS -char *rrdset_cache_dir(RRDHOST *host, const char *id); +char *rrdhost_cache_dir_for_rrdset_alloc(RRDHOST *host, const char *id); +const char *rrdset_cache_dir(RRDSET *st); void rrddim_free(RRDSET *st, RRDDIM *rd); diff --git a/database/rrdcontext.c b/database/rrdcontext.c index c261c832f3..8d019dafba 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -3446,6 +3446,8 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG if(worker_jobs) worker_is_busy(WORKER_JOB_PP_CONTEXT); + size_t min_priority_collected = LONG_MAX; + size_t min_priority_not_collected = LONG_MAX; size_t min_priority = LONG_MAX; time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; size_t instances_active = 0, instances_deleted = 0; @@ -3482,8 +3484,16 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG instances_active++; - if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY && ri->priority < min_priority) - min_priority = ri->priority; + if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY) { + if(rrd_flag_check(ri, RRD_FLAG_COLLECTED)) { + if(ri->priority < min_priority_collected) + min_priority_collected = ri->priority; + } + else { + if(ri->priority < min_priority_not_collected) + min_priority_not_collected = ri->priority; + } + } if (ri->first_time_s && ri->first_time_s < min_first_time_t) min_first_time_t = ri->first_time_s; @@ -3492,6 +3502,13 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG max_last_time_t = ri->last_time_s; } dfe_done(ri); + + if(min_priority_collected != LONG_MAX) + // use the collected priority + min_priority = min_priority_collected; + else + // use the non-collected priority + min_priority = min_priority_not_collected; } { diff --git a/database/rrddim.c b/database/rrddim.c index b520f21d3d..6846b0d42c 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -686,7 +686,7 @@ bool rrddim_memory_load_or_create_map_save(RRDSET *st, RRDDIM *rd, RRD_MEMORY_MO char filename[FILENAME_MAX + 1]; char fullfilename[FILENAME_MAX + 1]; rrdset_strncpyz_name(filename, rrddim_id(rd), FILENAME_MAX); - snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename); + snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", rrdset_cache_dir(st), filename); rd_on_file = (struct rrddim_map_save_v019 *)netdata_mmap( fullfilename, size, ((memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 1, false, NULL); diff --git a/database/rrdhost.c b/database/rrdhost.c index b25fc72d21..454fd6b809 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -499,7 +499,6 @@ int is_legacy = 1; ", health %s" ", cache_dir '%s'" ", varlib_dir '%s'" - ", health_log '%s'" ", alarms default handler '%s'" ", alarms default recipient '%s'" , rrdhost_hostname(host) @@ -519,7 +518,6 @@ int is_legacy = 1; , host->health.health_enabled?"enabled":"disabled" , host->cache_dir , host->varlib_dir - , host->health.health_log_filename , string2str(host->health.health_default_exec) , string2str(host->health.health_default_recipient) ); @@ -1085,7 +1083,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { if(!host) return; if (netdata_exit || force) { - info("Freeing all memory for host '%s'...", rrdhost_hostname(host)); + info("RRD: 'host:%s' freeing memory...", rrdhost_hostname(host)); // ------------------------------------------------------------------------ // first remove it from the indexes, so that it will not be discoverable @@ -1146,7 +1144,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { #endif if (!netdata_exit && !force) { - info("Setting archive mode for host '%s'...", rrdhost_hostname(host)); + info("RRD: 'host:%s' is now in archive mode...", rrdhost_hostname(host)); rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN); return; } @@ -1187,7 +1185,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { rrdpush_destinations_free(host); string_freez(host->health.health_default_exec); string_freez(host->health.health_default_recipient); - freez(host->health.health_log_filename); string_freez(host->registry_hostname); simple_pattern_free(host->rrdpush_send_charts_matching); netdata_rwlock_destroy(&host->health_log.alarm_log_rwlock); @@ -1236,7 +1233,7 @@ void rrd_finalize_collection_for_all_hosts(void) { void rrdhost_save_charts(RRDHOST *host) { if(!host) return; - info("Saving/Closing database of host '%s'...", rrdhost_hostname(host)); + info("RRD: 'host:%s' saving / closing database...", rrdhost_hostname(host)); RRDSET *st; @@ -1393,13 +1390,11 @@ void reload_host_labels(void) { rrdhost_flag_set(localhost,RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); - health_label_log_save(localhost); - rrdpush_send_host_labels(localhost); } void rrdhost_finalize_collection(RRDHOST *host) { - info("Stopping data collection for host '%s'...", rrdhost_hostname(host)); + info("RRD: 'host:%s' stopping data collection...", rrdhost_hostname(host)); RRDSET *st; rrdset_foreach_write(st, host) @@ -1413,16 +1408,18 @@ void rrdhost_finalize_collection(RRDHOST *host) { void rrdhost_delete_charts(RRDHOST *host) { if(!host) return; - info("Deleting database of host '%s'...", rrdhost_hostname(host)); + info("RRD: 'host:%s' deleting disk files...", rrdhost_hostname(host)); RRDSET *st; - // we get a write lock - // to ensure only one thread is saving the database - rrdset_foreach_write(st, host) { - rrdset_delete_files(st); + if(host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) { + // we get a write lock + // to ensure only one thread is saving the database + rrdset_foreach_write(st, host){ + rrdset_delete_files(st); + } + rrdset_foreach_done(st); } - rrdset_foreach_done(st); recursively_delete_dir(host->cache_dir, "left over host"); } @@ -1433,7 +1430,7 @@ void rrdhost_delete_charts(RRDHOST *host) { void rrdhost_cleanup_charts(RRDHOST *host) { if(!host) return; - info("Cleaning up database of host '%s'...", rrdhost_hostname(host)); + info("RRD: 'host:%s' cleaning up disk files...", rrdhost_hostname(host)); RRDSET *st; uint32_t rrdhost_delete_obsolete_charts = rrdhost_option_check(host, RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS); @@ -1460,7 +1457,7 @@ void rrdhost_cleanup_charts(RRDHOST *host) { // RRDHOST - save all hosts to disk void rrdhost_save_all(void) { - info("Saving database [%zu hosts(s)]...", rrdhost_hosts_available()); + info("RRD: saving databases [%zu hosts(s)]...", rrdhost_hosts_available()); rrd_rdlock(); @@ -1475,7 +1472,7 @@ void rrdhost_save_all(void) { // RRDHOST - save or delete all hosts from disk void rrdhost_cleanup_all(void) { - info("Cleaning up database [%zu hosts(s)]...", rrdhost_hosts_available()); + info("RRD: cleaning up database [%zu hosts(s)]...", rrdhost_hosts_available()); rrd_rdlock(); diff --git a/database/rrdset.c b/database/rrdset.c index c97ebbb145..a7378dd4d9 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -128,7 +128,6 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v st->module_name = rrd_string_strdupz(ctr->module); st->priority = ctr->priority; - st->cache_dir = rrdset_cache_dir(host, chart_full_id); st->entries = (ctr->memory_mode != RRD_MEMORY_MODE_DBENGINE) ? align_entries_to_pagesize(ctr->memory_mode, ctr->history_entries) : 5; st->update_every = ctr->update_every; st->rrd_memory_mode = ctr->memory_mode; @@ -601,13 +600,15 @@ void rrdset_get_retention_of_tier_for_collected_chart(RRDSET *st, time_t *first_ if(unlikely(!db_last_entry_s)) { db_last_entry_s = rrdset_last_entry_s_of_tier(st, tier); - if (unlikely(!db_last_entry_s)) + if (unlikely(!db_last_entry_s)) { // we assume this is a collected RRDSET - db_last_entry_s = now_s; + db_first_entry_s = 0; + db_last_entry_s = 0; + } } if(unlikely(db_last_entry_s > now_s)) { - internal_error(true, + internal_error(db_last_entry_s > now_s + 1, "RRDSET: 'host:%s/chart:%s' latest db time %ld is in the future, adjusting it to now %ld", rrdhost_hostname(st->rrdhost), rrdset_id(st), db_last_entry_s, now_s); @@ -831,7 +832,8 @@ void rrdset_delete_files(RRDSET *st) { } rrddim_foreach_done(rd); - recursively_delete_dir(st->cache_dir, "left-over chart"); + if(st->cache_dir) + recursively_delete_dir(st->cache_dir, "left-over chart"); } void rrdset_delete_obsolete_dimensions(RRDSET *st) { @@ -1105,15 +1107,17 @@ static inline time_t tier_next_point_time_s(RRDDIM *rd, struct rrddim_tier *t, t } void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut __maybe_unused) { - if (unlikely(!t->next_point_time_s)) - t->next_point_time_s = tier_next_point_time_s(rd, t, sp.end_time_s); + if (unlikely(!t->next_point_end_time_s)) + t->next_point_end_time_s = tier_next_point_time_s(rd, t, sp.end_time_s); + + if(unlikely(sp.start_time_s >= t->next_point_end_time_s)) { + // flush the virtual point, it is done - if(unlikely(sp.start_time_s > t->next_point_time_s)) { if (likely(!storage_point_is_unset(t->virtual_point))) { t->collect_ops->store_metric( t->db_collection_handle, - t->next_point_time_s * USEC_PER_SEC, + t->next_point_end_time_s * USEC_PER_SEC, t->virtual_point.sum, t->virtual_point.min, t->virtual_point.max, @@ -1124,7 +1128,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG else { t->collect_ops->store_metric( t->db_collection_handle, - t->next_point_time_s * USEC_PER_SEC, + t->next_point_end_time_s * USEC_PER_SEC, NAN, NAN, NAN, @@ -1134,7 +1138,7 @@ void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAG rrdset_done_statistics_points_stored_per_tier[tier]++; t->virtual_point.count = 0; // make the point unset - t->next_point_time_s = tier_next_point_time_s(rd, t, sp.end_time_s); + t->next_point_end_time_s = tier_next_point_time_s(rd, t, sp.end_time_s); } // merge the dates into our virtual point @@ -2073,6 +2077,13 @@ const char *rrdset_cache_filename(RRDSET *st) { return st_on_file->cache_filename; } +const char *rrdset_cache_dir(RRDSET *st) { + if(!st->cache_dir) + st->cache_dir = rrdhost_cache_dir_for_rrdset_alloc(st->rrdhost, rrdset_id(st)); + + return st->cache_dir; +} + void rrdset_memory_file_free(RRDSET *st) { if(!st->st_on_file) return; @@ -2103,7 +2114,7 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st, RRD_MEMORY_MODE memory_mo return false; char fullfilename[FILENAME_MAX + 1]; - snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", st->cache_dir); + snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", rrdset_cache_dir(st)); unsigned long size = sizeof(struct rrdset_map_save_v019); struct rrdset_map_save_v019 *st_on_file = (struct rrdset_map_save_v019 *)netdata_mmap( diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index fc32853182..1d03cfc2a5 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -529,116 +529,6 @@ void db_execute(const char *cmd) } } -#define SELECT_MIGRATED_FILE "select 1 from metadata_migration where filename = @path;" - -int file_is_migrated(char *path) -{ - sqlite3_stmt *res = NULL; - int rc; - - rc = sqlite3_prepare_v2(db_meta, SELECT_MIGRATED_FILE, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch host"); - return 0; - } - - rc = sqlite3_bind_text(res, 1, path, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind filename parameter to check migration"); - return 0; - } - - rc = sqlite3_step_monitored(res); - - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when checking if metadata file is migrated"); - - return (rc == SQLITE_ROW); -} - -#define STORE_MIGRATED_FILE "insert or replace into metadata_migration (filename, file_size, date_created) " \ - "values (@file, @size, unixepoch());" - -void add_migrated_file(char *path, uint64_t file_size) -{ - sqlite3_stmt *res = NULL; - int rc; - - rc = sqlite3_prepare_v2(db_meta, STORE_MIGRATED_FILE, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch host"); - return; - } - - rc = sqlite3_bind_text(res, 1, path, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind filename parameter to store migration information"); - return; - } - - rc = sqlite3_bind_int64(res, 2, (sqlite_int64) file_size); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind size parameter to store migration information"); - return; - } - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store migrated file, rc = %d", rc); - - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when checking if metadata file is migrated"); -} - - - -#define SQL_STORE_CLAIM_ID "insert into node_instance " \ - "(host_id, claim_id, date_created) values (@host_id, @claim_id, unixepoch()) " \ - "on conflict(host_id) do update set claim_id = excluded.claim_id;" - -void store_claim_id(uuid_t *host_id, uuid_t *claim_id) -{ - sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return; - } - - rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement store chart labels"); - return; - } - - rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to store node instance information"); - goto failed; - } - - if (claim_id) - rc = sqlite3_bind_blob(res, 2, claim_id, sizeof(*claim_id), SQLITE_STATIC); - else - rc = sqlite3_bind_null(res, 2); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind claim_id parameter to store node instance information"); - goto failed; - } - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store node instance information, rc = %d", rc); - -failed: - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when storing node instance information"); - - return; -} - static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id) { if (unlikely(!host)) diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index 26bdef1e2c..40abd010da 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -54,9 +54,7 @@ void sql_close_database(void); int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_be_null); int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement); int execute_insert(sqlite3_stmt *res); -int file_is_migrated(char *path); int exec_statement_with_uuid(const char *sql, uuid_t *uuid); -void add_migrated_file(char *path, uint64_t file_size); void db_execute(const char *cmd); // Look up functions diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c index a0090d50f1..471fa3addb 100644 --- a/database/sqlite/sqlite_health.c +++ b/database/sqlite/sqlite_health.c @@ -107,8 +107,6 @@ void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae) { failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("HEALTH [%s]: Failed to finalize the prepared statement for updating health log.", rrdhost_hostname(host)); - - return; } /* Health related SQL queries @@ -350,8 +348,6 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("HEALTH [%s]: Failed to finalize the prepared statement for inserting to health log.", rrdhost_hostname(host)); - - return; } void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) @@ -545,8 +541,6 @@ void sql_inject_removed_status(char *uuid_str, uint32_t alarm_id, uint32_t alarm failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("HEALTH [N/A]: Failed to finalize the prepared statement for injecting removed event."); - return; - } #define SQL_SELECT_MAX_UNIQUE_ID(guid) "SELECT MAX(unique_id) from health_log_%s", guid diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 326e5e5a1f..35f928ffa0 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -972,6 +972,9 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) bool run_again = false; worker_is_busy(UV_EVENT_METADATA_STORE); + + if (!data->max_count) + db_execute("BEGIN TRANSACTION;"); dfe_start_reentrant(rrdhost_root_index, host) { if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) continue; @@ -1024,11 +1027,15 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) query_counter++; } + if (data->max_count) + db_execute("BEGIN TRANSACTION;"); if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) { run_again = true; rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE); internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host)); } + if (data->max_count) + db_execute("COMMIT TRANSACTION;"); usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms", @@ -1036,6 +1043,8 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) (double)(ended_ut - started_ut) / USEC_PER_MS); } dfe_done(host); + if (!data->max_count) + db_execute("COMMIT TRANSACTION;"); usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", @@ -1170,7 +1179,7 @@ static void metadata_event_loop(void *arg) cmd.completion = NULL; // Do not complete after launching worker (worker will do) } else - data->max_count = 1000; + data->max_count = 5000; metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS); if (unlikely( diff --git a/health/health.c b/health/health.c index d7368028f5..b34f54ab50 100644 --- a/health/health.c +++ b/health/health.c @@ -797,30 +797,15 @@ static void initialize_health(RRDHOST *host, int is_localhost) { if(r != 0 && errno != EEXIST) error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), filename); } - snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir); - host->health.health_log_filename = strdupz(filename); snprintfz(filename, FILENAME_MAX, "%s/alarm-notify.sh", netdata_configured_primary_plugins_dir); host->health.health_default_exec = string_strdupz(config_get(CONFIG_SECTION_HEALTH, "script to execute on alarm", filename)); host->health.health_default_recipient = string_strdupz("root"); - if (!file_is_migrated(host->health.health_log_filename)) { - int rc = sql_create_health_log_table(host); - if (unlikely(rc)) { - log_health("[%s]: Failed to create health log table in the database", rrdhost_hostname(host)); - health_alarm_log_load(host); - health_alarm_log_open(host); - } - else { - health_alarm_log_load(host); - add_migrated_file(host->health.health_log_filename, 0); - } - } else { - // TODO: This needs to go to the metadata thread - // Health should wait before accessing the table (needs to be created by the metadata thread) - sql_create_health_log_table(host); - sql_health_alarm_log_load(host); - } + // TODO: This needs to go to the metadata thread + // Health should wait before accessing the table (needs to be created by the metadata thread) + sql_create_health_log_table(host); + sql_health_alarm_log_load(host); // ------------------------------------------------------------------------ // load health configuration diff --git a/health/health_log.c b/health/health_log.c index b7e3e7ef49..d3417493b8 100644 --- a/health/health_log.c +++ b/health/health_log.c @@ -3,149 +3,10 @@ #include "health.h" // ---------------------------------------------------------------------------- -// health alarm log load/save -// no need for locking - only one thread is reading / writing the alarms log - -inline int health_alarm_log_open(RRDHOST *host) { - if(host->health.health_log_fp) - fclose(host->health.health_log_fp); - - host->health.health_log_fp = fopen(host->health.health_log_filename, "a"); - - if(host->health.health_log_fp) { - if (setvbuf(host->health.health_log_fp, NULL, _IOLBF, 0) != 0) - error("HEALTH [%s]: cannot set line buffering on health log file '%s'.", rrdhost_hostname(host), host->health.health_log_filename); - return 0; - } - - error("HEALTH [%s]: cannot open health log file '%s'. Health data will be lost in case of netdata or server crash.", rrdhost_hostname(host), host->health.health_log_filename); - return -1; -} - -static inline void health_alarm_log_close(RRDHOST *host) { - if(host->health.health_log_fp) { - fclose(host->health.health_log_fp); - host->health.health_log_fp = NULL; - } -} - -static inline void health_log_rotate(RRDHOST *host) { - static size_t rotate_every = 0; - - if(unlikely(rotate_every == 0)) { - rotate_every = (size_t)config_get_number(CONFIG_SECTION_HEALTH, "rotate log every lines", 2000); - if(rotate_every < 100) rotate_every = 100; - } - - if(unlikely(host->health.health_log_entries_written > rotate_every)) { - if(unlikely(host->health.health_log_fp)) { - health_alarm_log_close(host); - - char old_filename[FILENAME_MAX + 1]; - snprintfz(old_filename, FILENAME_MAX, "%s.old", host->health.health_log_filename); - - if(unlink(old_filename) == -1 && errno != ENOENT) - error("HEALTH [%s]: cannot remove old alarms log file '%s'", rrdhost_hostname(host), old_filename); - - if(link(host->health.health_log_filename, old_filename) == -1 && errno != ENOENT) - error("HEALTH [%s]: cannot move file '%s' to '%s'.", rrdhost_hostname(host), host->health.health_log_filename, old_filename); - - if(unlink(host->health.health_log_filename) == -1 && errno != ENOENT) - error("HEALTH [%s]: cannot remove old alarms log file '%s'", rrdhost_hostname(host), host->health.health_log_filename); - - // open it with truncate - host->health.health_log_fp = fopen(host->health.health_log_filename, "w"); - - if(host->health.health_log_fp) - fclose(host->health.health_log_fp); - else - error("HEALTH [%s]: cannot truncate health log '%s'", rrdhost_hostname(host), host->health.health_log_filename); - - host->health.health_log_fp = NULL; - - host->health.health_log_entries_written = 0; - health_alarm_log_open(host); - } - } -} - -inline void health_label_log_save(RRDHOST *host) { - health_log_rotate(host); - - if(unlikely(host->health.health_log_fp)) { - BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_health); - - rrdlabels_to_buffer(localhost->rrdlabels, wb, "", "=", "", "\t ", NULL, NULL, NULL, NULL); - char *write = (char *) buffer_tostring(wb); - - if (unlikely(fprintf(host->health.health_log_fp, "L\t%s", write) < 0)) - error("HEALTH [%s]: failed to save alarm log entry to '%s'. Health data may be lost in case of abnormal restart.", - rrdhost_hostname(host), host->health.health_log_filename); - else - host->health.health_log_entries_written++; - - buffer_free(wb); - } -} inline void health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) { - health_log_rotate(host); - if(unlikely(host->health.health_log_fp)) { - if(unlikely(fprintf(host->health.health_log_fp - , "%c\t%s" - "\t%08x\t%08x\t%08x\t%08x\t%08x" - "\t%08x\t%08x\t%08x" - "\t%08x\t%08x\t%08x" - "\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" - "\t%d\t%d\t%d\t%d" - "\t" NETDATA_DOUBLE_FORMAT_AUTO "\t" NETDATA_DOUBLE_FORMAT_AUTO - "\t%016"PRIx64"" - "\t%s\t%s\t%s" - "\n" - , (ae->flags & HEALTH_ENTRY_FLAG_SAVED)?'U':'A' - , rrdhost_hostname(host) - - , ae->unique_id - , ae->alarm_id - , ae->alarm_event_id - , ae->updated_by_id - , ae->updates_id - - , (uint32_t)ae->when - , (uint32_t)ae->duration - , (uint32_t)ae->non_clear_duration - , (uint32_t)ae->flags - , (uint32_t)ae->exec_run_timestamp - , (uint32_t)ae->delay_up_to_timestamp - - , ae_name(ae) - , ae_chart_name(ae) - , ae_family(ae) - , ae_exec(ae) - , ae_recipient(ae) - , ae_source(ae) - , ae_units(ae) - , ae_info(ae) - , ae->exec_code - , ae->new_status - , ae->old_status - , ae->delay - - , ae->new_value - , ae->old_value - , (uint64_t)ae->last_repeat - , (ae->classification)?ae_classification(ae):"Unknown" - , (ae->component)?ae_component(ae):"Unknown" - , (ae->type)?ae_type(ae):"Unknown" - ) < 0)) - error("HEALTH [%s]: failed to save alarm log entry to '%s'. Health data may be lost in case of abnormal restart.", rrdhost_hostname(host), host->health.health_log_filename); - else { - ae->flags |= HEALTH_ENTRY_FLAG_SAVED; - host->health.health_log_entries_written++; - } - }else - sql_health_alarm_log_save(host, ae); + sql_health_alarm_log_save(host, ae); #ifdef ENABLE_ACLK if (netdata_cloud_setting) { @@ -154,293 +15,6 @@ inline void health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) { #endif } -static uint32_t is_valid_alarm_id(RRDHOST *host, const char *chart, const char *name, uint32_t alarm_id) -{ - STRING *chart_string = string_strdupz(chart); - STRING *name_string = string_strdupz(name); - - uint32_t ret = 1; - - ALARM_ENTRY *ae; - for(ae = host->health_log.alarms; ae ;ae = ae->next) { - if (unlikely(ae->alarm_id == alarm_id && (!(chart_string == ae->chart && name_string == ae->name)))) { - ret = 0; - break; - } - } - - string_freez(chart_string); - string_freez(name_string); - - return ret; -} - -static inline ssize_t health_alarm_log_read(RRDHOST *host, FILE *fp, const char *filename) { - errno = 0; - - char *s, *buf = mallocz(65536 + 1); - size_t line = 0, len = 0; - ssize_t loaded = 0, updated = 0, errored = 0, duplicate = 0; - - DICTIONARY *all_rrdcalcs = dictionary_create_advanced( - DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE, - &dictionary_stats_category_rrdhealth); - - RRDCALC *rc; - foreach_rrdcalc_in_rrdhost_read(host, rc) { - dictionary_set(all_rrdcalcs, rrdcalc_name(rc), rc, sizeof(*rc)); - } - foreach_rrdcalc_in_rrdhost_done(rc); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - - while((s = fgets_trim_len(buf, 65536, fp, &len))) { - host->health.health_log_entries_written++; - line++; - - int max_entries = 33, entries = 0; - char *pointers[max_entries]; - - pointers[entries++] = s++; - while(*s) { - if(unlikely(*s == '\t')) { - *s = '\0'; - pointers[entries++] = ++s; - if(entries >= max_entries) { - error("HEALTH [%s]: line %zu of file '%s' has more than %d entries. Ignoring excessive entries.", rrdhost_hostname(host), line, filename, max_entries); - break; - } - } - else s++; - } - - if(likely(*pointers[0] == 'L')) - continue; - - if(likely(*pointers[0] == 'U' || *pointers[0] == 'A')) { - ALARM_ENTRY *ae = NULL; - - if(entries < 27) { - error("HEALTH [%s]: line %zu of file '%s' should have at least 27 entries, but it has %d. Ignoring it.", rrdhost_hostname(host), line, filename, entries); - errored++; - continue; - } - - // check that we have valid ids - uint32_t unique_id = (uint32_t)strtoul(pointers[2], NULL, 16); - if(!unique_id) { - error("HEALTH [%s]: line %zu of file '%s' states alarm entry with invalid unique id %u (%s). Ignoring it.", rrdhost_hostname(host), line, filename, unique_id, pointers[2]); - errored++; - continue; - } - - uint32_t alarm_id = (uint32_t)strtoul(pointers[3], NULL, 16); - if(!alarm_id) { - error("HEALTH [%s]: line %zu of file '%s' states alarm entry for invalid alarm id %u (%s). Ignoring it.", rrdhost_hostname(host), line, filename, alarm_id, pointers[3]); - errored++; - continue; - } - - // Check if we got last_repeat field - time_t last_repeat = 0; - if(entries > 27) { - char* alarm_name = pointers[13]; - last_repeat = (time_t)strtoul(pointers[27], NULL, 16); - - rc = dictionary_get(all_rrdcalcs, alarm_name); - if(unlikely(rc)) { - if (rrdcalc_isrepeating(rc)) { - rc->last_repeat = last_repeat; - // We iterate through repeating alarm entries only to - // find the latest last_repeat timestamp. Otherwise, - // there is no need to keep them in memory. - continue; - } - } - } - - if(unlikely(*pointers[0] == 'A')) { - // make sure it is properly numbered - if(unlikely(host->health_log.alarms && unique_id < host->health_log.alarms->unique_id)) { - error( "HEALTH [%s]: line %zu of file '%s' has alarm log entry %u in wrong order. Ignoring it." - , rrdhost_hostname(host), line, filename, unique_id); - errored++; - continue; - } - - ae = callocz(1, sizeof(ALARM_ENTRY)); - } - else if(unlikely(*pointers[0] == 'U')) { - // find the original - for(ae = host->health_log.alarms; ae ; ae = ae->next) { - if(unlikely(unique_id == ae->unique_id)) { - if(unlikely(*pointers[0] == 'A')) { - error("HEALTH [%s]: line %zu of file '%s' adds duplicate alarm log entry %u. Using the later." - , rrdhost_hostname(host), line, filename, unique_id); - *pointers[0] = 'U'; - duplicate++; - } - break; - } - else if(unlikely(unique_id > ae->unique_id)) { - // no need to continue - // the linked list is sorted - ae = NULL; - break; - } - } - } - - // if not found, skip this line - if(unlikely(!ae)) { - // error("HEALTH [%s]: line %zu of file '%s' updates alarm log entry with unique id %u, but it is not found.", host->hostname, line, filename, unique_id); - continue; - } - - // check for a possible host mismatch - //if(strcmp(pointers[1], host->hostname)) - // error("HEALTH [%s]: line %zu of file '%s' provides an alarm for host '%s' but this is named '%s'.", host->hostname, line, filename, pointers[1], host->hostname); - - ae->unique_id = unique_id; - if (!is_valid_alarm_id(host, pointers[14], pointers[13], alarm_id)) { - STRING *chart = string_strdupz(pointers[14]); - STRING *name = string_strdupz(pointers[13]); - alarm_id = rrdcalc_get_unique_id(host, chart, name, NULL); - string_freez(chart); - string_freez(name); - } - ae->alarm_id = alarm_id; - ae->alarm_event_id = (uint32_t)strtoul(pointers[4], NULL, 16); - ae->updated_by_id = (uint32_t)strtoul(pointers[5], NULL, 16); - ae->updates_id = (uint32_t)strtoul(pointers[6], NULL, 16); - - ae->when = (uint32_t)strtoul(pointers[7], NULL, 16); - ae->duration = (uint32_t)strtoul(pointers[8], NULL, 16); - ae->non_clear_duration = (uint32_t)strtoul(pointers[9], NULL, 16); - - ae->flags = (uint32_t)strtoul(pointers[10], NULL, 16); - ae->flags |= HEALTH_ENTRY_FLAG_SAVED; - - ae->exec_run_timestamp = (uint32_t)strtoul(pointers[11], NULL, 16); - ae->delay_up_to_timestamp = (uint32_t)strtoul(pointers[12], NULL, 16); - - string_freez(ae->name); - ae->name = string_strdupz(pointers[13]); - - string_freez(ae->chart); - ae->chart = string_strdupz(pointers[14]); - - string_freez(ae->family); - ae->family = string_strdupz(pointers[15]); - - string_freez(ae->exec); - ae->exec = string_strdupz(pointers[16]); - - string_freez(ae->recipient); - ae->recipient = string_strdupz(pointers[17]); - - string_freez(ae->source); - ae->source = string_strdupz(pointers[18]); - - string_freez(ae->units); - ae->units = string_strdupz(pointers[19]); - - string_freez(ae->info); - ae->info = string_strdupz(pointers[20]); - -