diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-15 23:49:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-15 23:49:42 +0200 |
commit | becd97a3660af34104c557ba6c2877f624143c2e (patch) | |
tree | 14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /database/engine/journalfile.c | |
parent | 1789d07c43182152437459a7a4f81267bbdd752c (diff) |
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)"
This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 1126 |
1 files changed, 34 insertions, 1092 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 037603b9b6..500dd78800 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -1,25 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc) -{ - struct rrdeng_work *work_request; - - if (unlikely(wc->running_journal_migration)) - return; - - work_request = mallocz(sizeof(*work_request)); - work_request->req.data = work_request; - work_request->wc = wc; - work_request->count = 0; - work_request->rerun = false; - wc->running_journal_migration = 1; - if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) { - freez(work_request); - wc->running_journal_migration = 0; - } -} - static void flush_transaction_buffer_cb(uv_fs_t* req) { struct generic_io_descriptor *io_descr = req->data; @@ -66,7 +47,6 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc) io_descr->bytes = size; io_descr->pos = journalfile->pos; io_descr->req.data = io_descr; - io_descr->data = journalfile; io_descr->completion = NULL; io_descr->iov = uv_buf_init((void *)io_descr->buf, size); @@ -113,12 +93,6 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s return ctx->commit_log.buf + buf_pos; } -void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen) -{ - (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2, - datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); -} - void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, @@ -130,49 +104,26 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin journalfile->file = (uv_file)0; journalfile->pos = 0; journalfile->datafile = datafile; - journalfile->journal_data = NULL; - journalfile->journal_data_size = 0; - journalfile->JudyL_array = (Pvoid_t) NULL; - journalfile->data = NULL; - journalfile->file_index = 0; - journalfile->last_access = 0; } -static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) +int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; int ret; char path[RRDENG_PATH_MAX]; - uv_fs_t req; - ret = uv_fs_close(NULL, &req, file, NULL); + generate_journalfilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - generate_journalfilepath(datafile, path, sizeof(path)); error("uv_fs_close(%s): %s", path, uv_strerror(ret)); - ++datafile->ctx->stats.fs_errors; + ++ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); } uv_fs_req_cleanup(&req); - return ret; -} - -int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - char path[RRDENG_PATH_MAX]; - - if (likely(journalfile->journal_data)) { - if (munmap(journalfile->journal_data, journalfile->journal_data_size)) { - generate_journalfilepath_v2(datafile, path, sizeof(path)); - error("Failed to unmap journal index file for %s", path); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - } - journalfile->journal_data = NULL; - journalfile->journal_data_size = 0; - return 0; - } - return close_uv_file(datafile, journalfile->file); + return ret; } int unlink_journal_file(struct rrdengine_journalfile *journalfile) @@ -198,16 +149,14 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile) return ret; } -int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; int ret; char path[RRDENG_PATH_MAX]; - char path_v2[RRDENG_PATH_MAX]; generate_journalfilepath(datafile, path, sizeof(path)); - generate_journalfilepath_v2(datafile, path_v2, sizeof(path)); ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { @@ -217,12 +166,9 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc } uv_fs_req_cleanup(&req); - (void) close_uv_file(datafile, journalfile->file); - - // This is the new journal v2 index file - ret = uv_fs_unlink(NULL, &req, path_v2, NULL); + ret = uv_fs_close(NULL, &req, journalfile->file, NULL); if (ret < 0) { - error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + error("uv_fs_close(%s): %s", path, uv_strerror(ret)); ++ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); } @@ -237,13 +183,6 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc uv_fs_req_cleanup(&req); ++ctx->stats.journalfile_deletions; - ++ctx->stats.journalfile_deletions; - - if (journalfile->journal_data) { - if (munmap(journalfile->journal_data, journalfile->journal_data_size)) { - error("Failed to unmap index file %s", path_v2); - } - } return ret; } @@ -288,7 +227,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_journal_file_unsafe(journalfile, datafile); + destroy_journal_file(journalfile, datafile); return ret; } @@ -427,67 +366,36 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden uv_rwlock_wrlock(&pg_cache->metrics_index.lock); PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0); fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(temp_id, ctx); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; + *PValue = page_index = create_page_index(temp_id, ctx); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } - // Lookup this descriptor - Word_t p_time = start_time_ut / USEC_PER_SEC; - uv_rwlock_rdlock(&page_index->lock); - PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0); - descr = (NULL == PValue) ? NULL: *PValue; - uv_rwlock_rdunlock(&page_index->lock); - - bool descr_found = false; - if (unlikely(descr && descr->start_time_ut == start_time_ut)) { - // We have this descriptor already - descr_found = true; - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(page_index->id, uuid_str); - internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u", - uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, descr->extent->datafile->fileno, - descr->extent->offset, descr->extent->size); - internal_error(true, "APPLYING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u", - uuid_str, start_time_ut, end_time_ut, jf_metric_data->descr[i].page_length, extent->datafile->fileno, - extent->offset, extent->size); -#endif - // Remove entry from previous extent - unlink_descriptor_extent_unsafe(descr); - internal_error(true, "REMOVING UUID %s with %lu OK", uuid_str, start_time_ut); - } - else { - descr = pg_cache_create_descr(); - descr->id = &page_index->id; - } - + descr = pg_cache_create_descr(); descr->page_length = jf_metric_data->descr[i].page_length; descr->start_time_ut = start_time_ut; descr->end_time_ut = end_time_ut; descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s); - + descr->id = &page_index->id; descr->extent = extent; descr->type = page_type; extent->pages[valid_pages++] = descr; - if (likely(!descr_found)) - (void)pg_cache_insert(ctx, page_index, descr, true); + pg_cache_insert(ctx, page_index, descr); - if (page_index->latest_time_ut == descr->end_time_ut) + if(page_index->latest_time_ut == descr->end_time_ut) page_index->latest_update_every_s = descr->update_every_s; if(descr->update_every_s == 0) - fatal("DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", + fatal( + "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu", (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries); } extent->number_of_pages = valid_pages; - if (likely(valid_pages)) { + if (likely(valid_pages)) df_extent_insert(extent); - } else { freez(extent); ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++; @@ -536,13 +444,13 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng return size_bytes; } switch (jf_header->type) { - case STORE_DATA: - debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); - restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); - break; - default: - error("Unknown transaction type. Skipping record."); - break; + case STORE_DATA: + debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + break; + default: + error("Unknown transaction type. Skipping record."); + break; } return size_bytes; @@ -610,905 +518,12 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde if (likely(journal_is_mmapped)) buf += size_bytes; } - skip_file: +skip_file: if (unlikely(!journal_is_mmapped)) posix_memfree(buf); return max_id; } -bool unlink_descriptor_extent_unsafe(struct rrdeng_page_descr *descr) -{ - if (unlikely(!descr || !descr->extent)) - return true; - - struct extent_info *extent = descr->extent; - for (uint8_t index = 0; index < extent->number_of_pages; index++) { - if (extent->pages[index] == descr) { - extent->pages[index] = NULL; - return true; - } - } - return false; -} - -// Checks that the extent list checksum is valid -static int check_journal_v2_extent_list (void *data_start, size_t file_size) -{ - UNUSED(file_size); - uLong crc; - - struct journal_v2_header *j2_header = (void *) data_start; - struct journal_v2_block_trailer *journal_v2_trailer; - - journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset); - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list)); - if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("Extent list CRC32 check: FAILED"); - return 1; - } - - return 0; -} - -// Checks that the metric list (UUIDs) checksum is valid -static int check_journal_v2_metric_list(void *data_start, size_t file_size) -{ - UNUSED(file_size); - uLong crc; - - struct journal_v2_header *j2_header = (void *) data_start; - struct journal_v2_block_trailer *journal_v2_trailer; - - journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset); - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list)); - if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("Metric list CRC32 check: FAILED"); - return 1; - } - return 0; -} - -static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size) -{ - int rc; - uLong crc; - - struct journal_v2_header *j2_header = (void *) data_start; - struct journal_v2_block_trailer *journal_v2_trailer; - - if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC) - return 2; - - // Magic failure - if (j2_header->magic != JOURVAL_V2_MAGIC) - return 1; - - if (j2_header->total_file_size != file_size) - return 1; - - if (original_size && j2_header->original_file_size != original_size) - return 1; - - journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + file_size - sizeof(*journal_v2_trailer)); - - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (void *) j2_header, sizeof(*j2_header)); - - rc = crc32cmp(journal_v2_trailer->checksum, crc); - if (unlikely(rc)) { - error("File CRC32 check: FAILED"); - return 1; - } - - rc = check_journal_v2_extent_list(data_start, file_size); - if (rc) return 1; - - rc = check_journal_v2_metric_list(data_start, file_size); - if (rc) return 1; - - if (!db_engine_journal_check) - return 0; - - // Verify complete UUID chain - - struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset); - - unsigned verified = 0; - unsigned entries; - unsigned total_pages = 0; - - info("Checking %u metrics that exist in the journal", j2_header->metric_count); - for (entries = 0; entries < j2_header->metric_count; entries++) { - - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(metric->uuid, uuid_str); - struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset); - struct journal_page_header local_metric_list_header = *metric_list_header; - - local_metric_list_header.crc = JOURVAL_V2_MAGIC; - - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header)); - rc = crc32cmp(metric_list_header->checksum, crc); - - internal_error(true, "Index %u : %s entries %u at offset %u (%llu -- %llu) verified, HEADER CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset, - j2_header->start_time_ut + (usec_t) metric->delta_start * USEC_PER_SEC, j2_header->start_time_ut + (usec_t) metric->delta_end * USEC_PER_SEC, - crc, metric_list_header->crc); - if (!rc) { - struct journal_v2_block_trailer *journal_trailer = - (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list)); - - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list)); - rc = crc32cmp(journal_trailer->checksum, crc); - internal_error(rc, "Index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset, - crc, metric_list_header->crc); - if (!rc) { - total_pages += metric_list_header->entries; - verified++; - } - } - - metric++; - if (((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) file_size) { - info("Verification failed EOF reached -- total entries %u, verified %u", entries, verified); - return 1; - } - } - - if (entries != verified) { - info("Verification failed -- total entries %u, verified %u", entries, verified); - return 1; - } - info("Verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages); - - return 0; -} - -int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct pg_cache_page_index *page_index; - int ret, fd; - uint64_t file_size; - char path[RRDENG_PATH_MAX]; - struct stat statbuf; - uint32_t original_file_size = 0; - - generate_journalfilepath(datafile, path, sizeof(path)); - ret = stat(path, &statbuf); - if (!ret) - original_file_size = (uint32_t)statbuf.st_size; - - generate_journalfilepath_v2(datafile, path, sizeof(path)); - - fd = open(path, O_RDONLY); - if (fd < 0) { - if (errno == ENOENT) - return 1; - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - error("Failed to open %s", path); - return 1; - } - - ret = fstat(fd, &statbuf); - if (ret) { - error("Failed to get file information for %s", path); - close(fd); - return 1; - } - - file_size = (size_t)statbuf.st_size; - - if (file_size < sizeof(struct journal_v2_header)) { - error_report("Invalid file %s. Not the expected size", path); - close(fd); - return 1; - } - - usec_t start_loading = now_realtime_usec(); - uint8_t *data_start = mmap(NULL, file_size, PROT_READ, MAP_SHARED, fd, 0); - if (data_start == MAP_FAILED) { - close(fd); - return 1; - } - close(fd); - - info("Checking integrity of %s", path); - int rc = check_journal_v2_file(data_start, file_size, original_file_size); - if (rc) { - if (rc == 2) - error_report("File %s needs to be rebuilt", path); - else - error_report("File %s is invalid", path); - if (unlikely(munmap(data_start, file_size))) - error("Failed to unmap %s", path); - return 1; - } - - struct journal_v2_header *j2_header = (void *) data_start; - - size_t entries = j2_header->metric_count; - - if (!entries) { - if (unlikely(munmap(data_start, file_size))) - error("Failed to unmap %s", path); - return 1; - } - - rc = madvise(data_start, file_size, MADV_DONTFORK); - if (rc) - error("MADV_DONTFORK: setting failed"); - - rc = madvise(data_start, file_size, MADV_DONTDUMP); - if (rc) - error("MADV_DONTDUMP: setting failed"); - - struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset); - - uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - - // Initialize the journal file to be able to access the data - journalfile->journal_data = data_start; - journalfile->journal_data_size = file_size; - - usec_t header_start_time = j2_header->start_time_ut; - for (size_t i=0; i < entries; i++) { - Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, metric->uuid, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - else { - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, metric->uuid, sizeof(uuid_t), PJE0); - fatal_assert(NULL == *PValue); - *PValue = page_index = create_page_index(&metric->uuid, ctx); - page_index->oldest_time_ut = LLONG_MAX; - page_index->latest_time_ut = 0; - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; - } - - usec_t metric_start_ut = header_start_time + (usec_t ) metric->delta_start * USEC_PER_SEC; - usec_t metric_end_ut = header_start_time + (usec_t ) metric->delta_end * USEC_PER_SEC; - - if (page_index->oldest_time_ut > metric_start_ut) - page_index->oldest_time_ut = metric_start_ut; - - if (page_index->latest_time_ut < metric_end_ut) - page_index->latest_time_ut = metric_end_ut; - - ++page_index->page_count; - ++pg_cache->page_descriptors; - metric++; - } - - uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); - - info("Journal file \"%s\" loaded (size:%"PRIu64") with %lu metrics in %d ms", - path, file_size, entries, - (int) ((now_realtime_usec() - start_loading) / USEC_PER_MS)); - return 0; -} - - -struct metric_info_s { - uuid_t *id; - struct pg_cache_page_index *page_index; - uint32_t entries; - time_t min_index_time_s; - time_t max_index_time_s; - usec_t min_time_ut; - usec_t max_time_ut; - uint32_t page_list_header; - Pvoid_t JudyL_array; -}; - -struct journal_metric_list_to_sort { - struct metric_info_s *metric_info; -}; - -static int journal_metric_compare (const void *item1, const void *item2) -{ - const struct metric_info_s *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info; - const struct metric_info_s *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info; - - return uuid_compare(*(metric1->id), *(metric2->id)); -} - - -// Write list of extents for the journalfile -void *journal_v2_write_extent_list(struct rrdengine_journalfile *journalfile, void *data) -{ - struct extent_info *extent = journalfile->datafile->extents.first; - struct journal_extent_list *j2_extent = (void *) data; - while (extent) { - j2_extent->datafile_offset = extent->offset; - j2_extent->datafile_size = extent->size; - j2_extent->pages = extent->number_of_pages; - j2_extent->file_index = journalfile->file_index; - j2_extent++; - extent = extent->next; - }; - return j2_extent; -} - -static int verify_journal_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes) -{ - if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->total_file_size - sizeof(struct journal_v2_block_trailer))) - return 1; - - return 0; -} - -void *journal_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct metric_info_s *metric_info, uint32_t pages_offset) -{ - struct journal_metric_list *metric = (void *) data; - - if (verify_journal_space(j2_header, data, sizeof(*metric))) - return NULL; - - uuid_copy(metric->uuid, *metric_info->id); - metric->entries = metric_info->entries; - metric->page_offset = pages_offset; - metric->delta_start = (metric_info->min_time_ut - j2_header->start_time_ut) / USEC_PER_SEC; - metric->delta_end = (metric_info->max_time_ut - j2_header->start_time_ut) / USEC_PER_SEC; - - return ++metric; -} - -void *journal_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct metric_info_s *metric_info, uint32_t uuid_offset) -{ - struct journal_page_header *data_page_header = (void *) data; - uLong crc; - - uuid_copy(data_page_header->uuid, *metric_info->id); - data_page_header->entries = metric_info->entries; - data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory - data_page_header->crc = JOURVAL_V2_MAGIC; - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header)); - crc32set(data_page_header->checksum, crc); - return ++data_page_header; -} - -void *journal_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header) -{ - struct journal_page_header *data_page_header = (void *) page_header; - struct journal_v2_block_trailer *journal_trailer = (void *) data; - uLong crc; - - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list)); - crc32set(journal_trailer->checksum, crc); - return ++journal_trailer; -} - -void *journal_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct rrdeng_page_descr *descr) -{ - if (unlikely(!descr)) - return data; - - struct journal_page_list *data_page = data; - - // verify that we can write number of bytes - if (verify_journal_space(j2_header, data, sizeof(*data_page))) - return NULL; - - fatal_assert(descr->extent != NULL); - - uint32_t extent_index = unlikely(NULL == descr->extent) ? UINT32_MAX : descr->extent->index; - - data_page->delta_start_s = (descr->start_time_ut - j2_header->start_time_ut) / USEC_PER_SEC; - data_page->delta_end_s = (descr->end_time_ut - j2_header->start_time_ut) / USEC_PER_SEC; - data_page->extent_index = extent_index; - data_page->update_every_s = (uint16_t) descr->update_every_s; - data_page->page_length = descr->page_length; - data_page->type = descr->type; - - // Rebuild on start to resolve unknown entry - if (unlikely(UINT32_MAX == extent_index)) - j2_header->magic = JOURVAL_V2_REBUILD_MAGIC; - - return ++data_page; -} - -// For a page_index write all descr @ time entries -// Must be recorded in metric_info->entries -void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct metric_info_s *metric_info, struct rrdengine_journalfile *journalfile) -{ - struct rrdeng_page_descr *descr; - Pvoid_t *PValue; - - struct journal_page_list *data_page = (void *)data; - struct page_cache *pg_cache = &journalfile->datafile->ctx->pg_cache; - struct pg_cache_page_index *page_index; - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, metric_info->id, sizeof(uuid_t)); - page_index = (NULL == PValue) ? NULL : *PValue; - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - - if (page_index == NULL) - return data_page; - - // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s - // that belong to this journal file - - Word_t index_time = metric_info->min_index_time_s; - unsigned entries = 0; - - uv_rwlock_rdlock(&page_index->lock); - - Pvoid_t JudyL_array = metric_info->JudyL_array ? metric_info->JudyL_array : page_index->JudyL_array; - - // Need page_index lock if running live - for (PValue = JudyLFirst(JudyL_array, &index_time, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue; - descr != NULL; - PValue = JudyLNext(JudyL_array, &index_time, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue) { - - if (unlikely((time_t) index_time > metric_info->max_index_time_s) || entries == metric_info->entries) - break; - - // Write one descriptor and return the next data page location - data_page = journal_v2_write_data_page(j2_header, (void *)data_page, descr); - - entries++; - if (unlikely(!data_page)) - break; - } - - uv_rwlock_rdunlock(&page_index->lock); - return data_page; -} - -static void journal_v2_remove_active_descriptors(struct rrdengine_journalfile *journalfile, struct metric_info_s *metric_info, bool startup) -{ - if (true == startup) { - // This is during startup, so we are the only ones accessing the structures - // thats why we can safely remote the entire page_index->JudyL_array - struct rrdeng_page_descr *descr; - Word_t index_time; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index; - - page_index = metric_info->page_index; - - for (index_time = 0, PValue = JudyLFirst(page_index->JudyL_array, &index_time, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue; descr != NULL; - PValue = JudyLNext(page_index->JudyL_array, &index_time, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue) { - - rrdeng_page_descr_freez(descr); - } - (void)JudyLFreeArray(&page_index->JudyL_array, PJE0); - } - else { - // This is during runtime - struct rrdeng_page_descr *descr; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = metric_info->page_index; - struct page_cache *pg_cache = &page_index->ctx->pg_cache; - struct rrdengine_instance *ctx = page_index->ctx; - - Word_t index_time = metric_info->min_index_time_s; - uint32_t metric_info_offset = metric_info->page_list_header; - - struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) journalfile->journal_data + metric_info_offset); - struct journal_v2_header *journal_header = (struct journal_v2_header *) journalfile->journal_data; - // Sanity check that we refer to the same UUID - fatal_assert(uuid_compare(page_list_header->uuid, *metric_info->id) == 0); - - struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header)); - struct journal_extent_list *extent_list = (void *)((uint8_t *)journal_header + journal_header->extent_offset); - - uint32_t index = 0; - uint32_t entries = page_list_header->entries; - - uv_rwlock_rdlock(&page_index->lock); - - bool mark_journalfile_for_expiration_check = false; - for (PValue = JudyLFirst(metric_info->JudyL_array, &index_time, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue; - descr != NULL; - PValue = JudyLNext(metric_info->JudyL_array, &index_time, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue) { - - if (unlikely((time_t) index_time > metric_info->max_index_time_s) || index == entries) - break; - - if (descr->extent_entry || (!descr->extent_entry && descr->extent && descr->extent->datafile->journalfile != journalfile)) - continue; - - struct journal_page_list *page_entry = &page_list[index++]; - - if (likely(page_entry->extent_index != UINT32_MAX)) { - - fatal_assert(descr->extent->offset == extent_list[page_entry->extent_index].datafile_offset); - fatal_assert(descr->extent->size == extent_list[page_entry->extent_index].datafile_size); - - rrdeng_page_descr_mutex_lock(ctx, descr); - while (!pg_cache_try_get_unsafe(descr, 1)) { - pg_cache_wait_event_unsafe(descr); - } - - descr->extent_entry = &extent_list[page_entry->extent_index]; - descr->extent = NULL; - descr->file = journalfile->datafile->file; - ++pg_cache->active_descriptors; - pg_cache_put_unsafe(descr); - rrdeng_try_deallocate_pg_cache_descr(ctx, descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - mark_journalfile_for_expiration_check = true; - } - } - - if (mark_journalfile_for_expiration_check) { - uint32_t page_offset = (uint8_t *)page_list_header - (uint8_t *)journalfile->journal_data; - mark_journalfile_descriptor(pg_cache, journalfile, page_offset, 1); - } - - uv_rwlock_rdunlock(&page_index->lock); - } -} - -bool descriptor_is_corrupted(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct pg_cache_page_index *page_index = get_page_index(pg_cache, descr->id); - - if (unlikely(!page_index)) - return true; - - time_t index_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC); - struct rrdeng_page_descr *idx_descr = get_descriptor(page_index, index_time_s); - - bool is_corrupted = (idx_descr != descr); - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(page_index->id, uuid_str); - internal_error(is_corrupted, "Descriptor corrupted (Extent %p Judy %p) @ %ld", descr, idx_descr, index_time_s); -#endif - - return is_corrupted; -} - -static bool journalfile_ready_to_index(struct rrdengine_datafile *datafile) -{ - struct extent_info *extent = datafile->extents.first; - while (extent) { - uint8_t extent_pages = extent->number_of_pages; - for (uint8_t index = 0; index < extent_pages; index++) { - struct rrdeng_page_descr *descr = extent->pages[index]; - if (unlikely(!descr)) - continue; - if (unlikely(!descr->extent)) - return false; - } - extent = extent->next; - } - return true; -} - -// Migrate the journalfile pointed by datafile -// activate : make the new file active immediately -// journafile data will be set and descriptors (if deleted) will be repopulated as needed -// startup : if the migration is done during agent startup -// this will allow us to optimize certain things -void migrate_journal_file_v2(struct rrdengine_datafile *datafile, bool activate, bool startup) -{ - char path[RRDENG_PATH_MAX]; - size_t number_of_extents = 0; // Number of extents - size_t number_of_metrics = 0; // Number of unique metrics (UUIDS) - size_t number_of_pages = 0; // Total number of descriptors @ time - Pvoid_t |