summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-10-24 10:20:39 +0300
committerGitHub <noreply@github.com>2022-10-24 10:20:39 +0300
commit29271f575a65ed5cd13265413bff3b32398bd538 (patch)
tree7da78a9ff849172b833659b7e59332bd6251b18b /database
parenta9135f47bbb36e9cb437b18a7109607569580db7 (diff)
Use mmap to read an extent from a datafile (#13834)
* Use MMAP to read an extent from the datafile. Fallback to uv_fs_read if that fails * Schedule the fallback using uv_fs_read in the MMAP callback * Check the return value of mmap correctly * Add unlikely
Diffstat (limited to 'database')
-rw-r--r--database/engine/rrdengine.c90
-rw-r--r--database/engine/rrdengine.h4
2 files changed, 70 insertions, 24 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 67f4e13ac0..e4cd37e989 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -272,11 +272,9 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type)
}
}
-void read_extent_cb(uv_fs_t* req)
+static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed)
{
- struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
- struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
int ret;
@@ -289,21 +287,20 @@ void read_extent_cb(uv_fs_t* req)
struct rrdeng_df_extent_trailer *trailer;
uLong crc;
- xt_io_descr = req->data;
header = xt_io_descr->buf;
payload_length = header->payload_length;
count = header->number_of_pages;
payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
- if (req->result < 0) {
+ if (unlikely(read_failed)) {
struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
have_read_error = 1;
- error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__,
- uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
+ error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos,
+ xt_io_descr->bytes, datafile->tier, datafile->fileno);
goto after_crc_check;
}
crc = crc32(0L, Z_NULL, 0);
@@ -421,11 +418,67 @@ after_crc_check:
}
if (xt_io_descr->completion)
completion_mark_complete(xt_io_descr->completion);
+}
+
+static void read_extent_cb(uv_fs_t *req)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct extent_io_descriptor *xt_io_descr;
+
+ xt_io_descr = req->data;
+ do_extent_processing(wc, xt_io_descr, req->result < 0);
uv_fs_req_cleanup(req);
posix_memfree(xt_io_descr->buf);
freez(xt_io_descr);
}
+static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused)
+{
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct extent_io_descriptor *xt_io_descr;
+ xt_io_descr = req->data;
+
+ if (likely(xt_io_descr->map_base)) {
+ do_extent_processing(wc, xt_io_descr, false);
+ munmap(xt_io_descr->map_base, xt_io_descr->map_length);
+ freez(xt_io_descr);
+ return;
+ }
+
+ // MMAP failed, so do uv_fs_read
+ int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes));
+ if (unlikely(ret)) {
+ fatal("posix_memalign:%s", strerror(ret));
+ }
+ unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes);
+ xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
+ xt_io_descr->req.data = xt_io_descr;
+ ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb);
+ fatal_assert(-1 != ret);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+}
+
+static void do_mmap_read_extent(uv_work_t *req)
+{
+ struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data;
+ struct rrdengine_worker_config *wc = req->loop->data;
+ struct rrdengine_instance *ctx = wc->ctx;
+
+ off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos);
+ size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start;
+ unsigned real_io_size = xt_io_descr->bytes;
+
+ void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start);
+ if (likely(data != MAP_FAILED)) {
+ xt_io_descr->map_base = data;
+ xt_io_descr->map_length = length;
+ xt_io_descr->buf = data + (xt_io_descr->pos - map_start);
+ ctx->stats.io_read_bytes += real_io_size;
+ ctx->stats.io_read_extent_bytes += real_io_size;
+ }
+}
static void do_read_extent(struct rrdengine_worker_config* wc,
struct rrdeng_page_descr **descr,
@@ -435,8 +488,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache_descr *pg_cache_descr;
int ret;
- unsigned i, size_bytes, pos, real_io_size;
-// uint32_t payload_length;
+ unsigned i, size_bytes, pos;
struct extent_io_descriptor *xt_io_descr;
struct rrdengine_datafile *datafile;
struct extent_info *extent = descr[0]->extent;
@@ -452,18 +504,17 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
rrdeng_page_descr_mutex_lock(ctx, descr[i]);
pg_cache_descr = descr[i]->pg_cache_descr;
pg_cache_descr->flags |= RRD_PAGE_READ_PENDING;
-// payload_length = descr[i]->page_length;
rrdeng_page_descr_mutex_unlock(ctx, descr[i]);
-
xt_io_descr->descr_array[i] = descr[i];
}
xt_io_descr->descr_count = count;
+ xt_io_descr->file = datafile->file;
xt_io_descr->bytes = size_bytes;
xt_io_descr->pos = pos;
- xt_io_descr->req.data = xt_io_descr;
+ xt_io_descr->req_worker.data = xt_io_descr;
xt_io_descr->completion = NULL;
- /* xt_io_descr->descr_commit_idx_array[0] */
xt_io_descr->release_descr = release_descr;
+ xt_io_descr->buf = NULL;
xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx);
if (xt_is_cached) {
@@ -483,19 +534,10 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
}
}
- ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- /* freez(xt_io_descr);
- return;*/
- }
- real_io_size = ALIGN_BYTES_CEILING(size_bytes);
- xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
- ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb);
+ ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb);
fatal_assert(-1 != ret);
- ctx->stats.io_read_bytes += real_io_size;
+
++ctx->stats.io_read_requests;
- ctx->stats.io_read_extent_bytes += real_io_size;
++ctx->stats.io_read_extents;
ctx->stats.pg_cache_backfills += count;
}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 2bb70ddf53..fedadbe861 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -105,8 +105,12 @@ struct rrdeng_cmdqueue {
struct extent_io_descriptor {
uv_fs_t req;
+ uv_work_t req_worker;
uv_buf_t iov;
+ uv_file file;
void *buf;
+ void *map_base;
+ size_t map_length;
uint64_t pos;
unsigned bytes;
struct completion *completion;