summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2023-05-02 17:43:51 +0200
committerGitHub <noreply@github.com>2023-05-02 17:43:51 +0200
commit2a491f7932b21768149a787c8ef94c5b817471aa (patch)
tree2f4884c386d27bd6b79849c8c2ff3ee2f2cb0ae0
parentcd5230b596bdc58f5dbdf97fbc0bee0dc6ea56ab (diff)
Add Cancel Pending Request Message (#14953)
-rw-r--r--CMakeLists.txt3
-rw-r--r--Makefile.am9
-rw-r--r--aclk/aclk_capas.c2
-rw-r--r--aclk/aclk_query.c81
-rw-r--r--aclk/aclk_query.h2
-rw-r--r--aclk/aclk_rx_msgs.c19
-rw-r--r--aclk/schema-wrappers/agent_cmds.cc38
-rw-r--r--aclk/schema-wrappers/agent_cmds.h27
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc3
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h1
10 files changed, 185 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6c7c8f593b..1feb4314b2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -936,6 +936,8 @@ set(ACLK_FILES
aclk/schema-wrappers/schema_wrappers.h
aclk/schema-wrappers/schema_wrapper_utils.cc
aclk/schema-wrappers/schema_wrapper_utils.h
+ aclk/schema-wrappers/agent_cmds.cc \
+ aclk/schema-wrappers/agent_cmds.h \
aclk/helpers/mqtt_wss_pal.h
aclk/helpers/ringbuffer_pal.h
)
@@ -1259,6 +1261,7 @@ set(ACLK_PROTO_DEFS
aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto
aclk/aclk-schemas/proto/context/v1/context.proto
aclk/aclk-schemas/proto/context/v1/stream.proto
+ aclk/aclk-schemas/proto/agent/v1/cmds.proto
)
PROTOBUF_ACLK_GENERATE_CPP(ACLK_PROTO_BUILT_SRCS ACLK_PROTO_BUILT_HDRS ${ACLK_PROTO_DEFS})
diff --git a/Makefile.am b/Makefile.am
index fe979cc8d4..7d9abd549c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -726,6 +726,8 @@ ACLK_FILES = \
aclk/schema-wrappers/context_stream.h \
aclk/schema-wrappers/context.cc \
aclk/schema-wrappers/context.h \
+ aclk/schema-wrappers/agent_cmds.cc \
+ aclk/schema-wrappers/agent_cmds.h \
$(NULL)
noinst_LIBRARIES += libmqttwebsockets.a
@@ -768,6 +770,7 @@ ACLK_PROTO_DEFINITIONS = \
aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto \
aclk/aclk-schemas/proto/context/v1/context.proto \
aclk/aclk-schemas/proto/context/v1/stream.proto \
+ aclk/aclk-schemas/proto/agent/v1/cmds.proto \
$(NULL)
dist_noinst_DATA += $(ACLK_PROTO_DEFINITIONS)
@@ -792,6 +795,8 @@ ACLK_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
aclk/aclk-schemas/proto/context/v1/context.pb.h \
aclk/aclk-schemas/proto/context/v1/stream.pb.cc \
aclk/aclk-schemas/proto/context/v1/stream.pb.h \
+ aclk/aclk-schemas/proto/agent/v1/cmds.pb.cc \
+ aclk/aclk-schemas/proto/agent/v1/cmds.pb.h \
$(NULL)
BUILT_SOURCES += $(ACLK_PROTO_BUILT_FILES)
@@ -838,6 +843,10 @@ aclk/aclk-schemas/proto/context/v1/stream.pb.cc \
aclk/aclk-schemas/proto/context/v1/stream.pb.h: aclk/aclk-schemas/proto/context/v1/stream.proto
$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+aclk/aclk-schemas/proto/agent/v1/cmds.pb.cc \
+aclk/aclk-schemas/proto/agent/v1/cmds.pb.h: aclk/aclk-schemas/proto/agent/v1/cmds.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
endif #ENABLE_ACLK
ACLK_ALWAYS_BUILD_FILES = \
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
index 824a438993..55f6fd3b4f 100644
--- a/aclk/aclk_capas.c
+++ b/aclk/aclk_capas.c
@@ -15,6 +15,7 @@ const struct capability *aclk_get_agent_capas()
{ .name = "funcs", .version = 1, .enabled = 1 },
{ .name = "http_api_v2", .version = 1, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = 0 },
+ { .name = "req_cancel", .version = 1, .enabled = 1 },
{ .name = NULL, .version = 0, .enabled = 0 }
};
agent_capabilities[2].version = ml_capable() ? 1 : 0;
@@ -40,6 +41,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{ .name = "funcs", .version = 0, .enabled = 0 },
{ .name = "http_api_v2", .version = 2, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = host->health.health_enabled },
+ { .name = "req_cancel", .version = 1, .enabled = 1 },
{ .name = NULL, .version = 0, .enabled = 0 }
};
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index fd6f8555bc..46d1e1e5e3 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -13,6 +13,82 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+struct pending_req_list {
+ const char *msg_id;
+ uint32_t hash;
+
+ int canceled;
+
+ struct pending_req_list *next;
+};
+
+static struct pending_req_list *pending_req_list_head = NULL;
+static pthread_mutex_t pending_req_list_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static struct pending_req_list *pending_req_list_add(const char *msg_id)
+{
+ struct pending_req_list *new = callocz(1, sizeof(struct pending_req_list));
+ new->msg_id = msg_id;
+ new->hash = simple_hash(msg_id);
+
+ pthread_mutex_lock(&pending_req_list_lock);
+ new->next = pending_req_list_head;
+ pending_req_list_head = new;
+ pthread_mutex_unlock(&pending_req_list_lock);
+ return new;
+}
+
+void pending_req_list_rm(const char *msg_id)
+{
+ uint32_t hash = simple_hash(msg_id);
+ struct pending_req_list *prev = NULL;
+
+ pthread_mutex_lock(&pending_req_list_lock);
+ struct pending_req_list *curr = pending_req_list_head;
+
+ while (curr) {
+ if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) {
+ if (prev)
+ prev->next = curr->next;
+ else
+ pending_req_list_head = curr->next;
+
+ freez(curr);
+ break;
+ }
+
+ prev = curr;
+ curr = curr->next;
+ }
+ pthread_mutex_unlock(&pending_req_list_lock);
+}
+
+int mark_pending_req_cancelled(const char *msg_id)
+{
+ uint32_t hash = simple_hash(msg_id);
+
+ pthread_mutex_lock(&pending_req_list_lock);
+ struct pending_req_list *curr = pending_req_list_head;
+
+ while (curr) {
+ if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) {
+ curr->canceled = 1;
+ pthread_mutex_unlock(&pending_req_list_lock);
+ return 0;
+ }
+
+ curr = curr->next;
+ }
+ pthread_mutex_unlock(&pending_req_list_lock);
+ return 1;
+}
+
+static bool aclk_web_client_interrupt_cb(struct web_client *w __maybe_unused, void *data)
+{
+ struct pending_req_list *req = (struct pending_req_list *)data;
+ return req->canceled;
+}
+
static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) {
int retval = 0;
BUFFER *local_buffer = NULL;
@@ -30,6 +106,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->mode = WEB_CLIENT_MODE_GET;
w->timings.tv_in = query->created_tv;
+ w->interrupt.callback = aclk_web_client_interrupt_cb;
+ w->interrupt.callback_data = pending_req_list_add(query->msg_id);
+
usec_t t;
web_client_timeout_checkpoint_set(w, query->timeout);
if(web_client_timeout_checkpoint_and_check(w, &t)) {
@@ -168,6 +247,8 @@ cleanup:
web_client_release_to_cache(w);
+ pending_req_list_rm(query->msg_id);
+
#ifdef NETDATA_WITH_ZLIB
buffer_free(z_buffer);
#endif
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
index c006b0138c..dbe6f9e5e6 100644
--- a/aclk/aclk_query.h
+++ b/aclk/aclk_query.h
@@ -33,4 +33,6 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok);
+int mark_pending_req_cancelled(const char *msg_id);
+
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index b4dda5c425..60bff9ba1f 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -6,6 +6,7 @@
#include "aclk_query_queue.h"
#include "aclk.h"
#include "aclk_capas.h"
+#include "aclk_query.h"
#include "schema-wrappers/proto_2_json.h"
@@ -446,6 +447,23 @@ int stop_streaming_contexts(const char *msg, size_t msg_len)
return 0;
}
+int cancel_pending_req(const char *msg, size_t msg_len)
+{
+ struct aclk_cancel_pending_req cmd;
+ if(parse_cancel_pending_req(msg, msg_len, &cmd)) {
+ error_report("Error parsing CancelPendingReq");
+ return 1;
+ }
+
+ log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id);
+
+ if (mark_pending_req_cancelled(cmd.request_id))
+ error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id);
+
+ free_cancel_pending_req(&cmd);
+ return 0;
+}
+
typedef struct {
const char *name;
simple_hash_t name_hash;
@@ -466,6 +484,7 @@ new_cloud_rx_msg_t rx_msgs[] = {
{ .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
{ .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint },
{ .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts },
+ { .name = "CancelPendingRequest", .name_hash = 0, .fnc = cancel_pending_req },
{ .name = NULL, .name_hash = 0, .fnc = NULL },
};
diff --git a/aclk/schema-wrappers/agent_cmds.cc b/aclk/schema-wrappers/agent_cmds.cc
new file mode 100644
index 0000000000..6950f4025d
--- /dev/null
+++ b/aclk/schema-wrappers/agent_cmds.cc
@@ -0,0 +1,38 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/agent/v1/cmds.pb.h"
+
+#include "agent_cmds.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace agent::v1;
+
+int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req)
+{
+ CancelPendingRequest msg_parsed;
+
+ if (!msg_parsed.ParseFromArray(msg, msg_len)) {
+ error_report("Failed to parse CancelPendingRequest message");
+ return 1;
+ }
+
+ if (msg_parsed.request_id().c_str() == NULL) {
+ error_report("CancelPendingRequest message missing request_id");
+ return 1;
+ }
+ req->request_id = strdupz(msg_parsed.request_id().c_str());
+
+ if (msg_parsed.trace_id().c_str())
+ req->trace_id = strdupz(msg_parsed.trace_id().c_str());
+
+ set_timeval_from_google_timestamp(msg_parsed.timestamp(), &req->timestamp);
+
+ return 0;
+}
+
+void free_cancel_pending_req(struct aclk_cancel_pending_req *req)
+{
+ freez(req->request_id);
+ freez(req->trace_id);
+}
diff --git a/aclk/schema-wrappers/agent_cmds.h b/aclk/schema-wrappers/agent_cmds.h
new file mode 100644
index 0000000000..7e01f86c57
--- /dev/null
+++ b/aclk/schema-wrappers/agent_cmds.h
@@ -0,0 +1,27 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H
+#define ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H
+
+#include "libnetdata/libnetdata.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_cancel_pending_req {
+ char *request_id;
+
+ struct timeval timestamp;
+
+ char *trace_id;
+};
+
+int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req);
+void free_cancel_pending_req(struct aclk_cancel_pending_req *req);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H */
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
index 4294f7efe5..8543965107 100644
--- a/aclk/schema-wrappers/proto_2_json.cc
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -11,6 +11,7 @@
#include "proto/nodeinstance/info/v1/info.pb.h"
#include "proto/context/v1/stream.pb.h"
#include "proto/context/v1/context.pb.h"
+#include "proto/agent/v1/cmds.pb.h"
#include "libnetdata/libnetdata.h"
@@ -63,6 +64,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new context::v1::ContextsCheckpoint;
if (!strcmp(msgname, "StopStreamingContexts"))
return new context::v1::StopStreamingContexts;
+ if (!strcmp(msgname, "CancelPendingRequest"))
+ return new agent::v1::CancelPendingRequest;
return NULL;
}
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index a96f7ea7ac..b651b88457 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -14,5 +14,6 @@
#include "capability.h"
#include "context_stream.h"
#include "context.h"
+#include "agent_cmds.h"
#endif /* SCHEMA_WRAPPERS_H */