summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--aclk/agent_cloud_link.c119
-rw-r--r--aclk/agent_cloud_link.h2
-rw-r--r--aclk/mqtt.c2
-rw-r--r--claim/claim.c4
-rw-r--r--daemon/commands.c4
-rw-r--r--web/api/web_api_v1.c2
6 files changed, 82 insertions, 51 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 30199be2c0..225c773711 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -117,7 +117,12 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
break;
}
if (!strcmp(e->name, "payload")) {
- data->payload = strdupz(e->data.string);
+ if (likely(e->data.string)) {
+ size_t len = strlen(e->data.string);
+ data->payload = mallocz(len+1);
+ if (!url_decode_r(data->payload, e->data.string, len + 1))
+ strcpy(data->payload, e->data.string);
+ }
break;
}
break;
@@ -302,20 +307,19 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
// Ignore all commands while we wait for the agent to initialize
if (unlikely(waiting_init))
- return 0;
+ return 1;
run_after = now_realtime_sec() + run_after;
QUERY_LOCK;
struct aclk_query *last_query = NULL;
- //last_query = NULL;
tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
if (unlikely(tmp_query)) {
if (tmp_query->run_after == run_after) {
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
- return 0;
+ return 1;
}
if (last_query)
@@ -509,7 +513,7 @@ static void _free_collector(struct _collector *collector)
*
*/
#ifdef ACLK_DEBUG
-static void _dump_connector_list()
+static void _dump_collector_list()
{
struct _collector *tmp_collector;
@@ -543,7 +547,7 @@ static void _dump_connector_list()
* This will cleanup the collector list
*
*/
-static void _reset_connector_list()
+static void _reset_collector_list()
{
struct _collector *tmp_collector, *next_collector;
@@ -689,8 +693,10 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
if (unlikely(agent_state == AGENT_INITIALIZING))
last_init_sequence = now_realtime_sec();
- else
- aclk_queue_query("connector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ else {
+ if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
+ }
COLLECTOR_UNLOCK;
}
@@ -724,8 +730,10 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
if (unlikely(agent_state == AGENT_INITIALIZING))
last_init_sequence = now_realtime_sec();
- else
- aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ else {
+ if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
+ }
_free_collector(tmp_collector);
}
@@ -757,12 +765,10 @@ int aclk_execute_query(struct aclk_query *this_query)
aclk_create_header(local_buffer, "http", this_query->msg_id);
- //if (rc != HTTP_RESP_OK || strcmp(mysep ? mysep + 1 : "noop", "badge.svg") == 0)
+ char *encoded_response = aclk_encode_response(w->response.data);
+
buffer_sprintf(
- local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\"\n}", rc, aclk_encode_response(w->response.data)->buffer);
- //else
- // buffer_sprintf(local_buffer, "{\n\"code\": %d,\n\"body\": %s\n}", rc,
- // aclk_encode_response(w->response.data)->buffer);
+ local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\"\n}", rc, encoded_response);
buffer_sprintf(local_buffer, "\n}");
@@ -771,6 +777,7 @@ int aclk_execute_query(struct aclk_query *this_query)
buffer_free(w->response.data);
freez(w);
buffer_free(local_buffer);
+ freez(encoded_response);
return 0;
}
return 1;
@@ -880,7 +887,7 @@ static void aclk_query_thread_cleanup(void *ptr)
COLLECTOR_LOCK;
- _reset_connector_list();
+ _reset_collector_list();
freez(collector_list);
COLLECTOR_UNLOCK;
@@ -908,7 +915,7 @@ void *aclk_query_main_thread(void *ptr)
agent_state = AGENT_STABLE;
info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
#ifdef ACLK_DEBUG
- _dump_connector_list();
+ _dump_collector_list();
#endif
break;
}
@@ -919,7 +926,11 @@ void *aclk_query_main_thread(void *ptr)
while (!netdata_exit) {
if (unlikely(!aclk_metadata_submitted)) {
aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
- aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ errno = 0;
+ error("ACLK failed to queue on_connect command");
+ aclk_metadata_submitted = 0;
+ }
}
aclk_process_queries();
@@ -1303,13 +1314,11 @@ void *aclk_main(void *ptr)
last_init_sequence = now_realtime_sec();
query_thread = NULL;
-
char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
char *aclk_port = NULL;
uint32_t port_num = 0;
char *cloud_base_url = config_get(CONFIG_SECTION_CLOUD, "cloud base url", "https://netdata.cloud");
- if( aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port))
- {
+ if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
error("Configuration error - cannot use agent cloud link");
return NULL;
}
@@ -1376,7 +1385,7 @@ void *aclk_main(void *ptr)
// TODO: Move to on-connect
if (unlikely(!aclk_subscribed)) {
- aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
+ aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
}
if (unlikely(!query_thread)) {
@@ -1416,7 +1425,7 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id)
UNUSED(msg_id);
- if(!aclk_connected)
+ if (!aclk_connected)
return 0;
if (unlikely(!message))
@@ -1541,20 +1550,20 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
*
*/
-BUFFER *aclk_encode_response(BUFFER *contents)
+char *aclk_encode_response(BUFFER *contents)
{
char *tmp_buffer = mallocz(contents->len * 2);
char *src, *dst;
+ size_t content_size = contents->len;
src = contents->buffer;
dst = tmp_buffer;
- while (*src) {
+ while (content_size > 0) {
switch (*src) {
case '\n':
- *dst++ = '\\';
- *dst++ = 'n';
+ case '\t':
break;
- case 0x01 ... 0x09:
+ case 0x01 ... 0x08:
case 0x0b ... 0x1F:
*dst++ = '\\';
*dst++ = '0';
@@ -1563,7 +1572,6 @@ BUFFER *aclk_encode_response(BUFFER *contents)
*dst++ = to_hex(*src);
break;
case '\"':
- case '\'':
*dst++ = '\\';
*dst++ = *src;
break;
@@ -1571,19 +1579,18 @@ BUFFER *aclk_encode_response(BUFFER *contents)
*dst++ = *src;
}
src++;
+ content_size--;
}
*dst = '\0';
- buffer_flush(contents);
- buffer_sprintf(contents, "%s", tmp_buffer);
-
- freez(tmp_buffer);
- return contents;
+ return tmp_buffer;
}
/*
- * This will send the alarms configuration
- * and
+ * This will send alarm information which includes
+ * configured alarms
+ * alarm_log
+ * active alarms
*/
void aclk_send_alarm_metadata()
{
@@ -1611,12 +1618,16 @@ void aclk_send_alarm_metadata()
buffer_sprintf(local_buffer, "\n}\n}");
aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
- debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len);
freez(msg_id);
buffer_free(local_buffer);
}
+/*
+ * This will send the agent metadata
+ * /api/v1/info
+ * charts
+ */
int aclk_send_info_metadata()
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1638,9 +1649,8 @@ int aclk_send_info_metadata()
debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
- debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len);
- freez(msg_id);
+ freez(msg_id);
buffer_free(local_buffer);
return 0;
}
@@ -1691,7 +1701,12 @@ void aclk_alarm_reload()
if (unlikely(agent_state == AGENT_INITIALIZING))
return;
- aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue on_connect command on alarm reload");
+ }
+ }
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
@@ -1742,8 +1757,14 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (unlikely(agent_state == AGENT_INITIALIZING))
last_init_sequence = now_realtime_sec();
- else
- aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd);
+ else {
+ if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue chart_update command");
+ }
+ }
+ }
return 0;
#endif
}
@@ -1779,7 +1800,13 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
buffer_sprintf(local_buffer, "\n}");
- aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM);
+
+ if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue alarm_command on alarm_update");
+ }
+ }
freez(msg_id);
buffer_free(local_buffer);
@@ -1796,6 +1823,7 @@ int aclk_handle_cloud_request(char *payload)
.type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
};
+
if (unlikely(agent_state == AGENT_INITIALIZING)) {
debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
return 0;
@@ -1806,7 +1834,7 @@ int aclk_handle_cloud_request(char *payload)
return 0;
}
- debug(D_ACLK, "ACLK incoming message [%s]", payload);
+ debug(D_ACLK, "ACLK incoming message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
@@ -1835,7 +1863,8 @@ int aclk_handle_cloud_request(char *payload)
return 1;
}
- aclk_submit_request(&cloud_to_agent);
+ if (unlikely(aclk_submit_request(&cloud_to_agent)))
+ debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
// Note: the payload comes from the callback and it will be automatically freed
return 0;
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index df1b85f05b..faf4932f84 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -106,7 +106,7 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
void aclk_alarm_reload();
void aclk_send_alarm_metadata();
int aclk_execute_query(struct aclk_query *query);
-BUFFER *aclk_encode_response(BUFFER *contents);
+char *aclk_encode_response(BUFFER *contents);
unsigned long int aclk_reconnect_delay(int mode);
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_single_update_enable();
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index f29a79b8cb..556d3b34cc 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -322,7 +322,7 @@ int _link_send_message(char *topic, unsigned char *message, int *mid)
return rc;
int msg_len = strlen((char*)message);
- error("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]);
+ info("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]);
rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0);
// TODO: Add better handling -- error will flood the logfile here
diff --git a/claim/claim.c b/claim/claim.c
index a3c947fd93..5d17ae9075 100644
--- a/claim/claim.c
+++ b/claim/claim.c
@@ -41,8 +41,8 @@ extern struct registry registry;
/* rrd_init() must have been called before this function */
void claim_agent(char *claiming_arguments)
{
-#ifndef ENABLE_ACLK
- info("The claiming feature is under development and still subject to change before the next release");
+#ifndef ENABLE_CLOUD
+ info("The claiming feature has been disabled");
return;
#endif
diff --git a/daemon/commands.c b/daemon/commands.c
index c3f1f16dac..71df5a4219 100644
--- a/daemon/commands.c
+++ b/daemon/commands.c
@@ -186,8 +186,10 @@ static cmd_status_t cmd_reload_claiming_state_execute(char *args, char **message
(void)args;
(void)message;
- info("The claiming feature is still in development and subject to change before the next release");
+#ifndef ENABLE_CLOUD
+ info("The claiming feature has been disabled");
return CMD_STATUS_FAILURE;
+#endif
error_log_limit_unlimited();
info("COMMAND: Reloading Agent Claiming configuration.");
diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c
index 478601b68a..c15197fb63 100644
--- a/web/api/web_api_v1.c
+++ b/web/api/web_api_v1.c
@@ -489,7 +489,7 @@ inline int web_client_api_request_v1_data(RRDHOST *host, struct web_client *w, c
st->last_accessed_time = now_realtime_sec();
long long before = (before_str && *before_str)?str2l(before_str):0;
- long long after = (after_str && *after_str) ?str2l(after_str):0;
+ long long after = (after_str && *after_str) ?str2l(after_str):-600;
int points = (points_str && *points_str)?str2i(points_str):0;
long group_time = (group_time_str && *group_time_str)?str2l(group_time_str):0;