summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2019-05-15 08:28:06 +0300
committerPaul Emm. Katsoulakis <34388743+paulkatsoulakis@users.noreply.github.com>2019-05-15 08:28:06 +0300
commit6ca6d840dd19d5d7e9bacf93e011803ea5861496 (patch)
treef20393187806d642f94eab87f87180440089fb0a /database/engine
parentfed63b6e99dd70beb2cf9ccadd7c396aa05b2ae0 (diff)
Database engine (#5282)
* Database engine prototype version 0 * Database engine initial integration with netdata POC * Scalable database engine with file and memory management. * Database engine integration with netdata * Added MIN MAX definitions to fix alpine build of travis CI * Bugfix for backends and new DB engine, remove useless rrdset_time2slot() calls and erroneous checks * DB engine disk protocol correction * Moved DB engine storage file location to /var/cache/netdata/{host}/dbengine * Fix configure to require openSSL for DB engine * Fix netdata daemon health not holding read lock when iterating chart dimensions * Optimized query API for new DB engine and old netdata DB fallback code-path * netdata database internal query API improvements and cleanup * Bugfix for DB engine queries returning empty values * Added netdata internal check for data queries for old and new DB * Added statistics to DB engine and fixed memory corruption bug * Added preliminary charts for DB engine statistics * Changed DB engine ratio statistics to incremental * Added netdata statistics charts for DB engine internal statistics * Fix for netdata not compiling successfully when missing dbengine dependencies * Added DB engine functional test to netdata unittest command parameter * Implemented DB engine dataset generator based on example.random chart * Fix build error in CI * Support older versions of libuv1 * Fixes segmentation fault when using multiple DB engine instances concurrently * Fix memory corruption bug * Fixed createdataset advanced option not exiting * Fix for DB engine not working on FreeBSD * Support FreeBSD library paths of new dependencies * Workaround for unsupported O_DIRECT in OS X * Fix unittest crashing during cleanup * Disable DB engine FS caching in Apple OS X since O_DIRECT is not available * Fix segfault when unittest and DB engine dataset generator don't have permissions to create temporary host * Modified DB engine dataset generator to create multiple files * Toned down overzealous page cache prefetcher * Reduce internal memory fragmentation for page-cache data pages * Added documentation describing the DB engine * Documentation bugfixes * Fixed unit tests compilation errors since last rebase * Added note to back-up the DB engine files in documentation * Added codacy fix. * Support old gcc versions for atomic counters in DB engine
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/Makefile.am8
-rw-r--r--database/engine/README.md109
-rw-r--r--database/engine/datafile.c335
-rw-r--r--database/engine/datafile.h63
-rw-r--r--database/engine/journalfile.c462
-rw-r--r--database/engine/journalfile.h46
-rw-r--r--database/engine/pagecache.c785
-rw-r--r--database/engine/pagecache.h132
-rw-r--r--database/engine/rrddiskprotocol.h119
-rw-r--r--database/engine/rrdengine.c780
-rw-r--r--database/engine/rrdengine.h171
-rw-r--r--database/engine/rrdengineapi.c484
-rw-r--r--database/engine/rrdengineapi.h37
-rw-r--r--database/engine/rrdenginelib.c116
-rw-r--r--database/engine/rrdenginelib.h84
15 files changed, 3731 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
new file mode 100644
index 0000000000..19554bed8e
--- /dev/null
+++ b/database/engine/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/database/engine/README.md b/database/engine/README.md
new file mode 100644
index 0000000000..28a2528cb3
--- /dev/null
+++ b/database/engine/README.md
@@ -0,0 +1,109 @@
+# Database engine
+
+The Database Engine works like a traditional
+database. There is some amount of RAM dedicated to data caching and indexing and the rest of
+the data reside compressed on disk. The number of history entries is not fixed in this case,
+but depends on the configured disk space and the effective compression ratio of the data stored.
+
+## Files
+
+With the DB engine memory mode the metric data are stored in database files. These files are
+organized in pairs, the datafiles and their corresponding journalfiles, e.g.:
+
+```
+datafile-1-0000000001.ndf
+journalfile-1-0000000001.njf
+datafile-1-0000000002.ndf
+journalfile-1-0000000002.njf
+datafile-1-0000000003.ndf
+journalfile-1-0000000003.njf
+...
+```
+
+They are located under their host's cache directory in the directory `./dbengine`
+(e.g. for localhost the default location is `/var/cache/netdata/dbengine/*`). The higher
+numbered filenames contain more recent metric data. The user can safely delete some pairs
+of files when netdata is stopped to manually free up some space.
+
+*Users should* **back up** *their `./dbengine` folders if they consider this data to be important.*
+
+## Configuration
+
+There is one DB engine instance per netdata host/node. That is, there is one `./dbengine` folder
+per node, and all charts of `dbengine` memory mode in such a host share the same storage space
+and DB engine instance memory state. You can select the memory mode for localhost by editing
+netdata.conf and setting:
+
+```
+[global]
+ memory mode = dbengine
+```
+
+For setting the memory mode for the rest of the nodes you should look at
+[streaming](../../streaming/).
+
+The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored
+for any metrics being stored in the DB engine.
+
+All DB engine instances, for localhost and all other streaming recipient nodes inherit their
+configuration from `netdata.conf`:
+
+```
+[global]
+ page cache size = 32
+ dbengine disk space = 256
+```
+
+The above values are the default and minimum values for Page Cache size and DB engine disk space
+quota. Both numbers are in **MiB**. All DB engine instances will allocate the configured resources
+separately.
+
+The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching
+netdata metric values themselves.
+
+The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated
+to storing netdata metric values and all related metadata describing them.
+
+## Operation
+
+The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets
+its own page to store consecutive values generated from the data collectors. Those pages comprise
+the **Page Cache**.
+
+When those pages fill up they are slowly compressed and flushed to disk.
+It can take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected
+every 1 second, to fill a page. Pages can be cut short when we stop netdata or the DB engine
+instance so as to not lose the data. When we query the DB engine for data we trigger disk read
+I/O requests that fill the Page Cache with the requested pages and potentially evict cold
+(not recently used) pages.
+
+When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by
+automatically deleting the oldest datafile and journalfile pair. Any corresponding pages residing
+in the Page Cache will also be invalidated and removed. The DB engine logic will try to maintain
+between 10 and 20 file pairs at any point in time.
+
+The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not
+generate excessive I/O traffic so as to create the minimum possible interference with other
+applications.
+
+## Memory requirements
+
+Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that
+is much larger than the available memory.
+
+There are explicit memory requirements **per** DB engine **instance**, meaning **per** netdata
+**node** (e.g. localhost and streaming recipient nodes):
+
+- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes.
+
+- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata.
+
+ - roughly speaking this is 6% of the uncompressed disk space taken by the DB files.
+
+ - for very highly compressible data (compression ratio > 90%) this RAM overhead
+ is comparable to the disk space footprint.
+
+An important observation is that RAM usage depends on both the `page cache size` and the
+`dbengine disk space` options.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
new file mode 100644
index 0000000000..2d17d05e42
--- /dev/null
+++ b/database/engine/datafile.c
@@ -0,0 +1,335 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+void df_extent_insert(struct extent_info *extent)
+{
+ struct rrdengine_datafile *datafile = extent->datafile;
+
+ if (likely(NULL != datafile->extents.last)) {
+ datafile->extents.last->next = extent;
+ }
+ if (unlikely(NULL == datafile->extents.first)) {
+ datafile->extents.first = extent;
+ }
+ datafile->extents.last = extent;
+}
+
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ if (likely(NULL != ctx->datafiles.last)) {
+ ctx->datafiles.last->next = datafile;
+ }
+ if (unlikely(NULL == ctx->datafiles.first)) {
+ ctx->datafiles.first = datafile;
+ }
+ ctx->datafiles.last = datafile;
+}
+
+void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_datafile *next;
+
+ next = datafile->next;
+ assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
+ ctx->datafiles.first = next;
+}
+
+
+static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx,
+ unsigned tier, unsigned fileno)
+{
+ assert(tier == 1);
+ datafile->tier = tier;
+ datafile->fileno = fileno;
+ datafile->file = (uv_file)0;
+ datafile->pos = 0;
+ datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ datafile->journalfile = NULL;
+ datafile->next = NULL;
+ datafile->ctx = ctx;
+}
+
+static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+int destroy_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, datafile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.datafile_deletions;
+
+ return 0;
+}
+
+int create_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ);
+ superblock->tier = 1;
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ datafile->file = file;
+ datafile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.datafile_creations;
+
+ return 0;
+}
+
+static int check_data_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_df_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
+ superblock->tier != 1) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static int load_data_file(struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ uint64_t file_size;
+ char path[1024];
+
+ generate_datafilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ /* if (UV_ENOENT != fd) */
+ error("uv_fs_fsopen: %s", uv_strerror(fd));
+ uv_fs_req_cleanup(&req);
+ return fd;
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+ info("Initializing data file \"%s\".", path);
+
+ ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
+ if (ret)
+ goto error;
+ file_size = ALIGN_BYTES_CEILING(file_size);
+
+ ret = check_data_file_superblock(file);
+ if (ret)
+ goto error;
+ ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
+ ++ctx->stats.io_read_requests;
+
+ datafile->file = file;
+ datafile->pos = file_size;
+
+ info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ return 0;
+
+ error:
+ (void) uv_fs_close(NULL, &req, file, NULL);
+ uv_fs_req_cleanup(&req);
+ return ret;
+}
+
+static int scan_data_files_cmp(const void *a, const void *b)
+{
+ struct rrdengine_datafile *file1, *file2;
+ char path1[1024], path2[1024];
+
+ file1 = *(struct rrdengine_datafile **)a;
+ file2 = *(struct rrdengine_datafile **)b;
+ generate_datafilepath(file1, path1, sizeof(path1));
+ generate_datafilepath(file2, path2, sizeof(path2));
+ return strcmp(path1, path2);
+}
+
+/* Returns number of datafiles that were loaded */
+static int scan_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+ unsigned tier, no, matched_files, i,failed_to_load;
+ static uv_fs_t req;
+ uv_dirent_t dent;
+ struct rrdengine_datafile **datafiles, *datafile;
+ struct rrdengine_journalfile *journalfile;
+
+ ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ assert(ret >= 0);
+ assert(req.result >= 0);
+ info("Found %d files in path %s", ret, ctx->dbfiles_path);
+
+ datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
+ for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
+ info("Scanning file \"%s\"", dent.name);
+ ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
+ if (2 == ret) {
+ info("Matched file \"%s\"", dent.name);
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, no);
+ datafiles[matched_files++] = datafile;
+ }
+ }
+ uv_fs_req_cleanup(&req);
+
+ if (matched_files == MAX_DATAFILES) {
+ error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+ }
+ qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
+ for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
+ datafile = datafiles[i];
+ ret = load_data_file(datafile);
+ if (0 != ret) {
+ free(datafile);
+ ++failed_to_load;
+ continue;
+ }
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = load_journal_file(ctx, journalfile, datafile);
+ if (0 != ret) {
+ free(datafile);
+ free(journalfile);
+ ++failed_to_load;
+ continue;
+ }
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+ }
+ if (failed_to_load) {
+ error("%u files failed to load.", failed_to_load);
+ }
+ free(datafiles);
+
+ return matched_files - failed_to_load;
+}
+
+/* Creates a datafile and a journalfile pair */
+void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
+{
+ struct rrdengine_datafile *datafile;
+ struct rrdengine_journalfile *journalfile;
+ int ret;
+
+ info("Creating new data and journal files.");
+ datafile = mallocz(sizeof(*datafile));
+ datafile_init(datafile, ctx, tier, fileno);
+ ret = create_data_file(datafile);
+ assert(!ret);
+
+ journalfile = mallocz(sizeof(*journalfile));
+ datafile->journalfile = journalfile;
+ journalfile_init(journalfile, datafile);
+ ret = create_journal_file(journalfile, datafile);
+ assert(!ret);
+ datafile_list_insert(ctx, datafile);
+ ctx->disk_space += datafile->pos + journalfile->pos;
+}
+
+/* Page cache must already be initialized. */
+int init_data_files(struct rrdengine_instance *ctx)
+{
+ int ret;
+
+ ret = scan_data_files(ctx);
+ if (0 == ret) {
+ info("Data files not found, creating.");
+ create_new_datafile_pair(ctx, 1, 1);
+ }
+ return 0;
+} \ No newline at end of file
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
new file mode 100644
index 0000000000..c5c8f31f3f
--- /dev/null
+++ b/database/engine/datafile.h
@@ -0,0 +1,63 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DATAFILE_H
+#define NETDATA_DATAFILE_H
+
+#include "rrdengine.h"
+
+/* Forward declarations */
+struct rrdengine_datafile;
+struct rrdengine_journalfile;
+struct rrdengine_instance;
+
+#define DATAFILE_PREFIX "datafile-"
+#define DATAFILE_EXTENSION ".ndf"
+
+#define MAX_DATAFILE_SIZE (1073741824LU)
+#define MIN_DATAFILE_SIZE (16777216LU)
+#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define TARGET_DATAFILES (20)
+
+#define DATAFILE_IDEAL_IO_SIZE (1048576U)
+
+struct extent_info {
+ uint64_t offset;
+ uint32_t size;
+ uint8_t number_of_pages;
+ struct rrdengine_datafile *datafile;
+ struct extent_info *next;
+ struct rrdeng_page_cache_descr *pages[];
+};
+
+struct rrdengine_df_extents {
+ /* the extent list is sorted based on disk offset */
+ struct extent_info *first;
+ struct extent_info *last;
+};
+
+/* only one event loop is supported for now */
+struct rrdengine_datafile {
+ unsigned tier;
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ struct rrdengine_instance *ctx;
+ struct rrdengine_df_extents extents;
+ struct rrdengine_journalfile *journalfile;
+ struct rrdengine_datafile *next;
+};
+
+struct rrdengine_datafile_list {
+ struct rrdengine_datafile *first; /* oldest */
+ struct rrdengine_datafile *last; /* newest */
+};
+
+extern void df_extent_insert(struct extent_info *extent);
+extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+extern int destroy_data_file(struct rrdengine_datafile *datafile);
+extern int create_data_file(struct rrdengine_datafile *datafile);
+extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
+extern int init_data_files(struct rrdengine_instance *ctx);
+
+#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
new file mode 100644
index 0000000000..44d8461dbb
--- /dev/null
+++ b/database/engine/journalfile.c
@@ -0,0 +1,462 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "rrdengine.h"
+
+static void flush_transaction_buffer_cb(uv_fs_t* req)
+{
+ struct generic_io_descriptor *io_descr;
+
+ debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ if (req->result < 0) {
+ fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ }
+ io_descr = req->data;
+
+ uv_fs_req_cleanup(req);
+ free(io_descr->buf);
+ free(io_descr);
+}
+
+/* Careful to always call this before creating a new journal file */
+void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ struct generic_io_descriptor *io_descr;
+ unsigned pos, size;
+ struct rrdengine_journalfile *journalfile;
+
+ if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
+ return;
+ }
+ /* care with outstanding transactions when switching journal files */
+ journalfile = ctx->datafiles.last->journalfile;
+
+ io_descr = mallocz(sizeof(*io_descr));
+ pos = ctx->commit_log.buf_pos;
+ size = ctx->commit_log.buf_size;
+ if (pos < size) {
+ /* simulate an empty transaction to skip the rest of the block */
+ *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ }
+ io_descr->buf = ctx->commit_log.buf;
+ io_descr->bytes = size;
+ io_descr->pos = journalfile->pos;
+ io_descr->req.data = io_descr;
+ io_descr->completion = NULL;
+
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
+ ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ journalfile->pos, flush_transaction_buffer_cb);
+ assert (-1 != ret);
+ journalfile->pos += RRDENG_BLOCK_SIZE;
+ ctx->disk_space += RRDENG_BLOCK_SIZE;
+ ctx->commit_log.buf = NULL;
+ ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
+ ++ctx->stats.io_write_requests;
+}
+
+void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ int ret;
+ unsigned buf_pos, buf_size;
+
+ assert(size);
+ if (ctx->commit_log.buf) {
+ unsigned remaining;
+
+ buf_pos = ctx->commit_log.buf_pos;
+ buf_size = ctx->commit_log.buf_size;
+ remaining = buf_size - buf_pos;
+ if (size > remaining) {
+ /* we need a new buffer */
+ wal_flush_transaction_buffer(wc);
+ }
+ }
+ if (NULL == ctx->commit_log.buf) {
+ buf_size = ALIGN_BYTES_CEILING(size);
+ ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ buf_pos = ctx->commit_log.buf_pos = 0;
+ ctx->commit_log.buf_size = buf_size;
+ }
+ ctx->commit_log.buf_pos += size;
+
+ return ctx->commit_log.buf + buf_pos;
+}
+
+static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
+void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ journalfile->file = (uv_file)0;
+ journalfile->pos = 0;
+ journalfile->datafile = datafile;
+}
+
+int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ int ret, fd;
+ char path[1024];
+
+ ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_close: %s", uv_strerror(ret));
+ exit(ret);
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_unlink(NULL, &req, path, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
+ }
+ assert(0 == req.result);
+ uv_fs_req_cleanup(&req);
+
+ ++ctx->stats.journalfile_deletions;
+
+ return 0;
+}
+
+int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ uv_fs_t req;
+ uv_file file;
+ int ret, fd;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ char path[1024];
+
+ generate_journalfilepath(datafile, path, sizeof(path));
+ fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR, NULL);
+ if (fd < 0) {
+ fatal("uv_fs_fsopen: %s", uv_strerror(fd));
+ }
+ assert(req.result >= 0);
+ file = req.result;
+ uv_fs_req_cleanup(&req);
+#ifdef __APPLE__
+ info("Disabling OS X caching for file \"%s\".", path);
+ fcntl(fd, F_NOCACHE, 1);
+#endif
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
+ (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
+
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ fatal("uv_fs_write: %s", uv_strerror(ret));
+ }
+ if (req.result < 0) {
+ fatal("uv_fs_write: %s", uv_strerror((int)req.result));
+ }
+ uv_fs_req_cleanup(&req);
+ free(superblock);
+
+ journalfile->file = file;
+ journalfile->pos = sizeof(*superblock);
+ ctx->stats.io_write_bytes += sizeof(*superblock);
+ ++ctx->stats.io_write_requests;
+ ++ctx->stats.journalfile_creations;
+
+ return 0;
+}
+
+static int check_journal_file_superblock(uv_file file)
+{
+ int ret;
+ struct rrdeng_jf_sb *superblock;
+ uv_buf_t iov;
+ uv_fs_t req;
+
+ ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ iov = uv_buf_init((void *)superblock, sizeof(*superblock));
+
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
+ if (ret < 0) {
+ error("uv_fs_read: %s", uv_strerror(ret));
+ uv_fs_req_cleanup(&req);
+ goto error;
+ }
+ assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+
+ if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
+ strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
+ error("File has invalid superblock.");
+ ret = UV_EINVAL;
+ } else {
+ ret = 0;
+ }
+ error:
+ free(superblock);
+ return ret;
+}
+
+static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, unsigned max_size)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned i, count, payload_length, descr_size, valid_pages;
+ struct rrdeng_page_cache_descr *descr;
+ struct extent_info *extent;
+ /* persistent structures */
+ struct rrdeng_jf_store_data *jf_metric_data;
+
+ jf_metric_data = buf;
+ count = jf_metric_data->number_of_pages;
+ descr_size = sizeof(*jf_metric_data->descr) * count;
+ payload_length = sizeof(*jf_metric_data) + descr_size;
+ if (payload_length > max_size) {
+ error("Corrupted transaction payload.");
+ return;
+ }
+
+ extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
+ extent->offset = jf_metric_data->extent_offset;
+ extent->size = jf_metric_data->extent_size;
+ extent->number_of_pages = count;
+ extent->datafile = journalfile->datafile;
+ extent->next = NULL;
+
+ for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ uuid_t *temp_id;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ if (PAGE_METRICS != jf_metric_data->descr[i].type) {
+ error("Unknown page type encountered.");
+ continue;
+ }
+ ++valid_pages;
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ /* First time we see the UUID */
+ uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
+ assert(NULL == *PValue); /* TODO: figure out concurrency model */
+ *PValue = page_index = create_page_index(temp_id);
+ uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
+ }
+
+ descr = pg_cache_create_descr();
+ descr->page_length = jf_metric_data->descr[i].page_length;
+ descr->start_time = jf_metric_data->descr[i].start_time