summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2019-10-24 19:43:09 +0300
committerGitHub <noreply@github.com>2019-10-24 19:43:09 +0300
commit88f966593abc5c7888e7c0be83780a97d4326ac2 (patch)
treeea40e93b143cb18c6da12357e1e2a64e0095ce78 /database
parenta6229b245cf42b332d3f11199488f175a8a80a7c (diff)
detect if the disk cannot keep up with data collection (#7139)
* Adjust dbengine flushing speed more dynamically * Added error tracking statistics for failure to flush events * Added alarm for dbengine flushing errors * Improved dbengine accounting for commited to be written pages
Diffstat (limited to 'database')
-rw-r--r--database/engine/pagecache.c22
-rw-r--r--database/engine/pagecache.h8
-rw-r--r--database/engine/rrdengine.c56
-rw-r--r--database/engine/rrdengine.h3
-rw-r--r--database/engine/rrdengineapi.c37
-rw-r--r--database/engine/rrdengineapi.h4
-rw-r--r--database/engine/rrdenginelib.c22
7 files changed, 104 insertions, 48 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index a419ba9818..449138b4de 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -214,7 +214,7 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
* This function returns the maximum number of pages allowed in the page cache.
* The caller must hold the page cache lock.
*/
-static inline unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
+unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers;
@@ -225,7 +225,7 @@ static inline unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
* number of pages below that number.
* The caller must hold the page cache lock.
*/
-static inline unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
+unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers;
@@ -1029,14 +1029,14 @@ static void init_replaceQ(struct rrdengine_instance *ctx)
assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
}
-static void init_commited_page_index(struct rrdengine_instance *ctx)
+static void init_committed_page_index(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
- pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL;
- assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock));
- pg_cache->commited_page_index.latest_corr_id = 0;
- pg_cache->commited_page_index.nr_commited_pages = 0;
+ pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
+ assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
+ pg_cache->committed_page_index.latest_corr_id = 0;
+ pg_cache->committed_page_index.nr_committed_pages = 0;
}
void init_page_cache(struct rrdengine_instance *ctx)
@@ -1049,7 +1049,7 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
- init_commited_page_index(ctx);
+ init_committed_page_index(ctx);
}
void free_page_cache(struct rrdengine_instance *ctx)
@@ -1062,9 +1062,9 @@ void free_page_cache(struct rrdengine_instance *ctx)
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
- /* Free commited page index */
- ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
- assert(NULL == pg_cache->commited_page_index.JudyL_array);
+ /* Free committed page index */
+ ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
+ assert(NULL == pg_cache->committed_page_index.JudyL_array);
bytes_freed += ret_Judy;
for (page_index = pg_cache->metrics_index.last_page_index ;
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index ab1a5c1ad0..7d8fa2a1df 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -110,7 +110,7 @@ struct pg_cache_metrics_index {
};
/* gathers dirty pages to be written on disk */
-struct pg_cache_commited_page_index {
+struct pg_cache_committed_page_index {
uv_rwlock_t lock;
Pvoid_t JudyL_array;
@@ -122,7 +122,7 @@ struct pg_cache_commited_page_index {
*/
Word_t latest_corr_id;
- unsigned nr_commited_pages;
+ unsigned nr_committed_pages;
};
/*
@@ -140,7 +140,7 @@ struct page_cache { /* TODO: add statistics */
uv_rwlock_t pg_cache_rwlock; /* page cache lock */
struct pg_cache_metrics_index metrics_index;
- struct pg_cache_commited_page_index commited_page_index;
+ struct pg_cache_committed_page_index committed_page_index;
struct pg_cache_replaceQ replaceQ;
unsigned page_descriptors;
@@ -182,6 +182,8 @@ extern void init_page_cache(struct rrdengine_instance *ctx);
extern void free_page_cache(struct rrdengine_instance *ctx);
extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
+extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
static inline void
pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 896d71f169..b6b6548ecf 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -5,9 +5,8 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
-rrdeng_stats_t global_pg_cache_warnings = 0;
-rrdeng_stats_t global_pg_cache_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+rrdeng_stats_t global_flushing_errors = 0;
void sanity_check(void)
{
@@ -253,6 +252,7 @@ void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
@@ -290,6 +290,10 @@ void flush_pages_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
freez(xt_io_descr);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ pg_cache->committed_page_index.nr_committed_pages -= count;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
}
/*
@@ -323,14 +327,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
if (force) {
debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
}
- uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
for (Index = 0, count = 0, uncompressed_payload_length = 0,
- PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
descr != NULL && count != MAX_PAGES_PER_EXTENT ;
- PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
uint8_t page_write_pending;
@@ -350,12 +354,11 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
rrdeng_page_descr_mutex_unlock(ctx, descr);
if (page_write_pending) {
- ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, Index, PJE0);
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
assert(1 == ret);
- --pg_cache->commited_page_index.nr_commited_pages;
}
}
- uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (!count) {
debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
@@ -648,6 +651,9 @@ void async_cb(uv_async_t *handle)
debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
+/* Flushes dirty pages when timer expires */
+#define TIMER_PERIOD_MS (1000)
+
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
@@ -657,12 +663,31 @@ void timer_cb(uv_timer_t* handle)
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting.data)) {
- unsigned total_bytes, bytes_written;
-
/* There is free space so we can write to disk */
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
+ high_watermark;
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ producers = ctx->stats.metric_API_producers;
+ /* are flushable pages more than 25% of the maximum page cache size */
+ high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
+ low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
+
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+
debug(D_RRDENGINE, "Flushing pages to disk.");
for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
- bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
+ bytes_written && (total_bytes < bytes_to_write) ;
total_bytes += bytes_written) {
bytes_written = do_flush_pages(wc, 0, NULL);
}
@@ -675,9 +700,6 @@ void timer_cb(uv_timer_t* handle)
#endif
}
-/* Flushes dirty pages when timer expires */
-#define TIMER_PERIOD_MS (1000)
-
#define MAX_CMD_BATCH_SIZE (256)
void rrdeng_worker(void* arg)
@@ -771,8 +793,8 @@ void rrdeng_worker(void* arg)
/* First I/O should be enough to call completion */
bytes_written = do_flush_pages(wc, 1, cmd.completion);
if (bytes_written) {
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
+ while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
+ ; /* Force flushing of all committed pages if there is free space. */
}
}
break;
@@ -789,7 +811,7 @@ void rrdeng_worker(void* arg)
}
info("Shutting down RRD engine event loop.");
while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
+ ; /* Force flushing of all committed pages. */
}
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 6f6a6f8ffd..78a35a0bfd 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -148,6 +148,7 @@ struct rrdengine_statistics {
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
+ rrdeng_stats_t flushing_errors;
};
/* I/O errors global counter */
@@ -156,6 +157,8 @@ extern rrdeng_stats_t global_io_errors;
extern rrdeng_stats_t global_fs_errors;
/* number of File-Descriptors that have been reserved by dbengine */
extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
+/* inability to flush global counter */
+extern rrdeng_stats_t global_flushing_errors;
struct rrdengine_instance {
struct rrdengine_worker_config worker_config;
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 5fa23d8fd8..baf4a99733 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -173,9 +173,9 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
handle->descr = descr;
- uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
- handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
- uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (0 == rd->rrdset->rrddim_page_alignment) {
/* this is the leading dimension that defines chart alignment */
@@ -614,18 +614,31 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr
{
struct page_cache *pg_cache = &ctx->pg_cache;
Pvoid_t *PValue;
+ unsigned nr_committed_pages;
if (unlikely(NULL == descr)) {
- debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-commited.", __func__);
+ debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
return;
}
assert(descr->page_length);
- uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
- PValue = JudyLIns(&pg_cache->commited_page_index.JudyL_array, page_correlation_id, PJE0);
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
*PValue = descr;
- ++pg_cache->commited_page_index.nr_commited_pages;
- uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
+ nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= (pg_cache_hard_limit(ctx) - (unsigned long)ctx->stats.metric_API_producers) / 2) {
+ /* 50% of pages have not been committed yet */
+ if (0 == (unsigned long)ctx->stats.flushing_errors) {
+ /* only print the first time */
+ error("Failed to flush quickly enough in dbengine instance \"%s\""
+ ". Metric data will not be stored in the database"
+ ", please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+ }
+ rrd_stat_atomic_add(&ctx->stats.flushing_errors, 1);
+ rrd_stat_atomic_add(&global_flushing_errors, 1);
+ }
pg_cache_put(ctx, descr);
}
@@ -674,7 +687,7 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
* You must not change the indices of the statistics or user code will break.
* You must not exceed RRDENG_NR_STATS or it will crash.
*/
-void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -682,7 +695,7 @@ void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long
array[1] = (uint64_t)ctx->stats.metric_API_consumers;
array[2] = (uint64_t)pg_cache->page_descriptors;
array[3] = (uint64_t)pg_cache->populated_pages;
- array[4] = (uint64_t)pg_cache->commited_page_index.nr_commited_pages;
+ array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
array[7] = (uint64_t)ctx->stats.pg_cache_hits;
@@ -711,7 +724,9 @@ void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long
array[30] = (uint64_t)global_io_errors;
array[31] = (uint64_t)global_fs_errors;
array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
- assert(RRDENG_NR_STATS == 33);
+ array[33] = (uint64_t)ctx->stats.flushing_errors;
+ array[34] = (uint64_t)global_flushing_errors;
+ assert(RRDENG_NR_STATS == 35);
}
/* Releases reference to page */
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index c876705e4f..1ef0569d5e 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -8,7 +8,7 @@
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8)
#define RRDENG_MIN_DISK_SPACE_MB (256)
-#define RRDENG_NR_STATS (33)
+#define RRDENG_NR_STATS (35)
#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
@@ -41,7 +41,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
-extern void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
+extern void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 1a04dc2a47..0606dd9386 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -131,7 +131,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"page_cache_total_pages: %ld\n"
"page_cache_descriptors: %ld\n"
"page_cache_populated_pages: %ld\n"
- "page_cache_commited_pages: %ld\n"
+ "page_cache_committed_pages: %ld\n"
"page_cache_insertions: %ld\n"
"page_cache_deletions: %ld\n"
"page_cache_hits: %ld\n"
@@ -153,13 +153,20 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"datafile_creations: %ld\n"
"datafile_deletions: %ld\n"
"journalfile_creations: %ld\n"
- "journalfile_deletions: %ld\n",
+ "journalfile_deletions: %ld\n"
+ "io_errors: %ld\n"
+ "fs_errors: %ld\n"
+ "global_io_errors: %ld\n"
+ "global_fs_errors: %ld\n"
+ "rrdeng_reserved_file_descriptors: %ld\n"
+ "flushing_errors: %ld\n"
+ "global_flushing_errors: %ld\n",
(long)ctx->stats.metric_API_producers,
(long)ctx->stats.metric_API_consumers,
(long)pg_cache->page_descriptors,
(long)ctx->stats.page_cache_descriptors,
(long)pg_cache->populated_pages,
- (long)pg_cache->commited_page_index.nr_commited_pages,
+ (long)pg_cache->committed_page_index.nr_committed_pages,
(long)ctx->stats.pg_cache_insertions,
(long)ctx->stats.pg_cache_deletions,
(long)ctx->stats.pg_cache_hits,
@@ -181,7 +188,14 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
(long)ctx->stats.datafile_creations,
(long)ctx->stats.datafile_deletions,
(long)ctx->stats.journalfile_creations,
- (long)ctx->stats.journalfile_deletions
+ (long)ctx->stats.journalfile_deletions,
+ (long)ctx->stats.io_errors,
+ (long)ctx->stats.fs_errors,
+ (long)global_io_errors,
+ (long)global_fs_errors,
+ (long)rrdeng_reserved_file_descriptors,
+ (long)ctx->stats.flushing_errors,
+ (long)global_flushing_errors
);
return str;
}