summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-05-21 10:11:05 +0300
committerGitHub <noreply@github.com>2020-05-21 10:11:05 +0300
commit83bc63694a79dc25dda26723bed8cb12e700da50 (patch)
treeecfd73c33c3126919ce8db24801603e7cd924e95
parent5675f2f48ae4b66526e739c4c5094da1b85ce0e9 (diff)
Dynamic memory cleanup for MongoDB exporting connector (#9103)
-rw-r--r--exporting/exporting_engine.c1
-rw-r--r--exporting/mongodb/mongodb.c125
2 files changed, 80 insertions, 46 deletions
diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c
index 656e5ada78..17f2327c4d 100644
--- a/exporting/exporting_engine.c
+++ b/exporting/exporting_engine.c
@@ -30,6 +30,7 @@ static void exporting_main_cleanup(void *ptr)
if (!instance->exited) {
found++;
info("stopping worker for instance %s", instance->config.name);
+ uv_mutex_unlock(&instance->mutex);
uv_cond_signal(&instance->cond_var);
} else
info("found stopped worker for instance %s", instance->config.name);
diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c
index f20c4f1c80..38e1d01378 100644
--- a/exporting/mongodb/mongodb.c
+++ b/exporting/mongodb/mongodb.c
@@ -80,22 +80,6 @@ int mongodb_init(struct instance *instance)
}
/**
- * Clean a MongoDB connector instance up
- *
- * @param instance an instance data structure.
- */
-void mongodb_cleanup(struct instance *instance)
-{
- struct mongodb_specific_data *connector_specific_data =
- (struct mongodb_specific_data *)instance->connector_specific_data;
-
- mongoc_collection_destroy(connector_specific_data->collection);
- mongoc_client_destroy(connector_specific_data->client);
-
- return;
-}
-
-/**
* Initialize a MongoDB connector instance
*
* @param instance an instance data structure.
@@ -238,6 +222,51 @@ int format_batch_mongodb(struct instance *instance)
}
/**
+ * Clean a MongoDB connector instance up
+ *
+ * @param instance an instance data structure.
+ */
+void mongodb_cleanup(struct instance *instance)
+{
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ mongoc_collection_destroy(connector_specific_data->collection);
+ mongoc_client_destroy(connector_specific_data->client);
+ if (instance->engine->mongoc_initialized) {
+ mongoc_cleanup();
+ instance->engine->mongoc_initialized = 0;
+ }
+
+ buffer_free(instance->buffer);
+
+ struct bson_buffer *next_buffer = connector_specific_data->first_buffer;
+ for (int i = 0; i < instance->config.buffer_on_failures; i++) {
+ struct bson_buffer *current_buffer = next_buffer;
+ next_buffer = next_buffer->next;
+
+ if (current_buffer->insert)
+ free_bson(current_buffer->insert, current_buffer->documents_inserted);
+ freez(current_buffer);
+ }
+
+ freez(connector_specific_data);
+
+ struct mongodb_specific_config *connector_specific_config =
+ (struct mongodb_specific_config *)instance->config.connector_specific_config;
+ freez(connector_specific_config->database);
+ freez(connector_specific_config->collection);
+ freez(connector_specific_config);
+
+ info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+
+ return;
+}
+
+/**
* MongoDB connector worker
*
* Runs in a separate thread for every instance.
@@ -251,12 +280,17 @@ void mongodb_connector_worker(void *instance_p)
struct mongodb_specific_data *connector_specific_data =
(struct mongodb_specific_data *)instance->connector_specific_data;
- while (!netdata_exit) {
+ while (!instance->engine->exit) {
struct stats *stats = &instance->stats;
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
// reset the monitoring chart counters
stats->received_bytes =
stats->sent_bytes =
@@ -293,38 +327,37 @@ void mongodb_connector_worker(void *instance_p)
connector_specific_config->collection,
data_size);
- if (unlikely(documents_inserted == 0))
- continue;
-
- bson_error_t bson_error;
- if (likely(mongoc_collection_insert_many(
- connector_specific_data->collection,
- (const bson_t **)insert,
- documents_inserted,
- NULL,
- NULL,
- &bson_error))) {
- stats->sent_metrics = documents_inserted;
- stats->sent_bytes += data_size;
- stats->transmission_successes++;
- stats->receptions++;
- } else {
- // oops! we couldn't send (all or some of the) data
- error("EXPORTING: %s", bson_error.message);
- error(
- "EXPORTING: failed to write data to the database '%s'. "
- "Willing to write %zu bytes, wrote %zu bytes.",
- instance->config.destination, data_size, 0UL);
-
- stats->transmission_failures++;
- stats->data_lost_events++;
- stats->lost_bytes += buffered_bytes;
- stats->lost_metrics += documents_inserted;
+ if (likely(documents_inserted != 0)) {
+ bson_error_t bson_error;
+ if (likely(mongoc_collection_insert_many(
+ connector_specific_data->collection,
+ (const bson_t **)insert,
+ documents_inserted,
+ NULL,
+ NULL,
+ &bson_error))) {
+ stats->sent_metrics = documents_inserted;
+ stats->sent_bytes += data_size;
+ stats->transmission_successes++;
+ stats->receptions++;
+ } else {
+ // oops! we couldn't send (all or some of the) data
+ error("EXPORTING: %s", bson_error.message);
+ error(
+ "EXPORTING: failed to write data to the database '%s'. "
+ "Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, data_size, 0UL);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += buffered_bytes;
+ stats->lost_metrics += documents_inserted;
+ }
}
free_bson(insert, documents_inserted);
- if (unlikely(netdata_exit))
+ if (unlikely(instance->engine->exit))
break;
uv_mutex_lock(&instance->mutex);
@@ -341,7 +374,7 @@ void mongodb_connector_worker(void *instance_p)
uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
- break;
+ return;
#endif
}