diff options
author | Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> | 2019-05-15 08:28:06 +0300 |
---|---|---|
committer | Paul Emm. Katsoulakis <34388743+paulkatsoulakis@users.noreply.github.com> | 2019-05-15 08:28:06 +0300 |
commit | 6ca6d840dd19d5d7e9bacf93e011803ea5861496 (patch) | |
tree | f20393187806d642f94eab87f87180440089fb0a /database/engine | |
parent | fed63b6e99dd70beb2cf9ccadd7c396aa05b2ae0 (diff) |
Database engine (#5282)
* Database engine prototype version 0
* Database engine initial integration with netdata POC
* Scalable database engine with file and memory management.
* Database engine integration with netdata
* Added MIN MAX definitions to fix alpine build of travis CI
* Bugfix for backends and new DB engine, remove useless rrdset_time2slot() calls and erroneous checks
* DB engine disk protocol correction
* Moved DB engine storage file location to /var/cache/netdata/{host}/dbengine
* Fix configure to require openSSL for DB engine
* Fix netdata daemon health not holding read lock when iterating chart dimensions
* Optimized query API for new DB engine and old netdata DB fallback code-path
* netdata database internal query API improvements and cleanup
* Bugfix for DB engine queries returning empty values
* Added netdata internal check for data queries for old and new DB
* Added statistics to DB engine and fixed memory corruption bug
* Added preliminary charts for DB engine statistics
* Changed DB engine ratio statistics to incremental
* Added netdata statistics charts for DB engine internal statistics
* Fix for netdata not compiling successfully when missing dbengine dependencies
* Added DB engine functional test to netdata unittest command parameter
* Implemented DB engine dataset generator based on example.random chart
* Fix build error in CI
* Support older versions of libuv1
* Fixes segmentation fault when using multiple DB engine instances concurrently
* Fix memory corruption bug
* Fixed createdataset advanced option not exiting
* Fix for DB engine not working on FreeBSD
* Support FreeBSD library paths of new dependencies
* Workaround for unsupported O_DIRECT in OS X
* Fix unittest crashing during cleanup
* Disable DB engine FS caching in Apple OS X since O_DIRECT is not available
* Fix segfault when unittest and DB engine dataset generator don't have permissions to create temporary host
* Modified DB engine dataset generator to create multiple files
* Toned down overzealous page cache prefetcher
* Reduce internal memory fragmentation for page-cache data pages
* Added documentation describing the DB engine
* Documentation bugfixes
* Fixed unit tests compilation errors since last rebase
* Added note to back-up the DB engine files in documentation
* Added codacy fix.
* Support old gcc versions for atomic counters in DB engine
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/Makefile.am | 8 | ||||
-rw-r--r-- | database/engine/README.md | 109 | ||||
-rw-r--r-- | database/engine/datafile.c | 335 | ||||
-rw-r--r-- | database/engine/datafile.h | 63 | ||||
-rw-r--r-- | database/engine/journalfile.c | 462 | ||||
-rw-r--r-- | database/engine/journalfile.h | 46 | ||||
-rw-r--r-- | database/engine/pagecache.c | 785 | ||||
-rw-r--r-- | database/engine/pagecache.h | 132 | ||||
-rw-r--r-- | database/engine/rrddiskprotocol.h | 119 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 780 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 171 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 484 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 37 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 116 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 84 |
15 files changed, 3731 insertions, 0 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am new file mode 100644 index 0000000000..19554bed8e --- /dev/null +++ b/database/engine/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/database/engine/README.md b/database/engine/README.md new file mode 100644 index 0000000000..28a2528cb3 --- /dev/null +++ b/database/engine/README.md @@ -0,0 +1,109 @@ +# Database engine + +The Database Engine works like a traditional +database. There is some amount of RAM dedicated to data caching and indexing and the rest of +the data reside compressed on disk. The number of history entries is not fixed in this case, +but depends on the configured disk space and the effective compression ratio of the data stored. + +## Files + +With the DB engine memory mode the metric data are stored in database files. These files are +organized in pairs, the datafiles and their corresponding journalfiles, e.g.: + +``` +datafile-1-0000000001.ndf +journalfile-1-0000000001.njf +datafile-1-0000000002.ndf +journalfile-1-0000000002.njf +datafile-1-0000000003.ndf +journalfile-1-0000000003.njf +... +``` + +They are located under their host's cache directory in the directory `./dbengine` +(e.g. for localhost the default location is `/var/cache/netdata/dbengine/*`). The higher +numbered filenames contain more recent metric data. The user can safely delete some pairs +of files when netdata is stopped to manually free up some space. + +*Users should* **back up** *their `./dbengine` folders if they consider this data to be important.* + +## Configuration + +There is one DB engine instance per netdata host/node. That is, there is one `./dbengine` folder +per node, and all charts of `dbengine` memory mode in such a host share the same storage space +and DB engine instance memory state. You can select the memory mode for localhost by editing +netdata.conf and setting: + +``` +[global] + memory mode = dbengine +``` + +For setting the memory mode for the rest of the nodes you should look at +[streaming](../../streaming/). + +The `history` configuration option is meaningless for `memory mode = dbengine` and is ignored +for any metrics being stored in the DB engine. + +All DB engine instances, for localhost and all other streaming recipient nodes inherit their +configuration from `netdata.conf`: + +``` +[global] + page cache size = 32 + dbengine disk space = 256 +``` + +The above values are the default and minimum values for Page Cache size and DB engine disk space +quota. Both numbers are in **MiB**. All DB engine instances will allocate the configured resources +separately. + +The `page cache size` option determines the amount of RAM in **MiB** that is dedicated to caching +netdata metric values themselves. + +The `dbengine disk space` option determines the amount of disk space in **MiB** that is dedicated +to storing netdata metric values and all related metadata describing them. + +## Operation + +The DB engine stores chart metric values in 4096-byte pages in memory. Each chart dimension gets +its own page to store consecutive values generated from the data collectors. Those pages comprise +the **Page Cache**. + +When those pages fill up they are slowly compressed and flushed to disk. +It can take `4096 / 4 = 1024 seconds = 17 minutes`, for a chart dimension that is being collected +every 1 second, to fill a page. Pages can be cut short when we stop netdata or the DB engine +instance so as to not lose the data. When we query the DB engine for data we trigger disk read +I/O requests that fill the Page Cache with the requested pages and potentially evict cold +(not recently used) pages. + +When the disk quota is exceeded the oldest values are removed from the DB engine at real time, by +automatically deleting the oldest datafile and journalfile pair. Any corresponding pages residing +in the Page Cache will also be invalidated and removed. The DB engine logic will try to maintain +between 10 and 20 file pairs at any point in time. + +The Database Engine uses direct I/O to avoid polluting the OS filesystem caches and does not +generate excessive I/O traffic so as to create the minimum possible interference with other +applications. + +## Memory requirements + +Using memory mode `dbengine` we can overcome most memory restrictions and store a dataset that +is much larger than the available memory. + +There are explicit memory requirements **per** DB engine **instance**, meaning **per** netdata +**node** (e.g. localhost and streaming recipient nodes): + +- `page cache size` must be at least `#dimensions-being-collected x 4096 x 2` bytes. + +- an additional `#pages-on-disk x 4096 x 0.06` bytes of RAM are allocated for metadata. + + - roughly speaking this is 6% of the uncompressed disk space taken by the DB files. + + - for very highly compressible data (compression ratio > 90%) this RAM overhead + is comparable to the disk space footprint. + +An important observation is that RAM usage depends on both the `page cache size` and the +`dbengine disk space` options. + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fdatabase%2Fengine%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]() diff --git a/database/engine/datafile.c b/database/engine/datafile.c new file mode 100644 index 0000000000..2d17d05e42 --- /dev/null +++ b/database/engine/datafile.c @@ -0,0 +1,335 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +void df_extent_insert(struct extent_info *extent) +{ + struct rrdengine_datafile *datafile = extent->datafile; + + if (likely(NULL != datafile->extents.last)) { + datafile->extents.last->next = extent; + } + if (unlikely(NULL == datafile->extents.first)) { + datafile->extents.first = extent; + } + datafile->extents.last = extent; +} + +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + if (likely(NULL != ctx->datafiles.last)) { + ctx->datafiles.last->next = datafile; + } + if (unlikely(NULL == ctx->datafiles.first)) { + ctx->datafiles.first = datafile; + } + ctx->datafiles.last = datafile; +} + +void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + struct rrdengine_datafile *next; + + next = datafile->next; + assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); + ctx->datafiles.first = next; +} + + +static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx, + unsigned tier, unsigned fileno) +{ + assert(tier == 1); + datafile->tier = tier; + datafile->fileno = fileno; + datafile->file = (uv_file)0; + datafile->pos = 0; + datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ + datafile->journalfile = NULL; + datafile->next = NULL; + datafile->ctx = ctx; +} + +static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +int destroy_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret, fd; + char path[1024]; + + ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); + if (ret < 0) { + fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + fatal("uv_fs_close: %s", uv_strerror(ret)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + generate_datafilepath(datafile, path, sizeof(path)); + fd = uv_fs_unlink(NULL, &req, path, NULL); + if (fd < 0) { + fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ++ctx->stats.datafile_deletions; + + return 0; +} + +int create_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + char path[1024]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, + S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ); + superblock->tier = 1; + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal("uv_fs_write: %s", uv_strerror(ret)); + } + if (req.result < 0) { + fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + } + uv_fs_req_cleanup(&req); + free(superblock); + + datafile->file = file; + datafile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + ++ctx->stats.datafile_creations; + + return 0; +} + +static int check_data_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || + superblock->tier != 1) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + free(superblock); + return ret; +} + +static int load_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + uint64_t file_size; + char path[1024]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + /* if (UV_ENOENT != fd) */ + error("uv_fs_fsopen: %s", uv_strerror(fd)); + uv_fs_req_cleanup(&req); + return fd; + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + info("Initializing data file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_CEILING(file_size); + + ret = check_data_file_superblock(file); + if (ret) + goto error; + ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb); + ++ctx->stats.io_read_requests; + + datafile->file = file; + datafile->pos = file_size; + + info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + return 0; + + error: + (void) uv_fs_close(NULL, &req, file, NULL); + uv_fs_req_cleanup(&req); + return ret; +} + +static int scan_data_files_cmp(const void *a, const void *b) +{ + struct rrdengine_datafile *file1, *file2; + char path1[1024], path2[1024]; + + file1 = *(struct rrdengine_datafile **)a; + file2 = *(struct rrdengine_datafile **)b; + generate_datafilepath(file1, path1, sizeof(path1)); + generate_datafilepath(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of datafiles that were loaded */ +static int scan_data_files(struct rrdengine_instance *ctx) +{ + int ret; + unsigned tier, no, matched_files, i,failed_to_load; + static uv_fs_t req; + uv_dirent_t dent; + struct rrdengine_datafile **datafiles, *datafile; + struct rrdengine_journalfile *journalfile; + + ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); + assert(ret >= 0); + assert(req.result >= 0); + info("Found %d files in path %s", ret, ctx->dbfiles_path); + + datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + info("Scanning file \"%s\"", dent.name); + ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); + if (2 == ret) { + info("Matched file \"%s\"", dent.name); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, no); + datafiles[matched_files++] = datafile; + } + } + uv_fs_req_cleanup(&req); + + if (matched_files == MAX_DATAFILES) { + error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + } + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + datafile = datafiles[i]; + ret = load_data_file(datafile); + if (0 != ret) { + free(datafile); + ++failed_to_load; + continue; + } + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = load_journal_file(ctx, journalfile, datafile); + if (0 != ret) { + free(datafile); + free(journalfile); + ++failed_to_load; + continue; + } + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; + } + if (failed_to_load) { + error("%u files failed to load.", failed_to_load); + } + free(datafiles); + + return matched_files - failed_to_load; +} + +/* Creates a datafile and a journalfile pair */ +void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +{ + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + int ret; + + info("Creating new data and journal files."); + datafile = mallocz(sizeof(*datafile)); + datafile_init(datafile, ctx, tier, fileno); + ret = create_data_file(datafile); + assert(!ret); + + journalfile = mallocz(sizeof(*journalfile)); + datafile->journalfile = journalfile; + journalfile_init(journalfile, datafile); + ret = create_journal_file(journalfile, datafile); + assert(!ret); + datafile_list_insert(ctx, datafile); + ctx->disk_space += datafile->pos + journalfile->pos; +} + +/* Page cache must already be initialized. */ +int init_data_files(struct rrdengine_instance *ctx) +{ + int ret; + + ret = scan_data_files(ctx); + if (0 == ret) { + info("Data files not found, creating."); + create_new_datafile_pair(ctx, 1, 1); + } + return 0; +}
\ No newline at end of file diff --git a/database/engine/datafile.h b/database/engine/datafile.h new file mode 100644 index 0000000000..c5c8f31f3f --- /dev/null +++ b/database/engine/datafile.h @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DATAFILE_H +#define NETDATA_DATAFILE_H + +#include "rrdengine.h" + +/* Forward declarations */ +struct rrdengine_datafile; +struct rrdengine_journalfile; +struct rrdengine_instance; + +#define DATAFILE_PREFIX "datafile-" +#define DATAFILE_EXTENSION ".ndf" + +#define MAX_DATAFILE_SIZE (1073741824LU) +#define MIN_DATAFILE_SIZE (16777216LU) +#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ +#define TARGET_DATAFILES (20) + +#define DATAFILE_IDEAL_IO_SIZE (1048576U) + +struct extent_info { + uint64_t offset; + uint32_t size; + uint8_t number_of_pages; + struct rrdengine_datafile *datafile; + struct extent_info *next; + struct rrdeng_page_cache_descr *pages[]; +}; + +struct rrdengine_df_extents { + /* the extent list is sorted based on disk offset */ + struct extent_info *first; + struct extent_info *last; +}; + +/* only one event loop is supported for now */ +struct rrdengine_datafile { + unsigned tier; + unsigned fileno; + uv_file file; + uint64_t pos; + struct rrdengine_instance *ctx; + struct rrdengine_df_extents extents; + struct rrdengine_journalfile *journalfile; + struct rrdengine_datafile *next; +}; + +struct rrdengine_datafile_list { + struct rrdengine_datafile *first; /* oldest */ + struct rrdengine_datafile *last; /* newest */ +}; + +extern void df_extent_insert(struct extent_info *extent); +extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +extern int destroy_data_file(struct rrdengine_datafile *datafile); +extern int create_data_file(struct rrdengine_datafile *datafile); +extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno); +extern int init_data_files(struct rrdengine_instance *ctx); + +#endif /* NETDATA_DATAFILE_H */
\ No newline at end of file diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c new file mode 100644 index 0000000000..44d8461dbb --- /dev/null +++ b/database/engine/journalfile.c @@ -0,0 +1,462 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +static void flush_transaction_buffer_cb(uv_fs_t* req) +{ + struct generic_io_descriptor *io_descr; + + debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + if (req->result < 0) { + fatal("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + } + io_descr = req->data; + + uv_fs_req_cleanup(req); + free(io_descr->buf); + free(io_descr); +} + +/* Careful to always call this before creating a new journal file */ +void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + struct generic_io_descriptor *io_descr; + unsigned pos, size; + struct rrdengine_journalfile *journalfile; + + if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) { + return; + } + /* care with outstanding transactions when switching journal files */ + journalfile = ctx->datafiles.last->journalfile; + + io_descr = mallocz(sizeof(*io_descr)); + pos = ctx->commit_log.buf_pos; + size = ctx->commit_log.buf_size; + if (pos < size) { + /* simulate an empty transaction to skip the rest of the block */ + *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING; + } + io_descr->buf = ctx->commit_log.buf; + io_descr->bytes = size; + io_descr->pos = journalfile->pos; + io_descr->req.data = io_descr; + io_descr->completion = NULL; + + io_descr->iov = uv_buf_init((void *)io_descr->buf, size); + ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, + journalfile->pos, flush_transaction_buffer_cb); + assert (-1 != ret); + journalfile->pos += RRDENG_BLOCK_SIZE; + ctx->disk_space += RRDENG_BLOCK_SIZE; + ctx->commit_log.buf = NULL; + ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE; + ++ctx->stats.io_write_requests; +} + +void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size) +{ + struct rrdengine_instance *ctx = wc->ctx; + int ret; + unsigned buf_pos, buf_size; + + assert(size); + if (ctx->commit_log.buf) { + unsigned remaining; + + buf_pos = ctx->commit_log.buf_pos; + buf_size = ctx->commit_log.buf_size; + remaining = buf_size - buf_pos; + if (size > remaining) { + /* we need a new buffer */ + wal_flush_transaction_buffer(wc); + } + } + if (NULL == ctx->commit_log.buf) { + buf_size = ALIGN_BYTES_CEILING(size); + ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + buf_pos = ctx->commit_log.buf_pos = 0; + ctx->commit_log.buf_size = buf_size; + } + ctx->commit_log.buf_pos += size; + + return ctx->commit_log.buf + buf_pos; +} + +static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, + datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); +} + +void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + journalfile->file = (uv_file)0; + journalfile->pos = 0; + journalfile->datafile = datafile; +} + +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret, fd; + char path[1024]; + + ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); + if (ret < 0) { + fatal("uv_fs_ftruncate: %s", uv_strerror(ret)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); + if (ret < 0) { + fatal("uv_fs_close: %s", uv_strerror(ret)); + exit(ret); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = uv_fs_unlink(NULL, &req, path, NULL); + if (fd < 0) { + fatal("uv_fs_fsunlink: %s", uv_strerror(fd)); + } + assert(0 == req.result); + uv_fs_req_cleanup(&req); + + ++ctx->stats.journalfile_deletions; + + return 0; +} + +int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + char path[1024]; + + generate_journalfilepath(datafile, path, sizeof(path)); + fd = uv_fs_open(NULL, &req, path, O_DIRECT | O_CREAT | O_RDWR | O_TRUNC, + S_IRUSR | S_IWUSR, NULL); + if (fd < 0) { + fatal("uv_fs_fsopen: %s", uv_strerror(fd)); + } + assert(req.result >= 0); + file = req.result; + uv_fs_req_cleanup(&req); +#ifdef __APPLE__ + info("Disabling OS X caching for file \"%s\".", path); + fcntl(fd, F_NOCACHE, 1); +#endif + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ); + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal("uv_fs_write: %s", uv_strerror(ret)); + } + if (req.result < 0) { + fatal("uv_fs_write: %s", uv_strerror((int)req.result)); + } + uv_fs_req_cleanup(&req); + free(superblock); + + journalfile->file = file; + journalfile->pos = sizeof(*superblock); + ctx->stats.io_write_bytes += sizeof(*superblock); + ++ctx->stats.io_write_requests; + ++ctx->stats.journalfile_creations; + + return 0; +} + +static int check_journal_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_jf_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + error("uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { + error("File has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + free(superblock); + return ret; +} + +static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, unsigned max_size) +{ + struct page_cache *pg_cache = &ctx->pg_cache; + unsigned i, count, payload_length, descr_size, valid_pages; + struct rrdeng_page_cache_descr *descr; + struct extent_info *extent; + /* persistent structures */ + struct rrdeng_jf_store_data *jf_metric_data; + + jf_metric_data = buf; + count = jf_metric_data->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + if (payload_length > max_size) { + error("Corrupted transaction payload."); + return; + } + + extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); + extent->offset = jf_metric_data->extent_offset; + extent->size = jf_metric_data->extent_size; + extent->number_of_pages = count; + extent->datafile = journalfile->datafile; + extent->next = NULL; + + for (i = 0, valid_pages = 0 ; i < count ; ++i) { + uuid_t *temp_id; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index; + + if (PAGE_METRICS != jf_metric_data->descr[i].type) { + error("Unknown page type encountered."); + continue; + } + ++valid_pages; + temp_id = (uuid_t *)jf_metric_data->descr[i].uuid; + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + /* First time we see the UUID */ + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); + assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(temp_id); + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + + descr = pg_cache_create_descr(); + descr->page_length = jf_metric_data->descr[i].page_length; + descr->start_time = jf_metric_data->descr[i].start_time |