summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-05-07 23:00:44 +0300
committerGitHub <noreply@github.com>2022-05-07 23:00:44 +0300
commit79444d36459b105f093c5626eea8f0b45af7f421 (patch)
treeb260e2268c3ec6deff4abf32aff9ef20b85a2ce8 /streaming/receiver.c
parent0cbf49860b14a04a618345f7029cd2000e651f89 (diff)
fix memory leaks and mismatches of the use of the z functions for allocations (#12841)
* fix mismatches of the use of the z functions for allocations * when there was no memory; the original name of the dimensions was freed, and with mismatching deallocator.. * fixed memory leak at rrdeng_load_metric_*() functions * fixed memory leak on exit of plugins.d parser * fixed memory leak on plugins and streaming receiver threads exit * fixed compiler warnings
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c47
1 files changed, 28 insertions, 19 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 5cba2c9267..c777ea54e9 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -338,26 +338,31 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) {
return NULL;
}
+static void streaming_parser_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
+ parser_destroy(parser);
+}
+
size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
size_t result;
- PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
- user->enabled = cd->enabled;
- user->host = rpt->host;
- user->opaque = rpt;
- user->cd = cd;
- user->trust_durations = 0;
-
- PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT);
+
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->enabled,
+ .host = rpt->host,
+ .opaque = rpt,
+ .cd = cd,
+ .trust_durations = 0
+ };
+
+ PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT);
+
+ // this keeps the parser with its current value
+ // so, parser needs to be allocated before pushing it
+ netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
+
parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
- if (unlikely(!parser)) {
- error("Failed to initialize parser");
- cd->serial_failures++;
- freez(user);
- return 0;
- }
-
parser->plugins_action->begin_action = &pluginsd_begin_action;
parser->plugins_action->flush_action = &pluginsd_flush_action;
parser->plugins_action->end_action = &pluginsd_end_action;
@@ -371,12 +376,13 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
parser->plugins_action->clabel_action = &pluginsd_clabel_action;
- user->parser = parser;
+ user.parser = parser;
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->reset(rpt->decompressor);
#endif
+
do{
if (receiver_read(rpt, fp))
break;
@@ -389,10 +395,13 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
rpt->last_msg_t = now_realtime_sec();
}
while(!netdata_exit);
+
done:
- result= user->count;
- freez(user);
- parser_destroy(parser);
+ result = user.count;
+
+ // free parser with the pop function
+ netdata_thread_cleanup_pop(1);
+
return result;
}