diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-02-24 12:10:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-24 12:10:10 +0100 |
commit | c6d945200f201b05c2b019fa862cdf080a39a9d4 (patch) | |
tree | f061f87ff75a898ee7de82018a2c18935ada09cf | |
parent | eeff346ca13af40091980d95de79ad5df50592e1 (diff) |
Merging the feature branch for the ACLK in the previous sprint. (#8179)
* ACLK connection and protocol improvements (#8139)
* Adding ACLK retry on connection failure (#8147)
* Fixed reconnect issues on the ACLK. (#8163)
* Cleaning up ACLK - part 1 (#8167)
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
25 files changed, 1286 insertions, 679 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c index a4a21f45bf..2a4badd4ab 100644 --- a/aclk/aclk_lws_wss_client.c +++ b/aclk/aclk_lws_wss_client.c @@ -161,8 +161,13 @@ void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) { #endif } -void _aclk_wss_connect(struct aclk_lws_wss_engine_instance *inst){ - struct lws_client_connect_info i; +void aclk_lws_wss_connect(struct aclk_lws_wss_engine_instance *inst){ + struct lws_client_connect_info i; + + if(inst->lws_wsi) { + error("Already Connected. Only one connection supported at a time."); + return; + } memset(&i, 0, sizeof(i)); i.context = inst->lws_context; @@ -186,7 +191,37 @@ static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data, } return 1; } - + +static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) +{ + switch(reason) + { + case LWS_CALLBACK_CLIENT_WRITEABLE: + return "LWS_CALLBACK_CLIENT_WRITEABLE"; + case LWS_CALLBACK_CLIENT_RECEIVE: + return "LWS_CALLBACK_CLIENT_RECEIVE"; + case LWS_CALLBACK_PROTOCOL_INIT: + return "LWS_CALLBACK_PROTOCOL_INIT"; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"; + case LWS_CALLBACK_USER: + return "LWS_CALLBACK_USER"; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR"; + case LWS_CALLBACK_CLIENT_CLOSED: + return "LWS_CALLBACK_CLIENT_CLOSED"; + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE"; + case LWS_CALLBACK_WSI_DESTROY: + return "LWS_CALLBACK_WSI_DESTROY"; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + return "LWS_CALLBACK_CLIENT_ESTABLISHED"; + default: + // Not using an internal buffer here for thread-safety with unknown calling context. + error("Unknown LWS callback %u", reason); + return "unknown"; + } +} static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -201,6 +236,7 @@ aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, return -1; } + // Callback servicing is forced when we are closed from above. if( inst->upstream_reconnect_request ) { error("Closing lws connectino due to libmosquitto error."); char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; @@ -209,74 +245,83 @@ aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, inst->upstream_reconnect_request = 0; } + // Don't log to info - volume is proportional to message flow on ACLK. + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + aclk_lws_mutex_lock(&inst->write_buf_mutex); + data = lws_wss_packet_buffer_pop(&inst->write_buffer_head); + if(likely(data)) { + lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY); + lws_wss_packet_buffer_free(data); + if(inst->write_buffer_head) + lws_callback_on_writable(inst->lws_wsi); + } + aclk_lws_mutex_unlock(&inst->write_buf_mutex); + return retval; + + case LWS_CALLBACK_CLIENT_RECEIVE: + aclk_lws_mutex_lock(&inst->read_buf_mutex); + if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len)) + retval = 1; + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + + if(likely(inst->callbacks.data_rcvd_callback)) + // to future myself -> do not call this while read lock is active as it will eventually + // want to acquire same lock later in aclk_lws_wss_client_read() function + inst->callbacks.data_rcvd_callback(); + else + inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read + return retval; + + case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_GET_THREAD_ID: // ? + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + // Expected and safe to ignore. + return retval; + + default: + // Pass to next switch, this case removes compiler warnings. + break; + + } + // Log to info - volume is proportional to connection attempts. + info("Processing callback %s", aclk_lws_callback_name(reason)); switch (reason) { - case LWS_CALLBACK_CLIENT_WRITEABLE: - aclk_lws_mutex_lock(&inst->write_buf_mutex); - data = lws_wss_packet_buffer_pop(&inst->write_buffer_head); - if(likely(data)) { - lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY); - lws_wss_packet_buffer_free(data); - if(inst->write_buffer_head) - lws_callback_on_writable(inst->lws_wsi); - } - aclk_lws_mutex_unlock(&inst->write_buf_mutex); - break; - case LWS_CALLBACK_CLIENT_RECEIVE: - aclk_lws_mutex_lock(&inst->read_buf_mutex); - if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len)) - retval = 1; - aclk_lws_mutex_unlock(&inst->read_buf_mutex); - - if(likely(inst->callbacks.data_rcvd_callback)) - // to future myself -> do not call this while read lock is active as it will eventually - // want to acquire same lock later in aclk_lws_wss_client_read() function - inst->callbacks.data_rcvd_callback(); - else - inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read - break; case LWS_CALLBACK_PROTOCOL_INIT: - //initial connection here - //later we will reconnect with delay od ACLK_LWS_WSS_RECONNECT_TIMEOUT - //in case this connection fails or drops - _aclk_wss_connect(inst); - break; - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - //TODO if already active make some error noise - //currently we expect only one connection per netdata + aclk_lws_wss_connect(inst); // Makes the outgoing connection + break; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + if (inst->lws_wsi != NULL && inst->lws_wsi != wsi) + error("Multiple connections on same WSI? %p vs %p", inst->lws_wsi, wsi); inst->lws_wsi = wsi; break; -#ifdef AUTO_RECONNECT_ON_LWS_LAYER - case LWS_CALLBACK_USER: - inst->reconnect_timeout_running = 0; - _aclk_wss_connect(inst); - break; -#endif case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given")); - /* FALLTHRU */ + // Fall-through case LWS_CALLBACK_CLIENT_CLOSED: case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: -#ifdef AUTO_RECONNECT_ON_LWS_LAYER - if(!inst->reconnect_timeout_running) { - lws_timed_callback_vh_protocol(lws_get_vhost(wsi), - lws_get_protocol(wsi), - LWS_CALLBACK_USER, ACLK_LWS_WSS_RECONNECT_TIMEOUT); - inst->reconnect_timeout_running = 1; - } - /* FALLTHRU */ -#endif - //no break here on purpose we want to continue with LWS_CALLBACK_WSI_DESTROY + inst->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback + if (inst->callbacks.connection_closed) + inst->callbacks.connection_closed(); + return -1; // the callback response is ignored, hope the above remains true case LWS_CALLBACK_WSI_DESTROY: aclk_lws_wss_clear_io_buffers(inst); inst->lws_wsi = NULL; inst->websocket_connection_up = 0; - break; + if (inst->callbacks.connection_closed) + inst->callbacks.connection_closed(); + break; case LWS_CALLBACK_CLIENT_ESTABLISHED: inst->websocket_connection_up = 1; if(inst->callbacks.connection_established_callback) inst->callbacks.connection_established_callback(); break; + default: + error("Unexecpted callback from libwebsockets %s",aclk_lws_callback_name(reason)); break; } return retval; //0-OK, other connection should be closed! diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h index 00d51c3985..27c1b96153 100644 --- a/aclk/aclk_lws_wss_client.h +++ b/aclk/aclk_lws_wss_client.h @@ -30,6 +30,7 @@ struct aclk_lws_wss_engine_callbacks { void (*connection_established_callback)(); void (*data_rcvd_callback)(); void (*data_writable_callback)(); + void (*connection_closed)(); }; struct lws_wss_packet_buffer; @@ -57,14 +58,7 @@ struct aclk_lws_wss_engine_instance { int websocket_connection_up; // currently this is by default disabled -// as decision has been made that reconnection -// will have to be done from top layer -// (after getting the new MQTT auth data) -// for now i keep it here as it is usefull for -// some of my internall testing -#ifdef AUTO_RECONNECT_ON_LWS_LAYER - int reconnect_timeout_running; -#endif + int data_to_read; int upstream_reconnect_request; }; @@ -72,6 +66,8 @@ struct aclk_lws_wss_engine_instance { struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port); void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst); +void aclk_lws_wss_connect(struct aclk_lws_wss_engine_instance *inst); + int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count); int aclk_lws_wss_client_read (struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count); int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst); diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 2f75a65009..dca6ca7fcd 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -5,19 +5,30 @@ // Read from the config file -- new section [agent_cloud_link] // Defaults are supplied -int aclk_recv_maximum = 0; // default 20 -int aclk_send_maximum = 0; // default 20 -int aclk_port = 0; // default 1883 -char *aclk_hostname = NULL; //default localhost +int aclk_port = ACLK_DEFAULT_PORT; +char *aclk_hostname = ACLK_DEFAULT_HOST; int aclk_subscribed = 0; +int aclk_disable_single_updates = 0; int aclk_metadata_submitted = 0; +int agent_state = 0; +time_t last_init_sequence = 0; int waiting_init = 1; -int cmdpause = 0; // Used to pause query processing -BUFFER *aclk_buffer = NULL; char *global_base_topic = NULL; +int aclk_connecting = 0; + +char *create_uuid() +{ + uuid_t uuid; + char *uuid_str = mallocz(36 + 1); + + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + + return uuid_str; +} int cloud_to_agent_parse(JSON_ENTRY *e) { @@ -25,11 +36,8 @@ int cloud_to_agent_parse(JSON_ENTRY *e) switch(e->type) { case JSON_OBJECT: - e->callback_function = cloud_to_agent_parse; - break; case JSON_ARRAY: - e->callback_function = cloud_to_agent_parse; - break; + break; case JSON_STRING: if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) { data->msg_id = strdupz(e->data.string); @@ -40,17 +48,17 @@ int cloud_to_agent_parse(JSON_ENTRY *e) break; } if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) { - data->topic = strdupz(e->data.string); + data->callback_topic = strdupz(e->data.string); break; } if (!strcmp(e->name, ACLK_JSON_IN_URL)) { - data->url = strdupz(e->data.string); + data->payload = strdupz(e->data.string); break; } break; case JSON_NUMBER: if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) { - data->version = atol(e->data.string); + data->version = atoi(e->original_string); break; } break; @@ -64,33 +72,6 @@ int cloud_to_agent_parse(JSON_ENTRY *e) return 0; } -//char *send_http_request(char *host, char *port, char *url, BUFFER *b) -//{ -// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; -// -// buffer_flush(b); -// buffer_sprintf( -// b, -// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n", -// url, host); -// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout); -// -// if (unlikely(sock == -1)) { -// error("Handshake failed"); -// return NULL; -// } -// -// SSL_CTX *ctx = security_initialize_openssl_client(); -// // Certificate chain: not updating the stores - do we need private CA roots? -// // Calls to SSL_CTX_load_verify_locations would go here. -// SSL *ssl = SSL_new(ctx); -// SSL_set_fd(ssl, sock); -// int err = SSL_connect(ssl); -// SSL_write(ssl, b->buffer, b->len); // Timeout options? -// int bytes_read = SSL_read(ssl, b->buffer, b->len); -// SSL_shutdown(ssl); -// close(sock); -//} // Set when we have connection up and running from the connection callback int aclk_connection_initialized = 0; @@ -101,10 +82,14 @@ int aclk_mqtt_connected = 0; static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) +#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) +#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) + #define QUERY_LOCK netdata_mutex_lock(&query_mutex) #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) @@ -116,14 +101,35 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) +/* + * Maintain a list of collectors and chart count + * If all the charts of a collector are deleted + * then a new metadata dataset must be send to the cloud + * + */ +struct _collector { + time_t created; + u_int32_t count; //chart count + u_int32_t hostname_hash; + u_int32_t plugin_hash; + u_int32_t module_hash; + char *hostname; + char *plugin_name; + char *module_name; + struct _collector *next; +}; + +struct _collector *collector_list = NULL; + struct aclk_query { time_t created; time_t run_after; // Delay run until after this time + ACLK_CMD cmd; // What command is this char *topic; // Topic to respond to char *data; // Internal data (NULL if request from the cloud) char *msg_id; // msg_id generated by the cloud (NULL if internal) char *query; // The actual query - u_char deleted; // Mark deleted for garbage collect + u_char deleted; // Mark deleted for garbage collect struct aclk_query *next; }; @@ -134,6 +140,41 @@ struct aclk_query_queue { } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; /* + * After a connection failure -- delay in milliseconds + * When a connection is established, the delay function + * should be called with + * + * mode 0 to reset the delay + * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * + */ +unsigned long int aclk_reconnect_delay(int mode) +{ + static int fail = -1; + unsigned long int delay; + + if (!mode || fail == -1) { + srandom(time(NULL)); + fail = mode-1; + return 0; + } + + delay = (1 << fail); + + if (delay >= ACLK_MAX_BACKOFF_DELAY) { + delay = ACLK_MAX_BACKOFF_DELAY * 1000; + } + else { + fail++; + delay = (delay * 1000) + (random() % 1000); + } + +// sleep_usec(USEC_PER_MS * delay); + + return delay; +} + +/* * Free a query structure when done */ @@ -143,23 +184,29 @@ void aclk_query_free(struct aclk_query *this_query) return; freez(this_query->topic); - freez(this_query->query); - if (this_query->data) + if (likely(this_query->query)) + freez(this_query->query); + if (likely(this_query->data)) freez(this_query->data); - if (this_query->msg_id) + if (likely(this_query->msg_id)) freez(this_query->msg_id); freez(this_query); - return; } // Returns the entry after which we need to create a new entry to run at the specified time // If NULL is returned we need to add to HEAD -// Called with locked entries +// Need to have a QUERY lock before calling this struct aclk_query *aclk_query_find_position(time_t time_to_run) { struct aclk_query *tmp_query, *last_query; + // Quick check if we will add to the end + if (likely(aclk_queue.aclk_query_tail)) { + if (aclk_queue.aclk_query_tail->run_after <= time_to_run) + return aclk_queue.aclk_query_tail; + } + last_query = NULL; tmp_query = aclk_queue.aclk_query_head; @@ -172,21 +219,27 @@ struct aclk_query *aclk_query_find_position(time_t time_to_run) return last_query; } -// Need to have a lock before calling this -struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query) +// Need to have a QUERY lock before calling this +struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) { - struct aclk_query *tmp_query; + struct aclk_query *tmp_query, *prev_query; + UNUSED(cmd); tmp_query = aclk_queue.aclk_query_head; - + prev_query = NULL; while (tmp_query) { if (likely(!tmp_query->deleted)) { - if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) { + if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && - (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { + + if (likely(last_query)) + *last_query = prev_query; return tmp_query; + } } } + prev_query = tmp_query; tmp_query = tmp_query->next; } return NULL; @@ -196,7 +249,7 @@ struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char * * Add a query to execute, the result will be send to the specified topic */ -int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal) +int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; @@ -204,23 +257,42 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run if (unlikely(waiting_init)) return 0; + // Ignore all commands if agent not stable and reset the last_init_sequence mark + if (agent_state == 0) { + last_init_sequence = now_realtime_sec(); + return 0; + } + run_after = now_realtime_sec() + run_after; QUERY_LOCK; - tmp_query = aclk_query_find(topic, data, msg_id, query); + struct aclk_query *last_query = NULL; + + //last_query = NULL; + tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); if (unlikely(tmp_query)) { if (tmp_query->run_after == run_after) { QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } - tmp_query->deleted = 1; + + if (last_query) + last_query->next = tmp_query->next; + else + aclk_queue.aclk_query_head = tmp_query->next; + + debug(D_ACLK, "Removing double entry"); + aclk_query_free(tmp_query); + aclk_queue.count--; } new_query = callocz(1, sizeof(struct aclk_query)); + new_query->cmd = aclk_cmd; if (internal) { new_query->topic = strdupz(topic); - new_query->query = strdupz(query); + if (likely(query)) + new_query->query = strdupz(query); } else { new_query->topic = topic; new_query->query = query; @@ -234,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run new_query->created = now_realtime_sec(); new_query->run_after = run_after; - info("Added query (%s) (%s)", topic, query); + debug(D_ACLK, "Added query (%s) (%s)", topic, query?query:""); tmp_query = aclk_query_find_position(run_after); @@ -256,29 +328,11 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; - -// if (likely(aclk_queue.aclk_query_tail)) { -// aclk_queue.aclk_query_tail->next = new_query; -// aclk_queue.aclk_query_tail = new_query; -// aclk_queue.count++; -// QUERY_UNLOCK; -// return 0; -// } -// -// if (likely(!aclk_queue.aclk_query_head)) { -// aclk_queue.aclk_query_head = new_query; -// aclk_queue.aclk_query_tail = new_query; -// aclk_queue.count++; -// QUERY_UNLOCK; -// return 0; -// } -// QUERY_UNLOCK; -// return 0; } inline int aclk_submit_request(struct aclk_request *request) { - return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0); + return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD); } /* @@ -303,7 +357,27 @@ struct aclk_query *aclk_queue_pop() this_query = aclk_queue.aclk_query_head; - if (this_query->run_after > now_realtime_sec()) { + // Get rid of the deleted entries + while (this_query && this_query->deleted) { + aclk_queue.count--; + + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + aclk_query_free(this_query); + + this_query = aclk_queue.aclk_query_head; + } + + if (likely(!this_query)) { + QUERY_UNLOCK; + return NULL; + } + + if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); QUERY_UNLOCK; return NULL; @@ -327,59 +401,295 @@ struct aclk_query *aclk_queue_pop() // Need to check if additional logic should be added to make sure that there // is enough information to determine the base topic at init time -// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch -// that on the fly -char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action) +char *create_publish_base_topic() { - static char *topic = NULL; - if (unlikely(!is_agent_claimed())) return NULL; ACLK_LOCK; - if (unlikely(action == PUBLICH_TOPIC_FREE)) { - if (likely(topic)) { - freez(topic); - topic = NULL; - } + if (unlikely(!global_base_topic)) { + char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; + + snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); + tmp = strchr(tmp_topic, '\n'); + if (unlikely(tmp)) + *tmp = '\0'; + global_base_topic = strdupz(tmp_topic); + } + + ACLK_UNLOCK; + return global_base_topic; +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + int rc; + + if (likely(sub_topic && sub_topic[0] == '/')) + return sub_topic; + + if (unlikely(!global_base_topic)) + return sub_topic; - ACLK_UNLOCK; + rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + if (unlikely(rc >= max_size)) + debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); + + return final_topic; +} + + +/* + * Free a collector structure + */ + +static void _free_collector(struct _collector *collector) +{ + + if (likely(collector->plugin_name)) + freez(collector->plugin_name); + + if (likely(collector->module_name)) + freez(collector->module_name); + + if (likely(collector->hostname)) + freez(collector->hostname); + + freez(collector); +} + +/* + * This will report the collector list + * + */ +#ifdef ACLK_DEBUG +static void _dump_connector_list() +{ + + struct _collector *tmp_collector; + + COLLECTOR_LOCK; + + info("DUMPING ALL COLLECTORS"); + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + info("DUMPING ALL COLLECTORS -- nothing found"); + return; + } + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + + while (tmp_collector) { + info( + "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, + tmp_collector->plugin_name ? tmp_collector->plugin_name : "", + tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); + + tmp_collector = tmp_collector->next; + + } + info("DUMPING ALL COLLECTORS DONE"); + COLLECTOR_UNLOCK; +} +#endif + +/* + * This will cleanup the collector list + * + */ +static void _reset_connector_list() +{ + struct _collector *tmp_collector, *next_collector; + + COLLECTOR_LOCK; + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + collector_list->count = 0; + collector_list->next = NULL; + + // We broke the link; we can unlock + COLLECTOR_UNLOCK; + + while (tmp_collector) { + next_collector = tmp_collector->next; + _free_collector(tmp_collector); + tmp_collector = next_collector; + } +} + + +/* + * Find a collector (if it exists) + * Must lock before calling this + * If last_collector is not null, it will return the previous collector in the linked + * list (used in collector delete) + */ +static struct _collector *_find_collector(const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) +{ + struct _collector *tmp_collector, *prev_collector; + uint32_t plugin_hash; + uint32_t module_hash; + uint32_t hostname_hash; + + if (unlikely(!collector_list)) { + collector_list = callocz(1, sizeof(struct _collector)); return NULL; } - if (unlikely(action == PUBLICH_TOPIC_REBUILD)) { - ACLK_UNLOCK; - get_publish_base_topic(PUBLICH_TOPIC_FREE); - return get_publish_base_topic(PUBLICH_TOPIC_GET); + if (unlikely(!collector_list->next)) + return NULL; + + plugin_hash = plugin_name?simple_hash(plugin_name):1; + module_hash = module_name?simple_hash(module_name):1; + hostname_hash = simple_hash(hostname); + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + prev_collector = collector_list; + while (tmp_collector) { + if (plugin_hash == tmp_collector->plugin_hash && + module_hash == tmp_collector->module_hash && + hostname_hash == tmp_collector->hostname_hash && + (!strcmp(hostname, tmp_collector->hostname |