summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2024-03-23 12:41:03 +0200
committerGitHub <noreply@github.com>2024-03-23 12:41:03 +0200
commit4355e50bceb7c6aab409cb671c04577a31c6a868 (patch)
treed42ccd0b4035edce15fc3093842bd4b2066b799c /src
parent3240d07c435864a33bef93e5a1c3971c5398c6d1 (diff)
updated dbengine unittest (#17232)
* when a page cannot be acquired, repeat the call until it can or does not exist * updated dbengine unittest * Update dbengine-unittest.c
Diffstat (limited to 'src')
-rw-r--r--src/daemon/unit_test.c848
-rw-r--r--src/database/engine/dbengine-stresstest.c456
-rw-r--r--src/database/engine/dbengine-unittest.c419
3 files changed, 875 insertions, 848 deletions
diff --git a/src/daemon/unit_test.c b/src/daemon/unit_test.c
index 183770baf4..54586eab53 100644
--- a/src/daemon/unit_test.c
+++ b/src/daemon/unit_test.c
@@ -1804,851 +1804,3 @@ int unit_test_bitmaps(void) {
fprintf(stderr, "%s() %d errors\n", __FUNCTION__, errors);
return errors;
}
-
-#ifdef ENABLE_DBENGINE
-static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number value, time_t now)
-{
- rd->collector.last_collected_time.tv_sec = now;
- rd->collector.last_collected_time.tv_usec = 0;
- rd->collector.collected_value = value;
- rrddim_set_updated(rd);
-
- rd->collector.counter++;
-
- collected_number v = (value >= 0) ? value : -value;
- if(unlikely(v > rd->collector.collected_value_max)) rd->collector.collected_value_max = v;
-}
-
-static RRDHOST *dbengine_rrdhost_find_or_create(char *name)
-{
- /* We don't want to drop metrics when generating load, we prefer to block data generation itself */
-
- return rrdhost_find_or_create(
- name,
- name,
- name,
- os_type,
- netdata_configured_timezone,
- netdata_configured_abbrev_timezone,
- netdata_configured_utc_offset,
- program_name,
- program_version,
- default_rrd_update_every,
- default_rrd_history_entries,
- RRD_MEMORY_MODE_DBENGINE,
- health_plugin_enabled(),
- default_rrdpush_enabled,
- default_rrdpush_destination,
- default_rrdpush_api_key,
- default_rrdpush_send_charts_matching,
- default_rrdpush_enable_replication,
- default_rrdpush_seconds_to_replicate,
- default_rrdpush_replication_step,
- NULL,
- 0);
-}
-
-// constants for test_dbengine
-static const int CHARTS = 64;
-static const int DIMS = 16; // That gives us 64 * 16 = 1024 metrics
-#define REGIONS (3) // 3 regions of update_every
-// first region update_every is 2, second is 3, third is 1
-static const int REGION_UPDATE_EVERY[REGIONS] = {2, 3, 1};
-static const int REGION_POINTS[REGIONS] = {
- 16384, // This produces 64MiB of metric data for the first region: update_every = 2
- 16384, // This produces 64MiB of metric data for the second region: update_every = 3
- 16384, // This produces 64MiB of metric data for the third region: update_every = 1
-};
-static const int QUERY_BATCH = 4096;
-
-static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
- int update_every)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- int i, j;
- char name[101];
-
- for (i = 0 ; i < CHARTS ; ++i) {
- snprintfz(name, sizeof(name) - 1, "dbengine-chart-%d", i);
-
- // create the chart
- st[i] = rrdset_create(host, "netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest",
- NULL, 1, update_every, RRDSET_TYPE_LINE);
- rrdset_flag_set(st[i], RRDSET_FLAG_DEBUG);
- rrdset_flag_set(st[i], RRDSET_FLAG_STORE_FIRST);
- for (j = 0 ; j < DIMS ; ++j) {
- snprintfz(name, sizeof(name) - 1, "dim-%d", j);
-
- rd[i][j] = rrddim_add(st[i], name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- }
- }
-
- // Initialize DB with the very first entries
- for (i = 0 ; i < CHARTS ; ++i) {
- for (j = 0 ; j < DIMS ; ++j) {
- rd[i][j]->collector.last_collected_time.tv_sec =
- st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = 2 * API_RELATIVE_TIME_MAX - 1;
- rd[i][j]->collector.last_collected_time.tv_usec =
- st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
- }
- }
- for (i = 0 ; i < CHARTS ; ++i) {
- st[i]->usec_since_last_update = USEC_PER_SEC;
-
- for (j = 0; j < DIMS; ++j) {
- rrddim_set_by_pointer_fake_time(rd[i][j], 69, 2 * API_RELATIVE_TIME_MAX); // set first value to 69
- }
-
- struct timeval now;
- now_realtime_timeval(&now);
- rrdset_timed_done(st[i], now, false);
- }
- // Flush pages for subsequent real values
- for (i = 0 ; i < CHARTS ; ++i) {
- for (j = 0; j < DIMS; ++j) {
- rrdeng_store_metric_flush_current_page((rd[i][j])->tiers[0].sch);
- }
- }
-}
-
-// Feeds the database region with test data, returns last timestamp of region
-static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
- int current_region, time_t time_start)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- time_t time_now;
- int i, j, c, update_every;
- collected_number next;
-
- update_every = REGION_UPDATE_EVERY[current_region];
- time_now = time_start;
- // feed it with the test data
- for (i = 0 ; i < CHARTS ; ++i) {
- for (j = 0 ; j < DIMS ; ++j) {
- storage_engine_store_change_collection_frequency(rd[i][j]->tiers[0].sch, update_every);
-
- rd[i][j]->collector.last_collected_time.tv_sec =
- st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now;
- rd[i][j]->collector.last_collected_time.tv_usec =
- st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
- }
- }
- for (c = 0; c < REGION_POINTS[current_region] ; ++c) {
- time_now += update_every; // time_now = start + (c + 1) * update_every
-
- for (i = 0 ; i < CHARTS ; ++i) {
- st[i]->usec_since_last_update = USEC_PER_SEC * update_every;
-
- for (j = 0; j < DIMS; ++j) {
- next = ((collected_number)i * DIMS) * REGION_POINTS[current_region] +
- j * REGION_POINTS[current_region] + c;
- rrddim_set_by_pointer_fake_time(rd[i][j], next, time_now);
- }
-
- struct timeval now;
- now.tv_sec = time_now;
- now.tv_usec = 0;
-
- rrdset_timed_done(st[i], now, false);
- }
- }
- return time_now; //time_end
-}
-
-// Checks the metric data for the given region, returns number of errors
-static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
- int current_region, time_t time_start)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- uint8_t same;
- time_t time_now, time_retrieved, end_time;
- int i, j, k, c, errors, update_every;
- collected_number last;
- NETDATA_DOUBLE value, expected;
- struct storage_engine_query_handle seqh;
- size_t value_errors = 0, time_errors = 0;
-
- update_every = REGION_UPDATE_EVERY[current_region];
- errors = 0;
-
- // check the result
- for (c = 0; c < REGION_POINTS[current_region] ; c += QUERY_BATCH) {
- time_now = time_start + (c + 1) * update_every;
- for (i = 0 ; i < CHARTS ; ++i) {
- for (j = 0; j < DIMS; ++j) {
- storage_engine_query_init(rd[i][j]->tiers[0].seb, rd[i][j]->tiers[0].smh, &seqh, time_now, time_now + QUERY_BATCH * update_every, STORAGE_PRIORITY_NORMAL);
- for (k = 0; k < QUERY_BATCH; ++k) {
- last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] +
- j * REGION_POINTS[current_region] + c + k;
- expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS));
-
- STORAGE_POINT sp = storage_engine_query_next_metric(&seqh);
- value = sp.sum;
- time_retrieved = sp.start_time_s;
- end_time = sp.end_time_s;
-
- same = (roundndd(value) == roundndd(expected)) ? 1 : 0;
- if(!same) {
- if(!value_errors)
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
- ", found " NETDATA_DOUBLE_FORMAT ", ### E R R O R ###\n",
- rrdset_name(st[i]), rrddim_name(rd[i][j]), (unsigned long)time_now + k * update_every, expected, value);
- value_errors++;
- errors++;
- }
- if(end_time != time_now + k * update_every) {
- if(!time_errors)
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
- rrdset_name(st[i]), rrddim_name(rd[i][j]), (unsigned long)time_now + k * update_every, (unsigned long)time_retrieved);
- time_errors++;
- errors++;
- }
- }
- storage_engine_query_finalize(&seqh);
- }
- }
- }
-
- if(value_errors)
- fprintf(stderr, "%zu value errors encountered\n", value_errors);
-
- if(time_errors)
- fprintf(stderr, "%zu time errors encountered\n", time_errors);
-
- return errors;
-}
-
-// Check rrdr transformations
-static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
- int current_region, time_t time_start, time_t time_end)
-{
- int update_every = REGION_UPDATE_EVERY[current_region];
- fprintf(stderr, "%s() running on region %d, start time %lld, end time %lld, update every %d, on %d dimensions...\n",
- __FUNCTION__, current_region, (long long)time_start, (long long)time_end, update_every, CHARTS * DIMS);
- uint8_t same;
- time_t time_now, time_retrieved;
- int i, j, errors, value_errors = 0, time_errors = 0, value_right = 0, time_right = 0;
- long c;
- collected_number last;
- NETDATA_DOUBLE value, expected;
-
- errors = 0;
- long points = (time_end - time_start) / update_every;
- for (i = 0 ; i < CHARTS ; ++i) {
- ONEWAYALLOC *owa = onewayalloc_create(0);
- RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start, time_end,
- RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS,
- NULL, NULL, 0, 0,
- QUERY_SOURCE_UNITTEST, STORAGE_PRIORITY_NORMAL);
- if (!r) {
- fprintf(stderr, " DB-engine unittest %s: empty RRDR on region %d ### E R R O R ###\n", rrdset_name(st[i]), current_region);
- return ++errors;
- } else {
- assert(r->internal.qt->request.st == st[i]);
- for (c = 0; c != (long)rrdr_rows(r) ; ++c) {
- RRDDIM *d;
- time_now = time_start + (c + 1) * update_every;
- time_retrieved = r->t[c];
-
- // for each dimension
- rrddim_foreach_read(d, r->internal.qt->request.st) {
- if(unlikely(d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe
-
- j = (int)d_dfe.counter;
-
- NETDATA_DOUBLE *cn = &r->v[ c * r->d ];
- value = cn[j];
- assert(rd[i][j] == d);
-
- last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
- expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS));
-
- same = (roundndd(value) == roundndd(expected)) ? 1 : 0;
- if(!same) {
- if(value_errors < 20)
- fprintf(stderr, " DB-engine unittest %s/%s: point #%ld, at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
- ", RRDR found " NETDATA_DOUBLE_FORMAT ", ### E R R O R ###\n",
- rrdset_name(st[i]), rrddim_name(rd[i][j]), (long) c+1, (unsigned long)time_now, expected, value);
- value_errors++;
- }
- else
- value_right++;
-
- if(time_retrieved != time_now) {
- if(time_errors < 20)
- fprintf(stderr, " DB-engine unittest %s/%s: point #%ld at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
- rrdset_name(st[i]), rrddim_name(rd[i][j]), (long)c+1, (unsigned long)time_now, (unsigned long)time_retrieved);
- time_errors++;
- }
- else
- time_right++;
- }
- rrddim_foreach_done(d);
- }
- rrdr_free(owa, r);
- }
- onewayalloc_destroy(owa);
- }
-
- if(value_errors)
- fprintf(stderr, "%d value errors encountered (%d were ok)\n", value_errors, value_right);
-
- if(time_errors)
- fprintf(stderr, "%d time errors encountered (%d were ok)\n", time_errors, value_right);
-
- return errors + value_errors + time_errors;
-}
-
-void test_dbengine_charts_and_dims_are_not_collected(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS]) {
- for(int c = 0; c < CHARTS ; c++) {
- st[c]->rrdcontexts.collected = false;
- for(int d = 0; d < DIMS ; d++)
- rd[c][d]->rrdcontexts.collected = false;
- }
-}
-
-int test_dbengine(void)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- int i, j, errors = 0, value_errors = 0, time_errors = 0, update_every, current_region;
- RRDHOST *host = NULL;
- RRDSET *st[CHARTS];
- RRDDIM *rd[CHARTS][DIMS];
- time_t time_start[REGIONS], time_end[REGIONS];
-
- nd_log_limits_unlimited();
- fprintf(stderr, "\nRunning DB-engine test\n");
-
- default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
-
- fprintf(stderr, "Initializing localhost with hostname 'unittest-dbengine'");
- host = dbengine_rrdhost_find_or_create("unittest-dbengine");
- if (NULL == host)
- return 1;
-
- current_region = 0; // this is the first region of data
- update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 2 seconds
- test_dbengine_create_charts(host, st, rd, update_every);
-
- time_start[current_region] = 2 * API_RELATIVE_TIME_MAX;
- time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
-
- errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
- test_dbengine_charts_and_dims_are_not_collected(st, rd);
-
- current_region = 1; //this is the second region of data
- update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 3 seconds
- // Align pages for frequency change
- for (i = 0 ; i < CHARTS ; ++i) {
- st[i]->update_every = update_every;
- for (j = 0; j < DIMS; ++j) {
- rrdeng_store_metric_flush_current_page((rd[i][j])->tiers[0].sch);
- }
- }
-
- time_start[current_region] = time_end[current_region - 1] + update_every;
- if (0 != time_start[current_region] % update_every) // align to update_every
- time_start[current_region] += update_every - time_start[current_region] % update_every;
- time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
-
- errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
- test_dbengine_charts_and_dims_are_not_collected(st, rd);
-
- current_region = 2; //this is the third region of data
- update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 1 seconds
- // Align pages for frequency change
- for (i = 0 ; i < CHARTS ; ++i) {
- st[i]->update_every = update_every;
- for (j = 0; j < DIMS; ++j) {
- rrdeng_store_metric_flush_current_page((rd[i][j])->tiers[0].sch);
- }
- }
-
- time_start[current_region] = time_end[current_region - 1] + update_every;
- if (0 != time_start[current_region] % update_every) // align to update_every
- time_start[current_region] += update_every - time_start[current_region] % update_every;
- time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
-
- errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
- test_dbengine_charts_and_dims_are_not_collected(st, rd);
-
- for (current_region = 0 ; current_region < REGIONS ; ++current_region) {
- errors += test_dbengine_check_rrdr(st, rd, current_region, time_start[current_region], time_end[current_region]);
- }
-
- current_region = 1;
- update_every = REGION_UPDATE_EVERY[current_region]; // use the maximum update_every = 3
- long points = (time_end[REGIONS - 1] - time_start[0]) / update_every; // cover all time regions with RRDR
- long point_offset = (time_start[current_region] - time_start[0]) / update_every;
- for (i = 0 ; i < CHARTS ; ++i) {
- ONEWAYALLOC *owa = onewayalloc_create(0);
- RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start[0] + update_every,
- time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0,
- RRDR_OPTION_NATURAL_POINTS, NULL, NULL, 0, 0,
- QUERY_SOURCE_UNITTEST, STORAGE_PRIORITY_NORMAL);
-
- if (!r) {
- fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", rrdset_name(st[i]));
- ++errors;
- } else {
- long c;
-
- assert(r->internal.qt->request.st == st[i]);
- // test current region values only, since they must be left unchanged
- for (c = point_offset ; c < (long)(point_offset + rrdr_rows(r) / REGIONS / 2) ; ++c) {
- RRDDIM *d;
- time_t time_now = time_start[current_region] + (c - point_offset + 2) * update_every;
- time_t time_retrieved = r->t[c];
-
- // for each dimension
- rrddim_foreach_read(d, r->internal.qt->request.st) {
- if(unlikely(d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe
-
- j = (int)d_dfe.counter;
-
- NETDATA_DOUBLE *cn = &r->v[ c * r->d ];
- NETDATA_DOUBLE value = cn[j];
- assert(rd[i][j] == d);
-
- collected_number last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c - point_offset + 1;
- NETDATA_DOUBLE expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS));
-
- uint8_t same = (roundndd(value) == roundndd(expected)) ? 1 : 0;
- if(!same) {
- if(!value_errors)
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
- ", RRDR found " NETDATA_DOUBLE_FORMAT ", ### E R R O R ###\n",
- rrdset_name(st[i]), rrddim_name(rd[i][j]), (unsigned long)time_now, expected, value);
- value_errors++;
- }
- if(time_retrieved != time_now) {
- if(!time_errors)
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
- rrdset_name(st[i]), rrddim_name(rd[i][j]), (unsigned long)time_now, (unsigned long)time_retrieved);
- time_errors++;
- }
- }
- rrddim_foreach_done(d);
- }
- rrdr_free(owa, r);
- }
- onewayalloc_destroy(owa);
- }
-
- rrd_wrlock();
- rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si);
- rrdeng_exit((struct rrdengine_instance *)host->db[0].si);
- rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL);
- rrd_unlock();
-
- return errors + value_errors + time_errors;
-}
-
-struct dbengine_chart_thread {
- uv_thread_t thread;
- RRDHOST *host;
- char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
- unsigned dset_charts; /* number of charts */
- unsigned dset_dims; /* dimensions per chart */
- unsigned chart_i; /* current chart offset */
- time_t time_present; /* current virtual time of the benchmark */
- volatile time_t time_max; /* latest timestamp of stored values */
- unsigned history_seconds; /* how far back in the past to go */
-
- volatile long done; /* initialize to 0, set to 1 to stop thread */
- struct completion charts_initialized;
- unsigned long errors, stored_metrics_nr; /* statistics */
-
- RRDSET *st;
- RRDDIM *rd[]; /* dset_dims elements */
-};
-
-collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t time_current)
-{
- collected_number value;
-
- value = ((collected_number)time_current) * (chart_i + 1);
- value += ((collected_number)time_current) * (dim_i + 1);
- value %= 1024LLU;
-
- return value;
-}
-
-static void generate_dbengine_chart(void *arg)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg;
- RRDHOST *host = thread_info->host;
- char *chartname = thread_info->chartname;
- const unsigned DSET_DIMS = thread_info->dset_dims;
- unsigned history_seconds = thread_info->history_seconds;
- time_t time_present = thread_info->time_present;
-
- unsigned j, update_every = 1;
- RRDSET *st;
- RRDDIM *rd[DSET_DIMS];
- char name[RRD_ID_LENGTH_MAX + 1];
- time_t time_current;
-
- // create the chart
- snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%u", thread_info->chart_i + 1);
- thread_info->st = st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname,
- chartname, NULL, 1, update_every, RRDSET_TYPE_LINE);
- for (j = 0 ; j < DSET_DIMS ; ++j) {
- snprintfz(name, RRD_ID_LENGTH_MAX, "%s%u", chartname, j + 1);
-
- thread_info->rd[j] = rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- }
- completion_mark_complete(&thread_info->charts_initialized);
-
- // feed it with the test data
- time_current = time_present - history_seconds;
- for (j = 0 ; j < DSET_DIMS ; ++j) {
- rd[j]->collector.last_collected_time.tv_sec =
- st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current - update_every;
- rd[j]->collector.last_collected_time.tv_usec =
- st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0;
- }
- for( ; !thread_info->done && time_current < time_present ; time_current += update_every) {
- st->usec_since_last_update = USEC_PER_SEC * update_every;
-
- for (j = 0; j < DSET_DIMS; ++j) {
- collected_number value;
-
- value = generate_dbengine_chart_value(thread_info->chart_i, j, time_current);
- rrddim_set_by_pointer_fake_time(rd[j], value, time_current);
- ++thread_info->stored_metrics_nr;
- }
- rrdset_done(st);
- thread_info->time_max = time_current;
- }
- for (j = 0; j < DSET_DIMS; ++j) {
- rrdeng_store_metric_finalize((rd[j])->tiers[0].sch);
- }
-}
-
-void generate_dbengine_dataset(unsigned history_seconds)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- const int DSET_CHARTS = 16;
- const int DSET_DIMS = 128;
- const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
- RRDHOST *host = NULL;
- struct dbengine_chart_thread **thread_info;
- int i;
- time_t time_present;
-
- default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
- default_rrdeng_page_cache_mb = 128;
- // Worst case for uncompressible data
- default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * history_seconds) /
- (1024 * 1024);
- default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
-
- nd_log_limits_unlimited();
- fprintf(stderr, "Initializing localhost with hostname 'dbengine-dataset'");
-
- host = dbengine_rrdhost_find_or_create("dbengine-dataset");
- if (NULL == host)
- return;
-
- thread_info = mallocz(sizeof(*thread_info) * DSET_CHARTS);
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- thread_info[i] = mallocz(sizeof(*thread_info[i]) + sizeof(RRDDIM *) * DSET_DIMS);
- }
- fprintf(stderr, "\nRunning DB-engine workload generator\n");
-
- time_present = now_realtime_sec();
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- thread_info[i]->host = host;
- thread_info[i]->chartname = "random";
- thread_info[i]->dset_charts = DSET_CHARTS;
- thread_info[i]->chart_i = i;
- thread_info[i]->dset_dims = DSET_DIMS;
- thread_info[i]->history_seconds = history_seconds;
- thread_info[i]->time_present = time_present;
- thread_info[i]->time_max = 0;
- thread_info[i]->done = 0;
- completion_init(&thread_info[i]->charts_initialized);
- fatal_assert(0 == uv_thread_create(&thread_info[i]->thread, generate_dbengine_chart, thread_info[i]));
- completion_wait_for(&thread_info[i]->charts_initialized);
- completion_destroy(&thread_info[i]->charts_initialized);
- }
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- fatal_assert(0 == uv_thread_join(&thread_info[i]->thread));
- }
-
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- freez(thread_info[i]);
- }
- freez(thread_info);
- rrd_wrlock();
- rrdhost_free___while_having_rrd_wrlock(localhost, true);
- rrd_unlock();
-}
-
-struct dbengine_query_thread {
- uv_thread_t thread;
- RRDHOST *host;
- char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
- unsigned dset_charts; /* number of charts */
- unsigned dset_dims; /* dimensions per chart */
- time_t time_present; /* current virtual time of the benchmark */
- unsigned history_seconds; /* how far back in the past to go */
- volatile long done; /* initialize to 0, set to 1 to stop thread */
- unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */
- uint8_t delete_old_data; /* if non zero then data are deleted when disk space is exhausted */
-
- struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */
-};
-
-static void query_dbengine_chart(void *arg)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg;
- const int DSET_CHARTS = thread_info->dset_charts;
- const int DSET_DIMS = thread_info->dset_dims;
- time_t time_after, time_before, time_min, time_approx_min, time_max, duration;
- int i, j, update_every = 1;
- RRDSET *st;
- RRDDIM *rd;
- uint8_t same;
- time_t time_now, time_retrieved, end_time;
- collected_number generatedv;
- NETDATA_DOUBLE value, expected;
- struct storage_engine_query_handle seqh;
- size_t value_errors = 0, time_errors = 0;
-
- do {
- // pick a chart and dimension
- i = random() % DSET_CHARTS;
- st = thread_info->chart_threads[i]->st;
- j = random() % DSET_DIMS;
- rd = thread_info->chart_threads[i]->rd[j];
-
- time_min = thread_info->time_present - thread_info->history_seconds + 1;
- time_max = thread_info->chart_threads[i]->time_max;
-
- if (thread_info->delete_old_data) {
- /* A time window of twice the disk space is sufficient for compression space savings of up to 50% */
- time_approx_min = time_max - (default_rrdeng_disk_quota_mb * 2 * 1024 * 1024) /
- (((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number));
- time_min = MAX(time_min, time_approx_min);
- }
- if (!time_max) {
- time_before = time_after = time_min;
- } else {
- time_after = time_min + random() % (MAX(time_max - time_min, 1));
- duration = random() % 3600;
- time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */
- }
-
- storage_engine_query_init(rd->tiers[0].seb, rd->tiers[0].smh, &seqh, time_after, time_before, STORAGE_PRIORITY_NORMAL);
- ++thread_info->queries_nr;
- for (time_now = time_after ; time_now <= time_before ; time_now += update_every) {
- generatedv = generate_dbengine_chart_value(i, j, time_now);
- expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS));
-
- if (unlikely(storage_engine_query_is_finished(&seqh))) {
- if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
- fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
- ", found data gap, ### E R R O R ###\n",
- rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected);
- ++thread_info->errors;
- }
- break;
- }
-
- STORAGE_POINT sp = storage_engine_query_next_metric(&seqh);
- value = sp.sum;
- time_retrieved = sp.start_time_s;
- end_time = sp.end_time_s;
-
- if (!netdata_double_isnumber(value)) {
- if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
- fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
- ", found data gap, ### E R R O R ###\n",
- rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected);
- ++thread_info->errors;
- }
- break;
- }
- ++thread_info->queried_metrics_nr;
-
- same = (roundndd(value) == roundndd(expected)) ? 1 : 0;
- if (!same) {
- if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
- if(!value_errors)
- fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
- ", found " NETDATA_DOUBLE_FORMAT ", ### E R R O R ###\n",
- rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected, value);
- value_errors++;
- thread_info->errors++;
- }
- }
- if (end_time != time_now) {
- if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
- if(!time_errors)
- fprintf(stderr,
- " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
- rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, (unsigned long) time_retrieved);
- time_errors++;
- thread_info->errors++;
- }
- }
- }
- storage_engine_query_finalize(&seqh);
- } while(!thread_info->done);
-
- if(value_errors)
- fprintf(stderr, "%zu value errors encountered\n", value_errors);
-
- if(time_errors)
- fprintf(stderr, "%zu time errors encountered\n", time_errors);
-}
-
-void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
- unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB)
-{
- fprintf(stderr, "%s() running...\n", __FUNCTION__ );
- const unsigned DSET_DIMS = 128;
- const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
- const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 year of history */
- RRDHOST *host = NULL;
- struct dbengine_chart_thread **chart_threads;
- struct dbengine_query_thread **query_threads;
- unsigned i, j;
- time_t time_start, test_duration;
-
- nd_log_limits_unlimited();
-
- if (!TEST_DURATION_SEC)
- TEST_DURATION_SEC = 10;
- if (!DSET_CHARTS)
- DSET_CHARTS = 1;
- if (!QUERY_THREADS)
- QUERY_THREADS = 1;
- if (PAGE_CACHE_MB < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
- PAGE_CACHE_MB = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
-
- default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
- default_rrdeng_page_cache_mb = PAGE_CACHE_MB;
- if (DISK_SPACE_MB) {
- fprintf(stderr, "By setting disk space limit data are allowed to be deleted. "
- "Data validation is turned off for this run.\n");
- default_rrdeng_disk_quota_mb = DISK_SPACE_MB;
- } else {
- // Worst case for uncompressible data
- default_rrdeng_disk_quota_mb =
- (((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) / (1024 * 1024);
- default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
- }
-
- fprintf(stderr, "Initializing localhost with hostname 'dbengine-stress-test'\n");
-
- (void)sql_init_meta_database(DB_CHECK_NONE, 1);
- host = dbengine_rrdhost_find_or_create("dbengine-stress-test");
- if (NULL == host)
- return;
-
- chart_threads = mallocz(sizeof(*chart_threads) * DSET_CHARTS);
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- chart_threads[i] = mallocz(sizeof(*chart_threads[i]) + sizeof(RRDDIM *) * DSET_DIMS);
- }
- query_threads = mallocz(sizeof(*query_threads) * QUERY_THREADS);
- for (i = 0 ; i < QUERY_THREADS ; ++i) {
- query_threads[i] = mallocz(sizeof(*query_threads[i]) + sizeof(struct dbengine_chart_thread *) * DSET_CHARTS);
- }
- fprintf(stderr, "\nRunning DB-engine stress test, %u seconds writers ramp-up time,\n"
- "%u seconds of concurrent readers and writers, %u writer threads, %u reader threads,\n"
- "%u MiB of page cache.\n",
- RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB);
-
- time_start = now_realtime_sec() + HISTORY_SECONDS; /* move history to the future */
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- chart_threads[i]->host = host;
- chart_threads[i]->chartname = "random";
- chart_threads[i]->dset_charts = DSET_CHARTS;
- chart_threads[i]->chart_i = i;
- chart_threads[i]->dset_dims = DSET_DIMS;
- chart_threads[i]->history_seconds = HISTORY_SECONDS;
- chart_threads[i]->time_present = time_start;
- chart_threads[i]->time_max = 0;
- chart_threads[i]->done = 0;
- chart_threads[i]->errors = chart_threads[i]->stored_metrics_nr = 0;
- completion_init(&chart_threads[i]->charts_initialized);
- fatal_assert(0 == uv_thread_create(&chart_threads[i]->thread, generate_dbengine_chart, chart_threads[i]));
- }
- /* barrier so that subsequent queries can access valid chart data */
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- completion_wait_for(&chart_threads[i]->charts_initialized);
- completion_destroy(&chart_threads[i]->charts_initialized);
- }
- sleep(RAMP_UP_SECONDS);
- /* at this point data have already began being written to the database */
- for (i = 0 ; i < QUERY_THREADS ; ++i) {
- query_threads[i]->host = host;
- query_threads[i]->chartname = "random";
- query_threads[i]->dset_charts = DSET_CHARTS;
- query_threads[i]->dset_dims = DSET_DIMS;
- query_threads[i]->history_seconds = HISTORY_SECONDS;
- query_threads[i]->time_present = time_start;
- query_threads[i]->done = 0;
- query_threads[i]->errors = query_threads[i]->queries_nr = query_threads[i]->queried_metrics_nr = 0;
- for (j = 0 ; j < DSET_CHARTS ; ++j) {
- query_threads[i]->chart_threads[j] = chart_threads[j];
- }
- query_threads[i]->delete_old_data = DISK_SPACE_MB ? 1 : 0;
- fatal_assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i]));
- }
- sleep(TEST_DURATION_SEC);
- /* stop workload */
- for (i = 0 ; i < DSET_CHARTS ; ++i) {
- chart_threads[i]->done