summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configs.signatures1
-rw-r--r--daemon/global_statistics.c71
-rw-r--r--database/engine/datafile.c204
-rw-r--r--database/engine/datafile.h5
-rw-r--r--database/engine/journalfile.c95
-rw-r--r--database/engine/journalfile.h2
-rw-r--r--database/engine/pagecache.c69
-rw-r--r--database/engine/pagecache.h4
-rw-r--r--database/engine/rrdengine.c156
-rw-r--r--database/engine/rrdengine.h14
-rw-r--r--database/engine/rrdengineapi.c72
-rw-r--r--database/engine/rrdengineapi.h6
-rw-r--r--database/engine/rrdenginelib.c2
-rw-r--r--database/engine/rrdenginelib.h2
-rw-r--r--database/engine/rrdenglocking.c4
-rw-r--r--health/Makefile.am1
-rw-r--r--health/health.d/dbengine.conf26
-rw-r--r--web/server/static/static-threaded.c2
18 files changed, 578 insertions, 158 deletions
diff --git a/configs.signatures b/configs.signatures
index afc8dbe56d..b0ded05e03 100644
--- a/configs.signatures
+++ b/configs.signatures
@@ -381,6 +381,7 @@ declare -A configs_signatures=(
['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf'
['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf'
['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf'
+ ['ce285c90747428ee5da4efb547418dda']='health.d/dbengine.conf'
['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf'
['80266bddd3df374923c750a6de91d120']='health.d/apache.conf'
['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf'
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index fef8eaae6e..53b7546f26 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -538,7 +538,7 @@ void global_statistics_charts(void) {
unsigned long long stats_array[RRDENG_NR_STATS];
/* get localhost's DB engine's statistics */
- rrdeng_get_28_statistics(localhost->rrdeng_ctx, stats_array);
+ rrdeng_get_33_statistics(localhost->rrdeng_ctx, stats_array);
// ----------------------------------------------------------------
@@ -749,6 +749,75 @@ void global_statistics_charts(void) {
rrddim_set_by_pointer(st_io_stats, rd_writes, (collected_number)stats_array[16]);
rrdset_done(st_io_stats);
}
+
+ // ----------------------------------------------------------------
+
+ {
+ static RRDSET *st_errors = NULL;
+ static RRDDIM *rd_fs_errors = NULL;
+ static RRDDIM *rd_io_errors = NULL;
+
+ if (unlikely(!st_errors)) {
+ st_errors = rrdset_create_localhost(
+ "netdata"
+ , "dbengine_global_errors"
+ , NULL
+ , "dbengine"
+ , NULL
+ , "NetData DB engine errors"
+ , "errors/s"
+ , "netdata"
+ , "stats"
+ , 130507
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+
+ rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ }
+ else
+ rrdset_next(st_errors);
+
+ rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]);
+ rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]);
+ rrdset_done(st_errors);
+ }
+
+ // ----------------------------------------------------------------
+
+ {
+ static RRDSET *st_fd = NULL;
+ static RRDDIM *rd_fd_current = NULL;
+ static RRDDIM *rd_fd_max = NULL;
+
+ if (unlikely(!st_fd)) {
+ st_fd = rrdset_create_localhost(
+ "netdata"
+ , "dbengine_global_file_descriptors"
+ , NULL
+ , "dbengine"
+ , NULL
+ , "NetData DB engine File Descriptors"
+ , "descriptors"
+ , "netdata"
+ , "stats"
+ , 130508
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+
+ rd_fd_current = rrddim_add(st_fd, "current", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_fd_max = rrddim_add(st_fd, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+ else
+ rrdset_next(st_fd);
+
+ rrddim_set_by_pointer(st_fd, rd_fd_current, (collected_number)stats_array[32]);
+ /* Careful here, modify this accordingly if the File-Descriptor budget ever changes */
+ rrddim_set_by_pointer(st_fd, rd_fd_max, (collected_number)rlimit_nofile.rlim_cur / 4);
+ rrdset_done(st_fd);
+ }
}
#endif
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 9262805c81..8ef4ed5990 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->ctx = ctx;
}
-static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
+int close_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
+
int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_datafilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_datafilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.datafile_deletions;
- return 0;
+ return ret;
}
int create_data_file(struct rrdengine_datafile *datafile)
@@ -97,13 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile)
int ret, fd;
struct rrdeng_df_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
+ datafile->file = file;
+ ++ctx->stats.datafile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -117,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_data_file(datafile);
+ return ret;
+ }
- datafile->file = file;
datafile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.datafile_creations;
return 0;
}
@@ -174,15 +205,15 @@ static int load_data_file(struct rrdengine_datafile *datafile)
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
info("Initializing data file \"%s\".", path);
@@ -205,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile)
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
static int scan_data_files_cmp(const void *a, const void *b)
{
struct rrdengine_datafile *file1, *file2;
- char path1[1024], path2[1024];
+ char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
file1 = *(struct rrdengine_datafile **)a;
file2 = *(struct rrdengine_datafile **)b;
@@ -222,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b)
return strcmp(path1, path2);
}
-/* Returns number of datafiles that were loaded */
+/* Returns number of datafiles that were loaded or < 0 on error */
static int scan_data_files(struct rrdengine_instance *ctx)
{
int ret;
@@ -233,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx)
struct rrdengine_journalfile *journalfile;
ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
- assert(ret >= 0);
- assert(req.result >= 0);
+ if (ret < 0) {
+ assert(req.result < 0);
+ uv_fs_req_cleanup(&req);
+ error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return ret;
+ }
info("Found %d files in path %s", ret, ctx->dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
- info("Scanning file \"%s\"", dent.name);
+ info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
- info("Matched file \"%s\"", dent.name);
+ info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -250,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
uv_fs_req_cleanup(&req);
+ if (0 == matched_files) {
+ freez(datafiles);
+ return 0;
+ }
if (matched_files == MAX_DATAFILES) {
error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
}
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ /* TODO: change this when tiering is implemented */
+ ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
datafile = datafiles[i];
ret = load_data_file(datafile);
if (0 != ret) {
- free(datafile);
+ freez(datafile);
++failed_to_load;
- continue;
+ break;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
- free(datafile);
- free(journalfile);
+ close_data_file(datafile);
+ freez(datafile);
+ freez(journalfile);
++failed_to_load;
- continue;
+ break;
}
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
}
+ freez(datafiles);
if (failed_to_load) {
- error("%u files failed to load.", failed_to_load);
+ error("%u datafiles failed to load.", failed_to_load);
+ finalize_data_files(ctx);
+ return UV_EIO;
}
- free(datafiles);
- return matched_files - failed_to_load;
+ return matched_files;
}
/* Creates a datafile and a journalfile pair */
-void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
{
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
int ret;
+ char path[RRDENG_PATH_MAX];
- info("Creating new data and journal files.");
+ info("Creating new data and journal files in path %s", ctx->dbfiles_path);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, fileno);
ret = create_data_file(datafile);
- assert(!ret);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Created data file \"%s\".", path);
+ } else {
+ goto error_after_datafile;
+ }
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = create_journal_file(journalfile, datafile);
- assert(!ret);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Created journal file \"%s\".", path);
+ } else {
+ goto error_after_journalfile;
+ }
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
+
+ return 0;
+
+error_after_journalfile:
+ destroy_data_file(datafile);
+ freez(journalfile);
+error_after_datafile:
+ freez(datafile);
+ return ret;
}
-/* Page cache must already be initialized. */
+/* Page cache must already be initialized.
+ * Return 0 on success.
+ */
int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
ret = scan_data_files(ctx);
- if (0 == ret) {
- info("Data files not found, creating.");
- create_new_datafile_pair(ctx, 1, 1);
+ if (ret < 0) {
+ error("Failed to scan path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ } else if (0 == ret) {
+ info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
+ ret = create_new_datafile_pair(ctx, 1, 1);
+ if (ret) {
+ error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ return ret;
+ }
+ ctx->last_fileno = 1;
}
+
return 0;
+}
+
+void finalize_data_files(struct rrdengine_instance *ctx)
+{
+ struct rrdengine_datafile *datafile, *next_datafile;
+ struct rrdengine_journalfile *journalfile;
+ struct extent_info *extent, *next_extent;
+
+ for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
+ journalfile = datafile->journalfile;
+ next_datafile = datafile->next;
+
+ for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
+ next_extent = extent->next;
+ freez(extent);
+ }
+ close_journal_file(journalfile, datafile);
+ close_data_file(datafile);
+ freez(journalfile);
+ freez(datafile);
+
+ }
} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index 961da49971..eeb11310b5 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -55,9 +55,12 @@ struct rrdengine_datafile_list {
extern void df_extent_insert(struct extent_info *extent);
extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
+extern int close_data_file(struct rrdengine_datafile *datafile);
extern int destroy_data_file(struct rrdengine_datafile *datafile);
extern int create_data_file(struct rrdengine_datafile *datafile);
-extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
extern int init_data_files(struct rrdengine_instance *ctx);
+extern void finalize_data_files(struct rrdengine_instance *ctx);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 9b14c914df..30eaa0ec6a 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(io_descr->buf);
- free(io_descr);
+ freez(io_descr);
}
/* Careful to always call this before creating a new journal file */
@@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
-static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
@@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->datafile = datafile;
}
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ uv_fs_req_cleanup(&req);
+
+ return ret;
+}
+
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
- int ret, fd;
- char path[1024];
+ int ret;
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
- fatal("uv_fs_close: %s", uv_strerror(ret));
- exit(ret);
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
- generate_journalfilepath(datafile, path, sizeof(path));
- fd = uv_fs_unlink(NULL, &req, path, NULL);
- if (fd < 0) {
- fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ ret = uv_fs_unlink(NULL, &req, path, NULL);
+ if (ret < 0) {
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
}
- assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
- return 0;
+ return ret;
}
int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@@ -143,13 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
int ret, fd;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ return fd;
}
+ journalfile->file = file;
+ ++ctx->stats.journalfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -162,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- fatal("uv_fs_write: %s", uv_strerror(ret));
- }
- if (req.result < 0) {
- fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ assert(req.result < 0);
+ error("uv_fs_write: %s", uv_strerror(ret));
+ ++ctx->stats.io_errors;
+ rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
+ if (ret < 0) {
+ destroy_journal_file(journalfile, datafile);
+ return ret;
+ }
- journalfile->file = file;
journalfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
- ++ctx->stats.journalfile_creations;
return 0;
}
@@ -263,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
@@ -398,15 +429,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
{
uv_fs_t req;
uv_file file;
- int ret, fd;
+ int ret, fd, error;
uint64_t file_size, max_id;
- char path[1024];
+ char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- /* if (UV_ENOENT != fd) */
- error("uv_fs_fsopen: %s", uv_strerror(fd));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
info("Loading journal file \"%s\".", path);
@@ -433,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
return 0;
error:
- (void) uv_fs_close(NULL, &req, file, NULL);
+ error = ret;
+ ret = uv_fs_close(NULL, &req, file, NULL);
+ if (ret < 0) {
+ error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
uv_fs_req_cleanup(&req);
- return ret;
+ return error;
}
void init_commit_log(struct rrdengine_instance *ctx)
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 50489aeeeb..0df66304d7 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -33,9 +33,11 @@ struct transaction_commit_log {
unsigned buf_size;
};
+extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
+extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index f41d3959da..124f2448b1 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -287,7 +287,7 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- free(pg_cache_descr->page);
+ freez(pg_cache_descr->page);
pg_cache_descr->page = NULL;
pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
pg_cache_release_pages_unsafe(ctx, 1);
@@ -330,7 +330,7 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 1;
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
- };
+ }
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
/* failed to evict */
@@ -594,7 +594,7 @@ struct pg_cache_page_index *
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
- };
+ }
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
@@ -767,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
+ page_index->prev = NULL;
return page_index;
}
@@ -776,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
+ pg_cache->metrics_index.last_page_index = NULL;
assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
@@ -809,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
init_commited_page_index(ctx);
+}
+
+void free_page_cache(struct rrdengine_instance *ctx)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ Word_t ret_Judy, bytes_freed = 0;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index, *prev_page_index;
+ Word_t Index;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+
+ /* Free commited page index */
+ ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
+ assert(NULL == pg_cache->commited_page_index.JudyL_array);
+ bytes_freed += ret_Judy;
+
+ for (page_index = pg_cache->metrics_index.last_page_index ;
+ page_index != NULL ;
+ page_index = prev_page_index) {
+ prev_page_index = page_index->prev;
+
+ /* Find first page in range */
+ Index = (Word_t) 0;
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ }
+ while (descr != NULL) {
+ /* Iterate all page descriptors of this metric */
+
+ if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ /* Check rrdenglocking.c */
+ pg_cache_descr = descr->pg_cache_descr;
+ if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
+ freez(pg_cache_descr->page);
+ bytes_freed += RRDENG_BLOCK_SIZE;
+ }
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ bytes_freed += sizeof(*pg_cache_descr);
+ }
+ freez(descr);
+ bytes_freed += sizeof(*descr);
+
+ PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ }
+
+ /* Free page index */
+ ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
+ assert(NULL == page_index->JudyL_array);
+ bytes_freed += ret_Judy;
+ freez(page_index);
+ bytes_freed += sizeof(*page_index);
+ }
+ /* Free metrics index */
+ ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
+ assert(NULL == pg_cache->metrics_index.JudyHS_array);
+ bytes_freed += ret_Judy;
+
+ info("Freed %lu bytes of memory from page cache.", bytes_freed);
} \ No newline at end of file
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 0447d14e69..b5670f82a1 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -84,12 +84,15 @@ struct pg_cache_page_index {
* It's also written by the data deletion workqueue when data collection is disabled for this metric.
*/
usec_t latest_time;
+
+ struct pg_cache_page_index *prev;
};
/* maps UUIDs to page indices */
struct pg_cache_metrics_index {
uv_rwlock_t lock;
Pvoid_t JudyHS_array;
+ struct pg_cache_page_index *last