diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 83 |
1 files changed, 47 insertions, 36 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 2bfe28dfd6..c96e9732a1 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -4,14 +4,40 @@ extern struct config stream_config; +void destroy_receiver_state(struct receiver_state *rpt) { + freez(rpt->key); + freez(rpt->hostname); + freez(rpt->registry_hostname); + freez(rpt->machine_guid); + freez(rpt->os); + freez(rpt->timezone); + freez(rpt->tags); + freez(rpt->client_ip); + freez(rpt->client_port); + freez(rpt->program_name); + freez(rpt->program_version); +#ifdef ENABLE_HTTPS + if(rpt->ssl.conn){ + SSL_free(rpt->ssl.conn); + } +#endif + freez(rpt); +} + static void rrdpush_receiver_thread_cleanup(void *ptr) { static __thread int executed = 0; if(!executed) { executed = 1; struct receiver_state *rpt = (struct receiver_state *) ptr; + // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch + // the host pointer as it is unpredicable when the RRDHOST is deleted. Do the cleanup from rrdhost_free(). + if (netdata_exit && rpt->host) { + rpt->exited = 1; + return; + } // Make sure that we detach this thread and don't kill a freshly arriving receiver - if (rpt->host) { + if (!netdata_exit && rpt->host) { netdata_mutex_lock(&rpt->host->receiver_lock); if (rpt->host->receiver == rpt) rpt->host->receiver = NULL; @@ -19,25 +45,7 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { } info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - - freez(rpt->key); - freez(rpt->hostname); - freez(rpt->registry_hostname); - freez(rpt->machine_guid); - freez(rpt->os); - freez(rpt->timezone); - freez(rpt->tags); - freez(rpt->client_ip); - freez(rpt->client_port); - freez(rpt->program_name); - freez(rpt->program_version); -#ifdef ENABLE_HTTPS - if(rpt->ssl.conn){ - SSL_free(rpt->ssl.conn); - } -#endif - freez(rpt); - + destroy_receiver_state(rpt); } } @@ -413,26 +421,29 @@ static int rrdpush_receive(struct receiver_state *rpt) size_t count = streaming_parser(rpt, &cd, fp); - //size_t count = pluginsd_process(host, &cd, fp, 1); - - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "DISCONNECTED"); - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->host->hostname, rpt->client_ip, rpt->client_port, count); - - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) { - rrdhost_wrlock(rpt->host); - rpt->host->senders_disconnected_time = now_realtime_sec(); - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); - if(health_enabled == CONFIG_BOOLEAN_AUTO) - rpt->host->health_enabled = 0; - rrdhost_unlock(rpt->host); - rrdpush_sender_thread_stop(rpt->host); + + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, + "DISCONNECTED"); + error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, + rpt->client_port, count); + + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread + if (!netdata_exit && rpt->host) { + netdata_mutex_lock(&rpt->host->receiver_lock); + if (rpt->host->receiver == rpt) { + rrdhost_wrlock(rpt->host); + rpt->host->senders_disconnected_time = now_realtime_sec(); + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); + if(health_enabled == CONFIG_BOOLEAN_AUTO) + rpt->host->health_enabled = 0; + rrdhost_unlock(rpt->host); + rrdpush_sender_thread_stop(rpt->host); + } + netdata_mutex_unlock(&rpt->host->receiver_lock); } - netdata_mutex_unlock(&rpt->host->receiver_lock); // cleanup fclose(fp); - return (int)count; } |