diff options
author | Vladimir Kobal <vlad@prokk.net> | 2020-03-30 09:54:39 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-30 09:54:39 +0300 |
commit | 36c2e1dbf3d415880d5e5f4ccb4dbf2e882bb8c6 (patch) | |
tree | 23e89e26ee283efb3e2a07c2f6d4d85f69d98ecd /exporting/mongodb | |
parent | d5d1865469b63385d70645f710728482aed0f23d (diff) |
Add a MongoDB connector to the exporting engine (#8416)
* Copy files from the MongoDB backend
* Update the documentation
* Rename functions in the MongoDB backend
* Add the connector to the Netdata build
* Add an initializer and a worker
* Add specific configuration options
* Initialize the connector
* Add a ring buffer for inserting data to a MongoDB database
* Add unit tests
Diffstat (limited to 'exporting/mongodb')
-rw-r--r-- | exporting/mongodb/Makefile.am | 8 | ||||
-rw-r--r-- | exporting/mongodb/README.md | 31 | ||||
-rw-r--r-- | exporting/mongodb/mongodb.c | 313 | ||||
-rw-r--r-- | exporting/mongodb/mongodb.h | 33 |
4 files changed, 385 insertions, 0 deletions
diff --git a/exporting/mongodb/Makefile.am b/exporting/mongodb/Makefile.am new file mode 100644 index 0000000000..161784b8f6 --- /dev/null +++ b/exporting/mongodb/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/exporting/mongodb/README.md b/exporting/mongodb/README.md new file mode 100644 index 0000000000..f8cb82e1ab --- /dev/null +++ b/exporting/mongodb/README.md @@ -0,0 +1,31 @@ +# MongoDB exporting connector + +You can use the MongoDB connector and the experimental [exporting engine](../README.md) to archive your agent's metrics +to a MongoDB database for long-term storage, further analysis, or correlation with data from other sources. + +## Prerequisites + +To use MongoDB as an external storage for long-term archiving, you should first +[install](http://mongoc.org/libmongoc/current/installing.html) `libmongoc` 1.7.0 or higher. Next, re-install Netdata +from the source, which detects that the required library is now available. + +## Configuration + +To enable data exporting to a MongoDB database, run `./edit-config exporting.conf` +in the Netdata configuration directory and set the following options: + +```conf +[mongodb:my_instance] + enabled = yes + destination = mongodb://<hostname> + database = your_database_name + collection = your_collection_name +``` + +You can find more information about the `destination` string URI format in the MongoDB +[documentation](https://docs.mongodb.com/manual/reference/connection-string/) + +The default socket timeout depends on the exporting connector update interval. The timeout is 500 ms shorter than the +interval (but not less than 1000 ms). You can alter the timeout using the `sockettimeoutms` MongoDB URI option. + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2Fmongodb%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c new file mode 100644 index 0000000000..b10a8fa664 --- /dev/null +++ b/exporting/mongodb/mongodb.c @@ -0,0 +1,313 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define EXPORTING_INTERNALS +#include "mongodb.h" + +#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2) + +/** + * Initialize MongoDB connector specific data, including a ring buffer + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int mongodb_init(struct instance *instance) +{ + struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config; + mongoc_uri_t *uri; + bson_error_t bson_error; + + if (unlikely(!connector_specific_config->collection || !*connector_specific_config->collection)) { + error("EXPORTING: collection name is a mandatory MongoDB parameter, but it is not configured"); + return 1; + } + + uri = mongoc_uri_new_with_error(instance->config.destination, &bson_error); + if (unlikely(!uri)) { + error( + "EXPORTING: failed to parse URI: %s. Error message: %s", instance->config.destination, bson_error.message); + return 1; + } + + int32_t socket_timeout = + mongoc_uri_get_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, instance->config.timeoutms); + if (!mongoc_uri_set_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout)) { + error("EXPORTING: failed to set %s to the value %d", MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout); + return 1; + }; + + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + connector_specific_data->client = mongoc_client_new_from_uri(uri); + if (unlikely(!connector_specific_data->client)) { + error("EXPORTING: failed to create a new client"); + return 1; + } + + if (!mongoc_client_set_appname(connector_specific_data->client, "netdata")) { + error("EXPORTING: failed to set client appname"); + }; + + connector_specific_data->collection = mongoc_client_get_collection( + connector_specific_data->client, connector_specific_config->database, connector_specific_config->collection); + + mongoc_uri_destroy(uri); + + // create a ring buffer + struct bson_buffer *first_buffer = NULL; + + if (instance->config.buffer_on_failures < 2) + instance->config.buffer_on_failures = 1; + else + instance->config.buffer_on_failures -= 1; + + for (int i = 0; i < instance->config.buffer_on_failures; i++) { + struct bson_buffer *current_buffer = callocz(1, sizeof(struct bson_buffer)); + + if (!connector_specific_data->first_buffer) + first_buffer = current_buffer; + else + current_buffer->next = connector_specific_data->first_buffer; + + connector_specific_data->first_buffer = current_buffer; + } + + first_buffer->next = connector_specific_data->first_buffer; + connector_specific_data->last_buffer = connector_specific_data->first_buffer; + + return 0; +} + +/** + * Clean a MongoDB connector instance up + * + * @param instance an instance data structure. + */ +void mongodb_cleanup(struct instance *instance) +{ + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + mongoc_collection_destroy(connector_specific_data->collection); + mongoc_client_destroy(connector_specific_data->client); + + return; +} + +/** + * Initialize a MongoDB connector instance + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_mongodb_instance(struct instance *instance) +{ + instance->worker = mongodb_connector_worker; + + instance->start_batch_formatting = NULL; + instance->start_host_formatting = format_host_labels_json_plaintext; + instance->start_chart_formatting = NULL; + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_json_plaintext; + else + instance->metric_formatting = format_dimension_stored_json_plaintext; + + instance->end_chart_formatting = NULL; + instance->end_host_formatting = flush_host_labels; + instance->end_batch_formatting = format_batch_mongodb; + + instance->send_header = NULL; + instance->check_response = NULL; + + instance->buffer = (void *)buffer_create(0); + if (!instance->buffer) { + error("EXPORTING: cannot create buffer for MongoDB exporting connector instance %s", instance->config.name); + return 1; + } + if (uv_mutex_init(&instance->mutex)) + return 1; + if (uv_cond_init(&instance->cond_var)) + return 1; + + struct mongodb_specific_data *connector_specific_data = callocz(1, sizeof(struct mongodb_specific_data)); + instance->connector_specific_data = (void *)connector_specific_data; + + instance->config.timeoutms = + (instance->config.update_every >= 2) ? (instance->engine->config.update_every * MSEC_PER_SEC - 500) : 1000; + + if (!instance->engine->mongoc_initialized) { + mongoc_init(); + instance->engine->mongoc_initialized = 1; + } + + if (unlikely(mongodb_init(instance))) { + error("EXPORTING: cannot initialize MongoDB exporting connector"); + return 1; + } + + return 0; +} + +/** + * Free an array of BSON structures + * + * @param insert an array of documents. + * @param documents_inserted the number of documents inserted. + */ +void free_bson(bson_t **insert, size_t documents_inserted) +{ + size_t i; + + for (i = 0; i < documents_inserted; i++) + bson_destroy(insert[i]); + + freez(insert); +} + +/** + * Format a batch for the MongoDB connector + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int format_batch_mongodb(struct instance *instance) +{ + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + struct stats *stats = &instance->stats; + + bson_t **insert = connector_specific_data->last_buffer->insert; + if (insert) { + // ring buffer is full, reuse the oldest element + connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; + free_bson(insert, connector_specific_data->last_buffer->documents_inserted); + } + insert = callocz((size_t)stats->chart_buffered_metrics, sizeof(bson_t *)); + connector_specific_data->last_buffer->insert = insert; + + BUFFER *buffer = (BUFFER *)instance->buffer; + char *start = (char *)buffer_tostring(buffer); + char *end = start; + + size_t documents_inserted = 0; + + while (*end && documents_inserted <= (size_t)stats->chart_buffered_metrics) { + while (*end && *end != '\n') + end++; + + if (likely(*end)) { + *end = '\0'; + end++; + } else { + break; + } + + bson_error_t bson_error; + insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error); + + if (unlikely(!insert[documents_inserted])) { + error("EXPORTING: %s", bson_error.message); + free_bson(insert, documents_inserted); + return 1; + } + + start = end; + + documents_inserted++; + } + + buffer_flush(buffer); + + connector_specific_data->last_buffer->documents_inserted = documents_inserted; + connector_specific_data->last_buffer = connector_specific_data->last_buffer->next; + + return 0; +} + +/** + * MongoDB connector worker + * + * Runs in a separate thread for every instance. + * + * @param instance_p an instance data structure. + */ +void mongodb_connector_worker(void *instance_p) +{ + struct instance *instance = (struct instance *)instance_p; + struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config; + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + while (!netdata_exit) { + struct stats *stats = &instance->stats; + + uv_mutex_lock(&instance->mutex); + uv_cond_wait(&instance->cond_var, &instance->mutex); + + bson_t **insert = connector_specific_data->first_buffer->insert; + size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted; + + connector_specific_data->first_buffer->insert = NULL; + connector_specific_data->first_buffer->documents_inserted = 0; + connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; + + uv_mutex_unlock(&instance->mutex); + + size_t data_size = 0; + for (size_t i = 0; i < documents_inserted; i++) { + data_size += insert[i]->len; + } + + debug( + D_BACKEND, + "EXPORTING: mongodb_insert(): destination = %s, database = %s, collection = %s, data size = %zu", + instance->config.destination, + connector_specific_config->database, + connector_specific_config->collection, + data_size); + + if (unlikely(documents_inserted == 0)) + continue; + + bson_error_t bson_error; + if (likely(mongoc_collection_insert_many( + connector_specific_data->collection, + (const bson_t **)insert, + documents_inserted, + NULL, + NULL, + &bson_error))) { + stats->chart_sent_bytes += data_size; + stats->chart_transmission_successes++; + stats->chart_receptions++; + } else { + // oops! we couldn't send (all or some of the) data + error("EXPORTING: %s", bson_error.message); + error( + "EXPORTING: failed to write data to the database '%s'. " + "Willing to write %zu bytes, wrote %zu bytes.", + instance->config.destination, data_size, 0UL); + + stats->chart_transmission_failures++; + stats->chart_data_lost_events++; + stats->chart_lost_bytes += data_size; + stats->chart_lost_metrics += stats->chart_buffered_metrics; + } + + free_bson(insert, documents_inserted); + + if (unlikely(netdata_exit)) + break; + + stats->chart_sent_bytes += data_size; + stats->chart_sent_metrics = stats->chart_buffered_metrics; + +#ifdef UNIT_TESTING + break; +#endif + } + + mongodb_cleanup(instance); +} diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h new file mode 100644 index 0000000000..0f23705f55 --- /dev/null +++ b/exporting/mongodb/mongodb.h @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_MONGODB_H +#define NETDATA_EXPORTING_MONGODB_H + +#include "exporting/exporting_engine.h" +#include "exporting/json/json.h" +#include <mongoc.h> + +struct bson_buffer { + bson_t **insert; + size_t documents_inserted; + + struct bson_buffer *next; +}; + +struct mongodb_specific_data { + mongoc_client_t *client; + mongoc_collection_t *collection; + + bson_t **current_insert; + struct bson_buffer *first_buffer; + struct bson_buffer *last_buffer; +}; + +int mongodb_init(struct instance *instance); +void mongodb_cleanup(struct instance *instance); + +int init_mongodb_instance(struct instance *instance); +int format_batch_mongodb(struct instance *instance); +void mongodb_connector_worker(void *instance_p); + +#endif //NETDATA_EXPORTING_MONGODB_H |