diff options
author | Austin S. Hemmelgarn <austin@netdata.cloud> | 2020-04-13 10:32:33 -0400 |
---|---|---|
committer | Austin S. Hemmelgarn <austin@netdata.cloud> | 2020-04-13 10:32:33 -0400 |
commit | 983a26d1a2c110b35db252b4b79c3f03eb4eeb4b (patch) | |
tree | 85d42c30bc81514bd5c18aa564497e439290523b /exporting/mongodb | |
parent | 5a12b4a7e42587058b9b42871a1316545d527a57 (diff) |
Revert "Revert changes since v1.21 in pereparation for hotfix release."
This reverts commit e2874320fc027f7ab51ab3e115d5b1889b8fd747.
Diffstat (limited to 'exporting/mongodb')
-rw-r--r-- | exporting/mongodb/mongodb.c | 60 | ||||
-rw-r--r-- | exporting/mongodb/mongodb.h | 3 |
2 files changed, 51 insertions, 12 deletions
diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c index b10a8fa664..f20c4f1c80 100644 --- a/exporting/mongodb/mongodb.c +++ b/exporting/mongodb/mongodb.c @@ -183,8 +183,10 @@ int format_batch_mongodb(struct instance *instance) // ring buffer is full, reuse the oldest element connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; free_bson(insert, connector_specific_data->last_buffer->documents_inserted); + connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted; + stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes; } - insert = callocz((size_t)stats->chart_buffered_metrics, sizeof(bson_t *)); + insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *)); connector_specific_data->last_buffer->insert = insert; BUFFER *buffer = (BUFFER *)instance->buffer; @@ -193,7 +195,7 @@ int format_batch_mongodb(struct instance *instance) size_t documents_inserted = 0; - while (*end && documents_inserted <= (size_t)stats->chart_buffered_metrics) { + while (*end && documents_inserted <= (size_t)stats->buffered_metrics) { while (*end && *end != '\n') end++; @@ -208,7 +210,8 @@ int format_batch_mongodb(struct instance *instance) insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error); if (unlikely(!insert[documents_inserted])) { - error("EXPORTING: %s", bson_error.message); + error( + "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message); free_bson(insert, documents_inserted); return 1; } @@ -218,8 +221,16 @@ int format_batch_mongodb(struct instance *instance) documents_inserted++; } + stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer); + buffer_flush(buffer); + // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number + // of metrics, added in the current iteration, so we are clearing it here. We will use the + // connector_specific_data->total_documents_inserted in the worker to show the statistics. + stats->buffered_metrics = 0; + connector_specific_data->total_documents_inserted += documents_inserted; + connector_specific_data->last_buffer->documents_inserted = documents_inserted; connector_specific_data->last_buffer = connector_specific_data->last_buffer->next; @@ -246,11 +257,25 @@ void mongodb_connector_worker(void *instance_p) uv_mutex_lock(&instance->mutex); uv_cond_wait(&instance->cond_var, &instance->mutex); + // reset the monitoring chart counters + stats->received_bytes = + stats->sent_bytes = + stats->sent_metrics = + stats->lost_metrics = + stats->receptions = + stats->transmission_successes = + stats->transmission_failures = + stats->data_lost_events = + stats->lost_bytes = + stats->reconnects = 0; + bson_t **insert = connector_specific_data->first_buffer->insert; size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted; + size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes; connector_specific_data->first_buffer->insert = NULL; connector_specific_data->first_buffer->documents_inserted = 0; + connector_specific_data->first_buffer->buffered_bytes = 0; connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; uv_mutex_unlock(&instance->mutex); @@ -279,9 +304,10 @@ void mongodb_connector_worker(void *instance_p) NULL, NULL, &bson_error))) { - stats->chart_sent_bytes += data_size; - stats->chart_transmission_successes++; - stats->chart_receptions++; + stats->sent_metrics = documents_inserted; + stats->sent_bytes += data_size; + stats->transmission_successes++; + stats->receptions++; } else { // oops! we couldn't send (all or some of the) data error("EXPORTING: %s", bson_error.message); @@ -290,10 +316,10 @@ void mongodb_connector_worker(void *instance_p) "Willing to write %zu bytes, wrote %zu bytes.", instance->config.destination, data_size, 0UL); - stats->chart_transmission_failures++; - stats->chart_data_lost_events++; - stats->chart_lost_bytes += data_size; - stats->chart_lost_metrics += stats->chart_buffered_metrics; + stats->transmission_failures++; + stats->data_lost_events++; + stats->lost_bytes += buffered_bytes; + stats->lost_metrics += documents_inserted; } free_bson(insert, documents_inserted); @@ -301,8 +327,18 @@ void mongodb_connector_worker(void *instance_p) if (unlikely(netdata_exit)) break; - stats->chart_sent_bytes += data_size; - stats->chart_sent_metrics = stats->chart_buffered_metrics; + uv_mutex_lock(&instance->mutex); + + stats->buffered_metrics = connector_specific_data->total_documents_inserted; + + send_internal_metrics(instance); + + connector_specific_data->total_documents_inserted -= documents_inserted; + + stats->buffered_metrics = 0; + stats->buffered_bytes -= buffered_bytes; + + uv_mutex_unlock(&instance->mutex); #ifdef UNIT_TESTING break; diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h index 0f23705f55..5116e66fab 100644 --- a/exporting/mongodb/mongodb.h +++ b/exporting/mongodb/mongodb.h @@ -10,6 +10,7 @@ struct bson_buffer { bson_t **insert; size_t documents_inserted; + size_t buffered_bytes; struct bson_buffer *next; }; @@ -18,6 +19,8 @@ struct mongodb_specific_data { mongoc_client_t *client; mongoc_collection_t *collection; + size_t total_documents_inserted; + bson_t **current_insert; struct bson_buffer *first_buffer; struct bson_buffer *last_buffer; |