summaryrefslogtreecommitdiffstats
path: root/exporting/send_data.c
diff options
context:
space:
mode:
Diffstat (limited to 'exporting/send_data.c')
-rw-r--r--exporting/send_data.c278
1 files changed, 160 insertions, 118 deletions
diff --git a/exporting/send_data.c b/exporting/send_data.c
index cc107ea2dd..1618c1bc09 100644
--- a/exporting/send_data.c
+++ b/exporting/send_data.c
@@ -3,6 +3,22 @@
#include "exporting_engine.h"
/**
+ * Check if TLS is enabled in the configuration
+ *
+ * @param type buffer with response data.
+ * @param options an instance data structure.
+ * @return Returns 1 if TLS should be enabled, 0 otherwise.
+ */
+static int exporting_tls_is_enabled(EXPORTING_CONNECTOR_TYPE type, EXPORTING_OPTIONS options)
+{
+ return (type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP ||
+ type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP ||
+ type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP ||
+ type == EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE) &&
+ options & EXPORTING_OPTION_USE_TLS;
+}
+
+/**
* Discard response
*
* Discards a response received by an exporting connector instance after logging a sample of it to error.log
@@ -12,6 +28,7 @@
* @return Always returns 0.
*/
int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
+#if NETDATA_INTERNAL_CHECKS
char sample[1024];
const char *s = buffer_tostring(buffer);
char *d = sample, *e = &sample[sizeof(sample) - 1];
@@ -23,11 +40,16 @@ int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
}
*d = '\0';
- info(
+ debug(
+ D_BACKEND,
"EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'",
buffer_strlen(buffer),
instance->config.name,
sample);
+#else
+ UNUSED(instance);
+#endif /* NETDATA_INTERNAL_CHECKS */
+
buffer_flush(buffer);
return 0;
}
@@ -47,7 +69,7 @@ void simple_connector_receive_response(int *sock, struct instance *instance)
struct stats *stats = &instance->stats;
#ifdef ENABLE_HTTPS
uint32_t options = (uint32_t)instance->config.options;
- struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data;
+ struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
if (options & EXPORTING_OPTION_USE_TLS)
ERR_clear_error();
@@ -61,8 +83,7 @@ void simple_connector_receive_response(int *sock, struct instance *instance)
ssize_t r;
#ifdef ENABLE_HTTPS
- if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP &&
- options & EXPORTING_OPTION_USE_TLS &&
+ if (exporting_tls_is_enabled(instance->config.type, options) &&
connector_specific_data->conn &&
connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
r = (ssize_t)SSL_read(connector_specific_data->conn,
@@ -132,11 +153,9 @@ endloop:
* @param failures the number of communication failures.
* @param instance an instance data structure.
*/
-void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance)
+void simple_connector_send_buffer(
+ int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics)
{
- BUFFER *buffer = (BUFFER *)instance->buffer;
- size_t len = buffer_strlen(buffer);
-
int flags = 0;
#ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL;
@@ -144,58 +163,61 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
#ifdef ENABLE_HTTPS
uint32_t options = (uint32_t)instance->config.options;
- struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data;
+ struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
if (options & EXPORTING_OPTION_USE_TLS)
ERR_clear_error();
#endif
struct stats *stats = &instance->stats;
+ ssize_t header_sent_bytes = 0;
+ ssize_t buffer_sent_bytes = 0;
+ size_t header_len = buffer_strlen(header);
+ size_t buffer_len = buffer_strlen(buffer);
- int ret = 0;
- if (instance->send_header)
- ret = instance->send_header(sock, instance);
-
- ssize_t written = -1;
-
- if (!ret) {
#ifdef ENABLE_HTTPS
- if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP &&
- options & EXPORTING_OPTION_USE_TLS &&
- connector_specific_data->conn &&
- connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- written = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(buffer), len);
- } else {
- written = send(*sock, buffer_tostring(buffer), len, flags);
- }
+ if (exporting_tls_is_enabled(instance->config.type, options) &&
+ connector_specific_data->conn &&
+ connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+ if (header_len)
+ header_sent_bytes = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(header), header_len);
+ if ((size_t)header_sent_bytes == header_len)
+ buffer_sent_bytes = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(buffer), buffer_len);
+ } else {
+ if (header_len)
+ header_sent_bytes = send(*sock, buffer_tostring(header), header_len, flags);
+ if ((size_t)header_sent_bytes == header_len)
+ buffer_sent_bytes = send(*sock, buffer_tostring(buffer), buffer_len, flags);
+ }
#else
- written = send(*sock, buffer_tostring(buffer), len, flags);
+ if (header_len)
+ header_sent_bytes = send(*sock, buffer_tostring(header), header_len, flags);
+ if ((size_t)header_sent_bytes == header_len)
+ buffer_sent_bytes = send(*sock, buffer_tostring(buffer), buffer_len, flags);
#endif
- }
- if(written != -1 && (size_t)written == len) {
+ if ((size_t)buffer_sent_bytes == buffer_len) {
// we sent the data successfully
stats->transmission_successes++;
- stats->sent_bytes += written;
- stats->sent_metrics = stats->buffered_metrics;
+ stats->sent_metrics += buffered_metrics;
+ stats->sent_bytes += buffer_sent_bytes;
// reset the failures count
*failures = 0;
// empty the buffer
buffer_flush(buffer);
- }
- else {
+ } else {
// oops! we couldn't send (all or some of the) data
error(
"EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.",
instance->config.destination,
- len,
- written);
+ buffer_len,
+ buffer_sent_bytes);
stats->transmission_failures++;
- if(written != -1)
- stats->sent_bytes += written;
+ if(buffer_sent_bytes != -1)
+ stats->sent_bytes += buffer_sent_bytes;
// increment the counter we check for data loss
(*failures)++;
@@ -207,22 +229,6 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
}
/**
- * Clean up a simple connector instance on Netdata exit
- *
- * @param instance an instance data structure.
- */
-void simple_connector_cleanup(struct instance *instance)
-{
- info("EXPORTING: cleaning up instance %s ...", instance->config.name);
-
- buffer_free(instance->buffer);
- freez(instance->config.connector_specific_config);
-
- info("EXPORTING: instance %s exited", instance->config.name);
- instance->exited = 1;
-}
-
-/**
* Simple connector worker
*
* Runs in a separate thread for every instance.
@@ -235,60 +241,97 @@ void simple_connector_worker(void *instance_p)
#ifdef ENABLE_HTTPS
uint32_t options = (uint32_t)instance->config.options;
- struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data;
+ struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
if (options & EXPORTING_OPTION_USE_TLS)
ERR_clear_error();
#endif
struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config;
- struct stats *stats = &instance->stats;
int sock = -1;
- struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000,
- .tv_usec = (instance->config.timeoutms * 1000) % 1000000};
+ struct timeval timeout = { .tv_sec = (instance->config.timeoutms * 1000) / 1000000,
+ .tv_usec = (instance->config.timeoutms * 1000) % 1000000 };
int failures = 0;
- while(!instance->engine->exit) {
+ BUFFER *spare_header = buffer_create(0);
+ BUFFER *spare_buffer = buffer_create(0);
+
+ while (!instance->engine->exit) {
+ struct stats *stats = &instance->stats;
+ int send_stats = 0;
+
+ if (instance->data_is_ready)
+ send_stats = 1;
+
+ uv_mutex_lock(&instance->mutex);
+ if (!connector_specific_data->first_buffer->used || failures) {
+ while (!instance->data_is_ready)
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+ instance->data_is_ready = 0;
+ send_stats = 1;
+ }
+
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
+ // ------------------------------------------------------------------------
+ // detach buffer
+
+ BUFFER *header;
+ BUFFER *buffer;
+ size_t buffered_metrics;
+
+ if (!connector_specific_data->previous_buffer ||
+ (connector_specific_data->previous_buffer == connector_specific_data->first_buffer &&
+ connector_specific_data->first_buffer->used == 1)) {
+ connector_specific_data->header = connector_specific_data->first_buffer->header;
+ connector_specific_data->buffer = connector_specific_data->first_buffer->buffer;
+ connector_specific_data->buffered_metrics = connector_specific_data->first_buffer->buffered_metrics;
+ connector_specific_data->buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
+
+ header = connector_specific_data->header;
+ buffer = connector_specific_data->buffer;
+ buffered_metrics = connector_specific_data->buffered_metrics;
+
+ buffer_flush(spare_header);
+ connector_specific_data->first_buffer->header = spare_header;
+ spare_header = header;
+
+ buffer_flush(spare_buffer);
+ connector_specific_data->first_buffer->buffer = spare_buffer;
+ spare_buffer = buffer;
+ } else {
+ header = connector_specific_data->header;
+ buffer = connector_specific_data->buffer;
+ buffered_metrics = connector_specific_data->buffered_metrics;
+ }
- // reset the monitoring chart counters
- stats->received_bytes =
- stats->sent_bytes =
- stats->sent_metrics =
- stats->lost_metrics =
- stats->receptions =
- stats->transmission_successes =
- stats->transmission_failures =
- stats->data_lost_events =
- stats->lost_bytes =
- stats->reconnects = 0;
+ uv_mutex_unlock(&instance->mutex);
// ------------------------------------------------------------------------
// if we are connected, receive a response, without blocking
- if(likely(sock != -1))
+ if (likely(sock != -1))
simple_connector_receive_response(&sock, instance);
// ------------------------------------------------------------------------
// if we are not connected, connect to a data collecting server
- if(unlikely(sock == -1)) {
+ if (unlikely(sock == -1)) {
size_t reconnects = 0;
sock = connect_to_one_of(
- instance->config.destination,
- connector_specific_config->default_port,
- &timeout,
- &reconnects,
- NULL,
- 0);
+ instance->config.destination, connector_specific_config->default_port, &timeout, &reconnects, NULL, 0);
#ifdef ENABLE_HTTPS
- if(instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && sock != -1) {
- if (netdata_opentsdb_ctx) {
- if ( sock_delnonblock(sock) < 0 )
+ if (exporting_tls_is_enabled(instance->config.type, options) && sock != -1) {
+ if (netdata_exporting_ctx) {
+ if (sock_delnonblock(sock) < 0)
error("Exporting cannot remove the non-blocking flag from socket %d", sock);
if (connector_specific_data->conn == NULL) {
- connector_specific_data->conn = SSL_new(netdata_opentsdb_ctx);
+ connector_specific_data->conn = SSL_new(netdata_exporting_ctx);
if (connector_specific_data->conn == NULL) {
error("Failed to allocate SSL structure to socket %d.", sock);
connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE;
@@ -307,50 +350,42 @@ void simple_connector_worker(void *instance_p)
int err = SSL_connect(connector_specific_data->conn);
if (err != 1) {
err = SSL_get_error(connector_specific_data->conn, err);
- error("SSL cannot connect with the server: %s ",
- ERR_error_string((long)SSL_get_error(connector_specific_data->conn, err), NULL));
+ error(
+ "SSL cannot connect with the server: %s ",
+ ERR_error_string((long)SSL_get_error(connector_specific_data->conn, err), NULL));
connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE;
} else {
info("Exporting established a SSL connection.");
struct timeval tv;
- tv.tv_sec = timeout.tv_sec /4;
+ tv.tv_sec = timeout.tv_sec / 4;
tv.tv_usec = 0;
if (!tv.tv_sec)
tv.tv_sec = 2;
- if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)))
+ if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv)))
error("Cannot set timeout to socket %d, this can block communication", sock);
-
}
}
}
}
-
}
#endif
stats->reconnects += reconnects;
}
- if(unlikely(instance->engine->exit)) break;
+ if (unlikely(instance->engine->exit))
+ break;
// ------------------------------------------------------------------------
// if we are connected, send our buffer to the data collecting server
- uv_mutex_lock(&instance->mutex);
- while (!instance->data_is_ready)
- uv_cond_wait(&instance->cond_var, &instance->mutex);
- instance->data_is_ready = 0;
-
- if (unlikely(instance->engine->exit)) {
- uv_mutex_unlock(&instance->mutex);
- break;
- }
+ failures = 0;
if (likely(sock != -1)) {
- simple_connector_send_buffer(&sock, &failures, instance);
+ simple_connector_send_buffer(&sock, &failures, instance, header, buffer, buffered_metrics);
} else {
error("EXPORTING: failed to update '%s'", instance->config.destination);
stats->transmission_failures++;
@@ -359,26 +394,40 @@ void simple_connector_worker(void *instance_p)
failures++;
}
- BUFFER *buffer = instance->buffer;
-
- if (failures > instance->config.buffer_on_failures) {
- stats->lost_bytes += buffer_strlen(buffer);
- error(
- "EXPORTING: connector instance %s reached %d exporting failures. "
- "Flushing buffers to protect this host - this results in data loss on server '%s'",
- instance->config.name, failures, instance->config.destination);
- buffer_flush(buffer);
- failures = 0;
- stats->data_lost_events++;
- stats->lost_metrics = stats->buffered_metrics;
+ if (!failures) {
+ connector_specific_data->first_buffer->buffered_metrics =
+ connector_specific_data->first_buffer->buffered_bytes = connector_specific_data->first_buffer->used = 0;
+ connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
}
- send_internal_metrics(instance);
+ if (unlikely(instance->engine->exit))
+ break;
+
+ if (send_stats) {
+ uv_mutex_lock(&instance->mutex);
+
+ stats->buffered_metrics = connector_specific_data->total_buffered_metrics;
+
+ send_internal_metrics(instance);
- if(likely(buffer_strlen(buffer) == 0))
stats->buffered_metrics = 0;
- uv_mutex_unlock(&instance->mutex);
+ // reset the internal monitoring chart counters
+ connector_specific_data->total_buffered_metrics =
+ stats->buffered_bytes =
+ stats->receptions =
+ stats->received_bytes =
+ stats->sent_metrics =
+ stats->sent_bytes =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->reconnects =
+ stats->data_lost_events =
+ stats->lost_metrics =
+ stats->lost_bytes = 0;
+
+ uv_mutex_unlock(&instance->mutex);
+ }
#ifdef UNIT_TESTING
return;
@@ -390,12 +439,5 @@ void simple_connector_worker(void *instance_p)
clean_prometheus_remote_write(instance);
#endif
-#ifdef ENABLE_HTTPS
- if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && options & EXPORTING_OPTION_USE_TLS) {
- SSL_free(connector_specific_data->conn);
- freez(instance->connector_specific_data);
- }
-#endif
-
simple_connector_cleanup(instance);
}