diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-22 22:42:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-22 22:42:16 +0200 |
commit | 4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch) | |
tree | 4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming | |
parent | 77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff) |
Replication fixes #3 (#14035)
* cleanup and additional information about replication
* fix deadlock on sender mutex
* do not ignore start streaming empty requests; when there duplicate requests, merge them
* flipped the flag
* final touch
* added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 28 | ||||
-rw-r--r-- | streaming/replication.c | 151 | ||||
-rw-r--r-- | streaming/replication.h | 2 | ||||
-rw-r--r-- | streaming/rrdpush.h | 34 | ||||
-rw-r--r-- | streaming/sender.c | 74 |
5 files changed, 189 insertions, 100 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 32b510e7ce..61ee33bc45 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -431,6 +431,15 @@ done: return result; } +static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { + RRDSET *st; + rrdset_foreach_read(st, rpt->host) { + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + } + rrdset_foreach_done(st); + rrdhost_receiver_replicating_charts_zero(rpt->host); +} static int rrdpush_receive(struct receiver_state *rpt) { @@ -721,14 +730,7 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(++localhost->senders_count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - + rrdpush_receiver_replication_reset(rpt); rrdcontext_host_child_connected(rpt->host); rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); @@ -750,16 +752,8 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_disconnected(rpt->host); + rrdpush_receiver_replication_reset(rpt); #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud diff --git a/streaming/replication.c b/streaming/replication.c index 3cb8424113..3f2f7939c6 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -290,7 +290,7 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR #ifdef NETDATA_INTERNAL_CHECKS internal_error( st->replay.after != 0 || st->replay.before != 0, - "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight", + "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight", rrdhost_hostname(st->rrdhost), rrdset_id(st) ); @@ -403,15 +403,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST // replication request in sender DICTIONARY // used for de-duplicating the requests struct replication_request { - struct sender_state *sender; - usec_t sender_last_flush_ut; - STRING *chart_id; - time_t after; // key for sorting (JudyL) - time_t before; - Word_t unique_id; - bool start_streaming; - bool found; - bool index_in_judy; + struct sender_state *sender; // the sender we should put the reply at + STRING *chart_id; // the chart of the request + time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) + time_t before; // the end time of the query (maybe zero) + bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming + + usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request + Word_t unique_id; // auto-increment, later requests have bigger + bool found; // used as a result boolean for the find call + bool indexed_in_judy; // true when the request is indexed in judy }; // replication sort entry in JudyL array @@ -486,13 +487,15 @@ static void replication_recursive_unlock() { static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + rrdpush_sender_pending_replication_requests_plus_one(rq->sender); + // copy the request rse->rq = rq; rse->unique_id = rep.next_unique_id++; // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; - rq->index_in_judy = false; + rq->indexed_in_judy = false; return rse; } @@ -524,7 +527,7 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat // add it to the inner judy, using unique_id as key Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); *item = rse; - rq->index_in_judy = true; + rq->indexed_in_judy = true; if(!rep.first_time_t || rq->after < rep.first_time_t) rep.first_time_t = rq->after; @@ -540,7 +543,9 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor rep.removed++; rep.requests_count--; - rse->rq->index_in_judy = false; + rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); + + rse->rq->indexed_in_judy = false; // delete it from the inner judy JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); @@ -562,7 +567,7 @@ static void replication_sort_entry_del(struct replication_request *rq) { struct replication_sort_entry *rse_to_delete = NULL; replication_recursive_lock(); - if(rq->index_in_judy) { + if(rq->indexed_in_judy) { inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0); if (inner_judy_pptr) { @@ -582,6 +587,13 @@ static void replication_sort_entry_del(struct replication_request *rq) { replication_recursive_unlock(); } +static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) { + if(unlikely(first)) + return JudyLFirst(PArray, PIndex, PJE0); + + return JudyLNext(PArray, PIndex, PJE0); +} + static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; @@ -590,16 +602,13 @@ static struct replication_request replication_request_get_first_available() { struct replication_request rq = (struct replication_request){ .found = false }; - if(rep.last_after && rep.last_unique_id) { - rep.last_after--; - rep.last_unique_id--; - } - else { + if(unlikely(!rep.last_after || !rep.last_unique_id)) { rep.last_after = 0; rep.last_unique_id = 0; } - while(!rq.found && (inner_judy_pptr = JudyLNext(rep.JudyL_array, &rep.last_after, PJE0))) { + bool find_same_after = true; + while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) { Pvoid_t *our_item_pptr; while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) { @@ -610,10 +619,10 @@ static struct replication_request replication_request_get_first_available() { rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); bool sender_has_been_flushed_since_this_request = - rse->rq->sender_last_flush_ut != __atomic_load_n(&s->last_flush_time_ut, __ATOMIC_SEQ_CST); + rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); bool sender_has_room_to_spare = - s->replication_sender_buffer_percent_used <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; + s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { rep.skipped_not_connected++; @@ -635,6 +644,9 @@ static struct replication_request replication_request_get_first_available() { rep.skipped_no_room++; } + // call JudyLNext from now on + find_same_after = false; + // prepare for the next iteration on the outer loop rep.last_unique_id = 0; } @@ -650,6 +662,14 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may struct sender_state *s = sender_state; (void)s; struct replication_request *rq = value; + RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + if(!st) { + internal_error(true, "REPLAY: chart '%s' not found on host '%s'", + string2str(rq->chart_id), rrdhost_hostname(rq->sender->host)); + } + else + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED); + // IMPORTANT: // We use the react instead of the insert callback // because we want the item to be atomically visible @@ -661,7 +681,9 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may // related to it. replication_sort_entry_add(rq); - __atomic_fetch_add(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); + + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_plus_one(s); } static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) { @@ -671,24 +693,63 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ internal_error( true, - "STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + "STREAM %s [send to %s]: REPLAY ERROR: merging duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); - string_freez(rq_new->chart_id); + bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false; - return false; + if(rq_new->after < rq->after && rq_new->after != 0) + updated_after = true; + + if(rq_new->before > rq->before) + updated_before = true; + + if(rq_new->start_streaming != rq->start_streaming) + updated_start_streaming = true; + + if(updated_after || updated_before || updated_start_streaming) { + replication_recursive_lock(); + + if(rq->indexed_in_judy) + replication_sort_entry_del(rq); + + if(rq_new->after < rq->after && rq_new->after != 0) + rq->after = rq_new->after; + + if(rq->after == 0) + rq->before = 0; + else if(rq_new->before > rq->before) + rq->before = rq_new->before; + + rq->start_streaming = rq->start_streaming; + replication_sort_entry_add(rq); + + replication_recursive_unlock(); + updated = true; + + internal_error( + true, + "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false"); + } + + string_freez(rq_new->chart_id); + return updated; } static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) { struct replication_request *rq = value; - if(rq->index_in_judy) + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_minus_one(rq->sender); + + if(rq->indexed_in_judy) replication_sort_entry_del(rq); string_freez(rq->chart_id); - __atomic_fetch_sub(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); } @@ -702,13 +763,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .after = after, .before = before, .start_streaming = start_streaming, - .sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST), + .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), }; dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request)); } -void replication_flush_sender(struct sender_state *sender) { +void replication_sender_delete_pending_requests(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks replication_recursive_lock(); dictionary_flush(sender->replication_requests); @@ -746,7 +807,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { replication_recursive_unlock(); } - s->replication_sender_buffer_percent_used = percentage; + s->buffer_used_percentage = percentage; } // ---------------------------------------------------------------------------- @@ -843,12 +904,19 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_is_busy(WORKER_JOB_FIND_CHART); RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id)); if(!st) { - internal_error(true, "REPLAY: chart '%s' not found on host '%s'", + internal_error(true, "REPLAY ERROR: chart '%s' not found on host '%s'", string2str(rq.chart_id), rrdhost_hostname(rq.sender->host)); continue; } + if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + } + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED); + worker_is_busy(WORKER_JOB_QUERYING); latest_first_time_t = rq.after; @@ -856,8 +924,8 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time) rq.sender->replication_first_time = rq.after; - if(rq.before < rq.sender->replication_min_time || !rq.sender->replication_min_time) - rq.sender->replication_min_time = rq.before; + if(rq.before < rq.sender->replication_current_time || !rq.sender->replication_current_time) + rq.sender->replication_current_time = rq.before; netdata_thread_disable_cancelability(); @@ -869,17 +937,20 @@ void *replication_thread_main(void *ptr __maybe_unused) { rep.executed++; - if(start_streaming && rq.sender_last_flush_ut == __atomic_load_n(&rq.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) { + if(start_streaming && rq.sender_last_flush_ut == rrdpush_sender_get_flush_time(rq.sender)) { worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING); - __atomic_fetch_add(&rq.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST); // enable normal streaming if we have to // but only if the sender buffer has not been flushed since we started - debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s", - rrdhost_hostname(rq.sender->host), rrdset_id(st)); - - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + } + else + internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating", + string2str(rq.chart_id), rrdhost_hostname(st->rrdhost)); } } diff --git a/streaming/replication.h b/streaming/replication.h index 7542e43d08..dde3cc1a96 100644 --- a/streaming/replication.h +++ b/streaming/replication.h @@ -16,7 +16,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, void replication_init_sender(struct sender_state *sender); void replication_cleanup_sender(struct sender_state *sender); -void replication_flush_sender(struct sender_state *sender); +void replication_sender_delete_pending_requests(struct sender_state *sender); void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming); void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s); diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index ed3b30bc5c..079c63acc5 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -159,20 +159,36 @@ struct sender_state { struct compressor_state *compressor; #endif #ifdef ENABLE_HTTPS - struct netdata_ssl ssl; // Structure used to encrypt the connection + struct netdata_ssl ssl; // structure used to encrypt the connection #endif - DICTIONARY *replication_requests; - size_t replication_pending_requests; - time_t replication_first_time; - time_t replication_min_time; - size_t replication_sender_buffer_percent_used; - bool replication_reached_max; + DICTIONARY *replication_requests; // de-duplication of replication requests, per chart - usec_t last_flush_time_ut; - size_t receiving_metrics; + size_t replication_pending_requests; // the currently outstanding replication requests + size_t replication_charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + + time_t replication_first_time; // the oldest time that has been requested to be replicated + time_t replication_current_time; // the minimum(before) of the executed replication requests + + bool replication_reached_max; // used to avoid resetting the replication thread too frequently + + size_t buffer_used_percentage; // the current utilization of the sending buffer + usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC }; +#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); +#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->last_flush_time_ut), __ATOMIC_RELAXED) + +#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication_charts_replicating), __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication_charts_replicating), 0, __ATOMIC_RELAXED) + +#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication_pending_requests), __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication_pending_requests), 0, __ATOMIC_RELAXED) + struct receiver_state { RRDHOST *host; netdata_thread_t thread; diff --git a/streaming/sender.c b/streaming/sender.c index 72affc2907..85aad3a3e5 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -169,19 +169,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { rrdpush_signal_sender_to_wake_up(s); } - -static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { - if(host->sender->rrdpush_sender_socket != -1) { - close(host->sender->rrdpush_sender_socket); - host->sender->rrdpush_sender_socket = -1; - } - - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - - replication_flush_sender(host->sender); -} - static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) { buffer_sprintf( wb @@ -236,21 +223,10 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host)); - bool receive_has_replication = host != localhost && host->receiver && stream_has_capability(host->receiver, STREAM_CAP_REPLICATION); - bool send_has_replication = host->sender && stream_has_capability(host->sender, STREAM_CAP_REPLICATION); - RRDSET *st; rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); - - if(!receive_has_replication) - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - - if(send_has_replication) - // it will be enabled once replication is done on the sending side - rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - else - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); st->upstream_resync_time = 0; @@ -260,21 +236,52 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { rrddim_foreach_done(rd); } rrdset_foreach_done(st); + + rrdhost_sender_replicating_charts_zero(host); } -static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { - __atomic_store_n(&host->sender->last_flush_time_ut, now_realtime_usec(), __ATOMIC_SEQ_CST); +static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { + rrdpush_sender_set_flush_time(host->sender); netdata_mutex_lock(&host->sender->mutex); + + // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); replication_recalculate_buffer_used_ratio_unsafe(host->sender); + netdata_mutex_unlock(&host->sender->mutex); +} + +static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { + rrdpush_sender_set_flush_time(host->sender); + // stop all replication commands inflight + replication_sender_delete_pending_requests(host->sender); + + // reset the state of all charts rrdpush_sender_thread_reset_all_charts(host); + + rrdpush_sender_replicating_charts_zero(host->sender); +} + +static void rrdpush_sender_on_connect(RRDHOST *host) { + rrdpush_sender_cbuffer_flush(host); + rrdpush_sender_charts_and_replication_reset(host); rrdpush_sender_thread_send_custom_host_variables(host); - replication_flush_sender(host->sender); +} + +static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { + if(host->sender->rrdpush_sender_socket != -1) { + close(host->sender->rrdpush_sender_socket); + host->sender->rrdpush_sender_socket = -1; + } + + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - __atomic_store_n(&host->sender->receiving_metrics, 0, __ATOMIC_SEQ_CST); + // do not flush the circular buffer here + // this function is called sometimes with the mutex lock, sometimes without the lock + rrdpush_sender_charts_and_replication_reset(host); } void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) @@ -704,7 +711,7 @@ static bool attempt_to_connect(struct sender_state *state) if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) { // reset the buffer, to properly send charts and metrics - rrdpush_sender_thread_data_flush(state->host); + rrdpush_sender_on_connect(state->host); // send from the beginning state->begin = 0; @@ -1185,8 +1192,9 @@ void *rrdpush_sender_thread(void *ptr) { // If the TCP window never opened then something is wrong, restart connection if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout && - __atomic_load_n(&s->replication_pending_requests, __ATOMIC_SEQ_CST) == 0) && - __atomic_load_n(&s->receiving_metrics, __ATOMIC_SEQ_CST) != 0) { + !rrdpush_sender_pending_replication_requests(s) && + !rrdpush_sender_replicating_charts(s) + )) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); rrdpush_sender_thread_close_socket(s->host); |