diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2023-05-02 17:43:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-02 17:43:51 +0200 |
commit | 2a491f7932b21768149a787c8ef94c5b817471aa (patch) | |
tree | 2f4884c386d27bd6b79849c8c2ff3ee2f2cb0ae0 /aclk | |
parent | cd5230b596bdc58f5dbdf97fbc0bee0dc6ea56ab (diff) |
Add Cancel Pending Request Message (#14953)
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_capas.c | 2 | ||||
-rw-r--r-- | aclk/aclk_query.c | 81 | ||||
-rw-r--r-- | aclk/aclk_query.h | 2 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 19 | ||||
-rw-r--r-- | aclk/schema-wrappers/agent_cmds.cc | 38 | ||||
-rw-r--r-- | aclk/schema-wrappers/agent_cmds.h | 27 | ||||
-rw-r--r-- | aclk/schema-wrappers/proto_2_json.cc | 3 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 1 |
8 files changed, 173 insertions, 0 deletions
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c index 824a438993..55f6fd3b4f 100644 --- a/aclk/aclk_capas.c +++ b/aclk/aclk_capas.c @@ -15,6 +15,7 @@ const struct capability *aclk_get_agent_capas() { .name = "funcs", .version = 1, .enabled = 1 }, { .name = "http_api_v2", .version = 1, .enabled = 1 }, { .name = "health", .version = 1, .enabled = 0 }, + { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; agent_capabilities[2].version = ml_capable() ? 1 : 0; @@ -40,6 +41,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) { .name = "funcs", .version = 0, .enabled = 0 }, { .name = "http_api_v2", .version = 2, .enabled = 1 }, { .name = "health", .version = 1, .enabled = host->health.health_enabled }, + { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index fd6f8555bc..46d1e1e5e3 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -13,6 +13,82 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +struct pending_req_list { + const char *msg_id; + uint32_t hash; + + int canceled; + + struct pending_req_list *next; +}; + +static struct pending_req_list *pending_req_list_head = NULL; +static pthread_mutex_t pending_req_list_lock = PTHREAD_MUTEX_INITIALIZER; + +static struct pending_req_list *pending_req_list_add(const char *msg_id) +{ + struct pending_req_list *new = callocz(1, sizeof(struct pending_req_list)); + new->msg_id = msg_id; + new->hash = simple_hash(msg_id); + + pthread_mutex_lock(&pending_req_list_lock); + new->next = pending_req_list_head; + pending_req_list_head = new; + pthread_mutex_unlock(&pending_req_list_lock); + return new; +} + +void pending_req_list_rm(const char *msg_id) +{ + uint32_t hash = simple_hash(msg_id); + struct pending_req_list *prev = NULL; + + pthread_mutex_lock(&pending_req_list_lock); + struct pending_req_list *curr = pending_req_list_head; + + while (curr) { + if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) { + if (prev) + prev->next = curr->next; + else + pending_req_list_head = curr->next; + + freez(curr); + break; + } + + prev = curr; + curr = curr->next; + } + pthread_mutex_unlock(&pending_req_list_lock); +} + +int mark_pending_req_cancelled(const char *msg_id) +{ + uint32_t hash = simple_hash(msg_id); + + pthread_mutex_lock(&pending_req_list_lock); + struct pending_req_list *curr = pending_req_list_head; + + while (curr) { + if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) { + curr->canceled = 1; + pthread_mutex_unlock(&pending_req_list_lock); + return 0; + } + + curr = curr->next; + } + pthread_mutex_unlock(&pending_req_list_lock); + return 1; +} + +static bool aclk_web_client_interrupt_cb(struct web_client *w __maybe_unused, void *data) +{ + struct pending_req_list *req = (struct pending_req_list *)data; + return req->canceled; +} + static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) { int retval = 0; BUFFER *local_buffer = NULL; @@ -30,6 +106,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->mode = WEB_CLIENT_MODE_GET; w->timings.tv_in = query->created_tv; + w->interrupt.callback = aclk_web_client_interrupt_cb; + w->interrupt.callback_data = pending_req_list_add(query->msg_id); + usec_t t; web_client_timeout_checkpoint_set(w, query->timeout); if(web_client_timeout_checkpoint_and_check(w, &t)) { @@ -168,6 +247,8 @@ cleanup: web_client_release_to_cache(w); + pending_req_list_rm(query->msg_id); + #ifdef NETDATA_WITH_ZLIB buffer_free(z_buffer); #endif diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index c006b0138c..dbe6f9e5e6 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -33,4 +33,6 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok); +int mark_pending_req_cancelled(const char *msg_id); + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index b4dda5c425..60bff9ba1f 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -6,6 +6,7 @@ #include "aclk_query_queue.h" #include "aclk.h" #include "aclk_capas.h" +#include "aclk_query.h" #include "schema-wrappers/proto_2_json.h" @@ -446,6 +447,23 @@ int stop_streaming_contexts(const char *msg, size_t msg_len) return 0; } +int cancel_pending_req(const char *msg, size_t msg_len) +{ + struct aclk_cancel_pending_req cmd; + if(parse_cancel_pending_req(msg, msg_len, &cmd)) { + error_report("Error parsing CancelPendingReq"); + return 1; + } + + log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); + + if (mark_pending_req_cancelled(cmd.request_id)) + error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id); + + free_cancel_pending_req(&cmd); + return 0; +} + typedef struct { const char *name; simple_hash_t name_hash; @@ -466,6 +484,7 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, + { .name = "CancelPendingRequest", .name_hash = 0, .fnc = cancel_pending_req }, { .name = NULL, .name_hash = 0, .fnc = NULL }, }; diff --git a/aclk/schema-wrappers/agent_cmds.cc b/aclk/schema-wrappers/agent_cmds.cc new file mode 100644 index 0000000000..6950f4025d --- /dev/null +++ b/aclk/schema-wrappers/agent_cmds.cc @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/agent/v1/cmds.pb.h" + +#include "agent_cmds.h" + +#include "schema_wrapper_utils.h" + +using namespace agent::v1; + +int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req) +{ + CancelPendingRequest msg_parsed; + + if (!msg_parsed.ParseFromArray(msg, msg_len)) { + error_report("Failed to parse CancelPendingRequest message"); + return 1; + } + + if (msg_parsed.request_id().c_str() == NULL) { + error_report("CancelPendingRequest message missing request_id"); + return 1; + } + req->request_id = strdupz(msg_parsed.request_id().c_str()); + + if (msg_parsed.trace_id().c_str()) + req->trace_id = strdupz(msg_parsed.trace_id().c_str()); + + set_timeval_from_google_timestamp(msg_parsed.timestamp(), &req->timestamp); + + return 0; +} + +void free_cancel_pending_req(struct aclk_cancel_pending_req *req) +{ + freez(req->request_id); + freez(req->trace_id); +} diff --git a/aclk/schema-wrappers/agent_cmds.h b/aclk/schema-wrappers/agent_cmds.h new file mode 100644 index 0000000000..7e01f86c57 --- /dev/null +++ b/aclk/schema-wrappers/agent_cmds.h @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H +#define ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H + +#include "libnetdata/libnetdata.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct aclk_cancel_pending_req { + char *request_id; + + struct timeval timestamp; + + char *trace_id; +}; + +int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req); +void free_cancel_pending_req(struct aclk_cancel_pending_req *req); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H */ diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 4294f7efe5..8543965107 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -11,6 +11,7 @@ #include "proto/nodeinstance/info/v1/info.pb.h" #include "proto/context/v1/stream.pb.h" #include "proto/context/v1/context.pb.h" +#include "proto/agent/v1/cmds.pb.h" #include "libnetdata/libnetdata.h" @@ -63,6 +64,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new context::v1::ContextsCheckpoint; if (!strcmp(msgname, "StopStreamingContexts")) return new context::v1::StopStreamingContexts; + if (!strcmp(msgname, "CancelPendingRequest")) + return new agent::v1::CancelPendingRequest; return NULL; } diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a96f7ea7ac..b651b88457 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -14,5 +14,6 @@ #include "capability.h" #include "context_stream.h" #include "context.h" +#include "agent_cmds.h" #endif /* SCHEMA_WRAPPERS_H */ |