summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-07 21:10:27 +0300
committerGitHub <noreply@github.com>2023-06-07 21:10:27 +0300
commit66c85460199dbf65aad09cdfcdbae25c6bde265b (patch)
treea77e1f19d21f429fbc73ff8c71660cfb97c934ed /streaming/sender.c
parent892255b23728fde076402b7300f13c80de32e5fc (diff)
Re-write of SSL support in Netdata; restoration of SIGCHLD; detection of stale plugins; streaming improvements (#15113)
* add information about streaming connections to /api/v2/nodes; reset defer time when sender or receivers connect or disconnect * make each streaming destination respect its SSL settings * to not send SSL traffic over non-SSL connection * keep track of outgoing streaming connection attempts * retry SSL reads when SSL_read() returns SSL_ERROR_WANT_READ * Revert "retry SSL reads when SSL_read() returns SSL_ERROR_WANT_READ" This reverts commit 14c858677c6f2d3b08c94f298e2f45ecdb74c801. * cleanup SSL connections properly * initialize SSL in rpt before takeover * sender should free SSL when talking to a non-SSL destination * do not shutdown SSL when receiver exits * restore operation of SIGCHLD when the reaper is not enabled * create an fgets function that checks for data and times out * work on error handling of plugins exiting * remove newlines from logs * global call to waitid(), caching the result for netdata_pclose() to process * receiver tid * parser timeouts in 2 minutes instead of 10 * fix crash when UUID is NULL in SQLite * abstract sqlite3 parsing for uuid and text * write proper ssl errors on read and write * fix for SSL_ERROR_WANT_RETRY_VERIFY * SSL WANT per function * unified SSL error logging * fix compilation warning * additional logging about parser cleanup * streaming parser should call the pluginsd parser cleanup * SSL error handling work * SSL initialization unification * check for pending data when receiving SSL response with timeout * macro to check if an SSL connection has been established * remove SSL_pending() * check for SSL macros * use SSL_peek() to find if there is a response * SSL renames * more SSL renames & cleanup * rrdpush ssl connection function * abstract all SSL functions into security.c * keep track of SSL connections and always attempt to use SSL read/write when on SSL connection * signal openssl to skip certificate validation when configured to do so * better SSL error handling and logging * SSL code cleanup * SSL retry on SSL_connect and SSL_accept * SSL provide default return value for old compilers * SSL read/write functions emulate system read/write functions * fix receive/send timeout and switch from SSL_peek() to SSL_pending() * remove SSL_pending() * removed sender auto-retry and debug info for initial recevier response * ssl skip certificate verification config for web server * ssl errors log ip and port of the peer * keep ssl with web_client for its whole lifetime * thread safe socket peers to text * use error_limit() for common ssl errors * cleanup * more cleanup * coverity fixes * ssl error logs include both local and remote ip/port info * remove obsolete code
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c212
1 files changed, 108 insertions, 104 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 179c2dc603..cedf58ce2d 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -29,7 +29,6 @@
#endif
extern struct config stream_config;
-extern int netdata_use_ssl_on_stream;
extern char *netdata_ssl_ca_path;
extern char *netdata_ssl_ca_file;
@@ -320,6 +319,10 @@ static void rrdpush_sender_after_connect(RRDHOST *host) {
}
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+#ifdef ENABLE_HTTPS
+ netdata_ssl_close(&host->sender->ssl);
+#endif
+
if(host->sender->rrdpush_sender_socket != -1) {
close(host->sender->rrdpush_sender_socket);
host->sender->rrdpush_sender_socket = -1;
@@ -480,6 +483,53 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return false;
}
+static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
+#ifdef ENABLE_HTTPS
+ RRDHOST *host = s->host;
+ bool ssl_required = host->destination && host->destination->ssl;
+
+ netdata_ssl_close(&host->sender->ssl);
+
+ if(!ssl_required)
+ return true;
+
+ if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
+ if(!netdata_ssl_connect(&host->sender->ssl)) {
+ // couldn't connect
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = "SSL error";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+
+ if (netdata_ssl_validate_certificate_sender &&
+ security_test_certificate(host->sender->ssl.conn)) {
+ // certificate is not valid
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = "invalid SSL certificate";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+
+ return true;
+ }
+
+ // failed to establish connection
+ return false;
+
+#else
+ // SSL is not enabled
+ return true;
+#endif
+}
+
static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
struct timeval tv = {
@@ -507,35 +557,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
-#ifdef ENABLE_HTTPS
- if(netdata_ssl_client_ctx){
- host->sender->ssl.flags = NETDATA_SSL_START;
- if (!host->sender->ssl.conn){
- host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx);
- if(!host->sender->ssl.conn){
- error("Failed to allocate SSL structure.");
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
- }
- else{
- SSL_clear(host->sender->ssl.conn);
- }
-
- if (host->sender->ssl.conn)
- {
- if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) {
- error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket);
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- } else{
- host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
- }
- }
- }
- else {
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
-#endif
-
// reset our capabilities to default
s->capabilities = stream_our_capabilities();
@@ -651,43 +672,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
http[eol] = 0x00;
rrdpush_clean_encoded(&se);
-#ifdef ENABLE_HTTPS
- if (!host->sender->ssl.flags) {
- ERR_clear_error();
- SSL_set_connect_state(host->sender->ssl.conn);
- int err = SSL_connect(host->sender->ssl.conn);
- if (err != 1){
- err = SSL_get_error(host->sender->ssl.conn, err);
- error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL));
- if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "SSL error";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
- host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
- return false;
- }
- else {
- host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
- }
- }
- else {
- if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
- if ( security_test_certificate(host->sender->ssl.conn)) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- error("Closing the stream connection, because the server SSL certificate is not valid.");
- rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "invalid SSL certificate";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
- host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
- return false;
- }
- }
- }
- }
- }
-#endif
+ if(!rrdpush_sender_connect_ssl(s))
+ return false;
ssize_t bytes;
@@ -733,6 +719,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
}
+ if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+
+ if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+
http[bytes] = '\0';
debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
@@ -749,12 +741,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
log_sender_capabilities(s);
- if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
-
- if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
-
debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
return true;
@@ -819,9 +805,8 @@ static ssize_t attempt_to_send(struct sender_state *s) {
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
#ifdef ENABLE_HTTPS
- SSL *conn = s->ssl.conn ;
- if(conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
- ret = netdata_ssl_write(conn, chunk, outstanding);
+ if(SSL_connection(&s->ssl))
+ ret = netdata_ssl_write(&s->ssl, chunk, outstanding);
else
ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#else
@@ -852,25 +837,17 @@ static ssize_t attempt_to_send(struct sender_state *s) {
}
static ssize_t attempt_read(struct sender_state *s) {
- ssize_t ret = 0;
+ ssize_t ret;
#ifdef ENABLE_HTTPS
- if (s->ssl.conn && s->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
- ret = netdata_ssl_read(s->ssl.conn, s->read_buffer, desired);
- if (ret > 0 ) {
- s->read_len += (int)ret;
- return ret;
- }
-
- if (ret == -1) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- rrdpush_sender_thread_close_socket(s->host);
- }
- return ret;
- }
-#endif
+ if (SSL_connection(&s->ssl))
+ ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1);
+ else
+ ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
+#else
ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
+#endif
+
if (ret > 0) {
s->read_len += ret;
return ret;
@@ -879,6 +856,12 @@ static ssize_t attempt_read(struct sender_state *s) {
if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
return ret;
+#ifdef ENABLE_HTTPS
+ if (SSL_connection(&s->ssl))
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ else
+#endif
+
if (ret == 0 || errno == ECONNRESET) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
@@ -887,6 +870,7 @@ static ssize_t attempt_read(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
}
+
rrdpush_sender_thread_close_socket(s->host);
return ret;
@@ -1096,6 +1080,8 @@ static bool rrdhost_set_sender(RRDHOST *host) {
}
netdata_mutex_unlock(&host->sender->mutex);
+ rrdpush_reset_destinations_postpone_time(host);
+
return ret;
}
@@ -1108,6 +1094,8 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
host->sender->exit.reason = NULL;
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
}
+
+ rrdpush_reset_destinations_postpone_time(host);
}
static bool rrdhost_sender_should_exit(struct sender_state *s) {
@@ -1134,7 +1122,7 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) {
if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
if(!s->exit.reason)
- s->exit.reason = "RECEIVER LEFT";
+ s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)";
return true;
}
@@ -1162,6 +1150,32 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
freez(s);
}
+void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
+#ifdef ENABLE_HTTPS
+ static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
+ netdata_spinlock_lock(&sp);
+
+ if(netdata_ssl_streaming_sender_ctx || !host) {
+ netdata_spinlock_unlock(&sp);
+ return;
+ }
+
+ for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
+ if (d->ssl) {
+ // we need to initialize SSL
+
+ netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
+ ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+
+ // stop the loop
+ break;
+ }
+ }
+
+ netdata_spinlock_unlock(&sp);
+#endif
+}
+
void *rrdpush_sender_thread(void *ptr) {
worker_register("STREAMSND");
worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
@@ -1206,17 +1220,7 @@ void *rrdpush_sender_thread(void *ptr) {
return NULL;
}
-#ifdef ENABLE_HTTPS
- if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) {
- static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&sp);
- if(!netdata_ssl_client_ctx) {
- security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
- ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
- }
- netdata_spinlock_unlock(&sp);
- }
-#endif
+ rrdpush_initialize_ssl_ctx(s->host);
info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());