diff options
author | Vladimir Kobal <vlad@prokk.net> | 2019-12-12 21:41:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-12 21:41:11 +0200 |
commit | 6f270819121afb743d783f9a72786e367d1048a3 (patch) | |
tree | 6e17d6764e5c6d72bb53bbfcc35cecce4b18d325 /exporting | |
parent | 7278d5bcd987fb9646da4a8a837173bae3b68459 (diff) |
Implement the main flow for the Exporting Engine (#7149)
* Add top level tests
* Add a skeleton for preparing buffers
* Initialize graphite instance
* Prepare buffers for all instances
* Add Grafite collected value formatter
* Add support for exporting.conf read and parsing
* - Use new exporting_config instead of netdata_config
* Implement Grafite worker
* Disable exporting engine compilation if libuv is not available
* Add mutex locks
- Configure connectors as connector_<type> in sections of exporting.conf
- Change exporting_select_type to check for connector_ fields
* - Override exporting_config structure if there no exporting.conf so that
look ups don't fail and we maintain backwards compatibility
* Separate fixtures in unit tests
* Test exporting_discard_responce
* Test response receiving
* Test buffer sending
* Test simple connector worker
- Instance section has the format connector:instance_name
e.g graphite:my_graphite_instance
- Connectors with : in their name e.g graphite:plaintext are reserved
So graphite:plaintext is not accepted because it would activate an
instance with name "plaintext"
It should be graphite:plaintext:instance_name
* - Enable the add_connector_instance to cleanup the internal structure
by passing NULL,not NULL arguments
* Implement configurable update interval
- Add additional check to verify instance uniqueness across connectors
* Add host and chart filters
* Add the value calculation over a database series
* Add the calculated over stored data graphite connector
* Add tests for graphite connector
* Add JSON connector
* Add tests for JSON formatting functions
* Add OpenTSDB connector
* Add tests for the OpenTSDB connector
* Add temporaty notes to the documentation
Diffstat (limited to 'exporting')
26 files changed, 3446 insertions, 0 deletions
diff --git a/exporting/Makefile.am b/exporting/Makefile.am new file mode 100644 index 0000000000..ce6282989a --- /dev/null +++ b/exporting/Makefile.am @@ -0,0 +1,15 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +SUBDIRS = \ + tests \ + graphite \ + json \ + opentsdb \ + $(NULL) + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/exporting/README.md b/exporting/README.md new file mode 100644 index 0000000000..84cbb760ac --- /dev/null +++ b/exporting/README.md @@ -0,0 +1,45 @@ +# Exporting metrics to external databases (experimental) + +The exporting engine is an update for the former [backends](../backends/). It's still work in progress. It has a +modular structure and supports metric exporting via multiple exporting connector instances at the same time. You can +have different update intervals and filters configured for every exporting connector instance. The exporting engine has +its own configuration file `exporting.conf`. Configuration is almost similar to [backends](../backends/#configuration). +The only difference is that the type of a connector should be specified in a section name before a colon and a name after +the colon. At the moment only four types of connectors are supported: `graphite`, `json`, `opentsdb`, `opentsdb:http`. + +An example configuration: +```conf +[exporting:global] +enabled = yes + +[graphite:my_instance1] +enabled = yes +destination = localhost:2003 +data source = sum +update every = 5 +send charts matching = system.load + +[json:my_instance2] +enabled = yes +destination = localhost:5448 +data source = as collected +update every = 2 +send charts matching = system.active_processes + +[opentsdb:my_instance3] +enabled = yes +destination = localhost:4242 +data source = sum +update every = 10 +send charts matching = system.cpu + +[opentsdb:http:my_instance4] +enabled = yes +destination = localhost:4243 +data source = average +update every = 3 +send charts matching = system.active_processes + +``` + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/exporting/check_filters.c b/exporting/check_filters.c new file mode 100644 index 0000000000..f1ed9e828c --- /dev/null +++ b/exporting/check_filters.c @@ -0,0 +1,78 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "exporting_engine.h" + +/** + * Check if the connector instance should export the host metrics + * + * @param instance an exporting connector instance. + * @param host a data collecting host. + * @return Returns 1 if the connector instance should export the host metrics + */ +int rrdhost_is_exportable(struct instance *instance, RRDHOST *host) +{ + if (host->exporting_flags == NULL) + host->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t)); + + RRDHOST_FLAGS *flags = &host->exporting_flags[instance->index]; + + if (unlikely((*flags & (RRDHOST_FLAG_BACKEND_SEND | RRDHOST_FLAG_BACKEND_DONT_SEND)) == 0)) { + char *host_name = (host == localhost) ? "localhost" : host->hostname; + + if (!instance->config.hosts_pattern || simple_pattern_matches(instance->config.hosts_pattern, host_name)) { + *flags |= RRDHOST_FLAG_BACKEND_SEND; + info("enabled exporting of host '%s' for instance '%s'", host_name, instance->config.name); + } else { + *flags |= RRDHOST_FLAG_BACKEND_DONT_SEND; + info("disabled exporting of host '%s' for instance '%s'", host_name, instance->config.name); + } + } + + if (likely(*flags & RRDHOST_FLAG_BACKEND_SEND)) + return 1; + else + return 0; +} + +/** + * Check if the connector instance should export the chart + * + * @param instance an exporting connector instance. + * @param st a chart. + * @return Returns 1 if the connector instance should export the chart + */ +int rrdset_is_exportable(struct instance *instance, RRDSET *st) +{ + RRDHOST *host = st->rrdhost; + + if (st->exporting_flags == NULL) + st->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t)); + + RRDSET_FLAGS *flags = &st->exporting_flags[instance->index]; + + if(unlikely(*flags & RRDSET_FLAG_BACKEND_IGNORE)) + return 0; + + if(unlikely(!(*flags & RRDSET_FLAG_BACKEND_SEND))) { + // we have not checked this chart + if(simple_pattern_matches(instance->config.charts_pattern, st->id) || simple_pattern_matches(instance->config.charts_pattern, st->name)) + *flags |= RRDSET_FLAG_BACKEND_SEND; + else { + *flags |= RRDSET_FLAG_BACKEND_IGNORE; + debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is disabled for backends.", st->id, host->hostname); + return 0; + } + } + + if(unlikely(!rrdset_is_available_for_backends(st))) { + debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is not available for backends.", st->id, host->hostname); + return 0; + } + + if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE && !(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED))) { + debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s' because its memory mode is '%s' and the backend requires database access.", st->id, host->hostname, rrd_memory_mode_name(host->rrd_memory_mode)); + return 0; + } + + return 1; +} diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c new file mode 100644 index 0000000000..24d8756dc2 --- /dev/null +++ b/exporting/exporting_engine.c @@ -0,0 +1,60 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "exporting_engine.h" + +/** + * Exporting engine main + * + * The main thread used to control the exporting engine. + * + * @param ptr a pointer to netdata_static_structure. + * + * @return It always returns NULL. + */ +void *exporting_main(void *ptr) +{ + (void)ptr; + + struct engine *engine = read_exporting_config(); + if (!engine) { + info("EXPORTING: no exporting connectors configured"); + return NULL; + } + + if (init_connectors(engine) != 0) { + error("EXPORTING: cannot initialize exporting connectors"); + return NULL; + } + + usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; + heartbeat_t hb; + heartbeat_init(&hb); + + while (!netdata_exit) { + heartbeat_next(&hb, step_ut); + engine->now = now_realtime_sec(); + + if (mark_scheduled_instances(engine)) { + if (prepare_buffers(engine) != 0) { + error("EXPORTING: cannot prepare data to send"); + return NULL; + } + } + + if (notify_workers(engine) != 0) { + error("EXPORTING: cannot communicate with exporting connector instance working threads"); + return NULL; + } + + if (send_internal_metrics(engine) != 0) { + error("EXPORTING: cannot send metrics for the operation of exporting engine"); + return NULL; + } + +#ifdef UNIT_TESTING + break; +#endif + } + + return NULL; +} diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h new file mode 100644 index 0000000000..02d0c89bfe --- /dev/null +++ b/exporting/exporting_engine.h @@ -0,0 +1,185 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_ENGINE_H +#define NETDATA_EXPORTING_ENGINE_H 1 + +#include "daemon/common.h" + +#include <uv.h> + +#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value) +#define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value) +#define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value) + +extern struct config exporting_config; + +#define EXPORTER_DATA_SOURCE "data source" +#define EXPORTER_DATA_SOURCE_DEFAULT "average" + +#define EXPORTER_DESTINATION "destination" +#define EXPORTER_DESTINATION_DEFAULT "localhost" + +#define EXPORTER_UPDATE_EVERY "update every" +#define EXPORTER_UPDATE_EVERY_DEFAULT 10 + +#define EXPORTER_BUF_ONFAIL "buffer on failures" +#define EXPORTER_BUF_ONFAIL_DEFAULT 10 + +#define EXPORTER_TIMEOUT_MS "timeout ms" +#define EXPORTER_TIMEOUT_MS_DEFAULT 10000 + +#define EXPORTER_SEND_CHART_MATCH "send charts matching" +#define EXPORTER_SEND_CHART_MATCH_DEFAULT "*" + +#define EXPORTER_SEND_HOST_MATCH "send hosts matching" +#define EXPORTER_SEND_HOST_MATCH_DEFAULT "localhost *" + +#define EXPORTER_SEND_NAMES "send names instead of ids" +#define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES + +typedef enum exporting_options { + EXPORTING_OPTION_NONE = 0, + + EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0), + EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1), + EXPORTING_SOURCE_DATA_SUM = (1 << 2), + + EXPORTING_OPTION_SEND_NAMES = (1 << 16) +} EXPORTING_OPTIONS; + +#define EXPORTING_OPTIONS_SOURCE_BITS \ + (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM) +#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS) + +struct engine; + +struct instance_config { + const char *name; + const char *destination; + + int update_every; + int buffer_on_failures; + long timeoutms; + + EXPORTING_OPTIONS options; + SIMPLE_PATTERN *charts_pattern; + SIMPLE_PATTERN *hosts_pattern; + + void *connector_specific_config; +}; + +struct simple_connector_config { + int default_port; +}; + +struct connector_config { + BACKEND_TYPE type; + void *connector_specific_config; +}; + +struct engine_config { + const char *prefix; + const char *hostname; + int update_every; +}; + +struct stats { + collected_number chart_buffered_metrics; + collected_number chart_lost_metrics; + collected_number chart_sent_metrics; + collected_number chart_buffered_bytes; + collected_number chart_received_bytes; + collected_number chart_sent_bytes; + collected_number chart_receptions; + collected_number chart_transmission_successes; + collected_number chart_transmission_failures; + collected_number chart_data_lost_events; + collected_number chart_lost_bytes; + collected_number chart_reconnects; +}; + +struct instance { + struct instance_config config; + void *buffer; + struct stats stats; + + int scheduled; + int skip_host; + int skip_chart; + + time_t after; + time_t before; + + uv_thread_t thread; + uv_mutex_t mutex; + uv_cond_t cond_var; + + int (*start_batch_formatting)(struct instance *instance); + int (*start_host_formatting)(struct instance *instance, RRDHOST *host); + int (*start_chart_formatting)(struct instance *instance, RRDSET *st); + int (*metric_formatting)(struct instance *instance, RRDDIM *rd); + int (*end_chart_formatting)(struct instance *instance, RRDSET *st); + int (*end_host_formatting)(struct instance *instance, RRDHOST *host); + int (*end_batch_formatting)(struct instance *instance); + + size_t index; + struct instance *next; + struct connector *connector; +}; + +struct connector { + struct connector_config config; + + void (*worker)(void *instance_p); + + struct instance *instance_root; + struct connector *next; + struct engine *engine; +}; + +struct engine { + struct engine_config config; + + size_t instance_num; + time_t now; + + struct connector *connector_root; +}; + +void *exporting_main(void *ptr); + +struct engine *read_exporting_config(); +BACKEND_TYPE exporting_select_type(const char *type); + +int init_connectors(struct engine *engine); + +int mark_scheduled_instances(struct engine *engine); +int prepare_buffers(struct engine *engine); +int notify_workers(struct engine *engine); + +size_t exporting_name_copy(char *dst, const char *src, size_t max_len); + +int rrdhost_is_exportable(struct instance *instance, RRDHOST *host); +int rrdset_is_exportable(struct instance *instance, RRDSET *st); + +calculated_number exporting_calculate_value_from_stored_data( + struct instance *instance, + RRDDIM *rd, + time_t *last_timestamp); + +int start_batch_formatting(struct engine *engine); +int start_host_formatting(struct engine *engine, RRDHOST *host); +int start_chart_formatting(struct engine *engine, RRDSET *st); +int metric_formatting(struct engine *engine, RRDDIM *rd); +int end_chart_formatting(struct engine *engine, RRDSET *st); +int end_host_formatting(struct engine *engine, RRDHOST *host); +int end_batch_formatting(struct engine *engine); + +int exporting_discard_response(BUFFER *buffer, struct instance *instance); +void simple_connector_receive_response(int *sock, struct instance *instance); +void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance); +void simple_connector_worker(void *instance_p); + +int send_internal_metrics(struct engine *engine); + +#endif /* NETDATA_EXPORTING_ENGINE_H */ diff --git a/exporting/graphite/Makefile.am b/exporting/graphite/Makefile.am new file mode 100644 index 0000000000..babdcf0df3 --- /dev/null +++ b/exporting/graphite/Makefile.am @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in diff --git a/exporting/graphite/graphite.c b/exporting/graphite/graphite.c new file mode 100644 index 0000000000..ec36b298e4 --- /dev/null +++ b/exporting/graphite/graphite.c @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "graphite.h" + +/** + * Initialize Graphite connector + * + * @param instance a connector data structure. + * @return Always returns 0. + */ +int init_graphite_connector(struct connector *connector) +{ + connector->worker = simple_connector_worker; + + struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config)); + connector->config.connector_specific_config = (void *)connector_specific_config; + connector_specific_config->default_port = 2003; + + return 0; +} + +/** + * Initialize Graphite connector instance + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_graphite_instance(struct instance *instance) +{ + instance->start_batch_formatting = NULL; + instance->start_host_formatting = NULL; + instance->start_chart_formatting = NULL; + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_graphite_plaintext; + else + instance->metric_formatting = format_dimension_stored_graphite_plaintext; + + instance->end_chart_formatting = NULL; + instance->end_host_formatting = NULL; + instance->end_batch_formatting = NULL; + + instance->buffer = (void *)buffer_create(0); + if (!instance->buffer) { + error("EXPORTING: cannot create buffer for graphite exporting connector instance %s", instance->config.name); + return 1; + } + uv_mutex_init(&instance->mutex); + uv_cond_init(&instance->cond_var); + + return 0; +} + +/** + * Format dimension using collected data for Graphite connector + * + * @param instance an instance data structure. + * @param rd a dimension. + * @return Always returns 0. + */ +int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd) +{ + struct engine *engine = instance->connector->engine; + RRDSET *st = rd->rrdset; + RRDHOST *host = st->rrdhost; + + char chart_name[RRD_ID_LENGTH_MAX + 1]; + exporting_name_copy( + chart_name, + (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id, + RRD_ID_LENGTH_MAX); + + char dimension_name[RRD_ID_LENGTH_MAX + 1]; + exporting_name_copy( + dimension_name, + (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, + RRD_ID_LENGTH_MAX); + + buffer_sprintf( + instance->buffer, + "%s.%s.%s.%s%s%s " COLLECTED_NUMBER_FORMAT " %llu\n", + engine->config.prefix, + engine->config.hostname, + chart_name, + dimension_name, + (host->tags) ? ";" : "", + (host->tags) ? host->tags : "", + rd->last_collected_value, + (unsigned long long)rd->last_collected_time.tv_sec); + + return 0; +} + +/** + * Format dimension using a calculated value from stored data for Graphite connector + * + * @param instance an instance data structure. + * @param rd a dimension. + * @return Always returns 0. + */ +int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd) +{ + struct engine *engine = instance->connector->engine; + RRDSET *st = rd->rrdset; + RRDHOST *host = st->rrdhost; + + char chart_name[RRD_ID_LENGTH_MAX + 1]; + exporting_name_copy( + chart_name, + (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id, + RRD_ID_LENGTH_MAX); + + char dimension_name[RRD_ID_LENGTH_MAX + 1]; + exporting_name_copy( + dimension_name, + (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, + RRD_ID_LENGTH_MAX); + + time_t last_t; + calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t); + + if(isnan(value)) + return 0; + + buffer_sprintf( + instance->buffer, + "%s.%s.%s.%s%s%s " CALCULATED_NUMBER_FORMAT " %llu\n", + engine->config.prefix, + engine->config.hostname, + chart_name, + dimension_name, + (host->tags) ? ";" : "", + (host->tags) ? host->tags : "", + value, + (unsigned long long)last_t); + + return 0; +} diff --git a/exporting/graphite/graphite.h b/exporting/graphite/graphite.h new file mode 100644 index 0000000000..e5a2001c28 --- /dev/null +++ b/exporting/graphite/graphite.h @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_GRAPHITE_H +#define NETDATA_EXPORTING_GRAPHITE_H + +#include "exporting/exporting_engine.h" + +int init_graphite_connector(struct connector *connector); +int init_graphite_instance(struct instance *instance); +int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd); +int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd); + +#endif //NETDATA_EXPORTING_GRAPHITE_H diff --git a/exporting/init_connectors.c b/exporting/init_connectors.c new file mode 100644 index 0000000000..a15665a966 --- /dev/null +++ b/exporting/init_connectors.c @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "exporting_engine.h" +#include "graphite/graphite.h" +#include "json/json.h" +#include "opentsdb/opentsdb.h" + +/** + * Initialize connectors + * + * @param engine an engine data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_connectors(struct engine *engine) +{ + engine->now = now_realtime_sec(); + + for (struct connector *connector = engine->connector_root; connector; connector = connector->next) { + switch (connector->config.type) { + case BACKEND_TYPE_GRAPHITE: + if (init_graphite_connector(connector) != 0) + return 1; + break; + case BACKEND_TYPE_JSON: + if (init_json_connector(connector) != 0) + return 1; + break; + case BACKEND_TYPE_OPENTSDB_USING_TELNET: + if (init_opentsdb_connector(connector) != 0) + return 1; + break; + case BACKEND_TYPE_OPENTSDB_USING_HTTP: + if (init_opentsdb_connector(connector) != 0) + return 1; + break; + default: + error("EXPORTING: unknown exporting connector type"); + return 1; + } + for (struct instance *instance = connector->instance_root; instance; instance = instance->next) { + instance->index = engine->instance_num++; + instance->after = engine->now; + + switch (connector->config.type) { + case BACKEND_TYPE_GRAPHITE: + if (init_graphite_instance(instance) != 0) + return 1; + break; + case BACKEND_TYPE_JSON: + if (init_json_instance(instance) != 0) + return 1; + break; + case BACKEND_TYPE_OPENTSDB_USING_TELNET: + if (init_opentsdb_telnet_instance(instance) != 0) + return 1; + break; + case BACKEND_TYPE_OPENTSDB_USING_HTTP: + if (init_opentsdb_http_instance(instance) != 0) + return 1; + break; + default: + error("EXPORTING: unknown exporting connector type"); + return 1; + } + + // dispatch the instance worker thread + uv_thread_create(&instance->thread, connector->worker, instance); + } + } + + return 0; +} diff --git a/exporting/json/Makefile.am b/exporting/json/Makefile.am new file mode 100644 index 0000000000..babdcf0df3 --- /dev/null +++ b/exporting/json/Makefile.am @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in diff --git a/exporting/json/json.c b/exporting/json/json.c new file mode 100644 index 0000000000..59ceb046be --- /dev/null +++ b/exporting/json/json.c @@ -0,0 +1,194 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "json.h" + +/** + * Initialize JSON connector + * + * @param instance a connector data structure. + * @return Always returns 0. + */ +int init_json_connector(struct connector *connector) +{ + connector->worker = simple_connector_worker; + + struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config)); + connector->config.connector_specific_config = (void *)connector_specific_config; + connector_specific_config->default_port = 5448; + + return 0; +} + +/** + * Initialize JSON connector instance + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_json_instance(struct instance *instance) +{ + instance->start_batch_formatting = NULL; + instance->start_host_formatting = NULL; + instance->start_chart_formatting = NULL; + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_json_plaintext; + else + instance->metric_formatting = format_dimension_stored_json_plaintext; + + instance->end_chart_formatting = NULL; + instance->end_host_formatting = NULL; + instance->end_batch_formatting = NULL; + + instance->buffer = (void *)buffer_create(0); + if (!instance->buffer) { + error("EXPORTING: cannot create buffer for json exporting connector instance %s", instance->config.name); + return 1; + } + uv_mutex_init(&instance->mutex); + uv_cond_init(&instance->cond_var); + + return 0; +} + +/** + * Format dimension using collected data for JSON connector + * + * @param instance an instance data structure. + * @param rd a dimension. + * @return Always returns 0. + */ +int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd) +{ + struct engine *engine = instance->connector->engine; + RRDSET *st = rd->rrdset; + RRDHOST *host = st->rrdhost; + + const char *tags_pre = "", *tags_post = "", *tags = host->tags; + if (!tags) + tags = ""; + + if (*tags) { + if (*tags == '{' || *tags == '[' || *tags == '"') { + tags_pre = "\"host_tags\":"; + tags_post = ","; + } else { + tags_pre = "\"host_tags\":\""; + tags_post = "\","; + } + } + + buffer_sprintf( + instance->buffer, + "{" + "\"prefix\":\"%s\"," + "\"hostname\":\"%s\"," + "%s%s%s" + + "\"chart_id\":\"%s\"," + "\"chart_name\":\"%s\"," + "\"chart_family\":\"%s\"," + "\"chart_context\": \"%s\"," + "\"chart_type\":\"%s\"," + "\"units\": \"%s\"," + + "\"id\":\"%s\"," + "\"name\":\"%s\"," + "\"value\":" COLLECTED_NUMBER_FORMAT "," + + "\"timestamp\": %llu}\n", |