summaryrefslogtreecommitdiffstats
path: root/exporting
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2019-12-12 21:41:11 +0200
committerGitHub <noreply@github.com>2019-12-12 21:41:11 +0200
commit6f270819121afb743d783f9a72786e367d1048a3 (patch)
tree6e17d6764e5c6d72bb53bbfcc35cecce4b18d325 /exporting
parent7278d5bcd987fb9646da4a8a837173bae3b68459 (diff)
Implement the main flow for the Exporting Engine (#7149)
* Add top level tests * Add a skeleton for preparing buffers * Initialize graphite instance * Prepare buffers for all instances * Add Grafite collected value formatter * Add support for exporting.conf read and parsing * - Use new exporting_config instead of netdata_config * Implement Grafite worker * Disable exporting engine compilation if libuv is not available * Add mutex locks - Configure connectors as connector_<type> in sections of exporting.conf - Change exporting_select_type to check for connector_ fields * - Override exporting_config structure if there no exporting.conf so that look ups don't fail and we maintain backwards compatibility * Separate fixtures in unit tests * Test exporting_discard_responce * Test response receiving * Test buffer sending * Test simple connector worker - Instance section has the format connector:instance_name e.g graphite:my_graphite_instance - Connectors with : in their name e.g graphite:plaintext are reserved So graphite:plaintext is not accepted because it would activate an instance with name "plaintext" It should be graphite:plaintext:instance_name * - Enable the add_connector_instance to cleanup the internal structure by passing NULL,not NULL arguments * Implement configurable update interval - Add additional check to verify instance uniqueness across connectors * Add host and chart filters * Add the value calculation over a database series * Add the calculated over stored data graphite connector * Add tests for graphite connector * Add JSON connector * Add tests for JSON formatting functions * Add OpenTSDB connector * Add tests for the OpenTSDB connector * Add temporaty notes to the documentation
Diffstat (limited to 'exporting')
-rw-r--r--exporting/Makefile.am15
-rw-r--r--exporting/README.md45
-rw-r--r--exporting/check_filters.c78
-rw-r--r--exporting/exporting_engine.c60
-rw-r--r--exporting/exporting_engine.h185
-rw-r--r--exporting/graphite/Makefile.am4
-rw-r--r--exporting/graphite/graphite.c138
-rw-r--r--exporting/graphite/graphite.h13
-rw-r--r--exporting/init_connectors.c72
-rw-r--r--exporting/json/Makefile.am4
-rw-r--r--exporting/json/json.c194
-rw-r--r--exporting/json/json.h13
-rw-r--r--exporting/opentsdb/Makefile.am4
-rw-r--r--exporting/opentsdb/opentsdb.c305
-rw-r--r--exporting/opentsdb/opentsdb.h18
-rw-r--r--exporting/process_data.c404
-rw-r--r--exporting/read_config.c354
-rw-r--r--exporting/send_data.c203
-rw-r--r--exporting/send_internal_metrics.c18
-rw-r--r--exporting/tests/Makefile.am4
-rw-r--r--exporting/tests/exporting_doubles.c162
-rw-r--r--exporting/tests/exporting_fixtures.c110
-rw-r--r--exporting/tests/netdata_doubles.c134
-rw-r--r--exporting/tests/system_doubles.c61
-rw-r--r--exporting/tests/test_exporting_engine.c738
-rw-r--r--exporting/tests/test_exporting_engine.h110
26 files changed, 3446 insertions, 0 deletions
diff --git a/exporting/Makefile.am b/exporting/Makefile.am
new file mode 100644
index 0000000000..ce6282989a
--- /dev/null
+++ b/exporting/Makefile.am
@@ -0,0 +1,15 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+SUBDIRS = \
+ tests \
+ graphite \
+ json \
+ opentsdb \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/exporting/README.md b/exporting/README.md
new file mode 100644
index 0000000000..84cbb760ac
--- /dev/null
+++ b/exporting/README.md
@@ -0,0 +1,45 @@
+# Exporting metrics to external databases (experimental)
+
+The exporting engine is an update for the former [backends](../backends/). It's still work in progress. It has a
+modular structure and supports metric exporting via multiple exporting connector instances at the same time. You can
+have different update intervals and filters configured for every exporting connector instance. The exporting engine has
+its own configuration file `exporting.conf`. Configuration is almost similar to [backends](../backends/#configuration).
+The only difference is that the type of a connector should be specified in a section name before a colon and a name after
+the colon. At the moment only four types of connectors are supported: `graphite`, `json`, `opentsdb`, `opentsdb:http`.
+
+An example configuration:
+```conf
+[exporting:global]
+enabled = yes
+
+[graphite:my_instance1]
+enabled = yes
+destination = localhost:2003
+data source = sum
+update every = 5
+send charts matching = system.load
+
+[json:my_instance2]
+enabled = yes
+destination = localhost:5448
+data source = as collected
+update every = 2
+send charts matching = system.active_processes
+
+[opentsdb:my_instance3]
+enabled = yes
+destination = localhost:4242
+data source = sum
+update every = 10
+send charts matching = system.cpu
+
+[opentsdb:http:my_instance4]
+enabled = yes
+destination = localhost:4243
+data source = average
+update every = 3
+send charts matching = system.active_processes
+
+```
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/exporting/check_filters.c b/exporting/check_filters.c
new file mode 100644
index 0000000000..f1ed9e828c
--- /dev/null
+++ b/exporting/check_filters.c
@@ -0,0 +1,78 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "exporting_engine.h"
+
+/**
+ * Check if the connector instance should export the host metrics
+ *
+ * @param instance an exporting connector instance.
+ * @param host a data collecting host.
+ * @return Returns 1 if the connector instance should export the host metrics
+ */
+int rrdhost_is_exportable(struct instance *instance, RRDHOST *host)
+{
+ if (host->exporting_flags == NULL)
+ host->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
+
+ RRDHOST_FLAGS *flags = &host->exporting_flags[instance->index];
+
+ if (unlikely((*flags & (RRDHOST_FLAG_BACKEND_SEND | RRDHOST_FLAG_BACKEND_DONT_SEND)) == 0)) {
+ char *host_name = (host == localhost) ? "localhost" : host->hostname;
+
+ if (!instance->config.hosts_pattern || simple_pattern_matches(instance->config.hosts_pattern, host_name)) {
+ *flags |= RRDHOST_FLAG_BACKEND_SEND;
+ info("enabled exporting of host '%s' for instance '%s'", host_name, instance->config.name);
+ } else {
+ *flags |= RRDHOST_FLAG_BACKEND_DONT_SEND;
+ info("disabled exporting of host '%s' for instance '%s'", host_name, instance->config.name);
+ }
+ }
+
+ if (likely(*flags & RRDHOST_FLAG_BACKEND_SEND))
+ return 1;
+ else
+ return 0;
+}
+
+/**
+ * Check if the connector instance should export the chart
+ *
+ * @param instance an exporting connector instance.
+ * @param st a chart.
+ * @return Returns 1 if the connector instance should export the chart
+ */
+int rrdset_is_exportable(struct instance *instance, RRDSET *st)
+{
+ RRDHOST *host = st->rrdhost;
+
+ if (st->exporting_flags == NULL)
+ st->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
+
+ RRDSET_FLAGS *flags = &st->exporting_flags[instance->index];
+
+ if(unlikely(*flags & RRDSET_FLAG_BACKEND_IGNORE))
+ return 0;
+
+ if(unlikely(!(*flags & RRDSET_FLAG_BACKEND_SEND))) {
+ // we have not checked this chart
+ if(simple_pattern_matches(instance->config.charts_pattern, st->id) || simple_pattern_matches(instance->config.charts_pattern, st->name))
+ *flags |= RRDSET_FLAG_BACKEND_SEND;
+ else {
+ *flags |= RRDSET_FLAG_BACKEND_IGNORE;
+ debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is disabled for backends.", st->id, host->hostname);
+ return 0;
+ }
+ }
+
+ if(unlikely(!rrdset_is_available_for_backends(st))) {
+ debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is not available for backends.", st->id, host->hostname);
+ return 0;
+ }
+
+ if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE && !(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED))) {
+ debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s' because its memory mode is '%s' and the backend requires database access.", st->id, host->hostname, rrd_memory_mode_name(host->rrd_memory_mode));
+ return 0;
+ }
+
+ return 1;
+}
diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c
new file mode 100644
index 0000000000..24d8756dc2
--- /dev/null
+++ b/exporting/exporting_engine.c
@@ -0,0 +1,60 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "exporting_engine.h"
+
+/**
+ * Exporting engine main
+ *
+ * The main thread used to control the exporting engine.
+ *
+ * @param ptr a pointer to netdata_static_structure.
+ *
+ * @return It always returns NULL.
+ */
+void *exporting_main(void *ptr)
+{
+ (void)ptr;
+
+ struct engine *engine = read_exporting_config();
+ if (!engine) {
+ info("EXPORTING: no exporting connectors configured");
+ return NULL;
+ }
+
+ if (init_connectors(engine) != 0) {
+ error("EXPORTING: cannot initialize exporting connectors");
+ return NULL;
+ }
+
+ usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+
+ while (!netdata_exit) {
+ heartbeat_next(&hb, step_ut);
+ engine->now = now_realtime_sec();
+
+ if (mark_scheduled_instances(engine)) {
+ if (prepare_buffers(engine) != 0) {
+ error("EXPORTING: cannot prepare data to send");
+ return NULL;
+ }
+ }
+
+ if (notify_workers(engine) != 0) {
+ error("EXPORTING: cannot communicate with exporting connector instance working threads");
+ return NULL;
+ }
+
+ if (send_internal_metrics(engine) != 0) {
+ error("EXPORTING: cannot send metrics for the operation of exporting engine");
+ return NULL;
+ }
+
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+
+ return NULL;
+}
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h
new file mode 100644
index 0000000000..02d0c89bfe
--- /dev/null
+++ b/exporting/exporting_engine.h
@@ -0,0 +1,185 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_ENGINE_H
+#define NETDATA_EXPORTING_ENGINE_H 1
+
+#include "daemon/common.h"
+
+#include <uv.h>
+
+#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value)
+#define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value)
+#define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value)
+
+extern struct config exporting_config;
+
+#define EXPORTER_DATA_SOURCE "data source"
+#define EXPORTER_DATA_SOURCE_DEFAULT "average"
+
+#define EXPORTER_DESTINATION "destination"
+#define EXPORTER_DESTINATION_DEFAULT "localhost"
+
+#define EXPORTER_UPDATE_EVERY "update every"
+#define EXPORTER_UPDATE_EVERY_DEFAULT 10
+
+#define EXPORTER_BUF_ONFAIL "buffer on failures"
+#define EXPORTER_BUF_ONFAIL_DEFAULT 10
+
+#define EXPORTER_TIMEOUT_MS "timeout ms"
+#define EXPORTER_TIMEOUT_MS_DEFAULT 10000
+
+#define EXPORTER_SEND_CHART_MATCH "send charts matching"
+#define EXPORTER_SEND_CHART_MATCH_DEFAULT "*"
+
+#define EXPORTER_SEND_HOST_MATCH "send hosts matching"
+#define EXPORTER_SEND_HOST_MATCH_DEFAULT "localhost *"
+
+#define EXPORTER_SEND_NAMES "send names instead of ids"
+#define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES
+
+typedef enum exporting_options {
+ EXPORTING_OPTION_NONE = 0,
+
+ EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0),
+ EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1),
+ EXPORTING_SOURCE_DATA_SUM = (1 << 2),
+
+ EXPORTING_OPTION_SEND_NAMES = (1 << 16)
+} EXPORTING_OPTIONS;
+
+#define EXPORTING_OPTIONS_SOURCE_BITS \
+ (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM)
+#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS)
+
+struct engine;
+
+struct instance_config {
+ const char *name;
+ const char *destination;
+
+ int update_every;
+ int buffer_on_failures;
+ long timeoutms;
+
+ EXPORTING_OPTIONS options;
+ SIMPLE_PATTERN *charts_pattern;
+ SIMPLE_PATTERN *hosts_pattern;
+
+ void *connector_specific_config;
+};
+
+struct simple_connector_config {
+ int default_port;
+};
+
+struct connector_config {
+ BACKEND_TYPE type;
+ void *connector_specific_config;
+};
+
+struct engine_config {
+ const char *prefix;
+ const char *hostname;
+ int update_every;
+};
+
+struct stats {
+ collected_number chart_buffered_metrics;
+ collected_number chart_lost_metrics;
+ collected_number chart_sent_metrics;
+ collected_number chart_buffered_bytes;
+ collected_number chart_received_bytes;
+ collected_number chart_sent_bytes;
+ collected_number chart_receptions;
+ collected_number chart_transmission_successes;
+ collected_number chart_transmission_failures;
+ collected_number chart_data_lost_events;
+ collected_number chart_lost_bytes;
+ collected_number chart_reconnects;
+};
+
+struct instance {
+ struct instance_config config;
+ void *buffer;
+ struct stats stats;
+
+ int scheduled;
+ int skip_host;
+ int skip_chart;
+
+ time_t after;
+ time_t before;
+
+ uv_thread_t thread;
+ uv_mutex_t mutex;
+ uv_cond_t cond_var;
+
+ int (*start_batch_formatting)(struct instance *instance);
+ int (*start_host_formatting)(struct instance *instance, RRDHOST *host);
+ int (*start_chart_formatting)(struct instance *instance, RRDSET *st);
+ int (*metric_formatting)(struct instance *instance, RRDDIM *rd);
+ int (*end_chart_formatting)(struct instance *instance, RRDSET *st);
+ int (*end_host_formatting)(struct instance *instance, RRDHOST *host);
+ int (*end_batch_formatting)(struct instance *instance);
+
+ size_t index;
+ struct instance *next;
+ struct connector *connector;
+};
+
+struct connector {
+ struct connector_config config;
+
+ void (*worker)(void *instance_p);
+
+ struct instance *instance_root;
+ struct connector *next;
+ struct engine *engine;
+};
+
+struct engine {
+ struct engine_config config;
+
+ size_t instance_num;
+ time_t now;
+
+ struct connector *connector_root;
+};
+
+void *exporting_main(void *ptr);
+
+struct engine *read_exporting_config();
+BACKEND_TYPE exporting_select_type(const char *type);
+
+int init_connectors(struct engine *engine);
+
+int mark_scheduled_instances(struct engine *engine);
+int prepare_buffers(struct engine *engine);
+int notify_workers(struct engine *engine);
+
+size_t exporting_name_copy(char *dst, const char *src, size_t max_len);
+
+int rrdhost_is_exportable(struct instance *instance, RRDHOST *host);
+int rrdset_is_exportable(struct instance *instance, RRDSET *st);
+
+calculated_number exporting_calculate_value_from_stored_data(
+ struct instance *instance,
+ RRDDIM *rd,
+ time_t *last_timestamp);
+
+int start_batch_formatting(struct engine *engine);
+int start_host_formatting(struct engine *engine, RRDHOST *host);
+int start_chart_formatting(struct engine *engine, RRDSET *st);
+int metric_formatting(struct engine *engine, RRDDIM *rd);
+int end_chart_formatting(struct engine *engine, RRDSET *st);
+int end_host_formatting(struct engine *engine, RRDHOST *host);
+int end_batch_formatting(struct engine *engine);
+
+int exporting_discard_response(BUFFER *buffer, struct instance *instance);
+void simple_connector_receive_response(int *sock, struct instance *instance);
+void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance);
+void simple_connector_worker(void *instance_p);
+
+int send_internal_metrics(struct engine *engine);
+
+#endif /* NETDATA_EXPORTING_ENGINE_H */
diff --git a/exporting/graphite/Makefile.am b/exporting/graphite/Makefile.am
new file mode 100644
index 0000000000..babdcf0df3
--- /dev/null
+++ b/exporting/graphite/Makefile.am
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
diff --git a/exporting/graphite/graphite.c b/exporting/graphite/graphite.c
new file mode 100644
index 0000000000..ec36b298e4
--- /dev/null
+++ b/exporting/graphite/graphite.c
@@ -0,0 +1,138 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "graphite.h"
+
+/**
+ * Initialize Graphite connector
+ *
+ * @param instance a connector data structure.
+ * @return Always returns 0.
+ */
+int init_graphite_connector(struct connector *connector)
+{
+ connector->worker = simple_connector_worker;
+
+ struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
+ connector->config.connector_specific_config = (void *)connector_specific_config;
+ connector_specific_config->default_port = 2003;
+
+ return 0;
+}
+
+/**
+ * Initialize Graphite connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_graphite_instance(struct instance *instance)
+{
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = NULL;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_graphite_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_graphite_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->end_host_formatting = NULL;
+ instance->end_batch_formatting = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for graphite exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ uv_mutex_init(&instance->mutex);
+ uv_cond_init(&instance->cond_var);
+
+ return 0;
+}
+
+/**
+ * Format dimension using collected data for Graphite connector
+ *
+ * @param instance an instance data structure.
+ * @param rd a dimension.
+ * @return Always returns 0.
+ */
+int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd)
+{
+ struct engine *engine = instance->connector->engine;
+ RRDSET *st = rd->rrdset;
+ RRDHOST *host = st->rrdhost;
+
+ char chart_name[RRD_ID_LENGTH_MAX + 1];
+ exporting_name_copy(
+ chart_name,
+ (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
+ RRD_ID_LENGTH_MAX);
+
+ char dimension_name[RRD_ID_LENGTH_MAX + 1];
+ exporting_name_copy(
+ dimension_name,
+ (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
+ RRD_ID_LENGTH_MAX);
+
+ buffer_sprintf(
+ instance->buffer,
+ "%s.%s.%s.%s%s%s " COLLECTED_NUMBER_FORMAT " %llu\n",
+ engine->config.prefix,
+ engine->config.hostname,
+ chart_name,
+ dimension_name,
+ (host->tags) ? ";" : "",
+ (host->tags) ? host->tags : "",
+ rd->last_collected_value,
+ (unsigned long long)rd->last_collected_time.tv_sec);
+
+ return 0;
+}
+
+/**
+ * Format dimension using a calculated value from stored data for Graphite connector
+ *
+ * @param instance an instance data structure.
+ * @param rd a dimension.
+ * @return Always returns 0.
+ */
+int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd)
+{
+ struct engine *engine = instance->connector->engine;
+ RRDSET *st = rd->rrdset;
+ RRDHOST *host = st->rrdhost;
+
+ char chart_name[RRD_ID_LENGTH_MAX + 1];
+ exporting_name_copy(
+ chart_name,
+ (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
+ RRD_ID_LENGTH_MAX);
+
+ char dimension_name[RRD_ID_LENGTH_MAX + 1];
+ exporting_name_copy(
+ dimension_name,
+ (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
+ RRD_ID_LENGTH_MAX);
+
+ time_t last_t;
+ calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
+
+ if(isnan(value))
+ return 0;
+
+ buffer_sprintf(
+ instance->buffer,
+ "%s.%s.%s.%s%s%s " CALCULATED_NUMBER_FORMAT " %llu\n",
+ engine->config.prefix,
+ engine->config.hostname,
+ chart_name,
+ dimension_name,
+ (host->tags) ? ";" : "",
+ (host->tags) ? host->tags : "",
+ value,
+ (unsigned long long)last_t);
+
+ return 0;
+}
diff --git a/exporting/graphite/graphite.h b/exporting/graphite/graphite.h
new file mode 100644
index 0000000000..e5a2001c28
--- /dev/null
+++ b/exporting/graphite/graphite.h
@@ -0,0 +1,13 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_GRAPHITE_H
+#define NETDATA_EXPORTING_GRAPHITE_H
+
+#include "exporting/exporting_engine.h"
+
+int init_graphite_connector(struct connector *connector);
+int init_graphite_instance(struct instance *instance);
+int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd);
+int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd);
+
+#endif //NETDATA_EXPORTING_GRAPHITE_H
diff --git a/exporting/init_connectors.c b/exporting/init_connectors.c
new file mode 100644
index 0000000000..a15665a966
--- /dev/null
+++ b/exporting/init_connectors.c
@@ -0,0 +1,72 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "exporting_engine.h"
+#include "graphite/graphite.h"
+#include "json/json.h"
+#include "opentsdb/opentsdb.h"
+
+/**
+ * Initialize connectors
+ *
+ * @param engine an engine data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_connectors(struct engine *engine)
+{
+ engine->now = now_realtime_sec();
+
+ for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
+ switch (connector->config.type) {
+ case BACKEND_TYPE_GRAPHITE:
+ if (init_graphite_connector(connector) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_JSON:
+ if (init_json_connector(connector) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_OPENTSDB_USING_TELNET:
+ if (init_opentsdb_connector(connector) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_OPENTSDB_USING_HTTP:
+ if (init_opentsdb_connector(connector) != 0)
+ return 1;
+ break;
+ default:
+ error("EXPORTING: unknown exporting connector type");
+ return 1;
+ }
+ for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
+ instance->index = engine->instance_num++;
+ instance->after = engine->now;
+
+ switch (connector->config.type) {
+ case BACKEND_TYPE_GRAPHITE:
+ if (init_graphite_instance(instance) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_JSON:
+ if (init_json_instance(instance) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_OPENTSDB_USING_TELNET:
+ if (init_opentsdb_telnet_instance(instance) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_OPENTSDB_USING_HTTP:
+ if (init_opentsdb_http_instance(instance) != 0)
+ return 1;
+ break;
+ default:
+ error("EXPORTING: unknown exporting connector type");
+ return 1;
+ }
+
+ // dispatch the instance worker thread
+ uv_thread_create(&instance->thread, connector->worker, instance);
+ }
+ }
+
+ return 0;
+}
diff --git a/exporting/json/Makefile.am b/exporting/json/Makefile.am
new file mode 100644
index 0000000000..babdcf0df3
--- /dev/null
+++ b/exporting/json/Makefile.am
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
diff --git a/exporting/json/json.c b/exporting/json/json.c
new file mode 100644
index 0000000000..59ceb046be
--- /dev/null
+++ b/exporting/json/json.c
@@ -0,0 +1,194 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "json.h"
+
+/**
+ * Initialize JSON connector
+ *
+ * @param instance a connector data structure.
+ * @return Always returns 0.
+ */
+int init_json_connector(struct connector *connector)
+{
+ connector->worker = simple_connector_worker;
+
+ struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
+ connector->config.connector_specific_config = (void *)connector_specific_config;
+ connector_specific_config->default_port = 5448;
+
+ return 0;
+}
+
+/**
+ * Initialize JSON connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_json_instance(struct instance *instance)
+{
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = NULL;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->end_host_formatting = NULL;
+ instance->end_batch_formatting = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for json exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ uv_mutex_init(&instance->mutex);
+ uv_cond_init(&instance->cond_var);
+
+ return 0;
+}
+
+/**
+ * Format dimension using collected data for JSON connector
+ *
+ * @param instance an instance data structure.
+ * @param rd a dimension.
+ * @return Always returns 0.
+ */
+int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd)
+{
+ struct engine *engine = instance->connector->engine;
+ RRDSET *st = rd->rrdset;
+ RRDHOST *host = st->rrdhost;
+
+ const char *tags_pre = "", *tags_post = "", *tags = host->tags;
+ if (!tags)
+ tags = "";
+
+ if (*tags) {
+ if (*tags == '{' || *tags == '[' || *tags == '"') {
+ tags_pre = "\"host_tags\":";
+ tags_post = ",";
+ } else {
+ tags_pre = "\"host_tags\":\"";
+ tags_post = "\",";
+ }
+ }
+
+ buffer_sprintf(
+ instance->buffer,
+ "{"
+ "\"prefix\":\"%s\","
+ "\"hostname\":\"%s\","
+ "%s%s%s"
+
+ "\"chart_id\":\"%s\","
+ "\"chart_name\":\"%s\","
+ "\"chart_family\":\"%s\","
+ "\"chart_context\": \"%s\","
+ "\"chart_type\":\"%s\","
+ "\"units\": \"%s\","
+
+ "\"id\":\"%s\","
+ "\"name\":\"%s\","
+ "\"value\":" COLLECTED_NUMBER_FORMAT ","
+
+ "\"timestamp\": %llu}\n",