summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-09-18 19:21:12 +0300
committerGitHub <noreply@github.com>2023-09-18 19:21:12 +0300
commited3ba445145a8403e77b1d4d00bbe944460a4530 (patch)
treedcc400da44a8c5801b80128e410b2614f63a3d27 /streaming
parent717ba3e9b201e43e0b4064bae5bdc762b31ebf93 (diff)
functions cancelling (#15977)
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.h1
-rw-r--r--streaming/sender.c12
2 files changed, 12 insertions, 1 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 73bd438c9d..09df8e711f 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -6,6 +6,7 @@
#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
#include "web/server/web_client.h"
+#include "database/rrdfunctions.h"
#include "database/rrd.h"
#define CONNECTED_TO_SIZE 100
diff --git a/streaming/sender.c b/streaming/sender.c
index d26181020c..591611e71b 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -940,6 +940,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
buffer_strlen(func_wb),
now_realtime_usec() - tmp->received_ut);
}
+
string_freez(tmp->transaction);
buffer_free(func_wb);
freez(tmp);
@@ -989,7 +990,9 @@ void execute_commands(struct sender_state *s) {
tmp->transaction = string_strdupz(transaction);
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
- int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
+ int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
+ stream_execute_function_callback, tmp, NULL, NULL);
+
if(code != HTTP_RESP_OK) {
if (!buffer_strlen(wb))
rrd_call_function_error(wb, "Failed to route request to collector", code);
@@ -998,6 +1001,13 @@ void execute_commands(struct sender_state *s) {
}
}
}
+ else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+
+ char *transaction = get_word(words, num_words, 1);
+ if(transaction && *transaction)
+ rrd_function_cancel(transaction);
+ }
else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);