From 284f6f3aa4f36cefad2601c490510621496c2b53 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Sun, 20 Nov 2022 23:47:53 +0200 Subject: streaming compression, query planner and replication fixes (#14023) * streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again --- web/api/queries/query.c | 189 +++++++++++++++++++++++++++--------------------- web/api/web_api_v1.c | 2 +- web/server/web_client.c | 6 +- 3 files changed, 109 insertions(+), 88 deletions(-) (limited to 'web') diff --git a/web/api/queries/query.c b/web/api/queries/query.c index b6a27755bf..052ab944d7 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -718,16 +718,22 @@ static size_t query_metric_first_working_tier(QUERY_METRIC *qm) { return 0; } -static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_t, time_t db_update_every, time_t after_wanted, time_t before_wanted, size_t points_wanted) { +static long query_plan_points_coverage_weight(time_t db_first_t, time_t db_last_t, time_t db_update_every, time_t after_wanted, time_t before_wanted, size_t points_wanted, size_t tier __maybe_unused) { if(db_first_t == 0 || db_last_t == 0 || db_update_every == 0) return -LONG_MAX; time_t common_first_t = MAX(db_first_t, after_wanted); time_t common_last_t = MIN(db_last_t, before_wanted); + long time_coverage = (common_last_t - common_first_t) * 1000000 / (before_wanted - after_wanted); + size_t points_wanted_in_coverage = points_wanted * time_coverage / 1000000; + long points_available = (common_last_t - common_first_t) / db_update_every; - long points_delta = (long)(points_available - points_wanted); - long points_coverage = (points_delta < 0) ? (long)(points_available * 1000 / points_wanted): 1000; + long points_delta = (long)(points_available - points_wanted_in_coverage); + long points_coverage = (points_delta < 0) ? (long)(points_available * time_coverage / points_wanted_in_coverage) : time_coverage; + + // a way to benefit higher tiers + // points_coverage += (long)tier * 10000; if(points_available <= 0) return -LONG_MAX; @@ -757,7 +763,7 @@ static size_t query_metric_best_tier_for_timeframe(QUERY_METRIC *qm, time_t afte continue; } - weight[tier] = query_plan_points_coverage_weight(first_t, last_t, update_every, after_wanted, before_wanted, points_wanted); + weight[tier] = query_plan_points_coverage_weight(first_t, last_t, update_every, after_wanted, before_wanted, points_wanted, tier); } size_t best_tier = 0; @@ -813,7 +819,7 @@ static size_t rrddim_find_best_tier_for_timeframe(QUERY_TARGET *qt, time_t after common_update_every = MIN(update_every, common_update_every); } - weight[tier] = query_plan_points_coverage_weight(common_first_t, common_last_t, common_update_every, after_wanted, before_wanted, points_wanted); + weight[tier] = query_plan_points_coverage_weight(common_first_t, common_last_t, common_update_every, after_wanted, before_wanted, points_wanted, tier); } size_t best_tier = 0; @@ -1094,8 +1100,11 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before qsort(&ops->plan.data, ops->plan.entries, sizeof(QUERY_PLAN_ENTRY), compare_query_plan_entries_on_start_time); // make sure it has the whole timeframe we need - ops->plan.data[0].after = after_wanted; - ops->plan.data[ops->plan.entries - 1].before = before_wanted; + if(ops->plan.data[0].after < after_wanted) + ops->plan.data[0].after = after_wanted; + + if(ops->plan.data[ops->plan.entries - 1].before > before_wanted) + ops->plan.data[ops->plan.entries - 1].before = before_wanted; //buffer_sprintf(wb, ": FINAL STEPS %zu", ops->plan.entries); @@ -1191,15 +1200,18 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { time_t now_start_time = after_wanted - ops.query_granularity; time_t now_end_time = after_wanted + ops.view_update_every - ops.query_granularity; + size_t db_points_read_since_plan_switch = 0; (void)db_points_read_since_plan_switch; + // The main loop, based on the query granularity we need for( ; points_added < points_wanted ; now_start_time = now_end_time, now_end_time += ops.view_update_every) { - if(query_plan_should_switch_plan(ops, now_end_time)) + if(unlikely(query_plan_should_switch_plan(ops, now_end_time))) { query_planer_next_plan(&ops, now_end_time, new_point.end_time); + db_points_read_since_plan_switch = 0; + } // read all the points of the db, prior to the time we need (now_end_time) - size_t count_same_end_time = 0; while(count_same_end_time < 100) { if(likely(count_same_end_time == 0)) { @@ -1223,6 +1235,7 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { // fetch the new point { + db_points_read_since_plan_switch++; STORAGE_POINT sp = ops.next_metric(&ops.handle); ops.db_points_read_per_tier[ops.tier]++; @@ -1280,9 +1293,10 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { // check if the db is advancing the query if(unlikely(new_point.end_time <= last1_point.end_time)) { - internal_error(true, "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld to %ld, before the last point %zu end time %ld, now is %ld to %ld", + internal_error(db_points_read_since_plan_switch > 1, + "QUERY: '%s', dimension '%s' next_metric() returned point %zu from %ld to %ld, before the last point %zu from %ld to %ld, now is %ld to %ld", qt->id, string2str(qm->dimension.id), new_point.id, new_point.start_time, new_point.end_time, - last1_point.id, last1_point.end_time, now_start_time, now_end_time); + last1_point.id, last1_point.start_time, last1_point.end_time, now_start_time, now_end_time); count_same_end_time++; continue; @@ -1350,35 +1364,35 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) { current_point = new_point; query_interpolate_point(current_point, last1_point, now_end_time); - internal_error(current_point.id > 0 - && last1_point.id == 0 - && current_point.end_time > after_wanted - && current_point.end_time > now_end_time, - "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld," - " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld," - " but we could really favor by having last_point1 in this query.", - qt->id, string2str(qm->dimension.id), - after_wanted, before_wanted, - ops.view_update_every, ops.query_granularity, - current_point.id, current_point.start_time, current_point.end_time, - now_end_time); +// internal_error(current_point.id > 0 +// && last1_point.id == 0 +// && current_point.end_time > after_wanted +// && current_point.end_time > now_end_time, +// "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld," +// " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld," +// " but we could really favor by having last_point1 in this query.", +// qt->id, string2str(qm->dimension.id), +// after_wanted, before_wanted, +// ops.view_update_every, ops.query_granularity, +// current_point.id, current_point.start_time, current_point.end_time, +// now_end_time); } else if(likely(now_end_time <= last1_point.end_time)) { // our LAST point is still valid current_point = last1_point; query_interpolate_point(current_point, last2_point, now_end_time); - internal_error(current_point.id > 0 - && last2_point.id == 0 - && current_point.end_time > after_wanted - && current_point.end_time > now_end_time, - "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld," - " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld," - " but we could really favor by having last_point2 in this query.", - qt->id, string2str(qm->dimension.id), - after_wanted, before_wanted, ops.view_update_every, ops.query_granularity, - current_point.id, current_point.start_time, current_point.end_time, - now_end_time); +// internal_error(current_point.id > 0 +// && last2_point.id == 0 +// && current_point.end_time > after_wanted +// && current_point.end_time > now_end_time, +// "QUERY: '%s', dimension '%s', after %ld, before %ld, view update every %ld," +// " query granularity %ld, interpolating point %zu (from %ld to %ld) at %ld," +// " but we could really favor by having last_point2 in this query.", +// qt->id, string2str(qm->dimension.id), +// after_wanted, before_wanted, ops.view_update_every, ops.query_granularity, +// current_point.id, current_point.start_time, current_point.end_time, +// now_end_time); } else { // a GAP, we don't have a value this time @@ -1647,6 +1661,21 @@ bool rrdr_relative_window_to_absolute(time_t *after, time_t *before) { after_requested -= delta; } + time_t absolute_minimum_time = now - (10 * 365 * 86400); + time_t absolute_maximum_time = now + (1 * 365 * 86400); + + if (after_requested < absolute_minimum_time && !unittest_running) + after_requested = absolute_minimum_time; + + if (after_requested > absolute_maximum_time && !unittest_running) + after_requested = absolute_maximum_time; + + if (before_requested < absolute_minimum_time && !unittest_running) + before_requested = absolute_minimum_time; + + if (before_requested > absolute_maximum_time && !unittest_running) + before_requested = absolute_maximum_time; + *before = before_requested; *after = after_requested; @@ -1836,6 +1865,11 @@ bool query_target_calculate_window(QUERY_TARGET *qt) { query_debug_log(":max points_wanted %zu", points_wanted); } + if(points_wanted > 86400 && !unittest_running) { + points_wanted = 86400; + query_debug_log(":absolute max points_wanted %zu", points_wanted); + } + // calculate the desired grouping of source data points size_t group = points_available / points_wanted; if (group == 0) group = 1; @@ -1971,37 +2005,23 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { return NULL; } - time_t timeout = qt->request.timeout; - time_t resampling_time_requested = qt->request.resampling_time; - time_t after_requested = qt->request.after; - time_t before_requested = qt->request.before; - size_t points_requested = qt->request.points; - - RRDR_OPTIONS options = qt->window.options; - size_t points_wanted = qt->window.points; - time_t after_wanted = qt->window.after; - time_t before_wanted = qt->window.before; - bool relative_period_requested = qt->window.relative; - bool aligned = qt->window.aligned; - RRDR_GROUPING group_method = qt->window.group_method; - size_t group = qt->window.group; - size_t resampling_group = qt->window.resampling_group; - time_t query_granularity = qt->window.query_granularity; + // qt.window members are the WANTED ones. + // qt.request members are the REQUESTED ones. RRDR *r = rrdr_create(owa, qt); if(unlikely(!r)) { internal_error(true, "QUERY: cannot create RRDR for %s, after=%ld, before=%ld, points=%zu", - qt->id, after_wanted, before_wanted, points_wanted); + qt->id, qt->window.after, qt->window.before, qt->window.points); return NULL; } - if(unlikely(!r->d || !points_wanted)) { + if(unlikely(!r->d || !qt->window.points)) { internal_error(true, "QUERY: returning empty RRDR (no dimensions in RRDSET) for %s, after=%ld, before=%ld, points=%zu", - qt->id, after_wanted, before_wanted, points_wanted); + qt->id, qt->window.after, qt->window.before, qt->window.points); return r; } - if(relative_period_requested) + if(qt->window.relative) r->result_options |= RRDR_RESULT_OPTION_RELATIVE; else r->result_options |= RRDR_RESULT_OPTION_ABSOLUTE; @@ -2034,7 +2054,8 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { long dimensions_used = 0, dimensions_nonzero = 0; struct timeval query_start_time; struct timeval query_current_time; - if (timeout) now_realtime_timeval(&query_start_time); + if (qt->request.timeout) + now_realtime_timeval(&query_start_time); for(size_t c = 0, max = qt->query.used; c < max ; c++) { // set the query target dimension options to rrdr @@ -2046,7 +2067,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { r->internal.grouping_reset(r); rrd2rrdr_do_dimension(r, c); - if (timeout) + if (qt->request.timeout) now_realtime_timeval(&query_current_time); if(r->od[c] & RRDR_DIMENSION_NONZERO) @@ -2082,9 +2103,9 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { } dimensions_used++; - if (timeout && ((NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0) > (NETDATA_DOUBLE)timeout) { + if (qt->request.timeout && ((NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0) > (NETDATA_DOUBLE)qt->request.timeout) { log_access("QUERY CANCELED RUNTIME EXCEEDED %0.2f ms (LIMIT %lld ms)", - (NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)timeout); + (NETDATA_DOUBLE)dt_usec(&query_start_time, &query_current_time) / 1000.0, (long long)qt->request.timeout); r->result_options |= RRDR_RESULT_OPTION_CANCEL; break; } @@ -2093,44 +2114,44 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { #ifdef NETDATA_INTERNAL_CHECKS if (dimensions_used) { if(r->internal.log) - rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, - after_wanted, after_requested, before_wanted, before_requested, - points_requested, points_wanted, /*after_slot, before_slot,*/ + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before, qt->request.before, + qt->request.points, qt->window.points, /*after_slot, before_slot,*/ r->internal.log); - if(r->rows != points_wanted) - rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, - after_wanted, after_requested, before_wanted, before_requested, - points_requested, points_wanted, /*after_slot, before_slot,*/ + if(r->rows != qt->window.points) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before, qt->request.before, + qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "got 'points' is not wanted 'points'"); - if(aligned && (r->before % (group * query_granularity)) != 0) - rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, - after_wanted, after_requested, before_wanted,before_wanted, - points_requested, points_wanted, /*after_slot, before_slot,*/ + if(qt->window.aligned && (r->before % (qt->window.group * qt->window.query_granularity)) != 0) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before,qt->request.before, + qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "'before' is not aligned but alignment is required"); // 'after' should not be aligned, since we start inside the first group - //if(aligned && (r->after % group) != 0) - // rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, after_wanted, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required"); + //if(qt->window.aligned && (r->after % group) != 0) + // rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, qt->window.after, after_requested, before_wanted, before_requested, points_requested, points_wanted, after_slot, before_slot, "'after' is not aligned but alignment is required"); - if(r->before != before_wanted) - rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, - after_wanted, after_requested, before_wanted, before_requested, - points_requested, points_wanted, /*after_slot, before_slot,*/ + if(r->before != qt->window.before) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before, qt->request.before, + qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "chart is not aligned to requested 'before'"); - if(r->before != before_wanted) - rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, - after_wanted, after_requested, before_wanted, before_requested, - points_requested, points_wanted, /*after_slot, before_slot,*/ + if(r->before != qt->window.before) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before, qt->request.before, + qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "got 'before' is not wanted 'before'"); // reported 'after' varies, depending on group - if(r->after != after_wanted) - rrd2rrdr_log_request_response_metadata(r, options, group_method, aligned, group, resampling_time_requested, resampling_group, - after_wanted, after_requested, before_wanted, before_requested, - points_requested, points_wanted, /*after_slot, before_slot,*/ + if(r->after != qt->window.after) + rrd2rrdr_log_request_response_metadata(r, qt->window.options, qt->window.group_method, qt->window.aligned, qt->window.group, qt->request.resampling_time, qt->window.resampling_group, + qt->window.after, qt->request.after, qt->window.before, qt->request.before, + qt->request.points, qt->window.points, /*after_slot, before_slot,*/ "got 'after' is not wanted 'after'"); } @@ -2140,7 +2161,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { r->internal.grouping_free(r); // when all the dimensions are zero, we should return all of them - if(unlikely(options & RRDR_OPTION_NONZERO && !dimensions_nonzero && !(r->result_options & RRDR_RESULT_OPTION_CANCEL))) { + if(unlikely((qt->window.options & RRDR_OPTION_NONZERO) && !dimensions_nonzero && !(r->result_options & RRDR_RESULT_OPTION_CANCEL))) { // all the dimensions are zero // mark them as NONZERO to send them all for(size_t c = 0, max = qt->query.used; c < max ; c++) { diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c index e8ca176a9d..c93da27a80 100644 --- a/web/api/web_api_v1.c +++ b/web/api/web_api_v1.c @@ -1226,7 +1226,7 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb) #ifdef ENABLE_COMPRESSION if(host->sender){ buffer_strcat(wb, "\t\"stream-compression\": "); - buffer_strcat(wb, (host->sender->flags & SENDER_FLAG_COMPRESSION) ? "true" : "false"); + buffer_strcat(wb, stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) ? "true" : "false"); buffer_strcat(wb, ",\n"); }else{ buffer_strcat(wb, "\t\"stream-compression\": null,\n"); diff --git a/web/server/web_client.c b/web/server/web_client.c index ff87041836..b569360f48 100644 --- a/web/server/web_client.c +++ b/web/server/web_client.c @@ -1056,7 +1056,7 @@ static inline ssize_t web_client_send_data(struct web_client *w,const void *buf, #ifdef ENABLE_HTTPS if ( (!web_client_check_unix(w)) && (netdata_ssl_srv_ctx) ) { if ( ( w->ssl.conn ) && ( !w->ssl.flags ) ){ - bytes = SSL_write(w->ssl.conn,buf, len) ; + bytes = netdata_ssl_write(w->ssl.conn, buf, len) ; } else { bytes = send(w->ofd,buf, len , flags); } @@ -1213,7 +1213,7 @@ static inline void web_client_send_http_header(struct web_client *w) { #ifdef ENABLE_HTTPS if ( (!web_client_check_unix(w)) && (netdata_ssl_srv_ctx) ) { if ( ( w->ssl.conn ) && ( !w->ssl.flags ) ){ - while((bytes = SSL_write(w->ssl.conn, buffer_tostring(w->response.header_output), buffer_strlen(w->response.header_output))) < 0) { + while((bytes = netdata_ssl_write(w->ssl.conn, buffer_tostring(w->response.header_output), buffer_strlen(w->response.header_output))) < 0) { count++; if(count > 100 || (errno != EAGAIN && errno != EWOULDBLOCK)) { error("Cannot send HTTPS headers to web client."); @@ -1909,7 +1909,7 @@ ssize_t web_client_receive(struct web_client *w) #ifdef ENABLE_HTTPS if ( (!web_client_check_unix(w)) && (netdata_ssl_srv_ctx) ) { if ( ( w->ssl.conn ) && (!w->ssl.flags)) { - bytes = SSL_read(w->ssl.conn, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1)); + bytes = netdata_ssl_read(w->ssl.conn, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1)); }else { bytes = recv(w->ifd, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1), MSG_DONTWAIT); } -- cgit v1.2.3