diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-10-24 10:20:39 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-24 10:20:39 +0300 |
commit | 29271f575a65ed5cd13265413bff3b32398bd538 (patch) | |
tree | 7da78a9ff849172b833659b7e59332bd6251b18b /database | |
parent | a9135f47bbb36e9cb437b18a7109607569580db7 (diff) |
Use mmap to read an extent from a datafile (#13834)
* Use MMAP to read an extent from the datafile. Fallback to uv_fs_read if that fails
* Schedule the fallback using uv_fs_read in the MMAP callback
* Check the return value of mmap correctly
* Add unlikely
Diffstat (limited to 'database')
-rw-r--r-- | database/engine/rrdengine.c | 90 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 4 |
2 files changed, 70 insertions, 24 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 67f4e13ac0..e4cd37e989 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -272,11 +272,9 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) } } -void read_extent_cb(uv_fs_t* req) +static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) { - struct rrdengine_worker_config* wc = req->loop->data; struct rrdengine_instance *ctx = wc->ctx; - struct extent_io_descriptor *xt_io_descr; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; @@ -289,21 +287,20 @@ void read_extent_cb(uv_fs_t* req) struct rrdeng_df_extent_trailer *trailer; uLong crc; - xt_io_descr = req->data; header = xt_io_descr->buf; payload_length = header->payload_length; count = header->number_of_pages; payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); - if (req->result < 0) { + if (unlikely(read_failed)) { struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; ++ctx->stats.io_errors; rrd_stat_atomic_add(&global_io_errors, 1); have_read_error = 1; - error("%s: uv_fs_read - %s - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, - uv_strerror((int)req->result), xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos, + xt_io_descr->bytes, datafile->tier, datafile->fileno); goto after_crc_check; } crc = crc32(0L, Z_NULL, 0); @@ -421,11 +418,67 @@ after_crc_check: } if (xt_io_descr->completion) completion_mark_complete(xt_io_descr->completion); +} + +static void read_extent_cb(uv_fs_t *req) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct extent_io_descriptor *xt_io_descr; + + xt_io_descr = req->data; + do_extent_processing(wc, xt_io_descr, req->result < 0); uv_fs_req_cleanup(req); posix_memfree(xt_io_descr->buf); freez(xt_io_descr); } +static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused) +{ + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + struct extent_io_descriptor *xt_io_descr; + xt_io_descr = req->data; + + if (likely(xt_io_descr->map_base)) { + do_extent_processing(wc, xt_io_descr, false); + munmap(xt_io_descr->map_base, xt_io_descr->map_length); + freez(xt_io_descr); + return; + } + + // MMAP failed, so do uv_fs_read + int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + } + unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes); + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + xt_io_descr->req.data = xt_io_descr; + ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb); + fatal_assert(-1 != ret); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; +} + +static void do_mmap_read_extent(uv_work_t *req) +{ + struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data; + struct rrdengine_worker_config *wc = req->loop->data; + struct rrdengine_instance *ctx = wc->ctx; + + off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos); + size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start; + unsigned real_io_size = xt_io_descr->bytes; + + void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start); + if (likely(data != MAP_FAILED)) { + xt_io_descr->map_base = data; + xt_io_descr->map_length = length; + xt_io_descr->buf = data + (xt_io_descr->pos - map_start); + ctx->stats.io_read_bytes += real_io_size; + ctx->stats.io_read_extent_bytes += real_io_size; + } +} static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdeng_page_descr **descr, @@ -435,8 +488,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc, struct rrdengine_instance *ctx = wc->ctx; struct page_cache_descr *pg_cache_descr; int ret; - unsigned i, size_bytes, pos, real_io_size; -// uint32_t payload_length; + unsigned i, size_bytes, pos; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; struct extent_info *extent = descr[0]->extent; @@ -452,18 +504,17 @@ static void do_read_extent(struct rrdengine_worker_config* wc, rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; -// payload_length = descr[i]->page_length; rrdeng_page_descr_mutex_unlock(ctx, descr[i]); - xt_io_descr->descr_array[i] = descr[i]; } xt_io_descr->descr_count = count; + xt_io_descr->file = datafile->file; xt_io_descr->bytes = size_bytes; xt_io_descr->pos = pos; - xt_io_descr->req.data = xt_io_descr; + xt_io_descr->req_worker.data = xt_io_descr; xt_io_descr->completion = NULL; - /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; + xt_io_descr->buf = NULL; xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); if (xt_is_cached) { @@ -483,19 +534,10 @@ static void do_read_extent(struct rrdengine_worker_config* wc, } } - ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - /* freez(xt_io_descr); - return;*/ - } - real_io_size = ALIGN_BYTES_CEILING(size_bytes); - xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); - ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); + ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb); fatal_assert(-1 != ret); - ctx->stats.io_read_bytes += real_io_size; + ++ctx->stats.io_read_requests; - ctx->stats.io_read_extent_bytes += real_io_size; ++ctx->stats.io_read_extents; ctx->stats.pg_cache_backfills += count; } diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 2bb70ddf53..fedadbe861 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -105,8 +105,12 @@ struct rrdeng_cmdqueue { struct extent_io_descriptor { uv_fs_t req; + uv_work_t req_worker; uv_buf_t iov; + uv_file file; void *buf; + void *map_base; + size_t map_length; uint64_t pos; unsigned bytes; struct completion *completion; |