diff options
-rw-r--r-- | configs.signatures | 1 | ||||
-rw-r--r-- | daemon/global_statistics.c | 71 | ||||
-rw-r--r-- | database/engine/datafile.c | 204 | ||||
-rw-r--r-- | database/engine/datafile.h | 5 | ||||
-rw-r--r-- | database/engine/journalfile.c | 95 | ||||
-rw-r--r-- | database/engine/journalfile.h | 2 | ||||
-rw-r--r-- | database/engine/pagecache.c | 69 | ||||
-rw-r--r-- | database/engine/pagecache.h | 4 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 156 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 14 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 72 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 6 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 2 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 2 | ||||
-rw-r--r-- | database/engine/rrdenglocking.c | 4 | ||||
-rw-r--r-- | health/Makefile.am | 1 | ||||
-rw-r--r-- | health/health.d/dbengine.conf | 26 | ||||
-rw-r--r-- | web/server/static/static-threaded.c | 2 |
18 files changed, 578 insertions, 158 deletions
diff --git a/configs.signatures b/configs.signatures index afc8dbe56d..b0ded05e03 100644 --- a/configs.signatures +++ b/configs.signatures @@ -381,6 +381,7 @@ declare -A configs_signatures=( ['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf' ['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf' ['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf' + ['ce285c90747428ee5da4efb547418dda']='health.d/dbengine.conf' ['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf' ['80266bddd3df374923c750a6de91d120']='health.d/apache.conf' ['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf' diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index fef8eaae6e..53b7546f26 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -538,7 +538,7 @@ void global_statistics_charts(void) { unsigned long long stats_array[RRDENG_NR_STATS]; /* get localhost's DB engine's statistics */ - rrdeng_get_28_statistics(localhost->rrdeng_ctx, stats_array); + rrdeng_get_33_statistics(localhost->rrdeng_ctx, stats_array); // ---------------------------------------------------------------- @@ -749,6 +749,75 @@ void global_statistics_charts(void) { rrddim_set_by_pointer(st_io_stats, rd_writes, (collected_number)stats_array[16]); rrdset_done(st_io_stats); } + + // ---------------------------------------------------------------- + + { + static RRDSET *st_errors = NULL; + static RRDDIM *rd_fs_errors = NULL; + static RRDDIM *rd_io_errors = NULL; + + if (unlikely(!st_errors)) { + st_errors = rrdset_create_localhost( + "netdata" + , "dbengine_global_errors" + , NULL + , "dbengine" + , NULL + , "NetData DB engine errors" + , "errors/s" + , "netdata" + , "stats" + , 130507 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + + rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + } + else + rrdset_next(st_errors); + + rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]); + rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]); + rrdset_done(st_errors); + } + + // ---------------------------------------------------------------- + + { + static RRDSET *st_fd = NULL; + static RRDDIM *rd_fd_current = NULL; + static RRDDIM *rd_fd_max = NULL; + + if (unlikely(!st_fd)) { + st_fd = rrdset_create_localhost( + "netdata" + , "dbengine_global_file_descriptors" + , NULL + , "dbengine" + , NULL + , "NetData DB engine File Descriptors" + , "descriptors" + , "netdata" + , "stats" + , 130508 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + + rd_fd_current = rrddim_add(st_fd, "current", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_fd_max = rrddim_add(st_fd, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + else + rrdset_next(st_fd); + + rrddim_set_by_pointer(st_fd, rd_fd_current, (collected_number)stats_array[32]); + /* Careful here, modify this accordingly if the File-Descriptor budget ever changes */ + rrddim_set_by_pointer(st_fd, rd_fd_max, (collected_number)rlimit_nofile.rlim_cur / 4); + rrdset_done(st_fd); + } } #endif diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 9262805c81..8ef4ed5990 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_ datafile->ctx = ctx; } -static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); } +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + + int destroy_data_file(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.datafile_deletions; - return 0; + return ret; } int create_data_file(struct rrdengine_datafile *datafile) @@ -97,13 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile) int ret, fd; struct rrdeng_df_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } + datafile->file = file; + ++ctx->stats.datafile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -117,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_data_file(datafile); + return ret; + } - datafile->file = file; datafile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.datafile_creations; return 0; } @@ -174,15 +205,15 @@ static int load_data_file(struct rrdengine_datafile *datafile) struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } info("Initializing data file \"%s\".", path); @@ -205,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile) return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } static int scan_data_files_cmp(const void *a, const void *b) { struct rrdengine_datafile *file1, *file2; - char path1[1024], path2[1024]; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; file1 = *(struct rrdengine_datafile **)a; file2 = *(struct rrdengine_datafile **)b; @@ -222,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b) return strcmp(path1, path2); } -/* Returns number of datafiles that were loaded */ +/* Returns number of datafiles that were loaded or < 0 on error */ static int scan_data_files(struct rrdengine_instance *ctx) { int ret; @@ -233,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx) struct rrdengine_journalfile *journalfile; ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); - assert(ret >= 0); - assert(req.result >= 0); + if (ret < 0) { + assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } info("Found %d files in path %s", ret, ctx->dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s\"", dent.name); + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { - info("Matched file \"%s\"", dent.name); + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, no); datafiles[matched_files++] = datafile; @@ -250,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx) } uv_fs_req_cleanup(&req); + if (0 == matched_files) { + freez(datafiles); + return 0; + } if (matched_files == MAX_DATAFILES) { error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); } qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + /* TODO: change this when tiering is implemented */ + ctx->last_fileno = datafiles[matched_files - 1]->fileno; + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { datafile = datafiles[i]; ret = load_data_file(datafile); if (0 != ret) { - free(datafile); + freez(datafile); ++failed_to_load; - continue; + break; } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { - free(datafile); - free(journalfile); + close_data_file(datafile); + freez(datafile); + freez(journalfile); ++failed_to_load; - continue; + break; } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; } + freez(datafiles); if (failed_to_load) { - error("%u files failed to load.", failed_to_load); + error("%u datafiles failed to load.", failed_to_load); + finalize_data_files(ctx); + return UV_EIO; } - free(datafiles); - return matched_files - failed_to_load; + return matched_files; } /* Creates a datafile and a journalfile pair */ -void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) { struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; int ret; + char path[RRDENG_PATH_MAX]; - info("Creating new data and journal files."); + info("Creating new data and journal files in path %s", ctx->dbfiles_path); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, fileno); ret = create_data_file(datafile); - assert(!ret); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Created data file \"%s\".", path); + } else { + goto error_after_datafile; + } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = create_journal_file(journalfile, datafile); - assert(!ret); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Created journal file \"%s\".", path); + } else { + goto error_after_journalfile; + } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; + + return 0; + +error_after_journalfile: + destroy_data_file(datafile); + freez(journalfile); +error_after_datafile: + freez(datafile); + return ret; } -/* Page cache must already be initialized. */ +/* Page cache must already be initialized. + * Return 0 on success. + */ int init_data_files(struct rrdengine_instance *ctx) { int ret; ret = scan_data_files(ctx); - if (0 == ret) { - info("Data files not found, creating."); - create_new_datafile_pair(ctx, 1, 1); + if (ret < 0) { + error("Failed to scan path \"%s\".", ctx->dbfiles_path); + return ret; + } else if (0 == ret) { + info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); + ret = create_new_datafile_pair(ctx, 1, 1); + if (ret) { + error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + return ret; + } + ctx->last_fileno = 1; } + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + struct rrdengine_datafile *datafile, *next_datafile; + struct rrdengine_journalfile *journalfile; + struct extent_info *extent, *next_extent; + + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { + journalfile = datafile->journalfile; + next_datafile = datafile->next; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + next_extent = extent->next; + freez(extent); + } + close_journal_file(journalfile, datafile); + close_data_file(datafile); + freez(journalfile); + freez(datafile); + + } }
\ No newline at end of file diff --git a/database/engine/datafile.h b/database/engine/datafile.h index 961da49971..eeb11310b5 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -55,9 +55,12 @@ struct rrdengine_datafile_list { extern void df_extent_insert(struct extent_info *extent); extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +extern int close_data_file(struct rrdengine_datafile *datafile); extern int destroy_data_file(struct rrdengine_datafile *datafile); extern int create_data_file(struct rrdengine_datafile *datafile); -extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); extern int init_data_files(struct rrdengine_instance *ctx); +extern void finalize_data_files(struct rrdengine_instance *ctx); #endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 9b14c914df..30eaa0ec6a 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req) uv_fs_req_cleanup(req); free(io_descr->buf); - free(io_descr); + freez(io_descr); } /* Careful to always call this before creating a new journal file */ @@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } -static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); @@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->datafile = datafile; } +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); - exit(ret); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; - return 0; + return ret; } int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) @@ -143,13 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng int ret, fd; struct rrdeng_jf_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } + journalfile->file = file; + ++ctx->stats.journalfile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -162,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_journal_file(journalfile, datafile); + return ret; + } - journalfile->file = file; journalfile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.journalfile_creations; return 0; } @@ -263,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } @@ -398,15 +429,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi { uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size, max_id; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } info("Loading journal file \"%s\".", path); @@ -433,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } void init_commit_log(struct rrdengine_instance *ctx) diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index 50489aeeeb..0df66304d7 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -33,9 +33,11 @@ struct transaction_commit_log { unsigned buf_size; }; +extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index f41d3959da..124f2448b1 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -287,7 +287,7 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_ { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - free(pg_cache_descr->page); + freez(pg_cache_descr->page); pg_cache_descr->page = NULL; pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; pg_cache_release_pages_unsafe(ctx, 1); @@ -330,7 +330,7 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) return 1; } rrdeng_page_descr_mutex_unlock(ctx, descr); - }; + } uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); /* failed to evict */ @@ -594,7 +594,7 @@ struct pg_cache_page_index * } rrdeng_page_descr_mutex_unlock(ctx, descr); - }; + } uv_rwlock_rdunlock(&page_index->lock); failed_to_reserve = 0; @@ -767,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id) assert(0 == uv_rwlock_init(&page_index->lock)); page_index->oldest_time = INVALID_TIME; page_index->latest_time = INVALID_TIME; + page_index->prev = NULL; return page_index; } @@ -776,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx) struct page_cache *pg_cache = &ctx->pg_cache; pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; + pg_cache->metrics_index.last_page_index = NULL; assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); } @@ -809,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx) init_metrics_index(ctx); init_replaceQ(ctx); init_commited_page_index(ctx); +} + +void free_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Word_t ret_Judy, bytes_freed = 0; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index, *prev_page_index; + Word_t Index; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + /* Free commited page index */ + ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0); + assert(NULL == pg_cache->commited_page_index.JudyL_array); + bytes_freed += ret_Judy; + + for (page_index = pg_cache->metrics_index.last_page_index ; + page_index != NULL ; + page_index = prev_page_index) { + prev_page_index = page_index->prev; + + /* Find first page in range */ + Index = (Word_t) 0; + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + while (descr != NULL) { + /* Iterate all page descriptors of this metric */ + + if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + /* Check rrdenglocking.c */ + pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + freez(pg_cache_descr->page); + bytes_freed += RRDENG_BLOCK_SIZE; + } + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + bytes_freed += sizeof(*pg_cache_descr); + } + freez(descr); + bytes_freed += sizeof(*descr); + + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } + + /* Free page index */ + ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); + assert(NULL == page_index->JudyL_array); + bytes_freed += ret_Judy; + freez(page_index); + bytes_freed += sizeof(*page_index); + } + /* Free metrics index */ + ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); + assert(NULL == pg_cache->metrics_index.JudyHS_array); + bytes_freed += ret_Judy; + + info("Freed %lu bytes of memory from page cache.", bytes_freed); }
\ No newline at end of file diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 0447d14e69..b5670f82a1 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -84,12 +84,15 @@ struct pg_cache_page_index { * It's also written by the data deletion workqueue when data collection is disabled for this metric. */ usec_t latest_time; + + struct pg_cache_page_index *prev; }; /* maps UUIDs to page indices */ struct pg_cache_metrics_index { uv_rwlock_t lock; Pvoid_t JudyHS_array; + struct pg_cache_page_index *last |