// SPDX-License-Identifier: GPL-3.0-or-later
#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"
// Read from the config file -- new section [agent_cloud_link]
// Defaults are supplied
int aclk_recv_maximum = 0; // default 20
int aclk_send_maximum = 0; // default 20
int aclk_port = 0; // default 1883
char *aclk_hostname = NULL; //default localhost
int aclk_subscribed = 0;
int aclk_metadata_submitted = 0;
int waiting_init = 1;
int cmdpause = 0; // Used to pause query processing
BUFFER *aclk_buffer = NULL;
char *global_base_topic = NULL;
int cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
switch(e->type) {
case JSON_OBJECT:
e->callback_function = cloud_to_agent_parse;
break;
case JSON_ARRAY:
e->callback_function = cloud_to_agent_parse;
break;
case JSON_STRING:
if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
data->msg_id = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
data->type_id = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
data->topic = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
data->url = strdupz(e->data.string);
break;
}
break;
case JSON_NUMBER:
if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
data->version = atol(e->data.string);
break;
}
break;
case JSON_BOOLEAN:
break;
case JSON_NULL:
break;
}
return 0;
}
//char *send_http_request(char *host, char *port, char *url, BUFFER *b)
//{
// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
//
// buffer_flush(b);
// buffer_sprintf(
// b,
// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
// url, host);
// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
//
// if (unlikely(sock == -1)) {
// error("Handshake failed");
// return NULL;
// }
//
// SSL_CTX *ctx = security_initialize_openssl_client();
// // Certificate chain: not updating the stores - do we need private CA roots?
// // Calls to SSL_CTX_load_verify_locations would go here.
// SSL *ssl = SSL_new(ctx);
// SSL_set_fd(ssl, sock);
// int err = SSL_connect(ssl);
// SSL_write(ssl, b->buffer, b->len); // Timeout options?
// int bytes_read = SSL_read(ssl, b->buffer, b->len);
// SSL_shutdown(ssl);
// close(sock);
//}
// Set when we have connection up and running from the connection callback
int aclk_connection_initialized = 0;
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
struct aclk_query {
time_t created;
time_t run_after; // Delay run until after this time
char *topic; // Topic to respond to
char *data; // Internal data (NULL if request from the cloud)
char *msg_id; // msg_id generated by the cloud (NULL if internal)
char *query; // The actual query
u_char deleted; // Mark deleted for garbage collect
struct aclk_query *next;
};
struct aclk_query_queue {
struct aclk_query *aclk_query_head;
struct aclk_query *aclk_query_tail;
u_int64_t count;
} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
/*
* Free a query structure when done
*/
void aclk_query_free(struct aclk_query *this_query)
{
if (unlikely(!this_query))
return;
freez(this_query->topic);
freez(this_query->query);
if (this_query->data)
freez(this_query->data);
if (this_query->msg_id)
freez(this_query->msg_id);<