diff options
Diffstat (limited to 'database')
-rw-r--r-- | database/engine/datafile.c | 204 | ||||
-rw-r--r-- | database/engine/datafile.h | 5 | ||||
-rw-r--r-- | database/engine/journalfile.c | 95 | ||||
-rw-r--r-- | database/engine/journalfile.h | 2 | ||||
-rw-r--r-- | database/engine/pagecache.c | 69 | ||||
-rw-r--r-- | database/engine/pagecache.h | 4 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 156 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 14 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 72 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 6 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 2 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 2 | ||||
-rw-r--r-- | database/engine/rrdenglocking.c | 4 |
13 files changed, 479 insertions, 156 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 9262805c81..8ef4ed5990 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_ datafile->ctx = ctx; } -static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); } +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + + int destroy_data_file(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_datafilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.datafile_deletions; - return 0; + return ret; } int create_data_file(struct rrdengine_datafile *datafile) @@ -97,13 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile) int ret, fd; struct rrdeng_df_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } + datafile->file = file; + ++ctx->stats.datafile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -117,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_data_file(datafile); + return ret; + } - datafile->file = file; datafile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.datafile_creations; return 0; } @@ -174,15 +205,15 @@ static int load_data_file(struct rrdengine_datafile *datafile) struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } info("Initializing data file \"%s\".", path); @@ -205,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile) return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } static int scan_data_files_cmp(const void *a, const void *b) { struct rrdengine_datafile *file1, *file2; - char path1[1024], path2[1024]; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; file1 = *(struct rrdengine_datafile **)a; file2 = *(struct rrdengine_datafile **)b; @@ -222,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b) return strcmp(path1, path2); } -/* Returns number of datafiles that were loaded */ +/* Returns number of datafiles that were loaded or < 0 on error */ static int scan_data_files(struct rrdengine_instance *ctx) { int ret; @@ -233,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx) struct rrdengine_journalfile *journalfile; ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); - assert(ret >= 0); - assert(req.result >= 0); + if (ret < 0) { + assert(req.result < 0); + uv_fs_req_cleanup(&req); + error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return ret; + } info("Found %d files in path %s", ret, ctx->dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s\"", dent.name); + info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { - info("Matched file \"%s\"", dent.name); + info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, no); datafiles[matched_files++] = datafile; @@ -250,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx) } uv_fs_req_cleanup(&req); + if (0 == matched_files) { + freez(datafiles); + return 0; + } if (matched_files == MAX_DATAFILES) { error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); } qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + /* TODO: change this when tiering is implemented */ + ctx->last_fileno = datafiles[matched_files - 1]->fileno; + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { datafile = datafiles[i]; ret = load_data_file(datafile); if (0 != ret) { - free(datafile); + freez(datafile); ++failed_to_load; - continue; + break; } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { - free(datafile); - free(journalfile); + close_data_file(datafile); + freez(datafile); + freez(journalfile); ++failed_to_load; - continue; + break; } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; } + freez(datafiles); if (failed_to_load) { - error("%u files failed to load.", failed_to_load); + error("%u datafiles failed to load.", failed_to_load); + finalize_data_files(ctx); + return UV_EIO; } - free(datafiles); - return matched_files - failed_to_load; + return matched_files; } /* Creates a datafile and a journalfile pair */ -void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) { struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; int ret; + char path[RRDENG_PATH_MAX]; - info("Creating new data and journal files."); + info("Creating new data and journal files in path %s", ctx->dbfiles_path); datafile = mallocz(sizeof(*datafile)); datafile_init(datafile, ctx, tier, fileno); ret = create_data_file(datafile); - assert(!ret); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Created data file \"%s\".", path); + } else { + goto error_after_datafile; + } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = create_journal_file(journalfile, datafile); - assert(!ret); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Created journal file \"%s\".", path); + } else { + goto error_after_journalfile; + } datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; + + return 0; + +error_after_journalfile: + destroy_data_file(datafile); + freez(journalfile); +error_after_datafile: + freez(datafile); + return ret; } -/* Page cache must already be initialized. */ +/* Page cache must already be initialized. + * Return 0 on success. + */ int init_data_files(struct rrdengine_instance *ctx) { int ret; ret = scan_data_files(ctx); - if (0 == ret) { - info("Data files not found, creating."); - create_new_datafile_pair(ctx, 1, 1); + if (ret < 0) { + error("Failed to scan path \"%s\".", ctx->dbfiles_path); + return ret; + } else if (0 == ret) { + info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); + ret = create_new_datafile_pair(ctx, 1, 1); + if (ret) { + error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + return ret; + } + ctx->last_fileno = 1; } + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + struct rrdengine_datafile *datafile, *next_datafile; + struct rrdengine_journalfile *journalfile; + struct extent_info *extent, *next_extent; + + for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { + journalfile = datafile->journalfile; + next_datafile = datafile->next; + + for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { + next_extent = extent->next; + freez(extent); + } + close_journal_file(journalfile, datafile); + close_data_file(datafile); + freez(journalfile); + freez(datafile); + + } }
\ No newline at end of file diff --git a/database/engine/datafile.h b/database/engine/datafile.h index 961da49971..eeb11310b5 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -55,9 +55,12 @@ struct rrdengine_datafile_list { extern void df_extent_insert(struct extent_info *extent); extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); +extern int close_data_file(struct rrdengine_datafile *datafile); extern int destroy_data_file(struct rrdengine_datafile *datafile); extern int create_data_file(struct rrdengine_datafile *datafile); -extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); extern int init_data_files(struct rrdengine_instance *ctx); +extern void finalize_data_files(struct rrdengine_instance *ctx); #endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 9b14c914df..30eaa0ec6a 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req) uv_fs_req_cleanup(req); free(io_descr->buf); - free(io_descr); + freez(io_descr); } /* Careful to always call this before creating a new journal file */ @@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } -static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); @@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->datafile = datafile; } +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; - int ret, fd; - char path[1024]; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { - fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - fatal("uv_fs_close: %s", uv_strerror(ret)); - exit(ret); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); - generate_journalfilepath(datafile, path, sizeof(path)); - fd = uv_fs_unlink(NULL, &req, path, NULL); - if (fd < 0) { - fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); } - assert(0 == req.result); uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; - return 0; + return ret; } int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) @@ -143,13 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng int ret, fd; struct rrdeng_jf_sb *superblock; uv_buf_t iov; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { - fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + return fd; } + journalfile->file = file; + ++ctx->stats.journalfile_creations; ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { @@ -162,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - fatal("uv_fs_write: %s", uv_strerror(ret)); - } - if (req.result < 0) { - fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + assert(req.result < 0); + error("uv_fs_write: %s", uv_strerror(ret)); + ++ctx->stats.io_errors; + rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); free(superblock); + if (ret < 0) { + destroy_journal_file(journalfile, datafile); + return ret; + } - journalfile->file = file; journalfile->pos = sizeof(*superblock); ctx->stats.io_write_bytes += sizeof(*superblock); ++ctx->stats.io_write_requests; - ++ctx->stats.journalfile_creations; return 0; } @@ -263,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } @@ -398,15 +429,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi { uv_fs_t req; uv_file file; - int ret, fd; + int ret, fd, error; uint64_t file_size, max_id; - char path[1024]; + char path[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_RDWR, &file); if (fd < 0) { - /* if (UV_ENOENT != fd) */ - error("uv_fs_fsopen: %s", uv_strerror(fd)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); return fd; } info("Loading journal file \"%s\".", path); @@ -433,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi return 0; error: - (void) uv_fs_close(NULL, &req, file, NULL); + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } uv_fs_req_cleanup(&req); - return ret; + return error; } void init_commit_log(struct rrdengine_instance *ctx) diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index 50489aeeeb..0df66304d7 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -33,9 +33,11 @@ struct transaction_commit_log { unsigned buf_size; }; +extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); +extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index f41d3959da..124f2448b1 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -287,7 +287,7 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_ { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - free(pg_cache_descr->page); + freez(pg_cache_descr->page); pg_cache_descr->page = NULL; pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; pg_cache_release_pages_unsafe(ctx, 1); @@ -330,7 +330,7 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) return 1; } rrdeng_page_descr_mutex_unlock(ctx, descr); - }; + } uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); /* failed to evict */ @@ -594,7 +594,7 @@ struct pg_cache_page_index * } rrdeng_page_descr_mutex_unlock(ctx, descr); - }; + } uv_rwlock_rdunlock(&page_index->lock); failed_to_reserve = 0; @@ -767,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id) assert(0 == uv_rwlock_init(&page_index->lock)); page_index->oldest_time = INVALID_TIME; page_index->latest_time = INVALID_TIME; + page_index->prev = NULL; return page_index; } @@ -776,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx) struct page_cache *pg_cache = &ctx->pg_cache; pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; + pg_cache->metrics_index.last_page_index = NULL; assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); } @@ -809,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx) init_metrics_index(ctx); init_replaceQ(ctx); init_commited_page_index(ctx); +} + +void free_page_cache(struct rrdengine_instance *ctx) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + Word_t ret_Judy, bytes_freed = 0; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index, *prev_page_index; + Word_t Index; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + + /* Free commited page index */ + ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0); + assert(NULL == pg_cache->commited_page_index.JudyL_array); + bytes_freed += ret_Judy; + + for (page_index = pg_cache->metrics_index.last_page_index ; + page_index != NULL ; + page_index = prev_page_index) { + prev_page_index = page_index->prev; + + /* Find first page in range */ + Index = (Word_t) 0; + PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); + if (likely(NULL != PValue)) { + descr = *PValue; + } + while (descr != NULL) { + /* Iterate all page descriptors of this metric */ + + if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { + /* Check rrdenglocking.c */ + pg_cache_descr = descr->pg_cache_descr; + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { + freez(pg_cache_descr->page); + bytes_freed += RRDENG_BLOCK_SIZE; + } + rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); + bytes_freed += sizeof(*pg_cache_descr); + } + freez(descr); + bytes_freed += sizeof(*descr); + + PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); + descr = unlikely(NULL == PValue) ? NULL : *PValue; + } + + /* Free page index */ + ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0); + assert(NULL == page_index->JudyL_array); + bytes_freed += ret_Judy; + freez(page_index); + bytes_freed += sizeof(*page_index); + } + /* Free metrics index */ + ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); + assert(NULL == pg_cache->metrics_index.JudyHS_array); + bytes_freed += ret_Judy; + + info("Freed %lu bytes of memory from page cache.", bytes_freed); }
\ No newline at end of file diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 0447d14e69..b5670f82a1 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -84,12 +84,15 @@ struct pg_cache_page_index { * It's also written by the data deletion workqueue when data collection is disabled for this metric. */ usec_t latest_time; + + struct pg_cache_page_index *prev; }; /* maps UUIDs to page indices */ struct pg_cache_metrics_index { uv_rwlock_t lock; Pvoid_t JudyHS_array; + struct pg_cache_page_index *last_page_index; }; /* gathers dirty pages to be written on disk */ @@ -153,6 +156,7 @@ extern struct rrdeng_page_descr * usec_t point_in_time); extern struct pg_cache_page_index *create_page_index(uuid_t *id); extern void init_page_cache(struct rrdengine_instance *ctx); +extern void free_page_cache(struct rrdengine_instance *ctx); extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 43922caaf5..0f2dceaa46 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -3,6 +3,10 @@ #include "rrdengine.h" +rrdeng_stats_t global_io_errors = 0; +rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; + void sanity_check(void) { /* Magic numbers must fit in the super-blocks */ @@ -33,7 +37,6 @@ void read_extent_cb(uv_fs_t* req) unsigned i, j, count; void *page, *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length; - struct rrdengine_datafile *datafile; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; @@ -55,9 +58,13 @@ void read_extent_cb(uv_fs_t* req) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); ret = crc32cmp(trailer->checksum, crc); - datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, - xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, + xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + } +#endif if (unlikely(ret)) { /* TODO: handle errors */ exit(UV_EIO); @@ -112,14 +119,14 @@ void read_extent_cb(uv_fs_t* req) rrdeng_page_descr_mutex_unlock(ctx, descr); } if (RRD_NO_COMPRESSION != header->compression_algorithm) { - free(uncompressed_buf); + freez(uncompressed_buf); } if (xt_io_descr->completion) complete(xt_io_descr->completion); cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); - free(xt_io_descr); + freez(xt_io_descr); } @@ -144,7 +151,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc, ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); - /* free(xt_io_descr); + /* freez(xt_io_descr); return;*/ } for (i = 0 ; i < count; ++i) { @@ -233,7 +240,6 @@ void flush_pages_cb(uv_fs_t* req) struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; - struct rrdengine_datafile *datafile; int ret; unsigned i, count; Word_t commit_id; @@ -243,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req) error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); goto cleanup; } - datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", - __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); - +#ifdef NETDATA_INTERNAL_CHECKS + { + struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; + debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", + __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + } +#endif count = xt_io_descr->descr_count; for (i = 0 ; i < count ; ++i) { /* care, we don't hold the descriptor mutex */ @@ -273,7 +282,7 @@ void flush_pages_cb(uv_fs_t* req) cleanup: uv_fs_req_cleanup(req); free(xt_io_descr->buf); - free(xt_io_descr); + freez(xt_io_descr); } /* @@ -353,7 +362,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("posix_memalign:%s", strerror(ret)); - /* free(xt_io_descr);*/ + /* freez(xt_io_descr);*/ } (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); xt_io_descr->descr_count = count; @@ -405,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct ctx->stats.after_compress_bytes += compressed_size; debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - free(compressed_buf); |