summaryrefslogtreecommitdiffstats
path: root/exporting/pubsub
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-05-21 23:29:14 +0300
committerGitHub <noreply@github.com>2020-05-21 23:29:14 +0300
commite7428a1d31da1c695cd869e0bcc8dd535383b10e (patch)
tree564e7fe1524fa2d6b40359644f0c9338820bdba4 /exporting/pubsub
parentc93038d7f98afe8b288fce33f7ad5b808c24eef1 (diff)
Dynamic memory cleanup for Pub/Sub exporting connector (#9112)
* Update dynamic memory cleanup * Fix unit tests * Rename a function * Delete GRPC objects * Unlock a mutex * Delete an odd file
Diffstat (limited to 'exporting/pubsub')
-rw-r--r--exporting/pubsub/pubsub.c40
-rw-r--r--exporting/pubsub/pubsub.h1
-rw-r--r--exporting/pubsub/pubsub_publish.cc29
-rw-r--r--exporting/pubsub/pubsub_publish.h1
4 files changed, 69 insertions, 2 deletions
diff --git a/exporting/pubsub/pubsub.c b/exporting/pubsub/pubsub.c
index fd03f72c8f..95a87422a6 100644
--- a/exporting/pubsub/pubsub.c
+++ b/exporting/pubsub/pubsub.c
@@ -57,6 +57,35 @@ int init_pubsub_instance(struct instance *instance)
}
/**
+ * Clean a PubSub connector instance
+ *
+ * @param instance an instance data structure.
+ */
+void clean_pubsub_instance(struct instance *instance)
+{
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ struct pubsub_specific_data *connector_specific_data =
+ (struct pubsub_specific_data *)instance->connector_specific_data;
+ pubsub_cleanup(connector_specific_data);
+ freez(connector_specific_data);
+
+ buffer_free(instance->buffer);
+
+ struct pubsub_specific_config *connector_specific_config =
+ (struct pubsub_specific_config *)instance->config.connector_specific_config;
+ freez(connector_specific_config->credentials_file);
+ freez(connector_specific_config->project_id);
+ freez(connector_specific_config->topic_id);
+ freez(connector_specific_config);
+
+ info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+
+ return;
+}
+
+/**
* Pub/Sub connector worker
*
* Runs in a separate thread for every instance.
@@ -69,13 +98,18 @@ void pubsub_connector_worker(void *instance_p)
struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;
- while (!netdata_exit) {
+ while (!instance->engine->exit) {
struct stats *stats = &instance->stats;
char error_message[ERROR_LINE_MAX + 1] = "";
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
// reset the monitoring chart counters
stats->received_bytes =
stats->sent_bytes =
@@ -149,7 +183,9 @@ void pubsub_connector_worker(void *instance_p)
uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
- break;
+ return;
#endif
}
+
+ clean_pubsub_instance(instance);
}
diff --git a/exporting/pubsub/pubsub.h b/exporting/pubsub/pubsub.h
index 90a60682d9..0bcb76f9ba 100644
--- a/exporting/pubsub/pubsub.h
+++ b/exporting/pubsub/pubsub.h
@@ -8,6 +8,7 @@
#include "pubsub_publish.h"
int init_pubsub_instance(struct instance *instance);
+void clean_pubsub_instance(struct instance *instance);
void pubsub_connector_worker(void *instance_p);
#endif //NETDATA_EXPORTING_PUBSUB_H
diff --git a/exporting/pubsub/pubsub_publish.cc b/exporting/pubsub/pubsub_publish.cc
index 49ace6b777..dc237cf22f 100644
--- a/exporting/pubsub/pubsub_publish.cc
+++ b/exporting/pubsub/pubsub_publish.cc
@@ -80,6 +80,35 @@ int pubsub_init(
}
/**
+ * Clean the PubSub connector instance specific data
+ */
+void pubsub_cleanup(void *pubsub_specific_data_p)
+{
+ struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
+
+ std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
+ std::list<struct response>::iterator response;
+ for (response = responses->begin(); response != responses->end(); ++response) {
+ // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of
+ // cleaning up contexts
+ // delete response->context;
+ delete response->publish_response;
+ delete response->status;
+ }
+ delete responses;
+
+ ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown();
+ delete (grpc::CompletionQueue *)connector_specific_data->completion_queue;
+ delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request;
+ delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub;
+
+ // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work.
+ // grpc_shutdown();
+
+ return;
+}
+
+/**
* Add data to a Pub/Sub request message.
*
* @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
diff --git a/exporting/pubsub/pubsub_publish.h b/exporting/pubsub/pubsub_publish.h
index 76654f0baf..567a262f0d 100644
--- a/exporting/pubsub/pubsub_publish.h
+++ b/exporting/pubsub/pubsub_publish.h
@@ -21,6 +21,7 @@ struct pubsub_specific_data {
int pubsub_init(
void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
const char *project_id, const char *topic_id);
+void pubsub_cleanup(void *pubsub_specific_data_p);
int pubsub_add_message(void *pubsub_specific_data_p, char *data);