summaryrefslogtreecommitdiffstats
path: root/exporting/mongodb
diff options
context:
space:
mode:
authorAustin S. Hemmelgarn <austin@netdata.cloud>2020-04-13 10:32:33 -0400
committerAustin S. Hemmelgarn <austin@netdata.cloud>2020-04-13 10:32:33 -0400
commit983a26d1a2c110b35db252b4b79c3f03eb4eeb4b (patch)
tree85d42c30bc81514bd5c18aa564497e439290523b /exporting/mongodb
parent5a12b4a7e42587058b9b42871a1316545d527a57 (diff)
Revert "Revert changes since v1.21 in pereparation for hotfix release."
Diffstat (limited to 'exporting/mongodb')
-rw-r--r--exporting/mongodb/mongodb.c60
-rw-r--r--exporting/mongodb/mongodb.h3
2 files changed, 51 insertions, 12 deletions
diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c
index b10a8fa664..f20c4f1c80 100644
--- a/exporting/mongodb/mongodb.c
+++ b/exporting/mongodb/mongodb.c
@@ -183,8 +183,10 @@ int format_batch_mongodb(struct instance *instance)
// ring buffer is full, reuse the oldest element
connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
free_bson(insert, connector_specific_data->last_buffer->documents_inserted);
+ connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted;
+ stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes;
}
- insert = callocz((size_t)stats->chart_buffered_metrics, sizeof(bson_t *));
+ insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *));
connector_specific_data->last_buffer->insert = insert;
BUFFER *buffer = (BUFFER *)instance->buffer;
@@ -193,7 +195,7 @@ int format_batch_mongodb(struct instance *instance)
size_t documents_inserted = 0;
- while (*end && documents_inserted <= (size_t)stats->chart_buffered_metrics) {
+ while (*end && documents_inserted <= (size_t)stats->buffered_metrics) {
while (*end && *end != '\n')
end++;
@@ -208,7 +210,8 @@ int format_batch_mongodb(struct instance *instance)
insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error);
if (unlikely(!insert[documents_inserted])) {
- error("EXPORTING: %s", bson_error.message);
+ error(
+ "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
free_bson(insert, documents_inserted);
return 1;
}
@@ -218,8 +221,16 @@ int format_batch_mongodb(struct instance *instance)
documents_inserted++;
}
+ stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer);
+
buffer_flush(buffer);
+ // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number
+ // of metrics, added in the current iteration, so we are clearing it here. We will use the
+ // connector_specific_data->total_documents_inserted in the worker to show the statistics.
+ stats->buffered_metrics = 0;
+ connector_specific_data->total_documents_inserted += documents_inserted;
+
connector_specific_data->last_buffer->documents_inserted = documents_inserted;
connector_specific_data->last_buffer = connector_specific_data->last_buffer->next;
@@ -246,11 +257,25 @@ void mongodb_connector_worker(void *instance_p)
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
bson_t **insert = connector_specific_data->first_buffer->insert;
size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted;
+ size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
connector_specific_data->first_buffer->insert = NULL;
connector_specific_data->first_buffer->documents_inserted = 0;
+ connector_specific_data->first_buffer->buffered_bytes = 0;
connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
uv_mutex_unlock(&instance->mutex);
@@ -279,9 +304,10 @@ void mongodb_connector_worker(void *instance_p)
NULL,
NULL,
&bson_error))) {
- stats->chart_sent_bytes += data_size;
- stats->chart_transmission_successes++;
- stats->chart_receptions++;
+ stats->sent_metrics = documents_inserted;
+ stats->sent_bytes += data_size;
+ stats->transmission_successes++;
+ stats->receptions++;
} else {
// oops! we couldn't send (all or some of the) data
error("EXPORTING: %s", bson_error.message);
@@ -290,10 +316,10 @@ void mongodb_connector_worker(void *instance_p)
"Willing to write %zu bytes, wrote %zu bytes.",
instance->config.destination, data_size, 0UL);
- stats->chart_transmission_failures++;
- stats->chart_data_lost_events++;
- stats->chart_lost_bytes += data_size;
- stats->chart_lost_metrics += stats->chart_buffered_metrics;
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += buffered_bytes;
+ stats->lost_metrics += documents_inserted;
}
free_bson(insert, documents_inserted);
@@ -301,8 +327,18 @@ void mongodb_connector_worker(void *instance_p)
if (unlikely(netdata_exit))
break;
- stats->chart_sent_bytes += data_size;
- stats->chart_sent_metrics = stats->chart_buffered_metrics;
+ uv_mutex_lock(&instance->mutex);
+
+ stats->buffered_metrics = connector_specific_data->total_documents_inserted;
+
+ send_internal_metrics(instance);
+
+ connector_specific_data->total_documents_inserted -= documents_inserted;
+
+ stats->buffered_metrics = 0;
+ stats->buffered_bytes -= buffered_bytes;
+
+ uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
break;
diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h
index 0f23705f55..5116e66fab 100644
--- a/exporting/mongodb/mongodb.h
+++ b/exporting/mongodb/mongodb.h
@@ -10,6 +10,7 @@
struct bson_buffer {
bson_t **insert;
size_t documents_inserted;
+ size_t buffered_bytes;
struct bson_buffer *next;
};
@@ -18,6 +19,8 @@ struct mongodb_specific_data {
mongoc_client_t *client;
mongoc_collection_t *collection;
+ size_t total_documents_inserted;
+
bson_t **current_insert;
struct bson_buffer *first_buffer;
struct bson_buffer *last_buffer;