summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-14 00:09:02 +0200
committerGitHub <noreply@github.com>2023-01-14 00:09:02 +0200
commitf10f19c8c8bba37e8364fa6138d6ea0f6160c3ea (patch)
tree6d71bc40d56362dd4de892239a633bdcfa2183be /streaming/replication.c
parent3e2924fd466dbc05338518412f426d34b215ef0b (diff)
More 32bit fixes (#14264)
* query planer weight calculation using long long * adjust replication query ahead pipeline for smaller systems * do not generate huge replication messages * add message to indicate replication message was interrupted * improved message * max replication size 25% of sender buffer * fix for last commit * use less cache and smaller page sizes and fewer threads on 32-bits * fix reserved libuv workers for 32bits * fix detection of 32/64 bit
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c59
1 files changed, 38 insertions, 21 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 5bfcb7540d..96859723a9 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -3,9 +3,10 @@
#include "replication.h"
#include "Judy.h"
-#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
-#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
-#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
+#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
+#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
#define WORKER_JOB_FIND_NEXT 1
#define WORKER_JOB_QUERYING 2
@@ -80,6 +81,7 @@ struct replication_query {
bool locked_data_collection;
bool execute;
+ bool interrupted;
} query;
time_t wall_clock_time;
@@ -196,8 +198,7 @@ static struct replication_query *replication_query_prepare(
return q;
}
-static time_t replication_query_finalize(struct replication_query *q, bool executed) {
- time_t query_before = q->query.before;
+static void replication_query_finalize(struct replication_query *q, bool executed) {
size_t dimensions = q->dimensions;
// release all the dictionary items acquired
@@ -231,8 +232,6 @@ static time_t replication_query_finalize(struct replication_query *q, bool execu
}
freez(q);
-
- return query_before;
}
static void replication_query_align_to_optimal_before(struct replication_query *q) {
@@ -258,10 +257,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
q->query.before = expanded_before;
}
-static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replication_query *q) {
- if(!q->query.execute)
- return replication_query_finalize(q, false);
-
+static void replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
time_t after = q->query.after;
@@ -277,6 +273,7 @@ static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replicat
#endif
time_t now = after + 1;
+ time_t last_end_time_in_buffer = 0;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
for (size_t i = 0; i < dimensions ;i++) {
@@ -332,6 +329,20 @@ static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replicat
actual_before = min_end_time;
#endif
+ if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
+ internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'."
+ "Interrupting replication query at %ld, before the expected %ld.",
+ buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
+ last_end_time_in_buffer, q->query.before);
+
+ q->query.before = last_end_time_in_buffer;
+ q->query.enable_streaming = false;
+ q->query.interrupted = true;
+
+ break;
+ }
+ last_end_time_in_buffer = min_end_time;
+
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
(unsigned long long) min_start_time,
(unsigned long long) min_end_time,
@@ -381,7 +392,6 @@ static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replicat
q->points_read = points_read;
q->points_generated = points_generated;
- return replication_query_finalize(q, true);
}
static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
@@ -445,13 +455,10 @@ void replication_response_cancel_and_finalize(struct replication_query *q) {
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
-bool replication_response_execute_and_finalize(struct replication_query *q) {
+bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
struct replication_request *rq = q->rq;
RRDSET *st = q->st;
RRDHOST *host = st->rrdhost;
- time_t after = q->request.after;
- time_t before; // the query will report this
- bool enable_streaming = q->query.enable_streaming;
// we might want to optimize this by filling a temporary buffer
// and copying the result to the host's buffer in order to avoid
@@ -463,10 +470,15 @@ bool replication_response_execute_and_finalize(struct replication_query *q) {
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
- before = replication_query_execute_and_finalize(wb, q);
+ if(q->query.execute)
+ replication_query_execute(wb, q, max_msg_size);
- // IMPORTANT: q is invalid now
- q = NULL;
+ time_t after = q->request.after;
+ time_t before = q->query.before;
+ bool enable_streaming = q->query.enable_streaming;
+
+ replication_query_finalize(q, q->query.execute);
+ q = NULL; // IMPORTANT: q is invalid now
// get again the world clock time
if(enable_streaming)
@@ -1204,7 +1216,8 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
// send the replication data
rq->q->rq = rq;
- replication_response_execute_and_finalize(rq->q);
+ replication_response_execute_and_finalize(
+ rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
netdata_thread_enable_cancelability();
@@ -1404,7 +1417,11 @@ static int replication_execute_next_pending_request(void) {
struct replication_request *rq;
if(unlikely(!rqs)) {
- max_requests_ahead = libuv_worker_threads * 2;
+ max_requests_ahead = get_system_cpus() / 2;
+
+ if(max_requests_ahead > libuv_worker_threads * 2)
+ max_requests_ahead = libuv_worker_threads * 2;
+
if(max_requests_ahead < 2)
max_requests_ahead = 2;