summaryrefslogtreecommitdiffstats
path: root/web
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /web
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'web')
-rw-r--r--web/api/queries/query.c189
-rw-r--r--web/api/web_api_v1.c2
-rw-r--r--web/server/web_client.c6
3 files changed, 109 insertions, 88 deletions
diff --git a/web/api/queries/query.c b/web/api/queries/query.c
index b6a27755bf..052ab944d7 100644
--- a/web/api/queries/query.c
+++ b/web/api/queries/query.c
@@ -718,16 +718,22 @@ static size_t query_metric_first_working_tier(QUERY_METRIC *qm) {
return 0;
}
-static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_t, time_t db_update_every, time_t after_wanted, time_t before_wanted, size_t points_wanted) {
+static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_t, time_t db_update_every, time_t after_wanted, time_t before_wanted, size_t points_wanted, size_t tier __maybe_unused) {
if(db_first_t == 0 || db_last_t == 0 || db_update_every == 0)
return -LONG_MAX;
time_t common_first_t = MAX(db_first_t, after_wanted);
time_t common_last_t = MIN(db_last_t, before_wanted);
+ long time_coverage = (common_last_t - common_first_t) * 1000000 / (before_wanted - after_wanted);
+ size_t points_wanted_in_coverage = points_wanted * time_coverage / 1000000;
+
long points_available = (common_last_t - common_first_t) / db_update_every;
- long points_delta = (long)(points_available - points_wanted);
- long points_coverage = (points_delta < 0) ? (long)(points_available * 1000 / points_wanted): 1000;
+ long points_delta = (long)(points_available - points_wanted_in_coverage);
+ long points_coverage = (points_delta < 0) ? (long)(points_available * time_coverage / points_wanted_in_coverage) : time_coverage;
+
+ // a way to benefit higher tiers
+ // points_coverage += (long)tier * 10000;
if(points_available <= 0)
return -LONG_MAX;
@@ -757,7 +763,7 @@ static size_t query_metric_best_tier_for_timeframe(QUERY_METRIC *qm, time_t afte
continue;
}
- weight[tier] = query_plan_points_coverage_weight(first_t, last_t, update_every, after_wanted, before_wanted, points_wanted);
+ weight[tier] = query_plan_points_coverage_weight(first_t, last_t, update_every, after_wanted, before_wanted, points_wanted, tier);
}
size_t best_tier = 0;
@@ -813,7 +819,7 @@ static size_t rrddim_find_best_tier_for_timeframe(QUERY_TARGET *qt, time_t after
common_update_every = MIN(update_every, common_update_every);
}
- weight[tier] = query_plan_points_coverage_weight(common_first_t, common_last_t, common_update_every, after_wanted, before_wanted, points_wanted);
+ weight[tier] = query_plan_points_coverage_weight(common_first_t, common_last_t, common_update_every, after_wanted, before_wanted, points_wanted, tier);
}
size_t best_tier = 0;
@@ -1094,8 +1100,11 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before
qsort(&ops->plan.data, ops->plan.entries, sizeof(QUERY_PLAN_ENTRY), compare_query_plan_entries_on_start_time);
// make sure it has the whole timeframe we need
- ops->plan.data[0].after = after_wanted;
- ops->plan.data[ops->plan.entries - 1].before = before_wanted;
+ if(ops->plan.data[0].after < after_wanted)
+ ops->plan.data[0].after = after_wanted;
+
+ if(ops->plan.data[ops->plan.entries - 1].before > before_wanted)
+ ops->plan.data[ops->plan.entries - 1].before = before_wanted;
//buffer_sprintf(wb, ": FINAL STEPS %zu", ops->plan.entries);
@@ -1191,15 +1200,18 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
time_t now_start_time = after_wanted - ops.query_granularity;
time_t now_end_time = after_wanted + ops.view_update_every - ops.query_granularity;
+ size_t db_points_read_since_plan_switch = 0; (void)db_points_read_since_plan_switch;
+
// The main loop, based on the query granularity we need
for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops.view_update_every) {
- if(query_plan_should_switch_plan(ops, now_end_time))
+ if(unlikely(query_plan_should_switch_plan(ops, now_end_time))) {
query_planer_next_plan(&ops, now_end_time, new_point.end_time);
+ db_points_read_since_plan_switch = 0;
+ }
// read all the points of the db, prior to the time we need (now_end_time)
-
size_t count_same_end_time = 0;
while(count_same_end_time < 100) {
if(likely(count_same_end_time == 0)) {
@@ -1223,6 +1235,7 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
// fetch the new point
{
+ db_points_read_since_plan_switch++;
STORAGE_POINT sp = ops.next_metric(&ops.handle);
ops.db_points_read_per_tier[ops.tier]++;
@@ -1280,9 +1293,10 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
// check if the db is advancing the query
if(unlikely(new_point.end_time <= last1_point.end_time)) {
- internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld to %ld, before the last point %zu end time %ld, now is %ld to %ld",
+ internal_error(db_points_read_since_plan_switch > 1,
+ "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld to %ld, before the last point %zu from %ld to %ld, now is %ld to %ld",
qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time,
- last1_point.id, last1_point.end_time, now_start_time, now_end_time);
+ last1_point.id, last1_point.start_time, last1_point.end_time, now_start_time, now_end_time);
count_same_end_time++;
continue;
@@ -1350,35 +1364,35 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
current_point = new_point;
query_interpolate_point(current_point, last1_point, now_end_time);
- internal_error(current_point.id > 0
- && last1_point.id == 0
- && current_point.end_time > after_wanted
- && current_point.end_time > now_end_time,
- "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld,"
- " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld,"
- " but we could really favor by having last_point1 in this query.",
- qt->id, string2str(qm->dimension.id),
- after_wanted, before_wanted,
- ops.view_update_every, ops.query_granularity,
- current_point.id, current_point.start_time, current_point.end_time,
- now_end_time);
+// internal_error(current_point.id > 0
+// && last1_point.id == 0
+// && current_point.end_time > after_wanted
+// && current_point.end_time > now_end_time,
+// "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld,"
+// " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld,"
+// " but we could really favor by having last_point1 in this query.",
+// qt->id, string2str(qm->dimension.id),
+// after_wanted, before_wanted,
+// ops.view_update_every, ops.query_granularity,
+// current_point.id, current_point.start_time, current_point.end_time,
+// now_end_time);
}
else if(likely(now_end_time <= last1_point.end_time)) {
// our LAST point is still valid
current_point = last1_point;
query_interpolate_point(current_point, last2_point, now_end_time);
- internal_error(current_point.id > 0
- && last2_point.id == 0
- && current_point.end_time > after_wanted
- && current_point.end_time > now_end_time,
- "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld,"
- " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld,"
- " but we could really favor by having last_point2 in this query.",
- qt->id, string2str(qm->dimension.id),
- after_wanted, before_wanted, ops.view_update_every, ops.query_granularity,
- current_point.id, current_point.start_time, current_point.end_time,
- now_end_time);
+// internal_error(current_point.id > 0
+// && last2_point.id == 0
+// && current_point.end_time > after_wanted
+// && current_point.end_time > now_end_time,
+// "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld,"
+// " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld,"
+// " but we could really favor by having last_point2 in this query.",
+// qt->id, string2str(qm->dimension.id),
+// after_wanted, before_wanted, ops.view_update_every, ops.query_granularity,
+// current_point.id, current_point.start_time, current_point.end_time,
+// now_end_time);
}
else {
// a GAP, we don't have a value this time
@@ -1647,6 +1661,21 @@ bool rrdr_relative_window_to_absolute(time_t *after, time_t *before) {
after_requested -= delta;
}
+ time_t absolute_minimum_time = now - (10 * 365 * 86400);
+ time_t absolute_maximum_time = now + (1 * 365 * 86400);
+
+ if (after_requested < absolute_minimum_time && !unittest_running)
+ after_requested = absolute_minimum_time;
+
+ if (after_requested > absolute_maximum_time && !unittest_running)
+ after_requested = absolute_maximum_time;
+
+ if (before_requested < absolute_minimum_time && !unittest_running)
+ before_requested = absolute_minimum_time;
+
+ if (before_requested > absolute_maximum_time && !unittest_running)
+ before_requested = absolute_maximum_time;
+
*before = before_requested;
*after = after_requested;
@@ -1836,6 +1865,11 @@ bool query_target_calculate_window(QUERY_TARGET *qt) {
query_debug_log(":max points_wanted %zu", points_wanted);
}
+ if(points_wanted > 86400 && !unittest_running) {
+ points_wanted = 86400;
+ query_debug_log(":absolute max points_wanted %zu", points_wanted);
+ }
+
// calculate the desired grouping of source data points
size_t group = points_available / points_wanted;
if (group == 0) group = 1;
@@ -1971,37 +2005,23 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
return NULL;
}
- time_t timeout = qt->request.timeout;
- time_t resampling_time_requested = qt->request.resampling_time;
- time_t after_requested = qt->request.after;
- time_t before_requested = qt->request.before;
- size_t points_requested = qt->request.points;
-
- RRDR_OPTIONS options = qt->window.options;
- size_t points_wanted = qt->window.points;
- time_t after_wanted = qt->window.after;
- time_t before_wanted = qt->window.before;
- bool relative_period_requested = qt->window.relative;
- bool aligned = qt->window.aligned;
- RRDR_GROUPING group_method = qt->window.group_method;
- size_t group = qt->window.group;
- size_t resampling_group = qt->window.resampling_group;
- time_t query_granularity = qt->window.query_granularity;
+ // qt.window members are the WANTED ones.
+ // qt.request members are the REQUESTED ones.
RRDR *r = rrdr_create(owa, qt);
if(unlikely(!r)) {
internal_error(true, "QUERY: cannot create RRDR for %s, after=%ld, before=%ld, points=%zu",
- qt->id, after_wanted, before_wanted, points_wanted);
+ qt->id, qt->window.after, qt->window.before, qt->window.points);
return NULL;
}
- if(unlikely(!r->d || !points_wanted)) {
+ if(unlikely(!r->d || !qt->window.points)) {
internal_error(true, "QUERY: returning empty RRDR (no dimensions in RRDSET) for %s, after=%ld, before=%ld, points=%zu",
- qt->id, after_wanted, before_wanted, points_wanted);
+ qt->id, qt->window.after, qt->window.before, qt->window.points);
return r;
}
- if(relative_period_requested)
+ if(qt->window.relative)
r->result_options |= RRDR_RESULT_OPTION_RELATIVE;
else
r->result_options |= RRDR_RESULT_OPTION_ABSOLUTE;
@@ -2034,7 +2054,8 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
long dimensions_used = 0, dimensions_nonzero = 0;
struct timeval query_start_time;
struct timeval query_current_time;
- if (timeout) now_realtime_timeval(&query_start_time);
+ if (qt->request.timeout)
+ now_realtime_timeval(&query_start_time);
for(size_t c = 0, max = qt->query.used; c < max ; c++) {
// set the query target dimension options to rrdr
@@ -2046,7 +2067,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
r->internal.grouping_reset(r);
rrd2rrdr_do_dimension(r, c);
- if (timeout)
+ if (qt->request.timeout)
now_realtime_timeval(&query_current_time);
if(r->od[c] & RRDR_DIMENSION_NONZERO)
@@ -2082,9 +2103,9 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
}
dimensions_used++;
- if (timeout && ((NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0) > (NETDATA_DOUBLE)timeout) {
+ if (qt->request.timeout && ((NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0) > (NETDATA_DOUBLE)qt->request.timeout) {
log_access("QUERY CANCELED RUNTIME EXCEEDED %0.2f ms (LIMIT %lld ms)",
- (NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)timeout);
+ (NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)qt->request.timeout);
r->result_options |= RRDR_RESULT_OPTION_CANCEL;
break;
}
@@ -2093,44 +2114,44 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
#ifdef NETDATA_INTERNAL_CHECKS
if (dimensions_used) {
if(r->internal.log)
- rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group,
- after_wanted, after_requested, before_wanted, before_requested,
- points_requested, points_wanted, /*after_slot, before_slot,*/
+ rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group,
+ qt->window.after, qt->request.after, qt->window.before, qt->request.before,
+ qt->request.points, qt->window.points, /*after_slot, before_slot,*/
r->internal.log);
- if(r->rows != points_wanted)
- rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group,
- after_wanted, after_requested, before_wanted, before_requested,
- points_requested, points_wanted, /*after_slot, before_slot,*/
+ if(r->rows != qt->window.points)
+ rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group,
+ qt->window.after, qt->request.after, qt->window.before, qt->request.before,
+ qt->request.points, qt->window.points, /*after_slot, before_slot,*/
"got 'points' is not wanted 'points'");
- if(aligned && (r->before % (group * query_granularity)) != 0)
- rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group,
- after_wanted, after_requested, before_wanted,before_wanted,
- points_requested, points_wanted, /*after_slot, before_slot,*/
+ if(qt->window.aligned && (r->before % (qt->window.group * qt->window.query_granularity)) != 0)
+ rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group,
+ qt->window.after, qt->request.after, qt->window.before,qt->request.before,
+ qt->request.points, qt->window.points, /*after_slot, before_slot,*/
"'before' is not aligned but alignment is required");
// 'after' should not be aligned, since we start inside the first group
- //if(aligned && (r->after % group) != 0)
- // rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required");
+ //if(qt->window.aligned && (r->after % group) != 0)
+ // rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required");
- if(r->before != before_wanted)
- rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group,
- after_wanted, after_requested, before_wanted, before_requested,
- points_requested, points_wanted, /*after_slot, before_slot,*/
+ if(r->before != qt->window.before)
+ rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group,
+ qt->window.after, qt->request.after, qt->window.before, qt->request.before,
+ qt->request.points, qt->window.points, /*after_slot, before_slot,*/
"chart is not aligned to requested 'before'");
- if(r->before != before_wanted)
- rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group,
- after_wanted, after_requested, before_wanted, before_requested,
- points_requested, points_wanted, /*after_slot, before_slot,*/
+ if(r->before != qt->window.before)
+ rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group,
+ qt->window.after, qt->request.after, qt->window.before, qt->request.before,
+ qt->request.points, qt->window.points, /*after_slot, before_slot,*/
"got 'before' is not wanted 'before'");
// reported 'after' varies, depending on group
- if(r->after != after_wanted)
- rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group,
- after_wanted, after_requested, before_wanted, before_requested,
- points_requested, points_wanted, /*after_slot, before_slot,*/
+ if(r->after != qt->window.after)
+ rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group,
+ qt->window.after, qt->request.after, qt->window.before, qt->request.before,
+ qt->request.points, qt->window.points, /*after_slot, before_slot,*/
"got 'after' is not wanted 'after'");
}
@@ -2140,7 +2161,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
r->internal.grouping_free(r);
// when all the dimensions are zero, we should return all of them
- if(unlikely(options & RRDR_OPTION_NONZERO && !dimensions_nonzero && !(r->result_options & RRDR_RESULT_OPTION_CANCEL))) {
+ if(unlikely((qt->window.options & RRDR_OPTION_NONZERO) && !dimensions_nonzero && !(r->result_options & RRDR_RESULT_OPTION_CANCEL))) {
// all the dimensions are zero
// mark them as NONZERO to send them all
for(size_t c = 0, max = qt->query.used; c < max ; c++) {
diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c
index e8ca176a9d..c93da27a80 100644
--- a/web/api/web_api_v1.c
+++ b/web/api/web_api_v1.c
@@ -1226,7 +1226,7 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
#ifdef ENABLE_COMPRESSION
if(host->sender){
buffer_strcat(wb, "\t\"stream-compression\": ");
- buffer_strcat(wb, (host->sender->flags & SENDER_FLAG_COMPRESSION) ? "true" : "false");
+ buffer_strcat(wb, stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) ? "true" : "false");
buffer_strcat(wb, ",\n");
}else{
buffer_strcat(wb, "\t\"stream-compression\": null,\n");
diff --git a/web/server/web_client.c b/web/server/web_client.c
index ff87041836..b569360f48 100644
--- a/web/server/web_client.c
+++ b/web/server/web_client.c
@@ -1056,7 +1056,7 @@ static inline ssize_t web_client_send_data(struct web_client *w,const void *buf,
#ifdef ENABLE_HTTPS
if ( (!web_client_check_unix(w)) && (netdata_ssl_srv_ctx) ) {
if ( ( w->ssl.conn ) && ( !w->ssl.flags ) ){
- bytes = SSL_write(w->ssl.conn,buf, len) ;
+ bytes = netdata_ssl_write(w->ssl.conn, buf, len) ;
} else {
bytes = send(w->ofd,buf, len , flags);
}
@@ -1213,7 +1213,7 @@ static inline void web_client_send_http_header(struct web_client *w) {
#ifdef ENABLE_HTTPS
if ( (!web_client_check_unix(w)) && (netdata_ssl_srv_ctx) ) {
if ( ( w->ssl.conn ) && ( !w->ssl.flags ) ){
- while((bytes = SSL_write(w->ssl.conn, buffer_tostring(w->response.header_output), buffer_strlen(w->response.header_output))) < 0) {
+ while((bytes = netdata_ssl_write(w->ssl.conn, buffer_tostring(w->response.header_output), buffer_strlen(w->response.header_output))) < 0) {
count++;
if(count > 100 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
error("Cannot send HTTPS headers to web client.");
@@ -1909,7 +1909,7 @@ ssize_t web_client_receive(struct web_client *w)
#ifdef ENABLE_HTTPS
if ( (!web_client_check_unix(w)) && (netdata_ssl_srv_ctx) ) {
if ( ( w->ssl.conn ) && (!w->ssl.flags)) {
- bytes = SSL_read(w->ssl.conn, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1));
+ bytes = netdata_ssl_read(w->ssl.conn, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1));
}else {
bytes = recv(w->ifd, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1), MSG_DONTWAIT);
}