diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2020-02-06 17:58:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-06 17:58:51 +0200 |
commit | b2b3c182548fe81e6d1c9a599b2571dabfdabcaa (patch) | |
tree | be40bfd7605ebf2b453114e2d904a0252abdfc21 | |
parent | 9f6c2556ecbbcfeab5049b3299094796457c97b1 (diff) |
ACLK agent 1 (#7894)
* - Add initial mqtt support
* [WIP] Agent cloud link
- Setup main mqtt thread to connect to a broker using V5 of the MQTT protocol (TBD)
- Send alarms to "netdata/alarm"
- Add error checks to handle connection failures
- Add params for
Broker, port
Maximum concurrent sent / recev messages
- Dummy function to check claiming status
- Generic mqtt_send command to publish message to a base topic , sub topic
It will end up in the form base_topic/sub_topic
- Add host/port in the connection failure error message
* Test libmosquitto libs
* connect to broker locally (assume localhost:1883)
* subscribe to channel netdata/command
* Test try a reload command to trigger health reload
* publish alerts to netdata/alarm
* - Fix compile issues
* - Use sleep_usec instead of usleep
* - Delay reconnection on failure due to misconfiguration (high cpu usage)
* - Remove the TLS connection config
* - Fix NETDATA_MQTT_INITIALIZATION_SLEEP_WAIT to use seconds
* - Gather ACLK related code under aclk folder
- Add aclk_ functions for abstract layer
- Moved low level libs intergration in mqtt.c
* - Add README.md file with initial comment
* - Clean MQTT v5
* - Code cleanup
* - Remove alarm log for now
- Remove the heart beat
* - Remove message properties for V5
* - Remove message properties for V5 (header)
* Fixed the netdata target to use a local static version of libmosquitto.
The installer does not yet have steps to pull and build the local library.
cd project_root
git clone ssh://git@github.com/netdata/mosquitto mosquitto/
(cd mosquitto/lib && make) # Ignore the cpp error
This will leave mosquitto/lib/libmosquitto.a for the build process to use.
* - Fix compile issues with older < 1.6 libmosquitto lib
* - Enable alarm events to check it works
- Re arrange includes
- Rework topic to be agent/guid/. Actual id will be
returned by the is_agent_claimed
* - Add initial metadata info
- Added helper function in web_api
- Added a debug command (info)
* Update the claiming state to retrieve the claimed id.
* - Use define for constants like command and metadata topics
- Function to wait for initialization of the ACLK link
- New aclk_subscribe command with QOS parameter for the mqtt subscription
- Use the is_agent_claimed function to get the real claim id and use it to build the topics
that will be used for the cloud communication
- Change in netdata-claim.sh.in to write the claim id without a trailing \n
* - Use define for constants like command and metadata topics
- Function to wait for initialization of the ACLK link
- New aclk_subscribe command with QOS parameter for the mqtt subscription
- Use the is_agent_claimed function to get the real claim id and use it to build the topics
that will be used for the cloud communication
- Change in netdata-claim.sh.in to write the claim id without a trailing \n
* - Remove the alarm log for now
- Add code (but disabled) to send charts
* - Use dummy anon, anon as username and password for testing purposes
* - Use client id anon as well
* Testing without TLS
* Switching TLS back on to fix docker environment.
* - Added query processing
An incoming URL now calls web_client_api_request_v1_data to handle a request and push the results
back to the "data" topic
- Move the above processing from the message callback to the query handle loop
- Added helper "pause" , "resume" commands to stop and resume query processing to stress test loading the queue
with queries before executing them
- Changed the endpoint topics to "meta", and "cmd" (previously metadata and command)
* make info message follow protocol
* move metadata msg generation into new func
* move metadata msg generation into new func
* - Add metadata to the responses
- Add hook to queue chart changes on creation and dimensions
- Changed the queue mechanism to include delay for X seconds
- Add delayed submittion of charts to the cloud so that all DIMs are defined to avoid resubmission
* - Add additional data info for aclk_queue command
* - Use web_clinet_api_request_v1 to handle the incoming request
This will handle all requests coming from the cloud
* - Cleanup and aclk_query structure
- Add msg_id parameter
- Enable the incoming JSON request
- Enable the outgoing JSON response
* - Added new thread to handle query processing
- Add lock and cond wait to wakeup thread when queries are submitted
- Cleanup on the main init function
* - Add wait time on agent init, to allow for chart, alarms and other definitions to be completed.
- During the wait time, no queries will be queued
* - Send metadata on query thread init
- New generic create header function for the JSON response
- Pack info and charts into one message
- Modified chart to remove entries (test)
- Modified charts mod to remove entries e.g alarms and volatile info
- Change input to aclk_update_chart (RRDHOST / instead of hostname)
* - When a request fails, add to the payload
- We may need to handle in a different key
- Error check in json parsing
* - Add dummy aclk_update_alarm command
* - Move incoming request JSON parsing code away from mqtt.c
- Added #ifdef ACLK_ENABLE so that we can have code merged but disabled by default
- Added version in incoming and outgoing JSON dict
* - Disable code if ACLK_ENABLE is not defined
- Remove references to the mqtt (mosquitto) lib
- Add dummy stubs in mqtt.c for completeness if ACLK_ENABLE is not defined
* - Disable challenge sample code for now
* - Remove libmosquitto from makefile
* - Fix spaces in Makefile.am
- Remove ifdef to avoid warning from LGTM
* - Remove for now the code that builds an along log test message to send to the cloud
* - Add check for ACLK_ENABLE definition and avoid calling the chart update functions
* - Remove commented code
* - Move source files to the correct place (ACLK_PLUGIN_FILES)
* - Remove include file thats not needed
* - Remove include file thats not needed
- Add improved checks for load_claiming_state()
* - Fix error message. Used error() that also logs errno and message
* - Fix some codacy issues
* - Fix more codacy issues, code cleanup
* - Revert code to address codacy warnings
* - Revert spaces added in a previous commit by mistake
* clean up if/else nest
* print error if fopen fails
* minor - error already logs errno
* - Fix version formatting
* - Cleanup all ACLK related compiler warnings
- Re-arrange include files
- Removed unused defines
* - More compilation warnings fixed
- Bug with thread creation fixed
* - Add condition to skip compilation of the ACLK code entirely. Add env variable ACLK="yes" to enable
* - Add condition to skip the libmosquitto
* - Change feature flag from ACLK_ENABLE to ENABLE_ACLK in accordance with the rest of ENABLE_xx flags
- Typo in info message fix
Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com>
Co-authored-by: Timo <6674623+underhood@users.noreply.github.com>
-rw-r--r-- | CMakeLists.txt | 12 | ||||
-rw-r--r-- | Makefile.am | 25 | ||||
-rw-r--r-- | aclk/Makefile.am | 18 | ||||
-rw-r--r-- | aclk/README.md | 1 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 995 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 103 | ||||
-rw-r--r-- | aclk/mqtt.c | 318 | ||||
-rw-r--r-- | aclk/mqtt.h | 21 | ||||
-rw-r--r-- | claim/claim.c | 46 | ||||
-rw-r--r-- | claim/claim.h | 2 | ||||
-rwxr-xr-x | claim/netdata-claim.sh.in | 4 | ||||
-rw-r--r-- | configure.ac | 16 | ||||
-rw-r--r-- | daemon/common.h | 3 | ||||
-rw-r--r-- | daemon/main.c | 5 | ||||
-rw-r--r-- | database/rrddim.c | 22 | ||||
-rw-r--r-- | database/rrdset.c | 4 | ||||
-rw-r--r-- | health/health_log.c | 1 | ||||
-rw-r--r-- | web/api/web_api_v1.c | 21 | ||||
-rw-r--r-- | web/api/web_api_v1.h | 2 |
19 files changed, 1587 insertions, 32 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f715aeecb4..d43cb8a1f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -339,8 +339,7 @@ set(LIBNETDATA_FILES add_library(libnetdata OBJECT ${LIBNETDATA_FILES}) set(APPS_PLUGIN_FILES - collectors/apps.plugin/apps_plugin.c - ) + collectors/apps.plugin/apps_plugin.c) set(CHECKS_PLUGIN_FILES collectors/checks.plugin/plugin_checks.c @@ -606,6 +605,14 @@ set(CLAIM_PLUGIN_FILES claim/claim.h ) +set(ACLK_PLUGIN_FILES + aclk/agent_cloud_link.c + aclk/agent_cloud_link.h + aclk/agent_cloud_link.c + aclk/mqtt.c + aclk/mqtt.h + ) + set(EXPORTING_ENGINE_FILES exporting/exporting_engine.c exporting/exporting_engine.h @@ -673,6 +680,7 @@ set(NETDATA_FILES ${STREAMING_PLUGIN_FILES} ${WEB_PLUGIN_FILES} ${CLAIM_PLUGIN_FILES} + ${ACLK_PLUGIN_FILES} ) set(NETDATACLI_FILES diff --git a/Makefile.am b/Makefile.am index 30c2eb67b5..483a9c22db 100644 --- a/Makefile.am +++ b/Makefile.am @@ -100,6 +100,7 @@ SUBDIRS += \ streaming \ web \ claim \ + aclk \ $(NULL) @@ -108,6 +109,7 @@ AM_CFLAGS = \ $(OPTIONAL_NFACCT_CFLAGS) \ $(OPTIONAL_ZLIB_CFLAGS) \ $(OPTIONAL_UUID_CFLAGS) \ + $(OPTIONAL_MQTT_CFLAGS) \ $(OPTIONAL_LIBCAP_LIBS) \ $(OPTIONAL_IPMIMONITORING_CFLAGS) \ $(OPTIONAL_CUPS_CFLAGS) \ @@ -456,6 +458,13 @@ CLAIM_PLUGIN_FILES = \ claim/claim.h \ $(NULL) +ACLK_PLUGIN_FILES = \ + aclk/agent_cloud_link.c \ + aclk/agent_cloud_link.h \ + aclk/mqtt.c \ + aclk/mqtt.h \ + $(NULL) + EXPORTING_ENGINE_FILES = \ exporting/exporting_engine.c \ exporting/exporting_engine.h \ @@ -526,6 +535,12 @@ NETDATA_FILES = \ $(CLAIM_PLUGIN_FILES) \ $(NULL) +if ENABLE_ACLK + NETDATA_FILES += \ + $(ACLK_PLUGIN_FILES) \ + $(NULL) +endif + if FREEBSD NETDATA_FILES += \ $(FREEBSD_PLUGIN_FILES) \ @@ -559,6 +574,7 @@ NETDATA_COMMON_LIBS = \ $(OPTIONAL_ZLIB_LIBS) \ $(OPTIONAL_SSL_LIBS) \ $(OPTIONAL_UUID_LIBS) \ + $(OPTIONAL_MQTT_LIBS) \ $(OPTIONAL_UV_LIBS) \ $(OPTIONAL_LZ4_LIBS) \ $(OPTIONAL_JUDY_LIBS) \ @@ -575,9 +591,18 @@ NETDATACLI_FILES = \ sbin_PROGRAMS += netdata netdata_SOURCES = $(NETDATA_FILES) + +if ENABLE_ACLK netdata_LDADD = \ + mosquitto/lib/libmosquitto.a \ $(NETDATA_COMMON_LIBS) \ $(NULL) +else +netdata_LDADD = \ + $(NETDATA_COMMON_LIBS) \ + $(NULL) +endif + if ENABLE_CXX_LINKER netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@ else diff --git a/aclk/Makefile.am b/aclk/Makefile.am new file mode 100644 index 0000000000..ebc2a18d95 --- /dev/null +++ b/aclk/Makefile.am @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +CLEANFILES = \ + $(NULL) + +include $(top_srcdir)/build/subst.inc +SUFFIXES = .in + +sbin_SCRIPTS = \ + $(NULL) + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/aclk/README.md b/aclk/README.md new file mode 100644 index 0000000000..287d3b2606 --- /dev/null +++ b/aclk/README.md @@ -0,0 +1 @@ +This is the agent cloud link (ACLK) information file
\ No newline at end of file diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c new file mode 100644 index 0000000000..dc90c6d14e --- /dev/null +++ b/aclk/agent_cloud_link.c @@ -0,0 +1,995 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" + +// Read from the config file -- new section [agent_cloud_link] +// Defaults are supplied +int aclk_recv_maximum = 0; // default 20 +int aclk_send_maximum = 0; // default 20 + +int aclk_port = 0; // default 1883 +char *aclk_hostname = NULL; //default localhost +int aclk_subscribed = 0; + +int aclk_metadata_submitted = 0; +int waiting_init = 1; +int cmdpause = 0; // Used to pause query processing + +BUFFER *aclk_buffer = NULL; +char *global_base_topic = NULL; + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch(e->type) { + case JSON_OBJECT: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_ARRAY: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_STRING: + if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) { + data->topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_URL)) { + data->url = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) { + data->version = atol(e->data.string); + break; + } + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + +//char *send_http_request(char *host, char *port, char *url, BUFFER *b) +//{ +// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; +// +// buffer_flush(b); +// buffer_sprintf( +// b, +// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n", +// url, host); +// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout); +// +// if (unlikely(sock == -1)) { +// error("Handshake failed"); +// return NULL; +// } +// +// SSL_CTX *ctx = security_initialize_openssl_client(); +// // Certificate chain: not updating the stores - do we need private CA roots? +// // Calls to SSL_CTX_load_verify_locations would go here. +// SSL *ssl = SSL_new(ctx); +// SSL_set_fd(ssl, sock); +// int err = SSL_connect(ssl); +// SSL_write(ssl, b->buffer, b->len); // Timeout options? +// int bytes_read = SSL_read(ssl, b->buffer, b->len); +// SSL_shutdown(ssl); +// close(sock); +//} + +// Set when we have connection up and running from the connection callback +int aclk_connection_initialized = 0; + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define QUERY_LOCK netdata_mutex_lock(&query_mutex) +#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; + +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) + + +struct aclk_query { + time_t created; + time_t run_after; // Delay run until after this time + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + u_int64_t count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + +/* + * Free a query structure when done + */ + +void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + freez(this_query->query); + if (this_query->data) + freez(this_query->data); + if (this_query->msg_id) + freez(this_query->msg_id); + freez(this_query); + return; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Called with locked entries + +struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a lock before calling this +struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query) +{ + struct aclk_query *tmp_query; + + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) { + if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) + return tmp_query; + } + } + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(waiting_init)) + return 0; + + run_after = now_realtime_sec() + run_after; + + QUERY_LOCK; + tmp_query = aclk_query_find(topic, data, msg_id, query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + tmp_query->deleted = 1; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + if (internal) { + new_query->topic = strdupz(topic); + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + if (data) + new_query->data = strdupz(data); + + new_query->next = NULL; + new_query->created = now_realtime_sec(); + new_query->run_after = run_after; + + info("Added query (%s) (%s)", topic, query); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + +// if (likely(aclk_queue.aclk_query_tail)) { +// aclk_queue.aclk_query_tail->next = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// +// if (likely(!aclk_queue.aclk_query_head)) { +// aclk_queue.aclk_query_head = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// QUERY_UNLOCK; +// return 0; +} + +inline int aclk_submit_request(struct aclk_request *request) +{ + return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + QUERY_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + QUERY_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + if (this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + QUERY_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + QUERY_UNLOCK; + return this_query; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called by aclk_init(), to compute the base topic once and have +// it stored internally. +// Need to check if additional logic should be added to make sure that there +// is enough information to determine the base topic at init time + +// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch +// that on the fly + +char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action) +{ + static char *topic = NULL; + + if (unlikely(!is_agent_claimed())) + return NULL; + + ACLK_LOCK; + + if (unlikely(action == PUBLICH_TOPIC_FREE)) { + if (likely(topic)) { + freez(topic); + topic = NULL; + } + + ACLK_UNLOCK; + + return NULL; + } + + if (unlikely(action == PUBLICH_TOPIC_REBUILD)) { + ACLK_UNLOCK; + get_publish_base_topic(PUBLICH_TOPIC_FREE); + return get_publish_base_topic(PUBLICH_TOPIC_GET); + } + + if (unlikely(!topic)) { + char tmp_topic[ACLK_MAX_TOPIC + 1]; + + sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); + topic = strdupz(tmp_topic); + } + + ACLK_UNLOCK; + return topic; +} + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + if (unlikely(!global_base_topic)) + global_base_topic = GET_PUBLISH_BASE_TOPIC; + + if (unlikely(!global_base_topic)) + return sub_topic; + + snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + + return final_topic; +} + +// Wait for ACLK connection to be established +int aclk_wait_for_initialization() +{ + if (unlikely(!aclk_connection_initialized)) { + time_t now = now_realtime_sec(); + + while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) { + sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT); + _link_event_loop(0); + } + + if (unlikely(!aclk_connection_initialized)) { + error("ACLK connection cannot be established"); + return 1; + } + } + return 0; +} + +/* + * This function will fetch the next pending command and process it + * + */ +int aclk_process_query() +{ + struct aclk_query *this_query; + static u_int64_t query_count = 0; + //int rc; + + if (unlikely(cmdpause)) + return 0; + + if (!aclk_connection_initialized) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + //info("No pending queries"); + return 0; + } + + if (unlikely(this_query->deleted)) { + info("Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + + query_count++; + info( + "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query, + (int) (now_realtime_sec() - this_query->created)); + + if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: ignore return code for now + web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + + //TODO: handle bad response perhaps in a different way. For now it does to the payload + //if (rc == HTTP_RESP_OK || 1) { + buffer_flush(aclk_buffer); + + aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data); + aclk_buffer->contenttype = CT_APPLICATION_JSON; + aclk_send_message(this_query->topic, aclk_buffer->buffer); + //} else + // error("Query RESP: %s", w->response.data->buffer); + + buffer_free(w->response.data); + freez(w); + aclk_query_free(this_query); + return 1; + } + + if (strcmp((char *)this_query->topic, "_chart") == 0) { + aclk_send_single_chart(this_query->data, this_query->query); + } + + aclk_query_free(this_query); + + return 1; +} + +// Launch a query processing thread + +/* + * Process all pending queries + * Return 0 if no queries were processed, 1 otherwise + * + */ + +int aclk_process_queries() +{ + if (unlikely(cmdpause)) + return 0; + + // Return if no queries pending + if (likely(!aclk_queue.count)) + return 0; + + info("Processing %d queries", (int ) aclk_queue.count); + + while (aclk_process_query()) { + //rc = _link_event_loop(0); + }; + + return 1; +} + +static void aclk_query_thread_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * MAin query processing thread + * + */ +void *aclk_query_main_thread(void *ptr) +{ + netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + + while (!netdata_exit) { + + QUERY_THREAD_LOCK; + + if (unlikely(!aclk_metadata_submitted)) { + aclk_send_metadata(); + aclk_metadata_submitted = 1; + } + + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + if (likely(aclk_connection_initialized && !netdata_exit)) { + while (aclk_process_queries()) { + // Sleep for a few ms and retry maybe we have something to process + // before going to sleep + // TODO: This needs improvement to avoid missed queries + sleep_usec(USEC_PER_MS * 100); + } + } + + QUERY_THREAD_UNLOCK; + + } // forever + info("Shutting down query processing thread"); + netdata_thread_cleanup_pop(1); + return NULL; +} + +// Thread cleanup +static void aclk_main_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + QUERY_THREAD_WAKEUP; + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + //netdata_thread_t *query_thread; + struct netdata_static_thread query_thread; + + memset(&query_thread, 0, sizeof(query_thread)); + + netdata_thread_cleanup_push(aclk_main_cleanup, ptr); + + if (unlikely(!aclk_buffer)) + aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + assert(aclk_buffer != NULL); + + //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT); + sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT); + + // Ok mark we are ready to accept incoming requests + waiting_init = 0; + + while (!netdata_exit) { + // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds + // TODO: Handle the unclaim command as well -- we may need to shutdown the connection + if (likely(!is_agent_claimed())) { + sleep_usec(USEC_PER_SEC * 60); + info("Checking agent claiming status"); + continue; + } + + if (unlikely(!aclk_connection_initialized)) { + static int initializing = 0; + + if (likely(initializing)) { + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + initializing = 1; + info("Initializing connection"); + //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer); + if (unlikely(aclk_init(ACLK_INIT))) { + // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying. + sleep_usec(USEC_PER_SEC * 60); + continue; + } else { + sleep_usec(USEC_PER_SEC * 1); + } + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2); + } + if (unlikely(!query_thread.thread)) { + query_thread.thread = mallocz(sizeof(netdata_thread_t)); + netdata_thread_create( + query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + } + + //TODO: Check if there is a return code + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + + } // forever + aclk_shutdown(); + + netdata_thread_cleanup_pop(1); + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form <base_topic>/<sub_topic> + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message(char *sub_topic, char *message) +{ + int rc; + static int skip_due_to_shutdown = 0; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (! |