diff options
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/metric.c | 39 | ||||
-rw-r--r-- | database/engine/metric.h | 6 | ||||
-rw-r--r-- | database/engine/pagecache.c | 14 | ||||
-rw-r--r-- | database/engine/pdc.c | 3 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 10 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 98 |
6 files changed, 118 insertions, 52 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c index af769fda95..d16bc063d9 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -327,33 +327,42 @@ bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metr return ret; } -bool mrg_metric_set_first_time_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { - bool ret = false; +time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t first_time_s; netdata_spinlock_lock(&metric->spinlock); - if(!metric->first_time_s) { - metric->first_time_s = first_time_s; - ret = true; + + if(unlikely(!metric->first_time_s)) { + if(metric->latest_time_s_clean) + metric->first_time_s = metric->latest_time_s_clean; + + else if(metric->latest_time_s_hot) + metric->first_time_s = metric->latest_time_s_hot; } + + first_time_s = metric->first_time_s; + netdata_spinlock_unlock(&metric->spinlock); - return ret; + return first_time_s; } -time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t first_time_s; +void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { netdata_spinlock_lock(&metric->spinlock); - first_time_s = metric->first_time_s; - if(!first_time_s) { + + if(unlikely(!metric->first_time_s)) { if(metric->latest_time_s_clean) - first_time_s = metric->latest_time_s_clean; + metric->first_time_s = metric->latest_time_s_clean; - if(!first_time_s || metric->latest_time_s_hot < metric->latest_time_s_clean) - first_time_s = metric->latest_time_s_hot; + else if(metric->latest_time_s_hot) + metric->first_time_s = metric->latest_time_s_hot; } - netdata_spinlock_unlock(&metric->spinlock); - return first_time_s; + *first_time_s = metric->first_time_s; + *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); + *update_every_s = metric->latest_update_every_s; + + netdata_spinlock_unlock(&metric->spinlock); } bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { diff --git a/database/engine/metric.h b/database/engine/metric.h index 3eb2c97469..fe0481a1b1 100644 --- a/database/engine/metric.h +++ b/database/engine/metric.h @@ -46,18 +46,18 @@ Word_t mrg_metric_section(MRG *mrg, METRIC *metric); bool mrg_metric_set_first_time_s(MRG *mrg, METRIC *metric, time_t first_time_s); bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg, METRIC *metric, time_t first_time_s); -bool mrg_metric_set_first_time_s_if_zero(MRG *mrg, METRIC *metric, time_t first_time_s); time_t mrg_metric_get_first_time_s(MRG *mrg, METRIC *metric); -void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s); bool mrg_metric_set_clean_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s); bool mrg_metric_set_hot_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s); time_t mrg_metric_get_latest_time_s(MRG *mrg, METRIC *metric); bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, time_t update_every_s); +bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s); time_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric); -bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s); +void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s); +void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s); bool mrg_metric_writer_acquire(MRG *mrg, METRIC *metric); bool mrg_metric_writer_release(MRG *mrg, METRIC *metric); diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 961c2b2886..11f63751b8 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -356,17 +356,27 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin } static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) { + + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + + if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE) + return; + PGC_ENTRY page_entry = { .hot = false, .section = (Word_t)ctx, .metric_id = (Word_t)metric, - .start_time_s = start_time_s, - .end_time_s = end_time_s, + .start_time_s = MAX(start_time_s, db_first_time_s), + .end_time_s = MIN(end_time_s, db_last_time_s), .update_every_s = 0, .size = 0, .data = DBENGINE_EMPTY_PAGE, }; + if(page_entry.start_time_s >= page_entry.end_time_s) + return; + PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, NULL); pgc_page_release(main_cache, page); } diff --git a/database/engine/pdc.c b/database/engine/pdc.c index d0daaa5c12..0563133719 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -606,6 +606,9 @@ void pdc_acquire(PDC *pdc) { } bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) { + if(unlikely(!pdc)) + return true; + netdata_spinlock_lock(&pdc->refcount_spinlock); if(pdc->refcount <= 0) diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index ed1d851b1f..dbc017aafb 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -1620,9 +1620,17 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse completion_wait_for(&ctx->quiesce.completion); completion_destroy(&ctx->quiesce.completion); + bool logged = false; while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) || - __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { + if(!logged) { + logged = true; + info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), + (ctx->config.legacy) ? -1 : ctx->config.tier); + } sleep_usec(1 * USEC_PER_MS); + } completion_mark_complete(completion); diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 22fe30e4fd..b7d2eae3b3 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -266,16 +266,19 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri if(!is_1st_metric_writer) __atomic_add_fetch(&ctx->atomic.collectors_running_duplicate, 1, __ATOMIC_RELAXED); + mrg_metric_set_update_every(main_mrg, metric, update_every); + + handle->alignment = (struct pg_alignment *)smg; + rrdeng_page_alignment_acquire(handle->alignment); + // this is important! // if we don't set the page_end_time_ut during the first collection // data collection may be able to go back in time and during the addition of new pages // clean pages may be found matching ours! - handle->page_end_time_ut = (usec_t)mrg_metric_get_latest_time_s(main_mrg, metric) * USEC_PER_SEC; - mrg_metric_set_update_every(main_mrg, metric, update_every); - - handle->alignment = (struct pg_alignment *)smg; - rrdeng_page_alignment_acquire(handle->alignment); + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + handle->page_end_time_ut = (usec_t)db_last_time_s * USEC_PER_SEC; return (STORAGE_COLLECT_HANDLE *)handle; } @@ -382,11 +385,12 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha error_limit(&erl, #endif "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache " - "with existing %s page from %ld to %ld, update every %ld - " + "with existing %s%s page from %ld to %ld, update every %ld - " "is it collected more than once?", uuid, page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s, pgc_is_page_hot(page) ? "hot" : "not-hot", + pgc_page_data(page) == DBENGINE_EMPTY_PAGE ? " gap" : "", pgc_page_start_time_s(page), pgc_page_end_time_s(page), pgc_page_update_every_s(page) ); @@ -580,12 +584,8 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle, us collect_page_flags_to_buffer(wb, handle->page_flags); } -#ifdef NETDATA_INTERNAL_CHECKS - internal_error(true, -#else error_limit_static_global_var(erl, 1, 0); error_limit(&erl, -#endif "DBENGINE: metric '%s' collected point at %ld, %s last collection at %ld, " "update every %ld, %s page from %ld to %ld, position %u (of %u), flags: %s", uuid, @@ -699,8 +699,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_writer_release(main_mrg, handle->metric)) internal_fatal(true, "DBENGINE: metric is already released"); - time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, handle->metric); - time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, handle->metric); + time_t first_time_s, last_time_s, update_every_s; + mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, &update_every_s); mrg_metric_release(main_mrg, handle->metric); freez(handle); @@ -759,7 +759,11 @@ static void unregister_query_handle(struct rrdeng_query_handle *handle __maybe_u * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, + struct storage_engine_query_handle *rrddim_handle, + time_t start_time_s, + time_t end_time_s, + STORAGE_PRIORITY priority) { usec_t started_ut = now_monotonic_usec(); @@ -769,8 +773,6 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct sto struct rrdengine_instance *ctx = mrg_metric_ctx(metric); struct rrdeng_query_handle *handle; - mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every); - handle = rrdeng_query_handle_get(); register_query_handle(handle); @@ -781,23 +783,48 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct sto handle->ctx = ctx; handle->metric = metric; - handle->start_time_s = start_time_s; - handle->end_time_s = end_time_s; handle->priority = priority; - handle->now_s = start_time_s; - handle->dt_s = mrg_metric_get_update_every_s(main_mrg, metric); - if(!handle->dt_s) - handle->dt_s = default_rrd_update_every; + // IMPORTANT! + // It is crucial not to exceed the db boundaries, because dbengine + // now has gap caching, so when a gap is detected a negative page + // is inserted into the main cache, to avoid scanning the journals + // again for pages matching the gap. - rrddim_handle->handle = (STORAGE_QUERY_HANDLE *)handle; - rrddim_handle->start_time_s = start_time_s; - rrddim_handle->end_time_s = end_time_s; - rrddim_handle->priority = priority; + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); - pg_cache_preload(handle); + if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) == PAGE_IS_IN_RANGE) { + handle->start_time_s = MAX(start_time_s, db_first_time_s); + handle->end_time_s = MIN(end_time_s, db_last_time_s); + handle->now_s = handle->start_time_s; - __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_init, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED); + handle->dt_s = db_update_every_s; + if (!handle->dt_s) { + handle->dt_s = default_rrd_update_every; + mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every); + } + + rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle; + rrddim_handle->start_time_s = handle->start_time_s; + rrddim_handle->end_time_s = handle->end_time_s; + rrddim_handle->priority = priority; + + pg_cache_preload(handle); + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_init, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED); + } + else { + handle->start_time_s = start_time_s; + handle->end_time_s = end_time_s; + handle->now_s = start_time_s; + handle->dt_s = db_update_every_s; + + rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle; + rrddim_handle->start_time_s = handle->start_time_s; + rrddim_handle->end_time_s = 0; + rrddim_handle->priority = priority; + } } static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) { @@ -827,10 +854,19 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han unsigned position; if(likely(handle->now_s >= page_start_time_s && handle->now_s <= page_end_time_s)) { - if(unlikely(entries == 1 || page_start_time_s == page_end_time_s)) + if(unlikely(entries == 1 || page_start_time_s == page_end_time_s || !page_update_every_s)) { position = 0; - else + handle->now_s = page_start_time_s; + } + else { position = (handle->now_s - page_start_time_s) * (entries - 1) / (page_end_time_s - page_start_time_s); + time_t point_end_time_s = page_start_time_s + position * page_update_every_s; + if(point_end_time_s < handle->now_s && position + 1 < entries) { + position++; + point_end_time_s = page_start_time_s + position * page_update_every_s; + } + handle->now_s = point_end_time_s; + } internal_fatal(position >= entries, "DBENGINE: wrong page position calculation"); } @@ -986,8 +1022,8 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_ if (unlikely(!metric)) return false; - *first_entry_s = mrg_metric_get_first_time_s(main_mrg, metric); - *last_entry_s = mrg_metric_get_latest_time_s(main_mrg, metric); + time_t update_every_s; + mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, &update_every_s); mrg_metric_release(main_mrg, metric); |