summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-02-24 12:10:10 +0100
committerGitHub <noreply@github.com>2020-02-24 12:10:10 +0100
commitc6d945200f201b05c2b019fa862cdf080a39a9d4 (patch)
treef061f87ff75a898ee7de82018a2c18935ada09cf
parenteeff346ca13af40091980d95de79ad5df50592e1 (diff)
Merging the feature branch for the ACLK in the previous sprint. (#8179)
* ACLK connection and protocol improvements (#8139) * Adding ACLK retry on connection failure (#8147) * Fixed reconnect issues on the ACLK. (#8163) * Cleaning up ACLK - part 1 (#8167) Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
-rw-r--r--aclk/aclk_lws_wss_client.c151
-rw-r--r--aclk/aclk_lws_wss_client.h12
-rw-r--r--aclk/agent_cloud_link.c1182
-rw-r--r--aclk/agent_cloud_link.h79
-rw-r--r--aclk/mqtt.c202
-rw-r--r--aclk/mqtt.h4
-rwxr-xr-xbuild_external/bin/clean-install.sh5
-rw-r--r--build_external/clean-install-arch-extras.Dockerfile56
-rw-r--r--build_external/clean-install-arch.Dockerfile2
-rw-r--r--build_external/projects/aclk-testing/agent-valgrind-compose.yml19
-rw-r--r--build_external/projects/aclk-testing/docker-compose.yml51
-rw-r--r--build_external/projects/aclk-testing/paho-compose.yml6
-rw-r--r--build_external/projects/aclk-testing/paho-inspection.py33
-rw-r--r--build_external/projects/aclk-testing/paho.Dockerfile12
-rw-r--r--database/rrddim.c14
-rw-r--r--database/rrdset.c7
-rw-r--r--health/health.c8
-rw-r--r--health/health_json.c2
-rw-r--r--health/health_log.c3
-rw-r--r--web/api/formatters/charts2json.c4
-rw-r--r--web/api/formatters/charts2json.h2
-rw-r--r--web/api/formatters/rrd2json.c2
-rw-r--r--web/api/formatters/rrdset2json.c105
-rw-r--r--web/api/formatters/rrdset2json.h2
-rw-r--r--web/api/web_api_v1.c2
25 files changed, 1286 insertions, 679 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c
index a4a21f45bf..2a4badd4ab 100644
--- a/aclk/aclk_lws_wss_client.c
+++ b/aclk/aclk_lws_wss_client.c
@@ -161,8 +161,13 @@ void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) {
#endif
}
-void _aclk_wss_connect(struct aclk_lws_wss_engine_instance *inst){
- struct lws_client_connect_info i;
+void aclk_lws_wss_connect(struct aclk_lws_wss_engine_instance *inst){
+ struct lws_client_connect_info i;
+
+ if(inst->lws_wsi) {
+ error("Already Connected. Only one connection supported at a time.");
+ return;
+ }
memset(&i, 0, sizeof(i));
i.context = inst->lws_context;
@@ -186,7 +191,37 @@ static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data,
}
return 1;
}
-
+
+static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
+{
+ switch(reason)
+ {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ return "LWS_CALLBACK_CLIENT_WRITEABLE";
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ return "LWS_CALLBACK_CLIENT_RECEIVE";
+ case LWS_CALLBACK_PROTOCOL_INIT:
+ return "LWS_CALLBACK_PROTOCOL_INIT";
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED";
+ case LWS_CALLBACK_USER:
+ return "LWS_CALLBACK_USER";
+ case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+ return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR";
+ case LWS_CALLBACK_CLIENT_CLOSED:
+ return "LWS_CALLBACK_CLIENT_CLOSED";
+ case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
+ return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE";
+ case LWS_CALLBACK_WSI_DESTROY:
+ return "LWS_CALLBACK_WSI_DESTROY";
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ return "LWS_CALLBACK_CLIENT_ESTABLISHED";
+ default:
+ // Not using an internal buffer here for thread-safety with unknown calling context.
+ error("Unknown LWS callback %u", reason);
+ return "unknown";
+ }
+}
static int
aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
@@ -201,6 +236,7 @@ aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
return -1;
}
+ // Callback servicing is forced when we are closed from above.
if( inst->upstream_reconnect_request ) {
error("Closing lws connectino due to libmosquitto error.");
char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
@@ -209,74 +245,83 @@ aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
inst->upstream_reconnect_request = 0;
}
+ // Don't log to info - volume is proportional to message flow on ACLK.
+ switch (reason) {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ aclk_lws_mutex_lock(&inst->write_buf_mutex);
+ data = lws_wss_packet_buffer_pop(&inst->write_buffer_head);
+ if(likely(data)) {
+ lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
+ lws_wss_packet_buffer_free(data);
+ if(inst->write_buffer_head)
+ lws_callback_on_writable(inst->lws_wsi);
+ }
+ aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+ return retval;
+
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ aclk_lws_mutex_lock(&inst->read_buf_mutex);
+ if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len))
+ retval = 1;
+ aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+
+ if(likely(inst->callbacks.data_rcvd_callback))
+ // to future myself -> do not call this while read lock is active as it will eventually
+ // want to acquire same lock later in aclk_lws_wss_client_read() function
+ inst->callbacks.data_rcvd_callback();
+ else
+ inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read
+ return retval;
+
+ case LWS_CALLBACK_WSI_CREATE:
+ case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
+ case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
+ case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ case LWS_CALLBACK_GET_THREAD_ID: // ?
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
+ // Expected and safe to ignore.
+ return retval;
+
+ default:
+ // Pass to next switch, this case removes compiler warnings.
+ break;
+
+ }
+ // Log to info - volume is proportional to connection attempts.
+ info("Processing callback %s", aclk_lws_callback_name(reason));
switch (reason) {
- case LWS_CALLBACK_CLIENT_WRITEABLE:
- aclk_lws_mutex_lock(&inst->write_buf_mutex);
- data = lws_wss_packet_buffer_pop(&inst->write_buffer_head);
- if(likely(data)) {
- lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
- lws_wss_packet_buffer_free(data);
- if(inst->write_buffer_head)
- lws_callback_on_writable(inst->lws_wsi);
- }
- aclk_lws_mutex_unlock(&inst->write_buf_mutex);
- break;
- case LWS_CALLBACK_CLIENT_RECEIVE:
- aclk_lws_mutex_lock(&inst->read_buf_mutex);
- if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len))
- retval = 1;
- aclk_lws_mutex_unlock(&inst->read_buf_mutex);
-
- if(likely(inst->callbacks.data_rcvd_callback))
- // to future myself -> do not call this while read lock is active as it will eventually
- // want to acquire same lock later in aclk_lws_wss_client_read() function
- inst->callbacks.data_rcvd_callback();
- else
- inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read
- break;
case LWS_CALLBACK_PROTOCOL_INIT:
- //initial connection here
- //later we will reconnect with delay od ACLK_LWS_WSS_RECONNECT_TIMEOUT
- //in case this connection fails or drops
- _aclk_wss_connect(inst);
- break;
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- //TODO if already active make some error noise
- //currently we expect only one connection per netdata
+ aclk_lws_wss_connect(inst); // Makes the outgoing connection
+ break;
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ if (inst->lws_wsi != NULL && inst->lws_wsi != wsi)
+ error("Multiple connections on same WSI? %p vs %p", inst->lws_wsi, wsi);
inst->lws_wsi = wsi;
break;
-#ifdef AUTO_RECONNECT_ON_LWS_LAYER
- case LWS_CALLBACK_USER:
- inst->reconnect_timeout_running = 0;
- _aclk_wss_connect(inst);
- break;
-#endif
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given"));
- /* FALLTHRU */
+ // Fall-through
case LWS_CALLBACK_CLIENT_CLOSED:
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
-#ifdef AUTO_RECONNECT_ON_LWS_LAYER
- if(!inst->reconnect_timeout_running) {
- lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
- lws_get_protocol(wsi),
- LWS_CALLBACK_USER, ACLK_LWS_WSS_RECONNECT_TIMEOUT);
- inst->reconnect_timeout_running = 1;
- }
- /* FALLTHRU */
-#endif
- //no break here on purpose we want to continue with LWS_CALLBACK_WSI_DESTROY
+ inst->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback
+ if (inst->callbacks.connection_closed)
+ inst->callbacks.connection_closed();
+ return -1; // the callback response is ignored, hope the above remains true
case LWS_CALLBACK_WSI_DESTROY:
aclk_lws_wss_clear_io_buffers(inst);
inst->lws_wsi = NULL;
inst->websocket_connection_up = 0;
- break;
+ if (inst->callbacks.connection_closed)
+ inst->callbacks.connection_closed();
+ break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
inst->websocket_connection_up = 1;
if(inst->callbacks.connection_established_callback)
inst->callbacks.connection_established_callback();
break;
+
default:
+ error("Unexecpted callback from libwebsockets %s",aclk_lws_callback_name(reason));
break;
}
return retval; //0-OK, other connection should be closed!
diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h
index 00d51c3985..27c1b96153 100644
--- a/aclk/aclk_lws_wss_client.h
+++ b/aclk/aclk_lws_wss_client.h
@@ -30,6 +30,7 @@ struct aclk_lws_wss_engine_callbacks {
void (*connection_established_callback)();
void (*data_rcvd_callback)();
void (*data_writable_callback)();
+ void (*connection_closed)();
};
struct lws_wss_packet_buffer;
@@ -57,14 +58,7 @@ struct aclk_lws_wss_engine_instance {
int websocket_connection_up;
// currently this is by default disabled
-// as decision has been made that reconnection
-// will have to be done from top layer
-// (after getting the new MQTT auth data)
-// for now i keep it here as it is usefull for
-// some of my internall testing
-#ifdef AUTO_RECONNECT_ON_LWS_LAYER
- int reconnect_timeout_running;
-#endif
+
int data_to_read;
int upstream_reconnect_request;
};
@@ -72,6 +66,8 @@ struct aclk_lws_wss_engine_instance {
struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port);
void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst);
+void aclk_lws_wss_connect(struct aclk_lws_wss_engine_instance *inst);
+
int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count);
int aclk_lws_wss_client_read (struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count);
int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst);
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 2f75a65009..dca6ca7fcd 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -5,19 +5,30 @@
// Read from the config file -- new section [agent_cloud_link]
// Defaults are supplied
-int aclk_recv_maximum = 0; // default 20
-int aclk_send_maximum = 0; // default 20
-int aclk_port = 0; // default 1883
-char *aclk_hostname = NULL; //default localhost
+int aclk_port = ACLK_DEFAULT_PORT;
+char *aclk_hostname = ACLK_DEFAULT_HOST;
int aclk_subscribed = 0;
+int aclk_disable_single_updates = 0;
int aclk_metadata_submitted = 0;
+int agent_state = 0;
+time_t last_init_sequence = 0;
int waiting_init = 1;
-int cmdpause = 0; // Used to pause query processing
-BUFFER *aclk_buffer = NULL;
char *global_base_topic = NULL;
+int aclk_connecting = 0;
+
+char *create_uuid()
+{
+ uuid_t uuid;
+ char *uuid_str = mallocz(36 + 1);
+
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+
+ return uuid_str;
+}
int cloud_to_agent_parse(JSON_ENTRY *e)
{
@@ -25,11 +36,8 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
switch(e->type) {
case JSON_OBJECT:
- e->callback_function = cloud_to_agent_parse;
- break;
case JSON_ARRAY:
- e->callback_function = cloud_to_agent_parse;
- break;
+ break;
case JSON_STRING:
if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
data->msg_id = strdupz(e->data.string);
@@ -40,17 +48,17 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
break;
}
if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
- data->topic = strdupz(e->data.string);
+ data->callback_topic = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
- data->url = strdupz(e->data.string);
+ data->payload = strdupz(e->data.string);
break;
}
break;
case JSON_NUMBER:
if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
- data->version = atol(e->data.string);
+ data->version = atoi(e->original_string);
break;
}
break;
@@ -64,33 +72,6 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
return 0;
}
-//char *send_http_request(char *host, char *port, char *url, BUFFER *b)
-//{
-// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
-//
-// buffer_flush(b);
-// buffer_sprintf(
-// b,
-// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
-// url, host);
-// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
-//
-// if (unlikely(sock == -1)) {
-// error("Handshake failed");
-// return NULL;
-// }
-//
-// SSL_CTX *ctx = security_initialize_openssl_client();
-// // Certificate chain: not updating the stores - do we need private CA roots?
-// // Calls to SSL_CTX_load_verify_locations would go here.
-// SSL *ssl = SSL_new(ctx);
-// SSL_set_fd(ssl, sock);
-// int err = SSL_connect(ssl);
-// SSL_write(ssl, b->buffer, b->len); // Timeout options?
-// int bytes_read = SSL_read(ssl, b->buffer, b->len);
-// SSL_shutdown(ssl);
-// close(sock);
-//}
// Set when we have connection up and running from the connection callback
int aclk_connection_initialized = 0;
@@ -101,10 +82,14 @@ int aclk_mqtt_connected = 0;
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
+#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
+#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
+
#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
@@ -116,14 +101,35 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+/*
+ * Maintain a list of collectors and chart count
+ * If all the charts of a collector are deleted
+ * then a new metadata dataset must be send to the cloud
+ *
+ */
+struct _collector {
+ time_t created;
+ u_int32_t count; //chart count
+ u_int32_t hostname_hash;
+ u_int32_t plugin_hash;
+ u_int32_t module_hash;
+ char *hostname;
+ char *plugin_name;
+ char *module_name;
+ struct _collector *next;
+};
+
+struct _collector *collector_list = NULL;
+
struct aclk_query {
time_t created;
time_t run_after; // Delay run until after this time
+ ACLK_CMD cmd; // What command is this
char *topic; // Topic to respond to
char *data; // Internal data (NULL if request from the cloud)
char *msg_id; // msg_id generated by the cloud (NULL if internal)
char *query; // The actual query
- u_char deleted; // Mark deleted for garbage collect
+ u_char deleted; // Mark deleted for garbage collect
struct aclk_query *next;
};
@@ -134,6 +140,41 @@ struct aclk_query_queue {
} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
/*
+ * After a connection failure -- delay in milliseconds
+ * When a connection is established, the delay function
+ * should be called with
+ *
+ * mode 0 to reset the delay
+ * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ *
+ */
+unsigned long int aclk_reconnect_delay(int mode)
+{
+ static int fail = -1;
+ unsigned long int delay;
+
+ if (!mode || fail == -1) {
+ srandom(time(NULL));
+ fail = mode-1;
+ return 0;
+ }
+
+ delay = (1 << fail);
+
+ if (delay >= ACLK_MAX_BACKOFF_DELAY) {
+ delay = ACLK_MAX_BACKOFF_DELAY * 1000;
+ }
+ else {
+ fail++;
+ delay = (delay * 1000) + (random() % 1000);
+ }
+
+// sleep_usec(USEC_PER_MS * delay);
+
+ return delay;
+}
+
+/*
* Free a query structure when done
*/
@@ -143,23 +184,29 @@ void aclk_query_free(struct aclk_query *this_query)
return;
freez(this_query->topic);
- freez(this_query->query);
- if (this_query->data)
+ if (likely(this_query->query))
+ freez(this_query->query);
+ if (likely(this_query->data))
freez(this_query->data);
- if (this_query->msg_id)
+ if (likely(this_query->msg_id))
freez(this_query->msg_id);
freez(this_query);
- return;
}
// Returns the entry after which we need to create a new entry to run at the specified time
// If NULL is returned we need to add to HEAD
-// Called with locked entries
+// Need to have a QUERY lock before calling this
struct aclk_query *aclk_query_find_position(time_t time_to_run)
{
struct aclk_query *tmp_query, *last_query;
+ // Quick check if we will add to the end
+ if (likely(aclk_queue.aclk_query_tail)) {
+ if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
+ return aclk_queue.aclk_query_tail;
+ }
+
last_query = NULL;
tmp_query = aclk_queue.aclk_query_head;
@@ -172,21 +219,27 @@ struct aclk_query *aclk_query_find_position(time_t time_to_run)
return last_query;
}
-// Need to have a lock before calling this
-struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
+// Need to have a QUERY lock before calling this
+struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
{
- struct aclk_query *tmp_query;
+ struct aclk_query *tmp_query, *prev_query;
+ UNUSED(cmd);
tmp_query = aclk_queue.aclk_query_head;
-
+ prev_query = NULL;
while (tmp_query) {
if (likely(!tmp_query->deleted)) {
- if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
+ if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
- (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
+ (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
+
+ if (likely(last_query))
+ *last_query = prev_query;
return tmp_query;
+ }
}
}
+ prev_query = tmp_query;
tmp_query = tmp_query->next;
}
return NULL;
@@ -196,7 +249,7 @@ struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *
* Add a query to execute, the result will be send to the specified topic
*/
-int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
+int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
struct aclk_query *new_query, *tmp_query;
@@ -204,23 +257,42 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
if (unlikely(waiting_init))
return 0;
+ // Ignore all commands if agent not stable and reset the last_init_sequence mark
+ if (agent_state == 0) {
+ last_init_sequence = now_realtime_sec();
+ return 0;
+ }
+
run_after = now_realtime_sec() + run_after;
QUERY_LOCK;
- tmp_query = aclk_query_find(topic, data, msg_id, query);
+ struct aclk_query *last_query = NULL;
+
+ //last_query = NULL;
+ tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
if (unlikely(tmp_query)) {
if (tmp_query->run_after == run_after) {
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
return 0;
}
- tmp_query->deleted = 1;
+
+ if (last_query)
+ last_query->next = tmp_query->next;
+ else
+ aclk_queue.aclk_query_head = tmp_query->next;
+
+ debug(D_ACLK, "Removing double entry");
+ aclk_query_free(tmp_query);
+ aclk_queue.count--;
}
new_query = callocz(1, sizeof(struct aclk_query));
+ new_query->cmd = aclk_cmd;
if (internal) {
new_query->topic = strdupz(topic);
- new_query->query = strdupz(query);
+ if (likely(query))
+ new_query->query = strdupz(query);
} else {
new_query->topic = topic;
new_query->query = query;
@@ -234,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
new_query->created = now_realtime_sec();
new_query->run_after = run_after;
- info("Added query (%s) (%s)", topic, query);
+ debug(D_ACLK, "Added query (%s) (%s)", topic, query?query:"");
tmp_query = aclk_query_find_position(run_after);
@@ -256,29 +328,11 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
return 0;
-
-// if (likely(aclk_queue.aclk_query_tail)) {
-// aclk_queue.aclk_query_tail->next = new_query;
-// aclk_queue.aclk_query_tail = new_query;
-// aclk_queue.count++;
-// QUERY_UNLOCK;
-// return 0;
-// }
-//
-// if (likely(!aclk_queue.aclk_query_head)) {
-// aclk_queue.aclk_query_head = new_query;
-// aclk_queue.aclk_query_tail = new_query;
-// aclk_queue.count++;
-// QUERY_UNLOCK;
-// return 0;
-// }
-// QUERY_UNLOCK;
-// return 0;
}
inline int aclk_submit_request(struct aclk_request *request)
{
- return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
+ return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
}
/*
@@ -303,7 +357,27 @@ struct aclk_query *aclk_queue_pop()
this_query = aclk_queue.aclk_query_head;
- if (this_query->run_after > now_realtime_sec()) {
+ // Get rid of the deleted entries
+ while (this_query && this_query->deleted) {
+ aclk_queue.count--;
+
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ aclk_query_free(this_query);
+
+ this_query = aclk_queue.aclk_query_head;
+ }
+
+ if (likely(!this_query)) {
+ QUERY_UNLOCK;
+ return NULL;
+ }
+
+ if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
QUERY_UNLOCK;
return NULL;
@@ -327,59 +401,295 @@ struct aclk_query *aclk_queue_pop()
// Need to check if additional logic should be added to make sure that there
// is enough information to determine the base topic at init time
-// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
-// that on the fly
-char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
+char *create_publish_base_topic()
{
- static char *topic = NULL;
-
if (unlikely(!is_agent_claimed()))
return NULL;
ACLK_LOCK;
- if (unlikely(action == PUBLICH_TOPIC_FREE)) {
- if (likely(topic)) {
- freez(topic);
- topic = NULL;
- }
+ if (unlikely(!global_base_topic)) {
+ char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
+
+ snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
+ tmp = strchr(tmp_topic, '\n');
+ if (unlikely(tmp))
+ *tmp = '\0';
+ global_base_topic = strdupz(tmp_topic);
+ }
+
+ ACLK_UNLOCK;
+ return global_base_topic;
+}
+
+/*
+ * Build a topic based on sub_topic and final_topic
+ * if the sub topic starts with / assume that is an absolute topic
+ *
+ */
+
+char *get_topic(char *sub_topic, char *final_topic, int max_size)
+{
+ int rc;
+
+ if (likely(sub_topic && sub_topic[0] == '/'))
+ return sub_topic;
+
+ if (unlikely(!global_base_topic))
+ return sub_topic;
- ACLK_UNLOCK;
+ rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
+ if (unlikely(rc >= max_size))
+ debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
+
+ return final_topic;
+}
+
+
+/*
+ * Free a collector structure
+ */
+
+static void _free_collector(struct _collector *collector)
+{
+
+ if (likely(collector->plugin_name))
+ freez(collector->plugin_name);
+
+ if (likely(collector->module_name))
+ freez(collector->module_name);
+
+ if (likely(collector->hostname))
+ freez(collector->hostname);
+
+ freez(collector);
+}
+
+/*
+ * This will report the collector list
+ *
+ */
+#ifdef ACLK_DEBUG
+static void _dump_connector_list()
+{
+
+ struct _collector *tmp_collector;
+
+ COLLECTOR_LOCK;
+
+ info("DUMPING ALL COLLECTORS");
+
+ if (unlikely(!collector_list || !collector_list->next)) {
+ COLLECTOR_UNLOCK;
+ info("DUMPING ALL COLLECTORS -- nothing found");
+ return;
+ }
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+
+ while (tmp_collector) {
+ info(
+ "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
+ tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
+ tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
+
+ tmp_collector = tmp_collector->next;
+
+ }
+ info("DUMPING ALL COLLECTORS DONE");
+ COLLECTOR_UNLOCK;
+}
+#endif
+
+/*
+ * This will cleanup the collector list
+ *
+ */
+static void _reset_connector_list()
+{
+ struct _collector *tmp_collector, *next_collector;
+
+ COLLECTOR_LOCK;
+
+ if (unlikely(!collector_list || !collector_list->next)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+ collector_list->count = 0;
+ collector_list->next = NULL;
+
+ // We broke the link; we can unlock
+ COLLECTOR_UNLOCK;
+
+ while (tmp_collector) {
+ next_collector = tmp_collector->next;
+ _free_collector(tmp_collector);
+ tmp_collector = next_collector;
+ }
+}
+
+
+/*
+ * Find a collector (if it exists)
+ * Must lock before calling this
+ * If last_collector is not null, it will return the previous collector in the linked
+ * list (used in collector delete)
+ */
+static struct _collector *_find_collector(const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
+{
+ struct _collector *tmp_collector, *prev_collector;
+ uint32_t plugin_hash;
+ uint32_t module_hash;
+ uint32_t hostname_hash;
+
+ if (unlikely(!collector_list)) {
+ collector_list = callocz(1, sizeof(struct _collector));
return NULL;
}
- if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
- ACLK_UNLOCK;
- get_publish_base_topic(PUBLICH_TOPIC_FREE);
- return get_publish_base_topic(PUBLICH_TOPIC_GET);
+ if (unlikely(!collector_list->next))
+ return NULL;
+
+ plugin_hash = plugin_name?simple_hash(plugin_name):1;
+ module_hash = module_name?simple_hash(module_name):1;
+ hostname_hash = simple_hash(hostname);
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+ prev_collector = collector_list;
+ while (tmp_collector) {
+ if (plugin_hash == tmp_collector->plugin_hash &&
+ module_hash == tmp_collector->module_hash &&
+ hostname_hash == tmp_collector->hostname_hash &&
+ (!strcmp(hostname, tmp_collector->hostname