summaryrefslogtreecommitdiffstats
path: root/exporting/send_data.c
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2019-12-12 21:41:11 +0200
committerGitHub <noreply@github.com>2019-12-12 21:41:11 +0200
commit6f270819121afb743d783f9a72786e367d1048a3 (patch)
tree6e17d6764e5c6d72bb53bbfcc35cecce4b18d325 /exporting/send_data.c
parent7278d5bcd987fb9646da4a8a837173bae3b68459 (diff)
Implement the main flow for the Exporting Engine (#7149)
* Add top level tests * Add a skeleton for preparing buffers * Initialize graphite instance * Prepare buffers for all instances * Add Grafite collected value formatter * Add support for exporting.conf read and parsing * - Use new exporting_config instead of netdata_config * Implement Grafite worker * Disable exporting engine compilation if libuv is not available * Add mutex locks - Configure connectors as connector_<type> in sections of exporting.conf - Change exporting_select_type to check for connector_ fields * - Override exporting_config structure if there no exporting.conf so that look ups don't fail and we maintain backwards compatibility * Separate fixtures in unit tests * Test exporting_discard_responce * Test response receiving * Test buffer sending * Test simple connector worker - Instance section has the format connector:instance_name e.g graphite:my_graphite_instance - Connectors with : in their name e.g graphite:plaintext are reserved So graphite:plaintext is not accepted because it would activate an instance with name "plaintext" It should be graphite:plaintext:instance_name * - Enable the add_connector_instance to cleanup the internal structure by passing NULL,not NULL arguments * Implement configurable update interval - Add additional check to verify instance uniqueness across connectors * Add host and chart filters * Add the value calculation over a database series * Add the calculated over stored data graphite connector * Add tests for graphite connector * Add JSON connector * Add tests for JSON formatting functions * Add OpenTSDB connector * Add tests for the OpenTSDB connector * Add temporaty notes to the documentation
Diffstat (limited to 'exporting/send_data.c')
-rw-r--r--exporting/send_data.c203
1 files changed, 203 insertions, 0 deletions
diff --git a/exporting/send_data.c b/exporting/send_data.c
new file mode 100644
index 0000000000..f7271219f4
--- /dev/null
+++ b/exporting/send_data.c
@@ -0,0 +1,203 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "exporting_engine.h"
+
+/**
+ * Discard response
+ *
+ * Discards a response received by an exporting connector instance after logging a sample of it to error.log
+ *
+ * @param buffer buffer with response data.
+ * @param instance an instance data structure.
+ * @return Always returns 0.
+ */
+int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
+ char sample[1024];
+ const char *s = buffer_tostring(buffer);
+ char *d = sample, *e = &sample[sizeof(sample) - 1];
+
+ for(; *s && d < e ;s++) {
+ char c = *s;
+ if(unlikely(!isprint(c))) c = ' ';
+ *d++ = c;
+ }
+ *d = '\0';
+
+ info(
+ "EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'",
+ buffer_strlen(buffer),
+ instance->config.name,
+ sample);
+ buffer_flush(buffer);
+ return 0;
+}
+
+/**
+ * Receive response
+ *
+ * @param sock communication socket.
+ * @param instance an instance data structure.
+ */
+void simple_connector_receive_response(int *sock, struct instance *instance)
+{
+ static BUFFER *response = NULL;
+ if (!response)
+ response = buffer_create(1);
+
+ struct stats *stats = &instance->stats;
+
+ errno = 0;
+
+ // loop through to collect all data
+ while (*sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r;
+ r = recv(*sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ if (likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ stats->chart_received_bytes += r;
+ stats->chart_receptions++;
+ } else if (r == 0) {
+ error("EXPORTING: '%s' closed the socket", instance->config.destination);
+ close(*sock);
+ *sock = -1;
+ } else {
+ // failed to receive data
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("EXPORTING: cannot receive data from '%s'.", instance->config.destination);
+ }
+ }
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+
+ // if we received data, process them
+ if (buffer_strlen(response))
+ exporting_discard_response(response, instance);
+}
+
+/**
+ * Send buffer to a server
+ *
+ * @param sock communication socket.
+ * @param failures the number of communication failures.
+ * @param instance an instance data structure.
+ */
+void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance)
+{
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t len = buffer_strlen(buffer);
+
+ int flags = 0;
+#ifdef MSG_NOSIGNAL
+ flags += MSG_NOSIGNAL;
+#endif
+
+ struct stats *stats = &instance->stats;
+
+ ssize_t written;
+ written = send(*sock, buffer_tostring(buffer), len, flags);
+
+ if(written != -1 && (size_t)written == len) {
+ // we sent the data successfully
+ stats->chart_transmission_successes++;
+ stats->chart_sent_bytes += written;
+ stats->chart_sent_metrics = stats->chart_buffered_metrics;
+
+ // reset the failures count
+ *failures = 0;
+
+ // empty the buffer
+ buffer_flush(buffer);
+ }
+ else {
+ // oops! we couldn't send (all or some of the) data
+ error(
+ "EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.",
+ instance->config.destination,
+ len,
+ written);
+ stats->chart_transmission_failures++;
+
+ if(written != -1)
+ stats->chart_sent_bytes += written;
+
+ // increment the counter we check for data loss
+ (*failures)++;
+
+ // close the socket - we will re-open it next time
+ close(*sock);
+ *sock = -1;
+ }
+}
+
+/**
+ * Simple connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void simple_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance*)instance_p;
+
+ struct simple_connector_config *connector_specific_config = instance->connector->config.connector_specific_config;
+ struct stats *stats = &instance->stats;
+
+ int sock = -1;
+ struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000,
+ .tv_usec = (instance->config.timeoutms * 1000) % 1000000};
+ int failures = 0;
+
+ while(!netdata_exit) {
+ // ------------------------------------------------------------------------
+ // if we are connected, receive a response, without blocking
+
+ if(likely(sock != -1))
+ simple_connector_receive_response(&sock, instance);
+
+ // ------------------------------------------------------------------------
+ // if we are not connected, connect to a data collecting server
+
+ if(unlikely(sock == -1)) {
+ size_t reconnects = 0;
+
+ sock = connect_to_one_of(
+ instance->config.destination,
+ connector_specific_config->default_port,
+ &timeout,
+ &reconnects,
+ NULL,
+ 0);
+ stats->chart_reconnects += reconnects;
+ }
+
+ if(unlikely(netdata_exit)) break;
+
+ // ------------------------------------------------------------------------
+ // if we are connected, send our buffer to the data collecting server
+
+ uv_mutex_lock(&instance->mutex);
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+
+ if (likely(sock != -1)) {
+ simple_connector_send_buffer(&sock, &failures, instance);
+ } else {
+ error("EXPORTING: failed to update '%s'", instance->config.destination);
+ stats->chart_transmission_failures++;
+
+ // increment the counter we check for data loss
+ failures++;
+ }
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+}