summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--backends/backends.c3
-rw-r--r--daemon/unit_test.c354
-rw-r--r--database/README.md2
-rw-r--r--database/engine/README.md2
-rw-r--r--database/engine/pagecache.c284
-rw-r--r--database/engine/pagecache.h33
-rw-r--r--database/engine/rrdengine.c3
-rw-r--r--database/engine/rrdengineapi.c298
-rw-r--r--database/engine/rrdengineapi.h11
-rw-r--r--database/engine/rrdenginelib.h2
-rw-r--r--database/rrd.h7
-rw-r--r--database/rrddim.c3
-rw-r--r--web/api/queries/average/average.c9
-rw-r--r--web/api/queries/query.c876
-rw-r--r--web/api/queries/rrdr.h11
15 files changed, 1604 insertions, 294 deletions
diff --git a/backends/backends.c b/backends/backends.c
index 24f84b63c8..120c6e7033 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -131,7 +131,8 @@ calculated_number backend_calculate_value_from_stored_data(
}
*/
for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) {
- n = rd->state->query_ops.next_metric(&handle);
+ time_t curr_t;
+ n = rd->state->query_ops.next_metric(&handle, &curr_t);
if(unlikely(!does_storage_number_exist(n))) {
// not collected
diff --git a/daemon/unit_test.c b/daemon/unit_test.c
index f9b58ce6b8..1c84022c00 100644
--- a/daemon/unit_test.c
+++ b/daemon/unit_test.c
@@ -1581,34 +1581,12 @@ static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number
if(unlikely(v > rd->collected_value_max)) rd->collected_value_max = v;
}
-int test_dbengine(void)
+static RRDHOST *dbengine_rrdhost_find_or_create(char *name)
{
- const int CHARTS = 128;
- const int DIMS = 16; /* That gives us 2048 metrics */
- const int POINTS = 16384; /* This produces 128MiB of metric data */
- const int QUERY_BATCH = 4096;
- uint8_t same;
- int i, j, k, c, errors;
- RRDHOST *host = NULL;
- RRDSET *st[CHARTS];
- RRDDIM *rd[CHARTS][DIMS];
- char name[101];
- time_t time_now;
- collected_number last;
- struct rrddim_query_handle handle;
- calculated_number value, expected;
- storage_number n;
-
- error_log_limit_unlimited();
- fprintf(stderr, "\nRunning DB-engine test\n");
-
- default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
-
- debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'");
- host = rrdhost_find_or_create(
- "unittest-dbengine"
- , "unittest-dbengine"
- , "unittest-dbengine"
+ return rrdhost_find_or_create(
+ name
+ , name
+ , name
, os_type
, netdata_configured_timezone
, config_get(CONFIG_SECTION_BACKEND, "host tags", "")
@@ -1624,15 +1602,33 @@ int test_dbengine(void)
, default_rrdpush_send_charts_matching
, NULL
);
- if (NULL == host)
- return 1;
+}
+
+// costants for test_dbengine
+static const int CHARTS = 64;
+static const int DIMS = 16; // That gives us 64 * 16 = 1024 metrics
+#define REGIONS (3) // 3 regions of update_every
+// first region update_every is 2, second is 3, third is 1
+static const int REGION_UPDATE_EVERY[REGIONS] = {2, 3, 1};
+static const int REGION_POINTS[REGIONS] = {
+ 16384, // This produces 64MiB of metric data for the first region: update_every = 2
+ 16384, // This produces 64MiB of metric data for the second region: update_every = 3
+ 16384, // This produces 64MiB of metric data for the third region: update_every = 1
+};
+static const int QUERY_BATCH = 4096;
+
+static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
+ int update_every)
+{
+ int i, j;
+ char name[101];
for (i = 0 ; i < CHARTS ; ++i) {
snprintfz(name, 100, "dbengine-chart-%d", i);
// create the chart
st[i] = rrdset_create(host, "netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest",
- NULL, 1, 1, RRDSET_TYPE_LINE);
+ NULL, 1, update_every, RRDSET_TYPE_LINE);
rrdset_flag_set(st[i], RRDSET_FLAG_DEBUG);
rrdset_flag_set(st[i], RRDSET_FLAG_STORE_FIRST);
for (j = 0 ; j < DIMS ; ++j) {
@@ -1642,50 +1638,103 @@ int test_dbengine(void)
}
}
+ // Initialize DB with the very first entries
+ for (i = 0 ; i < CHARTS ; ++i) {
+ for (j = 0 ; j < DIMS ; ++j) {
+ rd[i][j]->last_collected_time.tv_sec =
+ st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = 2 * API_RELATIVE_TIME_MAX - 1;
+ rd[i][j]->last_collected_time.tv_usec =
+ st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
+ }
+ }
+ for (i = 0 ; i < CHARTS ; ++i) {
+ st[i]->usec_since_last_update = USEC_PER_SEC;
+
+ for (j = 0; j < DIMS; ++j) {
+ rrddim_set_by_pointer_fake_time(rd[i][j], 69, 2 * API_RELATIVE_TIME_MAX); // set first value to 69
+ }
+ rrdset_done(st[i]);
+ }
+ // Fluh pages for subsequent real values
+ for (i = 0 ; i < CHARTS ; ++i) {
+ for (j = 0; j < DIMS; ++j) {
+ rrdeng_store_metric_flush_current_page(rd[i][j]);
+ }
+ }
+}
+
+// Feeds the database region with test data, returns last timestamp of region
+static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
+ int current_region, time_t time_start)
+{
+ time_t time_now;
+ int i, j, c, update_every;
+ collected_number next;
+
+ update_every = REGION_UPDATE_EVERY[current_region];
+ time_now = time_start + update_every;
// feed it with the test data
- time_now = 1;
- last = 0;
for (i = 0 ; i < CHARTS ; ++i) {
for (j = 0 ; j < DIMS ; ++j) {
rd[i][j]->last_collected_time.tv_sec =
st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now;
rd[i][j]->last_collected_time.tv_usec =
- st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
+ st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
}
}
- for(c = 0; c < POINTS ; ++c) {
- ++time_now; // time_now = c + 2
+ for (c = 0; c < REGION_POINTS[current_region] ; ++c) {
+ time_now += update_every; // time_now = start + (c + 2) * update_every
for (i = 0 ; i < CHARTS ; ++i) {
- st[i]->usec_since_last_update = USEC_PER_SEC;
+ st[i]->usec_since_last_update = USEC_PER_SEC * update_every;
for (j = 0; j < DIMS; ++j) {
- last = i * DIMS * POINTS + j * POINTS + c;
- rrddim_set_by_pointer_fake_time(rd[i][j], last, time_now);
+ next = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
+ rrddim_set_by_pointer_fake_time(rd[i][j], next, time_now);
}
rrdset_done(st[i]);
}
}
+ return time_now; //time_end
+}
- // check the result
+// Checks the metric data for the given region, returns number of errors
+static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
+ int current_region, time_t time_start)
+{
+ uint8_t same;
+ time_t time_now, time_retrieved;
+ int i, j, k, c, errors, update_every;
+ collected_number last;
+ calculated_number value, expected;
+ storage_number n;
+ struct rrddim_query_handle handle;
+
+ update_every = REGION_UPDATE_EVERY[current_region];
errors = 0;
- for(c = 0; c < POINTS ; c += QUERY_BATCH) {
- time_now = c + 2;
+ // check the result
+ for (c = 0; c < REGION_POINTS[current_region] ; c += QUERY_BATCH) {
+ time_now = time_start + (c + 2) * update_every;
for (i = 0 ; i < CHARTS ; ++i) {
for (j = 0; j < DIMS; ++j) {
- rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH);
+ rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH * update_every);
for (k = 0; k < QUERY_BATCH; ++k) {
- last = i * DIMS * POINTS + j * POINTS + c + k;
+ last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k;
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
- n = rd[i][j]->state->query_ops.next_metric(&handle);
+ n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved);
value = unpack_storage_number(n);
same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
if(!same) {
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
- st[i]->name, rd[i][j]->name, (unsigned long)time_now + k, expected, value);
+ st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, expected, value);
+ errors++;
+ }
+ if(time_retrieved != time_now + k * update_every) {
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
+ st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, (unsigned long)time_retrieved);
errors++;
}
}
@@ -1693,7 +1742,184 @@ int test_dbengine(void)
}
}
}
+ return errors;
+}
+
+// Check rrdr transformations
+static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
+ int current_region, time_t time_start, time_t time_end)
+{
+ uint8_t same;
+ time_t time_now, time_retrieved;
+ int i, j, errors, update_every;
+ long c;
+ collected_number last;
+ calculated_number value, expected;
+
+ errors = 0;
+ update_every = REGION_UPDATE_EVERY[current_region];
+ long points = (time_end - time_start) / update_every - 1;
+ for (i = 0 ; i < CHARTS ; ++i) {
+ RRDR *r = rrd2rrdr(st[i], points, time_start + update_every, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL);
+ if (!r) {
+ fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name);
+ return ++errors;
+ } else {
+ assert(r->st == st[i]);
+ for (c = 0; c != rrdr_rows(r) ; ++c) {
+ RRDDIM *d;
+ time_now = time_start + (c + 2) * update_every;
+ time_retrieved = r->t[c];
+
+ // for each dimension
+ for (j = 0, d = r->st->dimensions ; d && j < r->d ; ++j, d = d->next) {
+ calculated_number *cn = &r->v[ c * r->d ];
+ value = cn[j];
+ assert(rd[i][j] == d);
+
+ last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
+ expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
+
+ same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
+ if(!same) {
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
+ CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
+ st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value);
+ errors++;
+ }
+ if(time_retrieved != time_now) {
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
+ st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved);
+ errors++;
+ }
+ }
+ }
+ rrdr_free(r);
+ }
+ }
+ return errors;
+}
+
+int test_dbengine(void)
+{
+ int i, j, errors, update_every, current_region;
+ RRDHOST *host = NULL;
+ RRDSET *st[CHARTS];
+ RRDDIM *rd[CHARTS][DIMS];
+ time_t time_start[REGIONS], time_end[REGIONS];
+
+ error_log_limit_unlimited();
+ fprintf(stderr, "\nRunning DB-engine test\n");
+
+ default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
+
+ debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'");
+ host = dbengine_rrdhost_find_or_create("unittest-dbengine");
+ if (NULL == host)
+ return 1;
+
+ current_region = 0; // this is the first region of data
+ update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 2 seconds
+ test_dbengine_create_charts(host, st, rd, update_every);
+
+ time_start[current_region] = 2 * API_RELATIVE_TIME_MAX;
+ time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
+
+ errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
+ if (errors)
+ goto error_out;
+
+ current_region = 1; //this is the second region of data
+ update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 3 seconds
+ // Align pages for frequency change
+ for (i = 0 ; i < CHARTS ; ++i) {
+ st[i]->update_every = update_every;
+ for (j = 0; j < DIMS; ++j) {
+ rrdeng_store_metric_flush_current_page(rd[i][j]);
+ }
+ }
+
+ time_start[current_region] = time_end[current_region - 1] + update_every;
+ if (0 != time_start[current_region] % update_every) // align to update_every
+ time_start[current_region] += update_every - time_start[current_region] % update_every;
+ time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
+
+ errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
+ if (errors)
+ goto error_out;
+
+ current_region = 2; //this is the third region of data
+ update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 1 seconds
+ // Align pages for frequency change
+ for (i = 0 ; i < CHARTS ; ++i) {
+ st[i]->update_every = update_every;
+ for (j = 0; j < DIMS; ++j) {
+ rrdeng_store_metric_flush_current_page(rd[i][j]);
+ }
+ }
+
+ time_start[current_region] = time_end[current_region - 1] + update_every;
+ if (0 != time_start[current_region] % update_every) // align to update_every
+ time_start[current_region] += update_every - time_start[current_region] % update_every;
+ time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
+ errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
+ if (errors)
+ goto error_out;
+
+ for (current_region = 0 ; current_region < REGIONS ; ++current_region) {
+ errors = test_dbengine_check_rrdr(st, rd, current_region, time_start[current_region], time_end[current_region]);
+ if (errors)
+ goto error_out;
+ }
+
+ current_region = 1;
+ update_every = REGION_UPDATE_EVERY[current_region]; // use the maximum update_every = 3
+ errors = 0;
+ long points = (time_end[REGIONS - 1] - time_start[0]) / update_every - 1; // cover all time regions with RRDR
+ long point_offset = (time_start[current_region] - time_start[0]) / update_every;
+ for (i = 0 ; i < CHARTS ; ++i) {
+ RRDR *r = rrd2rrdr(st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL);
+ if (!r) {
+ fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name);
+ ++errors;
+ } else {
+ long c;
+
+ assert(r->st == st[i]);
+ // test current region values only, since they must be left unchanged
+ for (c = point_offset ; c < point_offset + rrdr_rows(r) / REGIONS / 2 ; ++c) {
+ RRDDIM *d;
+ time_t time_now = time_start[current_region] + (c - point_offset + 2) * update_every;
+ time_t time_retrieved = r->t[c];
+
+ // for each dimension
+ for(j = 0, d = r->st->dimensions ; d && j < r->d ; ++j, d = d->next) {
+ calculated_number *cn = &r->v[ c * r->d ];
+ calculated_number value = cn[j];
+ assert(rd[i][j] == d);
+
+ collected_number last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c - point_offset;
+ calculated_number expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
+
+ uint8_t same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
+ if(!same) {
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
+ CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
+ st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value);
+ errors++;
+ }
+ if(time_retrieved != time_now) {
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
+ st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved);
+ errors++;
+ }
+ }
+ }
+ rrdr_free(r);
+ }
+ }
+error_out:
rrdeng_exit(host->rrdeng_ctx);
rrd_wrlock();
rrdhost_delete_charts(host);
@@ -1704,43 +1930,25 @@ int test_dbengine(void)
void generate_dbengine_dataset(unsigned history_seconds)
{
- const int DIMS = 128;
+ const int DSET_DIMS = 128;
const uint64_t EXPECTED_COMPRESSION_RATIO = 94;
- int j;
+ int j, update_every = 1;
RRDHOST *host = NULL;
RRDSET *st;
- RRDDIM *rd[DIMS];
+ RRDDIM *rd[DSET_DIMS];
char name[101];
time_t time_current, time_present;
default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
default_rrdeng_page_cache_mb = 128;
- /* Worst case for uncompressible data */
- default_rrdeng_disk_quota_mb = (((uint64_t)DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024);
+ // Worst case for uncompressible data
+ default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024);
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
error_log_limit_unlimited();
debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-dataset'");
- host = rrdhost_find_or_create(
- "dbengine-dataset"
- , "dbengine-dataset"
- , "dbengine-dataset"
- , os_type
- , netdata_configured_timezone
- , config_get(CONFIG_SECTION_BACKEND, "host tags", "")
- , program_name
- , program_version
- , default_rrd_update_every
- , default_rrd_history_entries
- , RRD_MEMORY_MODE_DBENGINE
- , default_health_enabled
- , default_rrdpush_enabled
- , default_rrdpush_destination
- , default_rrdpush_api_key
- , default_rrdpush_send_charts_matching
- , NULL
- );
+ host = dbengine_rrdhost_find_or_create("dbengine-dataset");
if (NULL == host)
return;
@@ -1748,8 +1956,8 @@ void generate_dbengine_dataset(unsigned history_seconds)
// create the chart
st = rrdset_create(host, "example", "random", "random", "example", NULL, "random", "random", "random",
- NULL, 1, 1, RRDSET_TYPE_LINE);
- for (j = 0 ; j < DIMS ; ++j) {
+ NULL, 1, update_every, RRDSET_TYPE_LINE);
+ for (j = 0 ; j < DSET_DIMS ; ++j) {
snprintfz(name, 100, "random%d", j);
rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -1758,7 +1966,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
time_present = now_realtime_sec();
// feed it with the test data
time_current = time_present - history_seconds;
- for (j = 0 ; j < DIMS ; ++j) {
+ for (j = 0 ; j < DSET_DIMS ; ++j) {
rd[j]->last_collected_time.tv_sec =
st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current;
rd[j]->last_collected_time.tv_usec =
@@ -1767,7 +1975,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
for( ; time_current < time_present; ++time_current) {
st->usec_since_last_update = USEC_PER_SEC;
- for (j = 0; j < DIMS; ++j) {
+ for (j = 0; j < DSET_DIMS; ++j) {
rrddim_set_by_pointer_fake_time(rd[j], (time_current + j) % 128, time_current);
}
rrdset_done(st);
diff --git a/database/README.md b/database/README.md
index 90ae502012..2fcb69b679 100644
--- a/database/README.md
+++ b/database/README.md
@@ -47,6 +47,8 @@ Currently Netdata supports 6 memory modes:
database. There is some amount of RAM dedicated to data caching and indexing and the rest of
the data reside compressed on disk. The number of history entries is not fixed in this case,
but depends on the configured disk space and the effective compression ratio of the data stored.
+ This is the **only mode** that supports changing the data collection update frequency
+ (`update_every`) **without losing** the previously stored metrics.
For more details see [here](engine/).
You can select the memory mode by editing `netdata.conf` and setting:
diff --git a/database/engine/README.md b/database/engine/README.md
index eb6d5a3c54..7791a549f8 100644
--- a/database/engine/README.md
+++ b/database/engine/README.md
@@ -4,6 +4,8 @@ The Database Engine works like a traditional
database. There is some amount of RAM dedicated to data caching and indexing and the rest of
the data reside compressed on disk. The number of history entries is not fixed in this case,
but depends on the configured disk space and the effective compression ratio of the data stored.
+This is the **only mode** that supports changing the data collection update frequency
+(`update_every`) **without losing** the previously stored metrics.
## Files
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 124f2448b1..1bd4c94361 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -419,6 +419,35 @@ static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec
return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
}
+/* The caller must hold the page index lock */
+static inline struct rrdeng_page_descr *
+ find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
+{
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ Index = (Word_t)(start_time / USEC_PER_SEC);
+ PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
+ if (likely(NULL != PValue)) {
+ descr = *PValue;
+ if (is_page_in_time_range(descr, start_time, end_time)) {
+ return descr;
+ }
+ }
+
+ return NULL;
+}
+
/* Update metric oldest and latest timestamps efficiently when adding new values */
void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
{
@@ -510,70 +539,144 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
-/*
- * Searches for a page and triggers disk I/O if necessary and possible.
+usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ struct pg_cache_page_index *page_index;
+
+ uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
+ PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
+ if (likely(NULL != PValue)) {
+ page_index = *PValue;
+ }
+ uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (NULL == PValue) {
+ return INVALID_TIME;
+ }
+
+ uv_rwlock_rdlock(&page_index->lock);
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ return INVALID_TIME;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+ return descr->start_time;
+}
+
+/**
+ * Return page information for the first page before point_in_time that satisfies the filter.
+ * @param ctx DB context
+ * @param page_index page index of a metric
+ * @param point_in_time the pages that are searched must be older than this timestamp
+ * @param filter decides if the page satisfies the caller's criteria
+ * @param page_info the result of the search is set in this pointer
+ */
+void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
+ usec_t point_in_time, pg_cache_page_info_filter_t *filter,
+ struct rrdeng_page_info *page_info)
+{
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ struct rrdeng_page_descr *descr = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ (void)pg_cache;
+ assert(NULL != page_index);
+
+ Index = (Word_t)(point_in_time / USEC_PER_SEC);
+ uv_rwlock_rdlock(&page_index->lock);
+ do {
+ PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+ } while (descr != NULL && !filter(descr));
+ if (unlikely(NULL == descr)) {
+ page_info->page_length = 0;
+ page_info->start_time = INVALID_TIME;
+ page_info->end_time = INVALID_TIME;
+ } else {
+ page_info->page_length = descr->page_length;
+ page_info->start_time = descr->start_time;
+ page_info->end_time = descr->end_time;
+ }
+ uv_rwlock_rdunlock(&page_index->lock);
+}
+/**
+ * Searches for pages in a time range and triggers disk I/O if necessary and possible.
* Does not get a reference.
- * Returns page index pointer for given metric UUID.
+ * @param ctx DB context
+ * @param id UUID
+ * @param start_time inclusive starting time in usec
+ * @param end_time inclusive ending time in usec
+ * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
+ * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
+ * If page_info_arrayp is set to NULL nothing was allocated.
+ * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
+ * @return the number of pages that overlap with the time range [start_time,end_time].
*/
-struct pg_cache_page_index *
- pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
+unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
+ struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
struct page_cache_descr *pg_cache_descr = NULL;
- int i, j, k, count, found;
+ unsigned i, j, k, preload_count, count, page_info_array_max_size;
unsigned long flags;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
Word_t Index;
uint8_t failed_to_reserve;
+ assert(NULL != ret_page_indexp);
+
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
if (likely(NULL != PValue)) {
- page_index = *PValue;
+ *ret_page_indexp = page_index = *PValue;
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
if (NULL == PValue) {
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- return NULL;
+ *ret_page_indexp = NULL;
+ return 0;
}
uv_rwlock_rdlock(&page_index->lock);
- /* Find first page in range */
- found = 0;
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- found = 1;
- }
- }
- if (!found) {
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- found = 1;
- }
- }
- }
- if (!found) {
+ descr = find_first_page_in_time_range(page_index, start_time, end_time);
+ if (NULL == descr) {
uv_rwlock_rdunlock(&page_index->lock);
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- return page_index;
+ *ret_page_indexp = NULL;
+ return 0;
+ } else {
+ Index = (Word_t)(descr->start_time / USEC_PER_SEC);
+ }
+ if (page_info_arrayp) {
+ page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = mallocz(page_info_array_max_size);
}
- for (count = 0 ;
- descr != NULL && is_page_in_time_range(descr, start_time, end_time);
+ for (count = 0, preload_count = 0 ;
+ descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
/* Iterate all pages in range */
if (unlikely(0 == descr->page_length))
continue;
+ if (page_info_arrayp) {
+ if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
+ page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
+ *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
+ }
+ (*page_info_arrayp)[count].start_time = descr->start_time;
+ (*page_info_arrayp)[count].end_time = descr->end_time;
+ (*page_info_arrayp)[count].page_length = descr->page_length;
+ }
+ ++count;
+
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
flags = pg_cache_descr->flags;
@@ -586,8 +689,8 @@ struct pg_cache_page_index *
}
}
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- preload_array[count++] = descr;
- if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
+ preload_array[preload_count++] = descr;
+ if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
rrdeng_page_descr_mutex_unlock(ctx, descr);
break;
}
@@ -598,7 +701,7 @@ struct pg_cache_page_index *
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
- for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
+ for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
struct rrdeng_cmd cmd;
struct rrdeng_page_descr *next;
@@ -614,7 +717,7 @@ struct pg_cache_page_index *
cmd.read_extent.page_cache_descr[0] = descr;
/* don't use this page again */
preload_array[i] = NULL;
- for (j = 0, k = 1 ; j < count ; ++j) {
+ for (j = 0, k = 1 ; j < preload_count ; ++j) {
next = preload_array[j];
if (NULL == next) {
continue;
@@ -635,7 +738,7 @@ struct pg_cache_page_index *
}
if (failed_to_reserve) {
debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
- for (i = 0 ; i < count ; ++i) {
+ for (i = 0 ; i < preload_count ; ++i) {
descr = preload_array[i];
if (NULL == descr) {
continue;
@@ -643,11 +746,15 @@ struct pg_cache_page_index *
pg_cache_put(ctx, descr);
}
}
- if (!count) {
+ if (!preload_count) {
/* no such page */
debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
}
- return page_index;
+ if (unlikely(0 == count && page_info_arrayp)) {
+ freez(*page_info_arrayp);
+ *page_info_arrayp = NULL;
+ }
+ return count;
}