diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-14 00:09:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-14 00:09:02 +0200 |
commit | f10f19c8c8bba37e8364fa6138d6ea0f6160c3ea (patch) | |
tree | 6d71bc40d56362dd4de892239a633bdcfa2183be /streaming/replication.c | |
parent | 3e2924fd466dbc05338518412f426d34b215ef0b (diff) |
More 32bit fixes (#14264)
* query planer weight calculation using long long
* adjust replication query ahead pipeline for smaller systems
* do not generate huge replication messages
* add message to indicate replication message was interrupted
* improved message
* max replication size 25% of sender buffer
* fix for last commit
* use less cache and smaller page sizes and fewer threads on 32-bits
* fix reserved libuv workers for 32bits
* fix detection of 32/64 bit
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 59 |
1 files changed, 38 insertions, 21 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 5bfcb7540d..96859723a9 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -3,9 +3,10 @@ #include "replication.h" #include "Judy.h" -#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 +#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL #define WORKER_JOB_FIND_NEXT 1 #define WORKER_JOB_QUERYING 2 @@ -80,6 +81,7 @@ struct replication_query { bool locked_data_collection; bool execute; + bool interrupted; } query; time_t wall_clock_time; @@ -196,8 +198,7 @@ static struct replication_query *replication_query_prepare( return q; } -static time_t replication_query_finalize(struct replication_query *q, bool executed) { - time_t query_before = q->query.before; +static void replication_query_finalize(struct replication_query *q, bool executed) { size_t dimensions = q->dimensions; // release all the dictionary items acquired @@ -231,8 +232,6 @@ static time_t replication_query_finalize(struct replication_query *q, bool execu } freez(q); - - return query_before; } static void replication_query_align_to_optimal_before(struct replication_query *q) { @@ -258,10 +257,7 @@ static void replication_query_align_to_optimal_before(struct replication_query * q->query.before = expanded_before; } -static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replication_query *q) { - if(!q->query.execute) - return replication_query_finalize(q, false); - +static void replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); time_t after = q->query.after; @@ -277,6 +273,7 @@ static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replicat #endif time_t now = after + 1; + time_t last_end_time_in_buffer = 0; while(now <= before) { time_t min_start_time = 0, min_end_time = 0; for (size_t i = 0; i < dimensions ;i++) { @@ -332,6 +329,20 @@ static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replicat actual_before = min_end_time; #endif + if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) { + internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'." + "Interrupting replication query at %ld, before the expected %ld.", + buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), + last_end_time_in_buffer, q->query.before); + + q->query.before = last_end_time_in_buffer; + q->query.enable_streaming = false; + q->query.interrupted = true; + + break; + } + last_end_time_in_buffer = min_end_time; + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n", (unsigned long long) min_start_time, (unsigned long long) min_end_time, @@ -381,7 +392,6 @@ static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replicat q->points_read = points_read; q->points_generated = points_generated; - return replication_query_finalize(q, true); } static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { @@ -445,13 +455,10 @@ void replication_response_cancel_and_finalize(struct replication_query *q) { static bool sender_is_still_connected_for_this_request(struct replication_request *rq); -bool replication_response_execute_and_finalize(struct replication_query *q) { +bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { struct replication_request *rq = q->rq; RRDSET *st = q->st; RRDHOST *host = st->rrdhost; - time_t after = q->request.after; - time_t before; // the query will report this - bool enable_streaming = q->query.enable_streaming; // we might want to optimize this by filling a temporary buffer // and copying the result to the host's buffer in order to avoid @@ -463,10 +470,15 @@ bool replication_response_execute_and_finalize(struct replication_query *q) { bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; - before = replication_query_execute_and_finalize(wb, q); + if(q->query.execute) + replication_query_execute(wb, q, max_msg_size); - // IMPORTANT: q is invalid now - q = NULL; + time_t after = q->request.after; + time_t before = q->query.before; + bool enable_streaming = q->query.enable_streaming; + + replication_query_finalize(q, q->query.execute); + q = NULL; // IMPORTANT: q is invalid now // get again the world clock time if(enable_streaming) @@ -1204,7 +1216,8 @@ static bool replication_execute_request(struct replication_request *rq, bool wor // send the replication data rq->q->rq = rq; - replication_response_execute_and_finalize(rq->q); + replication_response_execute_and_finalize( + rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); netdata_thread_enable_cancelability(); @@ -1404,7 +1417,11 @@ static int replication_execute_next_pending_request(void) { struct replication_request *rq; if(unlikely(!rqs)) { - max_requests_ahead = libuv_worker_threads * 2; + max_requests_ahead = get_system_cpus() / 2; + + if(max_requests_ahead > libuv_worker_threads * 2) + max_requests_ahead = libuv_worker_threads * 2; + if(max_requests_ahead < 2) max_requests_ahead = 2; |