diff options
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 246 |
1 files changed, 236 insertions, 10 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index f095cadfe9..af4ea20e31 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -27,10 +27,188 @@ static void sanity_check(void) /* page count must fit in 8 bits */ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); + /* extent cache count must fit in 32 bits */ + BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); + /* page info scratch space must be able to hold 2 32-bit integers */ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } +/* always inserts into tail */ +static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + + xt_cache_elem->prev = NULL; + xt_cache_elem->next = NULL; + + if (likely(NULL != xt_cache->replaceQ_tail)) { + xt_cache_elem->prev = xt_cache->replaceQ_tail; + xt_cache->replaceQ_tail->next = xt_cache_elem; + } + if (unlikely(NULL == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = xt_cache_elem; + } + xt_cache->replaceQ_tail = xt_cache_elem; +} + +static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *prev, *next; + + prev = xt_cache_elem->prev; + next = xt_cache_elem->next; + + if (likely(NULL != prev)) { + prev->next = next; + } + if (likely(NULL != next)) { + next->prev = prev; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) { + xt_cache->replaceQ_head = next; + } + if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) { + xt_cache->replaceQ_tail = prev; + } + xt_cache_elem->prev = xt_cache_elem->next = NULL; +} + +static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc, + struct extent_cache_element *xt_cache_elem) +{ + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_replaceQ_insert(wc, xt_cache_elem); +} + +/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */ +static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned idx; + int ret; + + ret = find_first_zero(xt_cache->allocation_bitmap); + if (-1 == ret || ret >= MAX_CACHED_EXTENTS) { + for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) { + idx = xt_cache_elem - xt_cache->extent_array; + if (!check_bit(xt_cache->inflight_bitmap, idx)) { + xt_cache_replaceQ_delete(wc, xt_cache_elem); + break; + } + } + if (NULL == xt_cache_elem) + return -1; + } else { + idx = (unsigned)ret; + xt_cache_elem = &xt_cache->extent_array[idx]; + } + xt_cache_elem->extent = extent; + xt_cache_elem->fileno = extent->datafile->fileno; + xt_cache_elem->inflight_io_descr = NULL; + xt_cache_replaceQ_insert(wc, xt_cache_elem); + modify_bit(&xt_cache->allocation_bitmap, idx, 1); + + return (int)idx; +} + +/** + * Returns 0 if the cached extent was found in the extent cache, 1 otherwise. + * Sets *idx to point to the position of the extent inside the cache. + **/ +static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + unsigned i; + + for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) { + xt_cache_elem = &xt_cache->extent_array[i]; + if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent && + xt_cache_elem->fileno == extent->datafile->fileno) { + *idx = i; + return 0; + } + } + return 1; +} + +#if 0 /* disabled code */ +static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + + xt_cache_elem = &xt_cache->extent_array[idx]; + xt_cache_replaceQ_delete(wc, xt_cache_elem); + xt_cache_elem->extent = NULL; + modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */ + modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */ +} +#endif + +void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx, + struct extent_io_descriptor *xt_io_descr) +{ + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem; + struct extent_io_descriptor *old_next; + + xt_cache_elem = &xt_cache->extent_array[idx]; + old_next = xt_cache_elem->inflight_io_descr->next; + xt_cache_elem->inflight_io_descr->next = xt_io_descr; + xt_io_descr->next = old_next; +} + +void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr) +{ + unsigned i, j, page_offset; + struct rrdengine_instance *ctx = wc->ctx; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; + void *page; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + for (i = 0 ; i < xt_io_descr->descr_count; ++i) { + page = mallocz(RRDENG_BLOCK_SIZE); + descr = xt_io_descr->descr_array[i]; + for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { + /* care, we don't hold the descriptor mutex */ + if (!uuid_compare(*extent->pages[j]->id, *descr->id) && + extent->pages[j]->page_length == descr->page_length && + extent->pages[j]->start_time == descr->start_time && + extent->pages[j]->end_time == descr->end_time) { + break; + } + page_offset += extent->pages[j]->page_length; + + } + /* care, we don't hold the descriptor mutex */ + (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length); + + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags |= RRD_PAGE_POPULATED; + pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; + rrdeng_page_descr_mutex_unlock(ctx, descr); + pg_cache_replaceQ_insert(ctx, descr); + if (xt_io_descr->release_descr) { + pg_cache_put(ctx, descr); + } else { + debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); + pg_cache_wake_up_waiters(ctx, descr); + } + } + if (xt_io_descr->completion) + complete(xt_io_descr->completion); + freez(xt_io_descr); +} + void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -99,6 +277,33 @@ after_crc_check: debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); /* care, we don't hold the descriptor mutex */ } + { + uint8_t xt_is_cached = 0; + unsigned xt_idx; + struct extent_info *extent = xt_io_descr->descr_array[0]->extent; + + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) { + struct extent_cache *xt_cache = &wc->xt_cache; + struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx]; + struct extent_io_descriptor *curr, *next; + + if (have_read_error) { + memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages)); + } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { + (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length); + } else { + (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length); + } + /* complete all connected in-flight read requests */ + for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) { + next = curr->next; + read_cached_extent_cb(wc, xt_idx, curr); + } + xt_cache_elem->inflight_io_descr = NULL; + modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */ + } + } for (i = 0 ; i < xt_io_descr->descr_count; ++i) { page = mallocz(RRDENG_BLOCK_SIZE); @@ -159,18 +364,15 @@ static void do_read_extent(struct rrdengine_worker_config* wc, // uint32_t payload_length; struct extent_io_descriptor *xt_io_descr; struct rrdengine_datafile *datafile; + struct extent_info *extent = descr[0]->extent; + uint8_t xt_is_cached = 0, xt_is_inflight = 0; + unsigned xt_idx; - datafile = descr[0]->extent->datafile; - pos = descr[0]->extent->offset; - size_bytes = descr[0]->extent->size; + datafile = extent->datafile; + pos = extent->offset; + size_bytes = extent->size; - xt_io_descr = mallocz(sizeof(*xt_io_descr)); - ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); - /* freez(xt_io_descr); - return;*/ - } + xt_io_descr = callocz(1, sizeof(*xt_io_descr)); for (i = 0 ; i < count; ++i) { rrdeng_page_descr_mutex_lock(ctx, descr[i]); pg_cache_descr = descr[i]->pg_cache_descr; @@ -188,6 +390,30 @@ static void do_read_extent(struct rrdengine_worker_config* wc, /* xt_io_descr->descr_commit_idx_array[0] */ xt_io_descr->release_descr = release_descr; + xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); + if (xt_is_cached) { + xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]); + xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx); + if (xt_is_inflight) { + enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr); + return; + } + return read_cached_extent_cb(wc, xt_idx, xt_io_descr); + } else { + ret = try_insert_into_xt_cache(wc, extent); + if (-1 != ret) { + xt_idx = (unsigned)ret; + modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1); + wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr; + } + } + + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr); + return;*/ + } real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); ret = uv_fs_read(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, pos, read_extent_cb); |