summaryrefslogtreecommitdiffstats
path: root/exporting/send_data.c
diff options
context:
space:
mode:
Diffstat (limited to 'exporting/send_data.c')
-rw-r--r--exporting/send_data.c203
1 files changed, 203 insertions, 0 deletions
diff --git a/exporting/send_data.c b/exporting/send_data.c
new file mode 100644
index 0000000000..f7271219f4
--- /dev/null
+++ b/exporting/send_data.c
@@ -0,0 +1,203 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "exporting_engine.h"
+
+/**
+ * Discard response
+ *
+ * Discards a response received by an exporting connector instance after logging a sample of it to error.log
+ *
+ * @param buffer buffer with response data.
+ * @param instance an instance data structure.
+ * @return Always returns 0.
+ */
+int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
+ char sample[1024];
+ const char *s = buffer_tostring(buffer);
+ char *d = sample, *e = &sample[sizeof(sample) - 1];
+
+ for(; *s && d < e ;s++) {
+ char c = *s;
+ if(unlikely(!isprint(c))) c = ' ';
+ *d++ = c;
+ }
+ *d = '\0';
+
+ info(
+ "EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'",
+ buffer_strlen(buffer),
+ instance->config.name,
+ sample);
+ buffer_flush(buffer);
+ return 0;
+}
+
+/**
+ * Receive response
+ *
+ * @param sock communication socket.
+ * @param instance an instance data structure.
+ */
+void simple_connector_receive_response(int *sock, struct instance *instance)
+{
+ static BUFFER *response = NULL;
+ if (!response)
+ response = buffer_create(1);
+
+ struct stats *stats = &instance->stats;
+
+ errno = 0;
+
+ // loop through to collect all data
+ while (*sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r;
+ r = recv(*sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ if (likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ stats->chart_received_bytes += r;
+ stats->chart_receptions++;
+ } else if (r == 0) {
+ error("EXPORTING: '%s' closed the socket", instance->config.destination);
+ close(*sock);
+ *sock = -1;
+ } else {
+ // failed to receive data
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("EXPORTING: cannot receive data from '%s'.", instance->config.destination);
+ }
+ }
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+
+ // if we received data, process them
+ if (buffer_strlen(response))
+ exporting_discard_response(response, instance);
+}
+
+/**
+ * Send buffer to a server
+ *
+ * @param sock communication socket.
+ * @param failures the number of communication failures.
+ * @param instance an instance data structure.
+ */
+void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance)
+{
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t len = buffer_strlen(buffer);
+
+ int flags = 0;
+#ifdef MSG_NOSIGNAL
+ flags += MSG_NOSIGNAL;
+#endif
+
+ struct stats *stats = &instance->stats;
+
+ ssize_t written;
+ written = send(*sock, buffer_tostring(buffer), len, flags);
+
+ if(written != -1 && (size_t)written == len) {
+ // we sent the data successfully
+ stats->chart_transmission_successes++;
+ stats->chart_sent_bytes += written;
+ stats->chart_sent_metrics = stats->chart_buffered_metrics;
+
+ // reset the failures count
+ *failures = 0;
+
+ // empty the buffer
+ buffer_flush(buffer);
+ }
+ else {
+ // oops! we couldn't send (all or some of the) data
+ error(
+ "EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.",
+ instance->config.destination,
+ len,
+ written);
+ stats->chart_transmission_failures++;
+
+ if(written != -1)
+ stats->chart_sent_bytes += written;
+
+ // increment the counter we check for data loss
+ (*failures)++;
+
+ // close the socket - we will re-open it next time
+ close(*sock);
+ *sock = -1;
+ }
+}
+
+/**
+ * Simple connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void simple_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance*)instance_p;
+
+ struct simple_connector_config *connector_specific_config = instance->connector->config.connector_specific_config;
+ struct stats *stats = &instance->stats;
+
+ int sock = -1;
+ struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000,
+ .tv_usec = (instance->config.timeoutms * 1000) % 1000000};
+ int failures = 0;
+
+ while(!netdata_exit) {
+ // ------------------------------------------------------------------------
+ // if we are connected, receive a response, without blocking
+
+ if(likely(sock != -1))
+ simple_connector_receive_response(&sock, instance);
+
+ // ------------------------------------------------------------------------
+ // if we are not connected, connect to a data collecting server
+
+ if(unlikely(sock == -1)) {
+ size_t reconnects = 0;
+
+ sock = connect_to_one_of(
+ instance->config.destination,
+ connector_specific_config->default_port,
+ &timeout,
+ &reconnects,
+ NULL,
+ 0);
+ stats->chart_reconnects += reconnects;
+ }
+
+ if(unlikely(netdata_exit)) break;
+
+ // ------------------------------------------------------------------------
+ // if we are connected, send our buffer to the data collecting server
+
+ uv_mutex_lock(&instance->mutex);
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+
+ if (likely(sock != -1)) {
+ simple_connector_send_buffer(&sock, &failures, instance);
+ } else {
+ error("EXPORTING: failed to update '%s'", instance->config.destination);
+ stats->chart_transmission_failures++;
+
+ // increment the counter we check for data loss
+ failures++;
+ }
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+}