summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c156
1 files changed, 108 insertions, 48 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 43922caaf5..0f2dceaa46 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -3,6 +3,10 @@
#include "rrdengine.h"
+rrdeng_stats_t global_io_errors = 0;
+rrdeng_stats_t global_fs_errors = 0;
+rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
+
void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
@@ -33,7 +37,6 @@ void read_extent_cb(uv_fs_t* req)
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
- struct rrdengine_datafile *datafile;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
@@ -55,9 +58,13 @@ void read_extent_cb(uv_fs_t* req)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
ret = crc32cmp(trailer->checksum, crc);
- datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
- xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
+ xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
+ }
+#endif
if (unlikely(ret)) {
/* TODO: handle errors */
exit(UV_EIO);
@@ -112,14 +119,14 @@ void read_extent_cb(uv_fs_t* req)
rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (RRD_NO_COMPRESSION != header->compression_algorithm) {
- free(uncompressed_buf);
+ freez(uncompressed_buf);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
- free(xt_io_descr);
+ freez(xt_io_descr);
}
@@ -144,7 +151,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
- /* free(xt_io_descr);
+ /* freez(xt_io_descr);
return;*/
}
for (i = 0 ; i < count; ++i) {
@@ -233,7 +240,6 @@ void flush_pages_cb(uv_fs_t* req)
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
- struct rrdengine_datafile *datafile;
int ret;
unsigned i, count;
Word_t commit_id;
@@ -243,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req)
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
goto cleanup;
}
- datafile = xt_io_descr->descr_array[0]->extent->datafile;
- debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
- __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
-
+#ifdef NETDATA_INTERNAL_CHECKS
+ {
+ struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
+ debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
+ __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ }
+#endif
count = xt_io_descr->descr_count;
for (i = 0 ; i < count ; ++i) {
/* care, we don't hold the descriptor mutex */
@@ -273,7 +282,7 @@ void flush_pages_cb(uv_fs_t* req)
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
- free(xt_io_descr);
+ freez(xt_io_descr);
}
/*
@@ -353,7 +362,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
- /* free(xt_io_descr);*/
+ /* freez(xt_io_descr);*/
}
(void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
xt_io_descr->descr_count = count;
@@ -405,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ctx->stats.after_compress_bytes += compressed_size;
debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
(void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- free(compressed_buf);
+ freez(compressed_buf);
size_bytes = payload_offset + compressed_size + sizeof(*trailer);
header->payload_length = compressed_size;
break;
@@ -443,23 +452,36 @@ static void after_delete_old_data(uv_work_t *req, int status)
struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
- unsigned bytes;
+ unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
+ int ret;
+ char path[RRDENG_PATH_MAX];
(void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
- bytes = datafile->pos + journalfile->pos;
+ datafile_bytes = datafile->pos;
+ journalfile_bytes = journalfile->pos;
+ deleted_bytes = 0;
+ info("Deleting data and journal file pair.");
datafile_list_delete(ctx, datafile);
- destroy_journal_file(journalfile, datafile);
- destroy_data_file(datafile);
- info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- datafile->tier, datafile->fileno);
- free(journalfile);
- free(datafile);
+ ret = destroy_journal_file(journalfile, datafile);
+ if (!ret) {
+ generate_journalfilepath(datafile, path, sizeof(path));
+ info("Deleted journal file \"%s\".", path);
+ deleted_bytes += journalfile_bytes;
+ }
+ ret = destroy_data_file(datafile);
+ if (!ret) {
+ generate_datafilepath(datafile, path, sizeof(path));
+ info("Deleted data file \"%s\".", path);
+ deleted_bytes += datafile_bytes;
+ }
+ freez(journalfile);
+ freez(datafile);
- ctx->disk_space -= bytes;
- info("Reclaimed %u bytes of disk space.", bytes);
+ ctx->disk_space -= deleted_bytes;
+ info("Reclaimed %u bytes of disk space.", deleted_bytes);
/* unfreeze command processing */
wc->now_deleting.data = NULL;
@@ -485,7 +507,7 @@ static void delete_old_data(uv_work_t *req)
pg_cache_punch_hole(ctx, descr, 0);
}
next = extent->next;
- free(extent);
+ freez(extent);
}
}
@@ -495,6 +517,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
+ int ret;
out_of_space = 0;
if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
@@ -509,7 +532,10 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
/* Finalize data and journal file and create a new pair */
wal_flush_transaction_buffer(wc);
- create_new_datafile_pair(ctx, 1, datafile->fileno + 1);
+ ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
+ if (likely(!ret)) {
+ ++ctx->last_fileno;
+ }
}
if (unlikely(out_of_space)) {
/* delete old data */
@@ -517,18 +543,30 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
/* already deleting data */
return;
}
- info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
- ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ if (NULL == ctx->datafiles.first->next) {
+ error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
+ " to reclaim space, there are no other file pairs left.",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
+ return;
+ }
+ info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
+ ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
wc->now_deleting.data = ctx;
- uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data);
+ assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
}
}
+/* return 0 on success */
int init_rrd_files(struct rrdengine_instance *ctx)
{
return init_data_files(ctx);
}
+void finalize_rrd_files(struct rrdengine_instance *ctx)
+{
+ return finalize_data_files(ctx);
+}
+
void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
@@ -596,7 +634,6 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
- struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
@@ -616,7 +653,7 @@ void timer_cb(uv_timer_t* handle)
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];
- debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf)));
+ debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
}
@@ -631,7 +668,7 @@ void rrdeng_worker(void* arg)
struct rrdengine_worker_config* wc = arg;
struct rrdengine_instance *ctx = wc->ctx;
uv_loop_t* loop;
- int shutdown;
+ int shutdown, ret;
enum rrdeng_opcode opcode;
uv_timer_t timer_req;
struct rrdeng_cmd cmd;
@@ -639,22 +676,35 @@ void rrdeng_worker(void* arg)
rrdeng_init_cmd_queue(wc);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
- uv_loop_init(loop);
+ ret = uv_loop_init(loop);
+ if (ret) {
+ error("uv_loop_init(): %s", uv_strerror(ret));
+ goto error_after_loop_init;
+ }
loop->data = wc;
- uv_async_init(wc->loop, &wc->async, async_cb);
+ ret = uv_async_init(wc->loop, &wc->async, async_cb);
+ if (ret) {
+ error("uv_async_init(): %s", uv_strerror(ret));
+ goto error_after_async_init;
+ }
wc->async.data = wc;
wc->now_deleting.data = NULL;
/* dirty page flushing timer */
- uv_timer_init(loop, &timer_req);
+ ret = uv_timer_init(loop, &timer_req);
+ if (ret) {
+ error("uv_timer_init(): %s", uv_strerror(ret));
+ goto error_after_timer_init;
+ }
timer_req.data = wc;
+ wc->error = 0;
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
- uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS);
+ assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
while (shutdown == 0 || uv_loop_alive(loop)) {
uv_run(loop, UV_RUN_DEFAULT);
@@ -669,12 +719,6 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
- if (unlikely(wc->now_deleting.data)) {
- /* postpone shutdown until after deletion */
- info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
- rrdeng_enq_cmd(wc, &cmd);
- break;
- }
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
@@ -683,10 +727,6 @@ void rrdeng_worker(void* arg)
uv_close((uv_handle_t *)&wc->async, NULL);
assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
- info("Shutting down RRD engine event loop.");
- while (do_flush_pages(wc, 1, NULL)) {
- ; /* Force flushing of all commited pages. */
- }
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@@ -716,6 +756,13 @@ void rrdeng_worker(void* arg)
} while (opcode != RRDENG_NOOP);
}
/* cleanup operations of the event loop */
+ if (unlikely(wc->now_deleting.data)) {
+ info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
+ }
+ info("Shutting down RRD engine event loop.");
+ while (do_flush_pages(wc, 1, NULL)) {
+ ; /* Force flushing of all commited pages. */
+ }
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
@@ -724,7 +771,20 @@ void rrdeng_worker(void* arg)
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
assert(0 == uv_loop_close(loop));
- free(loop);
+ freez(loop);
+
+ return;
+
+error_after_timer_init:
+ uv_close((uv_handle_t *)&wc->async, NULL);
+error_after_async_init:
+ assert(0 == uv_loop_close(loop));
+error_after_loop_init:
+ freez(loop);
+
+ wc->error = UV_EAGAIN;
+ /* wake up initialization thread */
+ complete(&ctx->rrdengine_completion);
}