diff options
author | Vladimir Kobal <vlad@prokk.net> | 2020-05-21 10:11:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-21 10:11:05 +0300 |
commit | 83bc63694a79dc25dda26723bed8cb12e700da50 (patch) | |
tree | ecfd73c33c3126919ce8db24801603e7cd924e95 | |
parent | 5675f2f48ae4b66526e739c4c5094da1b85ce0e9 (diff) |
Dynamic memory cleanup for MongoDB exporting connector (#9103)
-rw-r--r-- | exporting/exporting_engine.c | 1 | ||||
-rw-r--r-- | exporting/mongodb/mongodb.c | 125 |
2 files changed, 80 insertions, 46 deletions
diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c index 656e5ada78..17f2327c4d 100644 --- a/exporting/exporting_engine.c +++ b/exporting/exporting_engine.c @@ -30,6 +30,7 @@ static void exporting_main_cleanup(void *ptr) if (!instance->exited) { found++; info("stopping worker for instance %s", instance->config.name); + uv_mutex_unlock(&instance->mutex); uv_cond_signal(&instance->cond_var); } else info("found stopped worker for instance %s", instance->config.name); diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c index f20c4f1c80..38e1d01378 100644 --- a/exporting/mongodb/mongodb.c +++ b/exporting/mongodb/mongodb.c @@ -80,22 +80,6 @@ int mongodb_init(struct instance *instance) } /** - * Clean a MongoDB connector instance up - * - * @param instance an instance data structure. - */ -void mongodb_cleanup(struct instance *instance) -{ - struct mongodb_specific_data *connector_specific_data = - (struct mongodb_specific_data *)instance->connector_specific_data; - - mongoc_collection_destroy(connector_specific_data->collection); - mongoc_client_destroy(connector_specific_data->client); - - return; -} - -/** * Initialize a MongoDB connector instance * * @param instance an instance data structure. @@ -238,6 +222,51 @@ int format_batch_mongodb(struct instance *instance) } /** + * Clean a MongoDB connector instance up + * + * @param instance an instance data structure. + */ +void mongodb_cleanup(struct instance *instance) +{ + info("EXPORTING: cleaning up instance %s ...", instance->config.name); + + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + mongoc_collection_destroy(connector_specific_data->collection); + mongoc_client_destroy(connector_specific_data->client); + if (instance->engine->mongoc_initialized) { + mongoc_cleanup(); + instance->engine->mongoc_initialized = 0; + } + + buffer_free(instance->buffer); + + struct bson_buffer *next_buffer = connector_specific_data->first_buffer; + for (int i = 0; i < instance->config.buffer_on_failures; i++) { + struct bson_buffer *current_buffer = next_buffer; + next_buffer = next_buffer->next; + + if (current_buffer->insert) + free_bson(current_buffer->insert, current_buffer->documents_inserted); + freez(current_buffer); + } + + freez(connector_specific_data); + + struct mongodb_specific_config *connector_specific_config = + (struct mongodb_specific_config *)instance->config.connector_specific_config; + freez(connector_specific_config->database); + freez(connector_specific_config->collection); + freez(connector_specific_config); + + info("EXPORTING: instance %s exited", instance->config.name); + instance->exited = 1; + + return; +} + +/** * MongoDB connector worker * * Runs in a separate thread for every instance. @@ -251,12 +280,17 @@ void mongodb_connector_worker(void *instance_p) struct mongodb_specific_data *connector_specific_data = (struct mongodb_specific_data *)instance->connector_specific_data; - while (!netdata_exit) { + while (!instance->engine->exit) { struct stats *stats = &instance->stats; uv_mutex_lock(&instance->mutex); uv_cond_wait(&instance->cond_var, &instance->mutex); + if (unlikely(instance->engine->exit)) { + uv_mutex_unlock(&instance->mutex); + break; + } + // reset the monitoring chart counters stats->received_bytes = stats->sent_bytes = @@ -293,38 +327,37 @@ void mongodb_connector_worker(void *instance_p) connector_specific_config->collection, data_size); - if (unlikely(documents_inserted == 0)) - continue; - - bson_error_t bson_error; - if (likely(mongoc_collection_insert_many( - connector_specific_data->collection, - (const bson_t **)insert, - documents_inserted, - NULL, - NULL, - &bson_error))) { - stats->sent_metrics = documents_inserted; - stats->sent_bytes += data_size; - stats->transmission_successes++; - stats->receptions++; - } else { - // oops! we couldn't send (all or some of the) data - error("EXPORTING: %s", bson_error.message); - error( - "EXPORTING: failed to write data to the database '%s'. " - "Willing to write %zu bytes, wrote %zu bytes.", - instance->config.destination, data_size, 0UL); - - stats->transmission_failures++; - stats->data_lost_events++; - stats->lost_bytes += buffered_bytes; - stats->lost_metrics += documents_inserted; + if (likely(documents_inserted != 0)) { + bson_error_t bson_error; + if (likely(mongoc_collection_insert_many( + connector_specific_data->collection, + (const bson_t **)insert, + documents_inserted, + NULL, + NULL, + &bson_error))) { + stats->sent_metrics = documents_inserted; + stats->sent_bytes += data_size; + stats->transmission_successes++; + stats->receptions++; + } else { + // oops! we couldn't send (all or some of the) data + error("EXPORTING: %s", bson_error.message); + error( + "EXPORTING: failed to write data to the database '%s'. " + "Willing to write %zu bytes, wrote %zu bytes.", + instance->config.destination, data_size, 0UL); + + stats->transmission_failures++; + stats->data_lost_events++; + stats->lost_bytes += buffered_bytes; + stats->lost_metrics += documents_inserted; + } } free_bson(insert, documents_inserted); - if (unlikely(netdata_exit)) + if (unlikely(instance->engine->exit)) break; uv_mutex_lock(&instance->mutex); @@ -341,7 +374,7 @@ void mongodb_connector_worker(void *instance_p) uv_mutex_unlock(&instance->mutex); #ifdef UNIT_TESTING - break; + return; #endif } |