diff options
author | Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> | 2020-02-06 21:58:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-06 21:58:13 +0200 |
commit | 6b119d9170fce726e9a5720edc83f6d9ac88e7ce (patch) | |
tree | 90dc7094ba92af299f5e7c0532e519f706b47d92 | |
parent | b2b3c182548fe81e6d1c9a599b2571dabfdabcaa (diff) |
Drop dirty dbengine pages if disk cannot keep up (#7777)
* Introduce dirty page pressure handling in the dbengine page cache that invalidates pages when the disk cannot keep up with the flushing speed.
-rw-r--r-- | configs.signatures | 2 | ||||
-rw-r--r-- | daemon/global_statistics.c | 86 | ||||
-rw-r--r-- | daemon/unit_test.c | 3 | ||||
-rw-r--r-- | database/engine/pagecache.c | 31 | ||||
-rw-r--r-- | database/engine/pagecache.h | 4 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 257 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 21 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 44 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 5 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 12 | ||||
-rw-r--r-- | health/health.d/dbengine.conf | 26 |
11 files changed, 375 insertions, 116 deletions
diff --git a/configs.signatures b/configs.signatures index ef4b99aa60..994b62a80e 100644 --- a/configs.signatures +++ b/configs.signatures @@ -381,7 +381,7 @@ declare -A configs_signatures=( ['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf' ['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf' ['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf' - ['8edc8c73a8f3ca40b32e27fe452c70f3']='health.d/dbengine.conf' + ['82f1dc0a477a175ae31d7b815411e44e']='health.d/dbengine.conf' ['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf' ['80266bddd3df374923c750a6de91d120']='health.d/apache.conf' ['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf' diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index 5197dcc100..e5aeb6e128 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -544,7 +544,7 @@ void global_statistics_charts(void) { if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { ++hosts_with_dbengine; /* get localhost's DB engine's statistics */ - rrdeng_get_35_statistics(host->rrdeng_ctx, local_stats_array); + rrdeng_get_37_statistics(host->rrdeng_ctx, local_stats_array); for (i = 0 ; i < RRDENG_NR_STATS ; ++i) { /* aggregate statistics across hosts */ stats_array[i] += local_stats_array[i]; @@ -558,6 +558,8 @@ void global_statistics_charts(void) { stats_array[30] = local_stats_array[30]; stats_array[31] = local_stats_array[31]; stats_array[32] = local_stats_array[32]; + stats_array[34] = local_stats_array[34]; + stats_array[36] = local_stats_array[36]; // ---------------------------------------------------------------- @@ -642,7 +644,6 @@ void global_statistics_charts(void) { old_misses = misses; if (hits_delta + misses_delta) { - // allow negative savings ratio = (hits_delta * 100 * 1000) / (hits_delta + misses_delta); } else { ratio = 0; @@ -658,11 +659,10 @@ void global_statistics_charts(void) { static RRDSET *st_pg_cache_pages = NULL; static RRDDIM *rd_descriptors = NULL; static RRDDIM *rd_populated = NULL; - static RRDDIM *rd_committed = NULL; - static RRDDIM *rd_insertions = NULL; - static RRDDIM *rd_deletions = NULL; + static RRDDIM *rd_dirty = NULL; static RRDDIM *rd_backfills = NULL; static RRDDIM *rd_evictions = NULL; + static RRDDIM *rd_used_by_collectors = NULL; if (unlikely(!st_pg_cache_pages)) { st_pg_cache_pages = rrdset_create_localhost( @@ -671,7 +671,7 @@ void global_statistics_charts(void) { , NULL , "dbengine" , NULL - , "NetData DB engine page statistics" + , "NetData dbengine page cache statistics" , "pages" , "netdata" , "stats" @@ -682,28 +682,69 @@ void global_statistics_charts(void) { rd_descriptors = rrddim_add(st_pg_cache_pages, "descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_populated = rrddim_add(st_pg_cache_pages, "populated", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rd_committed = rrddim_add(st_pg_cache_pages, "committed", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rd_insertions = rrddim_add(st_pg_cache_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); - rd_deletions = rrddim_add(st_pg_cache_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_dirty = rrddim_add(st_pg_cache_pages, "dirty", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_backfills = rrddim_add(st_pg_cache_pages, "backfills", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); rd_evictions = rrddim_add(st_pg_cache_pages, "evictions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_used_by_collectors = rrddim_add(st_pg_cache_pages, "used_by_collectors", NULL, 1, 1, + RRD_ALGORITHM_ABSOLUTE); } else rrdset_next(st_pg_cache_pages); rrddim_set_by_pointer(st_pg_cache_pages, rd_descriptors, (collected_number)stats_array[27]); rrddim_set_by_pointer(st_pg_cache_pages, rd_populated, (collected_number)stats_array[3]); - rrddim_set_by_pointer(st_pg_cache_pages, rd_committed, (collected_number)stats_array[4]); - rrddim_set_by_pointer(st_pg_cache_pages, rd_insertions, (collected_number)stats_array[5]); - rrddim_set_by_pointer(st_pg_cache_pages, rd_deletions, (collected_number)stats_array[6]); + rrddim_set_by_pointer(st_pg_cache_pages, rd_dirty, (collected_number)stats_array[0] + stats_array[4]); rrddim_set_by_pointer(st_pg_cache_pages, rd_backfills, (collected_number)stats_array[9]); rrddim_set_by_pointer(st_pg_cache_pages, rd_evictions, (collected_number)stats_array[10]); + rrddim_set_by_pointer(st_pg_cache_pages, rd_used_by_collectors, (collected_number)stats_array[0]); rrdset_done(st_pg_cache_pages); } // ---------------------------------------------------------------- { + static RRDSET *st_long_term_pages = NULL; + static RRDDIM *rd_total = NULL; + static RRDDIM *rd_insertions = NULL; + static RRDDIM *rd_deletions = NULL; + static RRDDIM *rd_flushing_pressure_deletions = NULL; + + if (unlikely(!st_long_term_pages)) { + st_long_term_pages = rrdset_create_localhost( + "netdata" + , "dbengine_long_term_page_stats" + , NULL + , "dbengine" + , NULL + , "NetData dbengine long-term page statistics" + , "pages" + , "netdata" + , "stats" + , 130505 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + + rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_flushing_pressure_deletions = rrddim_add(st_long_term_pages, "flushing_pressure_deletions", NULL, -1, + 1, RRD_ALGORITHM_INCREMENTAL); + } + else + rrdset_next(st_long_term_pages); + + rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]); + rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]); + rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]); + rrddim_set_by_pointer(st_long_term_pages, rd_flushing_pressure_deletions, + (collected_number)stats_array[36]); + rrdset_done(st_long_term_pages); + } + + // ---------------------------------------------------------------- + + { static RRDSET *st_io_stats = NULL; static RRDDIM *rd_reads = NULL; static RRDDIM *rd_writes = NULL; @@ -719,7 +760,7 @@ void global_statistics_charts(void) { , "MiB/s" , "netdata" , "stats" - , 130505 + , 130506 , localhost->rrd_update_every , RRDSET_TYPE_LINE ); @@ -753,7 +794,7 @@ void global_statistics_charts(void) { , "operations/s" , "netdata" , "stats" - , 130506 + , 130507 , localhost->rrd_update_every , RRDSET_TYPE_LINE ); @@ -775,7 +816,7 @@ void global_statistics_charts(void) { static RRDSET *st_errors = NULL; static RRDDIM *rd_fs_errors = NULL; static RRDDIM *rd_io_errors = NULL; - static RRDDIM *rd_flushing_errors = NULL; + static RRDDIM *pg_cache_over_half_dirty_events = NULL; if (unlikely(!st_errors)) { st_errors = rrdset_create_localhost( @@ -788,21 +829,22 @@ void global_statistics_charts(void) { , "errors/s" , "netdata" , "stats" - , 130507 + , 130508 , localhost->rrd_update_every , RRDSET_TYPE_LINE ); - rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); - rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); - rd_flushing_errors = rrddim_add(st_errors, "flushing errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_io_errors = rrddim_add(st_errors, "io_errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_fs_errors = rrddim_add(st_errors, "fs_errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + pg_cache_over_half_dirty_events = rrddim_add(st_errors, "pg_cache_over_half_dirty_events", NULL, 1, 1, + RRD_ALGORITHM_INCREMENTAL); } else rrdset_next(st_errors); rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]); rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]); - rrddim_set_by_pointer(st_errors, rd_flushing_errors, (collected_number)stats_array[34]); + rrddim_set_by_pointer(st_errors, pg_cache_over_half_dirty_events, (collected_number)stats_array[34]); rrdset_done(st_errors); } @@ -824,7 +866,7 @@ void global_statistics_charts(void) { , "descriptors" , "netdata" , "stats" - , 130508 + , 130509 , localhost->rrd_update_every , RRDSET_TYPE_LINE ); @@ -863,7 +905,7 @@ void global_statistics_charts(void) { , "MiB" , "netdata" , "stats" - , 130509 + , 130510 , localhost->rrd_update_every , RRDSET_TYPE_STACKED ); diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 87d281d766..323ae285a3 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1491,6 +1491,9 @@ static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number static RRDHOST *dbengine_rrdhost_find_or_create(char *name) { + /* We don't want to drop metrics when generating load, we prefer to block data generation itself */ + rrdeng_drop_metrics_under_page_cache_pressure = 0; + return rrdhost_find_or_create( name , name diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 60fd7a5f89..e90ac13fe4 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -217,7 +217,6 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb /* * This function returns the maximum number of pages allowed in the page cache. - * The caller must hold the page cache lock. */ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) { @@ -228,7 +227,6 @@ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) /* * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the * number of pages below that number. - * The caller must hold the page cache lock. */ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) { @@ -237,6 +235,16 @@ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) } /* + * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page + * cache. + */ +unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx) +{ + /* We remove the active pages of the producers from the calculation and only allow 50% of the extra pinned pages */ + return ctx->cache_pages_low_watermark + (unsigned long)ctx->stats.metric_API_producers / 2; +} + +/* * This function will block until it reserves #number populated pages. * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit. */ @@ -375,7 +383,11 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) return 0; } -void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty) +/* + * Callers of this function need to make sure they're not deleting the same descriptor concurrently + */ +void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, + uint8_t is_exclusive_holder) { struct page_cache *pg_cache = &ctx->pg_cache; struct page_cache_descr *pg_cache_descr = NULL; @@ -408,11 +420,14 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; - while (!pg_cache_try_get_unsafe(descr, 1)) { - debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); - pg_cache_wait_event_unsafe(descr); + if (!is_exclusive_holder) { + /* If we don't hold an exclusive page reference get one */ + while (!pg_cache_try_get_unsafe(descr, 1)) { + debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_wait_event_unsafe(descr); + } } if (remove_dirty) { pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 9fd9991b5e..3722a7e1cc 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -163,7 +163,8 @@ extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr); extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr); extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, struct rrdeng_page_descr *descr); -extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty); +extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, + uint8_t is_exclusive_holder); extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time); extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, @@ -185,6 +186,7 @@ extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx); extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx); +extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx); static inline void pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp) diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 3d49379048..586f2e0b48 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -6,7 +6,8 @@ rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; -rrdeng_stats_t global_flushing_errors = 0; +rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; +rrdeng_stats_t global_flushing_pressure_page_deletions = 0; static void sanity_check(void) { @@ -248,6 +249,109 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty } } +static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + int error; + + error = uv_thread_join(wc->now_invalidating_dirty_pages); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; +} + +static void invalidate_oldest_committed(void *arg) +{ + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config *wc = &ctx->worker_config; + struct page_cache *pg_cache = &ctx->pg_cache; + int ret; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + Pvoid_t *PValue; + Word_t Index; + unsigned nr_committed_pages; + + do { + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + for (Index = 0, + PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue; + + descr != NULL; + + PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), + descr = unlikely(NULL == PValue) ? NULL : *PValue) { + assert(0 != descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) { + rrdeng_page_descr_mutex_unlock(ctx, descr); + + ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); + assert(1 == ret); + break; + } + rrdeng_page_descr_mutex_unlock(ctx, descr); + } + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + + if (!descr) { + info("Failed to invalidate any dirty pages to relieve page cache pressure."); + + goto out; + } + pg_cache_punch_hole(ctx, descr, 1, 1); + + uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1); + rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1); + + } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)); +out: + wc->cleanup_thread_invalidating_dirty_pages = 1; + /* wake up event loop */ + assert(0 == uv_async_send(&wc->async)); +} + +void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned nr_committed_pages; + int error; + + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); + nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + + if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* delete the oldest page in memory */ + if (wc->now_invalidating_dirty_pages) { + /* already deleting a page */ + return; + } + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path); + + wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages)); + wc->cleanup_thread_invalidating_dirty_pages = 0; + + error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_invalidating_dirty_pages); + wc->now_invalidating_dirty_pages = NULL; + } + } +} + void flush_pages_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -294,6 +398,7 @@ void flush_pages_cb(uv_fs_t* req) uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); pg_cache->committed_page_index.nr_committed_pages -= count; uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + wc->inflight_dirty_pages -= count; } /* @@ -366,6 +471,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct complete(completion); return 0; } + wc->inflight_dirty_pages += count; + xt_io_descr = mallocz(sizeof(*xt_io_descr)); payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); switch (compression_algorithm) { @@ -466,17 +573,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct return ALIGN_BYTES_CEILING(size_bytes); } -static void after_delete_old_data(uv_work_t *req, int status) +static void after_delete_old_data(struct rrdengine_worker_config* wc) { - struct rrdengine_instance *ctx = req->data; - struct rrdengine_worker_config* wc = &ctx->worker_config; + struct rrdengine_instance *ctx = wc->ctx; struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; unsigned deleted_bytes, journalfile_bytes, datafile_bytes; - int ret; + int ret, error; char path[RRDENG_PATH_MAX]; - (void)status; datafile = ctx->datafiles.first; journalfile = datafile->journalfile; datafile_bytes = datafile->pos; @@ -503,15 +608,24 @@ static void after_delete_old_data(uv_work_t *req, int status) ctx->disk_space -= deleted_bytes; info("Reclaimed %u bytes of disk space.", deleted_bytes); + error = uv_thread_join(wc->now_deleting_files); + if (error) { + error("uv_thread_join(): %s", uv_strerror(error)); + } + freez(wc->now_deleting_files); /* unfreeze command processing */ - wc->now_deleting.data = NULL; - /* wake up event loop */ - assert(0 == uv_async_send(&wc->async)); + wc->now_deleting_files = NULL; + + wc->cleanup_thread_deleting_files = 0; + + /* interrupt event loop */ + uv_stop(wc->loop); } -static void delete_old_data(uv_work_t *req) +static void delete_old_data(void *arg) { - struct rrdengine_instance *ctx = req->data; + struct rrdengine_instance *ctx = arg; + struct rrdengine_worker_config* wc = &ctx->worker_config; struct rrdengine_datafile *datafile; struct extent_info *extent, *next; struct rrdeng_page_descr *descr; @@ -524,11 +638,14 @@ static void delete_old_data(uv_work_t *req) count = extent->number_of_pages; for (i = 0 ; i < count ; ++i) { descr = extent->pages[i]; - pg_cache_punch_hole(ctx, descr, 0); + pg_cache_punch_hole(ctx, descr, 0, 0); } next = extent->next; freez(extent); } + wc->cleanup_thread_deleting_files = 1; + /* wake up event loop */ + assert(0 == uv_async_send(&wc->async)); } void rrdeng_test_quota(struct rrdengine_worker_config* wc) @@ -537,7 +654,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) struct rrdengine_datafile *datafile; unsigned current_size, target_size; uint8_t out_of_space, only_one_datafile; - int ret; + int ret, error; out_of_space = 0; if (unlikely(ctx->disk_space > ctx->max_disk_space)) { @@ -559,7 +676,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) } if (unlikely(out_of_space)) { /* delete old data */ - if (wc->now_deleting.data) { + if (wc->now_deleting_files) { /* already deleting data */ return; } @@ -571,8 +688,33 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc) } info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); - wc->now_deleting.data = ctx; - assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data)); + wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files)); + wc->cleanup_thread_deleting_files = 0; + + error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + freez(wc->now_deleting_files); + wc->now_deleting_files = NULL; + } + } +} + +static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc) +{ + if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) { + return 1; + } + return 0; +} + +static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) +{ + if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) { + after_invalidate_oldest_committed(wc); + } + if (unlikely(wc->cleanup_thread_deleting_files)) { + after_delete_old_data(wc); } } @@ -662,34 +804,37 @@ void timer_cb(uv_timer_t* handle) uv_update_time(handle->loop); rrdeng_test_quota(wc); debug(D_RRDENGINE, "%s: timeout reached.", __func__); - if (likely(!wc->now_deleting.data)) { - /* There is free space so we can write to disk */ + if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { + /* There is free space so we can write to disk and we are not actively deleting dirty buffers */ struct rrdengine_instance *ctx = wc->ctx; struct page_cache *pg_cache = &ctx->pg_cache; unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, high_watermark; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); + uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); producers = ctx->stats.metric_API_producers; /* are flushable pages more than 25% of the maximum page cache size */ high_watermark = (ctx->max_cache_pages * 25LLU) / 100; low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ - if (nr_committed_pages > producers && - /* committed to be written pages are more than the produced number */ - nr_committed_pages - producers > high_watermark) { - /* Flushing speed must increase to stop page cache from filling with dirty pages */ - bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; - } - bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); + /* Flush more pages only if disk can keep up */ + if (wc->inflight_dirty_pages < high_watermark + producers) { + if (nr_committed_pages > producers && + /* committed to be written pages are more than the produced number */ + nr_committed_pages - producers > high_watermark) { + /* Flushing speed must increase to stop page cache from filling with dirty pages */ + bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; + } + bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); - debug(D_RRDENGINE, "Flushing pages to disk."); - for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ; - bytes_written && (total_bytes < bytes_to_write) ; - total_bytes += bytes_written) { - bytes_written = do_flush_pages(wc, 0, NULL); + debug(D_RRDENGINE, "Flushing pages to disk."); + for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL); + bytes_written && (total_bytes < bytes_to_write); + total_bytes += bytes_written) { + bytes_written = do_flush_pages(wc, 0, NULL); + } } } #ifdef NETDATA_INTERNAL_CHECKS @@ -730,7 +875,12 @@ void rrdeng_worker(void* arg) } wc->async.data = wc; - wc->now_deleting.data = NULL; + wc->now_deleting_files = NULL; + wc->cleanup_thread_deleting_files = 0; + + wc->now_invalidating_dirty_pages = NULL; + wc->cleanup_thread_invalidating_dirty_pages = 0; + wc->inflight_dirty_pages = 0; /* dirty page flushing timer */ ret = uv_timer_init(loop, &timer_req); @@ -746,8 +896,9 @@ void rrdeng_worker(void* arg) assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); shutdown = 0; - while (shutdown == 0 || uv_loop_alive(loop)) { + while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { uv_run(loop, UV_RUN_DEFAULT); + rrdeng_cleanup_finished_threads(wc); /* wait for commands */ cmd_batch_size = 0; @@ -769,14 +920,6 @@ void rrdeng_worker(void* arg) break; case RRDENG_SHUTDOWN: shutdown = 1; - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour and we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&wc->async, NULL); - assert(0 == uv_timer_stop(&timer_req)); - uv_close((uv_handle_t *)&timer_req, NULL); break; case RRDENG_READ_PAGE: do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); @@ -788,16 +931,16 @@ void rrdeng_worker(void* arg) do_commit_transaction(wc, STORE_DATA, NULL); break; case RRDENG_FLUSH_PAGES: { - unsigned bytes_written; - - /* First I/O should be enough to call completion */ - bytes_written = do_flush_pages(wc, 1, cmd.completion); - if (bytes_written) { - while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) { - ; /* Force flushing of all committed pages if there is free space. */ - } + if (wc->now_invalidating_dirty_pages) { + /* Do not flush if the disk cannot keep up */ + complete(cmd.completion); + } else { + (void)do_flush_pages(wc, 1, cmd.completion); } break; + case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE: + rrdeng_invalidate_oldest_committed(wc); + break; } default: debug(D_RRDENGINE, "%s: default.", __func__); @@ -805,11 +948,19 @@ void rrdeng_worker(void* arg) } } while (opcode != RRDENG_NOOP); } + /* cleanup operations of the event loop */ - if (unlikely(wc->now_deleting.data)) { - info("Postponing shutting RRD engine event loop down until after datafile deletion is finished."); - } info("Shutting down RRD engine event loop."); + + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour and we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + assert(0 == uv_timer_stop(&timer_req)); + uv_close((uv_handle_t *)&timer_req, NULL); + while (do_flush_pages(wc, 1, NULL)) { ; /* Force flushing of all committed pages. */ } diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 67f1b41c3d..22c3d33bef 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -49,6 +49,7 @@ enum rrdeng_opcode { RRDENG_COMMIT_PAGE, RRDENG_FLUSH_PAGES, RRDENG_SHUTDOWN, + RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, RRDENG_MAX_OPCODE }; @@ -102,7 +103,16 @@ struct rrdengine_worker_config { uv_thread_t thread; uv_loop_t* loop; uv_async_t async; - uv_work_t now_deleting; + + /* file deletion thread */ + uv_thread_t *now_deleting_files; + unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */ + + /* dirty page deletion thread */ + uv_thread_t *now_invalidating_dirty_pages; + /* set to 0 when now_invalidating_dirty_pages is still running */ + unsigned long cleanup_thread_invalidating_dirty_pages; + unsigned inflight_dirty_pages; /* FIFO command queue */ uv_mutex_t cmd_mutex; @@ -145,7 +155,8 @@ struct rrdengine_statistics { rrdeng_stats_t page_cache_descriptors; rrdeng_stats_t io_errors; rrdeng_stats_t fs_errors; - rrdeng_stats_t flushing_errors; + rrdeng_stats_t pg_cache_over_half_dirty_events; + rrdeng_stats_t flushing_pressure_page_deletions; }; /* I/O errors global counter |