diff options
Diffstat (limited to 'database')
-rw-r--r-- | database/engine/datafile.c | 57 | ||||
-rw-r--r-- | database/engine/datafile.h | 1 | ||||
-rw-r--r-- | database/engine/journalfile.c | 23 | ||||
-rw-r--r-- | database/engine/journalfile.h | 1 | ||||
-rw-r--r-- | database/engine/metadata_log/logfile.c | 40 | ||||
-rw-r--r-- | database/engine/metadata_log/logfile.h | 1 | ||||
-rw-r--r-- | database/engine/metadata_log/metadatalog.c | 2 |
7 files changed, 108 insertions, 17 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 01f6863d55..7a052f963d 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -75,6 +75,27 @@ int close_data_file(struct rrdengine_datafile *datafile) return ret; } +int unlink_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return ret; +} int destroy_data_file(struct rrdengine_datafile *datafile) { @@ -305,33 +326,47 @@ static int scan_data_files(struct rrdengine_instance *ctx) ctx->last_fileno = datafiles[matched_files - 1]->fileno; for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + uint8_t must_delete_pair = 0; + datafile = datafiles[i]; ret = load_data_file(datafile); if (0 != ret) { - freez(datafile); - ++failed_to_load; - break; + must_delete_pair = 1; } journalfile = mallocz(sizeof(*journalfile)); datafile->journalfile = journalfile; journalfile_init(journalfile, datafile); ret = load_journal_file(ctx, journalfile, datafile); if (0 != ret) { - close_data_file(datafile); - freez(datafile); + if (!must_delete_pair) /* If datafile is still open close it */ + close_data_file(datafile); + must_delete_pair = 1; + } + if (must_delete_pair) { + char path[RRDENG_PATH_MAX]; + + error("Deleting invalid data and journal file pair."); + ret = unlink_journal_file(journalfile); + if (!ret) { + generate_journalfilepath(datafile, path, sizeof(path)); + info("Deleted journal file \"%s\".", path); + } + ret = unlink_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("Deleted data file \"%s\".", path); + } freez(journalfile); + freez(datafile); ++failed_to_load; - break; + continue; } + datafile_list_insert(ctx, datafile); ctx->disk_space += datafile->pos + journalfile->pos; } + matched_files -= failed_to_load; freez(datafiles); - if (failed_to_load) { - error("%u datafiles failed to load.", failed_to_load); - finalize_data_files(ctx); - return UV_EIO; - } return matched_files; } diff --git a/database/engine/datafile.h b/database/engine/datafile.h index 77a7ad39a8..ae94bfdd02 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -57,6 +57,7 @@ extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengin extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); extern int close_data_file(struct rrdengine_datafile *datafile); +extern int unlink_data_file(struct rrdengine_datafile *datafile); extern int destroy_data_file(struct rrdengine_datafile *datafile); extern int create_data_file(struct rrdengine_datafile *datafile); extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 0704c830f7..9fecc48ff8 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -125,6 +125,29 @@ int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengi return ret; } +int unlink_journal_file(struct rrdengine_journalfile *journalfile) +{ + struct rrdengine_datafile *datafile = journalfile->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return ret; +} + int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index 0df66304d7..f6c43cd16e 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -38,6 +38,7 @@ extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct r extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size); extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc); extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); +extern int unlink_journal_file(struct rrdengine_journalfile *journalfile); extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile); extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c index fdee93da10..08bb4eec86 100644 --- a/database/engine/metadata_log/logfile.c +++ b/database/engine/metadata_log/logfile.c @@ -248,6 +248,26 @@ int close_metadata_logfile(struct metadata_logfile *metalogfile) return ret; } +int fsync_metadata_logfile(struct metadata_logfile *metalogfile) +{ + struct metalog_instance *ctx = metalogfile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_metadata_logfile_path(metalogfile, path, sizeof(path)); + + ret = uv_fs_fsync(NULL, &req, metalogfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + + return ret; +} + int unlink_metadata_logfile(struct metadata_logfile *metalogfile) { struct metalog_instance *ctx = metalogfile->ctx; @@ -344,6 +364,15 @@ int create_metadata_logfile(struct metadata_logfile *metalogfile) rrd_stat_atomic_add(&global_io_errors, 1); } uv_fs_req_cleanup(&req); + + ret = uv_fs_fsync(NULL, &req, metalogfile->file, NULL); + if (ret < 0) { + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); + ++ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + uv_fs_req_cleanup(&req); + free(superblock); if (ret < 0) { destroy_metadata_logfile(metalogfile); @@ -695,13 +724,17 @@ static int scan_metalog_files(struct metalog_instance *ctx) metalogfile = metalogfiles[i]; ret = load_metadata_logfile(ctx, metalogfile); if (0 != ret) { + error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL + METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno); + unlink_metadata_logfile(metalogfile); freez(metalogfile); ++failed_to_load; - break; + continue; } metadata_logfile_list_insert(&ctx->metadata_logfiles, metalogfile); rrd_atomic_fetch_add(&ctx->disk_space, metalogfile->pos); } + matched_files -= failed_to_load; debug(D_METADATALOG, "PARSER ended"); parser_destroy(parser); @@ -712,11 +745,6 @@ static int scan_metalog_files(struct metalog_instance *ctx) after_failed_to_parse: freez(metalogfiles); - if (failed_to_load) { - error("%u metadata log files failed to load.", failed_to_load); - finalize_metalog_files(ctx); - return UV_EIO; - } return matched_files; } diff --git a/database/engine/metadata_log/logfile.h b/database/engine/metadata_log/logfile.h index d4e26da883..4829ff9287 100644 --- a/database/engine/metadata_log/logfile.h +++ b/database/engine/metadata_log/logfile.h @@ -83,6 +83,7 @@ extern void metadata_logfile_init(struct metadata_logfile *metadatalog, struct m extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno); extern int close_metadata_logfile(struct metadata_logfile *metadatalog); +extern int fsync_metadata_logfile(struct metadata_logfile *metalogfile); extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile); extern int destroy_metadata_logfile(struct metadata_logfile *metalogfile); extern int create_metadata_logfile(struct metadata_logfile *metalogfile); diff --git a/database/engine/metadata_log/metadatalog.c b/database/engine/metadata_log/metadatalog.c index 8c8c4b04f1..5f44f08193 100644 --- a/database/engine/metadata_log/metadatalog.c +++ b/database/engine/metadata_log/metadatalog.c @@ -134,6 +134,7 @@ void metalog_try_link_new_metadata_logfile(struct metalog_worker_config *wc) if (metalogfile->records.first) { /* it has records */ /* Finalize metadata log file and create a new one */ mlf_flush_records_buffer(wc, &ctx->records_log, &ctx->metadata_logfiles); + fsync_metadata_logfile(ctx->metadata_logfiles.last); ret = add_new_metadata_logfile(ctx, &ctx->metadata_logfiles, 0, ctx->last_fileno + 1); if (likely(!ret)) { ++ctx->last_fileno; @@ -364,6 +365,7 @@ void metalog_worker(void* arg) case METALOG_COMPACTION_FLUSH: mlf_flush_records_buffer(wc, &ctx->compaction_state.records_log, &ctx->compaction_state.new_metadata_logfiles); + fsync_metadata_logfile(ctx->compaction_state.new_metadata_logfiles.last); complete(cmd.record_io_descr.completion); break; default: |