summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-03-09 14:08:20 +0100
committerGitHub <noreply@github.com>2022-03-09 14:08:20 +0100
commitd8aba23d0f471ccb3f0107c4aac119cf35753f94 (patch)
tree0d2d3632e4a04058912378fb90286634cedfb612
parent5b8737361aac20a9887ae97b95a861d7c4f3deab (diff)
Adds more info to aclk-state API call (#12231)
-rw-r--r--aclk/aclk.c272
-rw-r--r--database/sqlite/sqlite_aclk_alert.c41
-rw-r--r--database/sqlite/sqlite_aclk_alert.h10
-rw-r--r--database/sqlite/sqlite_aclk_chart.c86
-rw-r--r--database/sqlite/sqlite_aclk_chart.h17
-rw-r--r--web/api/netdata-swagger.json19
-rw-r--r--web/api/netdata-swagger.yaml18
7 files changed, 427 insertions, 36 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index a2159d9179..ac6db78ac1 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -24,6 +24,8 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+int aclk_rcvd_cloud_msgs = 0;
+int aclk_connection_counter = 0;
int disconnect_req = 0;
int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
@@ -43,8 +45,6 @@ struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_rcvd = 0
};
-//ENDTODO
-
static RSA *aclk_private_key = NULL;
static int load_private_key()
{
@@ -266,6 +266,7 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t
}
static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
+ aclk_rcvd_cloud_msgs++;
if (aclk_use_new_cloud_arch)
msg_callback_new_protocol(topic, msg, msglen, qos);
else
@@ -402,6 +403,8 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
+ aclk_rcvd_cloud_msgs = 0;
+ aclk_connection_counter++;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch) {
@@ -1117,6 +1120,64 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con
aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status)) {
+ buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tLast Acked Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tLast Submitted Seq ID: %"PRIu64,
+ status.alert_updates,
+ status.alerts_batch_id,
+ status.last_acked_sequence_id,
+ status.pending_min_sequence_id,
+ status.pending_max_sequence_id,
+ status.last_submitted_sequence_id
+ );
+}
+
+static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
+ if (!stats) {
+ buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tMin Seq ID: %"PRIu64
+ "\n\t\tMax Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tSent Min Seq ID: %"PRIu64
+ "\n\t\tSent Max Seq ID: %"PRIu64
+ "\n\t\tAcked Min Seq ID: %"PRIu64
+ "\n\t\tAcked Max Seq ID: %"PRIu64,
+ stats->updates,
+ stats->batch_id,
+ stats->min_seqid,
+ stats->max_seqid,
+ stats->min_seqid_pend,
+ stats->max_seqid_pend,
+ stats->min_seqid_sent,
+ stats->max_seqid_sent,
+ stats->min_seqid_ack,
+ stats->max_seqid_ack
+ );
+ freez(stats);
+}
+#endif
+
char *ng_aclk_state(void)
{
BUFFER *wb = buffer_create(1024);
@@ -1124,46 +1185,160 @@ char *ng_aclk_state(void)
buffer_strcat(wb,
"ACLK Available: Yes\n"
- "ACLK Implementation: Next Generation\n"
+ "ACLK Version: 2\n"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- "New Cloud Protocol Support: Yes\n"
+ "Protocols Supported: Legacy, Protobuf\n"
#else
- "New Cloud Protocol Support: No\n"
+ "Protocols Supported: Legacy\n"
#endif
- "Claimed: "
);
+ buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
buffer_strcat(wb, "No\n");
else {
- buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
freez(agent_id);
}
- buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy");
+ buffer_sprintf(wb, "Online: %s\nReconnect count: %d\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
+
+ if (aclk_connected) {
+ buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
+
+ buffer_strcat(wb, "\tClaimed ID: ");
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ buffer_strcat(wb, host->aclk_state.claimed_id);
+ else
+ buffer_strcat(wb, "null");
+ rrdhost_aclk_state_unlock(host);
+
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ buffer_strcat(wb, "\n\tNode ID: null\n");
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
+ }
+
+ buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
+
+ if (host != localhost)
+ buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
+
+ buffer_strcat(wb, "\n\tAlert Streaming Status:");
+ fill_alert_status_for_host(wb, host);
+
+ buffer_strcat(wb, "\n\tChart Streaming Status:");
+ fill_chart_status_for_host(wb, host);
+ }
+ rrd_unlock();
+#endif
+ }
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status))
+ return;
+
+ json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(status.alerts_batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(status.last_acked_sequence_id);
+ json_object_object_add(obj, "last-acked-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_min_sequence_id);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_max_sequence_id);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(status.last_submitted_sequence_id);
+ json_object_object_add(obj, "last-submitted-seq-id", tmp);
+}
+
+static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
+ if (!stats)
+ return;
+
+ json_object *tmp = json_object_new_int(stats->updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(stats->batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid);
+ json_object_object_add(obj, "min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid);
+ json_object_object_add(obj, "max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_pend);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_pend);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_sent);
+ json_object_object_add(obj, "sent-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_sent);
+ json_object_object_add(obj, "sent-max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_ack);
+ json_object_object_add(obj, "acked-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_ack);
+ json_object_object_add(obj, "acked-max-seq-id", tmp);
+
+ freez(stats);
+}
+#endif
+
char *ng_aclk_state_json(void)
{
- json_object *tmp, *msg = json_object_new_object();
+ json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
json_object_object_add(msg, "aclk-available", tmp);
- tmp = json_object_new_string("Next Generation");
- json_object_object_add(msg, "aclk-implementation", tmp);
+ tmp = json_object_new_int(2);
+ json_object_object_add(msg, "aclk-version", tmp);
+ grp = json_object_new_array();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- tmp = json_object_new_boolean(1);
+ tmp = json_object_new_string("Legacy");
+ json_object_array_add(grp, tmp);
+ tmp = json_object_new_string("Protobuf");
+ json_object_array_add(grp, tmp);
#else
- tmp = json_object_new_boolean(0);
+ tmp = json_object_new_string("Legacy");
+ json_object_array_add(grp, tmp);
#endif
- json_object_object_add(msg, "new-cloud-protocol-supported", tmp);
+ json_object_object_add(msg, "protocols-supported", grp);
char *agent_id = is_agent_claimed();
tmp = json_object_new_boolean(agent_id != NULL);
@@ -1176,12 +1351,79 @@ char *ng_aclk_state_json(void)
tmp = NULL;
json_object_object_add(msg, "claimed-id", tmp);
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
+ json_object_object_add(msg, "cloud-url", tmp);
+
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
- tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy");
+ tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
json_object_object_add(msg, "used-cloud-protocol", tmp);
+ tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
+ json_object_object_add(msg, "received-app-layer-msgs", tmp);
+
+ tmp = json_object_new_int(aclk_pubacks_per_conn);
+ json_object_object_add(msg, "received-mqtt-pubacks", tmp);
+
+ tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
+ json_object_object_add(msg, "reconnect-count", tmp);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ grp = json_object_new_array();
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ json_object *nodeinstance = json_object_new_object();
+
+ tmp = json_object_new_string(host->hostname);
+ json_object_object_add(nodeinstance, "hostname", tmp);
+
+ tmp = json_object_new_string(host->machine_guid);
+ json_object_object_add(nodeinstance, "mguid", tmp);
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id) {
+ tmp = json_object_new_string(host->aclk_state.claimed_id);
+ json_object_object_add(nodeinstance, "claimed_id", tmp);
+ } else
+ json_object_object_add(nodeinstance, "claimed_id", NULL);
+ rrdhost_aclk_state_unlock(host);
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ json_object_object_add(nodeinstance, "node-id", NULL);
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ tmp = json_object_new_string(node_id);
+ json_object_object_add(nodeinstance, "node-id", tmp);
+ }
+
+ tmp = json_object_new_int(host->system_info->hops);
+ json_object_object_add(nodeinstance, "streaming-hops", tmp);
+
+ tmp = json_object_new_string(host == localhost ? "self" : "child");
+ json_object_object_add(nodeinstance, "relationship", tmp);
+
+ tmp = json_object_new_boolean((host->receiver || host == localhost));
+ json_object_object_add(nodeinstance, "streaming-online", tmp);
+
+ tmp = json_object_new_object();
+ fill_alert_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "alert-sync-status", tmp);
+
+ tmp = json_object_new_object();
+ fill_chart_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "chart-sync-status", tmp);
+
+ json_object_array_add(grp, nodeinstance);
+ }
+ rrd_unlock();
+ json_object_object_add(msg, "node-instances", grp);
+#endif
+
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index d6773b3dc7..f57111408f 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -908,3 +908,44 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
UNUSED(host);
#endif
}
+
+int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
+{
+ int rc;
+ struct aclk_database_worker_config *wc = NULL;
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (!wc)
+ return 1;
+
+ proto_alert_status->alert_updates = wc->alert_updates;
+ proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
+
+ BUFFER *sql = buffer_create(1024);
+ sqlite3_stmt *res = NULL;
+
+ buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
+ "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
+ "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
+ "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to get alert log status from the database.");
+ buffer_free(sql);
+ return 1;
+ }
+
+ while (sqlite3_step(res) == SQLITE_ROW) {
+ proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
+ proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
+ proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
+ proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
+
+ buffer_free(sql);
+ return 0;
+}
diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h
index 1aaaa5d23a..957cb94ac8 100644
--- a/database/sqlite/sqlite_aclk_alert.h
+++ b/database/sqlite/sqlite_aclk_alert.h
@@ -5,6 +5,15 @@
extern sqlite3 *db_meta;
+struct proto_alert_status {
+ int alert_updates;
+ uint64_t alerts_batch_id;
+ uint64_t last_acked_sequence_id;
+ uint64_t pending_min_sequence_id;
+ uint64_t pending_max_sequence_id;
+ uint64_t last_submitted_sequence_id;
+};
+
int aclk_add_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_send_alarm_health_log(char *node_id);
@@ -16,5 +25,6 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host);
void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id);
+int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status);
#endif //NETDATA_SQLITE_ACLK_ALERT_H
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c
index f4e8aea23a..f6f3ebeb32 100644
--- a/database/sqlite/sqlite_aclk_chart.c
+++ b/database/sqlite/sqlite_aclk_chart.c
@@ -1014,6 +1014,91 @@ void aclk_send_dimension_update(RRDDIM *rd)
return;
}
+#define SQL_SEQ_NULL(result, n) sqlite3_column_type(result, n) == SQLITE_NULL ? 0 : sqlite3_column_int64(result, n)
+
+struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *aclk_statistics = NULL;
+
+ struct aclk_database_worker_config *wc = NULL;
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ if (!wc)
+ return NULL;
+
+ aclk_statistics = callocz(1, sizeof(struct aclk_chart_sync_stats));
+
+ aclk_statistics->updates = wc->chart_updates;
+ aclk_statistics->batch_id = wc->batch_id;
+
+ char host_uuid_fixed[GUID_LEN + 1];
+
+ strncpy(host_uuid_fixed, host->machine_guid, GUID_LEN);
+ host_uuid_fixed[GUID_LEN] = 0;
+
+ host_uuid_fixed[8] = '_';
+ host_uuid_fixed[13] = '_';
+ host_uuid_fixed[18] = '_';
+ host_uuid_fixed[23] = '_';
+
+ sqlite3_stmt *res = NULL;
+ BUFFER *sql = buffer_create(1024);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_submitted IS NULL;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_submitted IS NOT NULL;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_updated IS NOT NULL;", host_uuid_fixed);
+ buffer_sprintf(sql, "SELECT max(date_created), max(date_submitted), max(date_updated), 0 FROM aclk_chart_%s;", host_uuid_fixed);
+
+ int rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ buffer_free(sql);
+ freez(aclk_statistics);
+ return NULL;
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_pend = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_pend = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_sent = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_sent = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_ack = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_ack = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->min_seqid_ack = SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_seqid_ack = SQL_SEQ_NULL(res, 1);
+ }
+
+ rc = sqlite3_step(res);
+ if (rc == SQLITE_ROW) {
+ aclk_statistics->max_date_created = (time_t) SQL_SEQ_NULL(res, 0);
+ aclk_statistics->max_date_submitted = (time_t) SQL_SEQ_NULL(res, 1);
+ aclk_statistics->max_date_ack = (time_t) SQL_SEQ_NULL(res, 2);
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when fetching aclk sync statistics, rc = %d", rc);
+
+ buffer_free(sql);
+ return aclk_statistics;
+}
#endif //ENABLE_NEW_CLOUD_PROTOCOL
// ST is read locked
@@ -1035,4 +1120,3 @@ int queue_chart_to_aclk(RRDSET *st)
st, ACLK_DATABASE_ADD_CHART);
#endif
}
-
diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h
index 370af4729a..fee5ecca2f 100644
--- a/database/sqlite/sqlite_aclk_chart.h
+++ b/database/sqlite/sqlite_aclk_chart.h
@@ -16,6 +16,22 @@ extern sqlite3 *db_meta;
#define RRDSET_MINIMUM_LIVE_MULTIPLIER (1.5)
#endif
+struct aclk_chart_sync_stats {
+ int updates;
+ uint64_t batch_id;
+ uint64_t min_seqid;
+ uint64_t max_seqid;
+ uint64_t min_seqid_pend;
+ uint64_t max_seqid_pend;
+ uint64_t min_seqid_sent;
+ uint64_t max_seqid_sent;
+ uint64_t min_seqid_ack;
+ uint64_t max_seqid_ack;
+ time_t max_date_created;
+ time_t max_date_submitted;
+ time_t max_date_ack;
+};
+
extern int queue_chart_to_aclk(RRDSET *st);
extern int queue_dimension_to_aclk(RRDDIM *rd);
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
@@ -35,4 +51,5 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
void aclk_send_dimension_update(RRDDIM *rd);
+struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host);
#endif //NETDATA_SQLITE_ACLK_CHART_H
diff --git a/web/api/netdata-swagger.json b/web/api/netdata-swagger.json
index cf2aca4c00..410ce19ced 100644
--- a/web/api/netdata-swagger.json
+++ b/web/api/netdata-swagger.json
@@ -2085,17 +2085,16 @@
"type": "boolean",
"description": "Describes whether this agent is capable of connection to the Cloud. False means agent has been built without ACLK component either on purpose (user choice) or due to missing dependency."
},
- "aclk-implementation": {
- "type": "string",
- "description": "Describes which ACLK implementation is currently used.",
- "enum": [
- "Next Generation",
- "Legacy"
- ]
+ "aclk-version": {
+ "type": "integer",
+ "description": "Describes which ACLK version is currently used."
},
- "new-cloud-protocol-supported": {
- "type": "boolean",
- "description": "Informs about new protobuf based Cloud/Agent protocol support of this agent. If false agent has to be compiled with protobuf and protoc available."
+ "protocols-supported": {
+ "type": "array",
+ "description": "List of supported protocols for communication with Cloud.",
+ "items": {
+ "type": "string"
+ }
},
"agent-claimed": {
"type": "boolean",
diff --git a/web/api/netdata-swagger.yaml b/web/api/netdata-swagger.yaml
index 7fc736b644..194e47747a 100644
--- a/web/api/netdata-swagger.yaml
+++ b/web/api/netdata-swagger.yaml
@@ -1628,16 +1628,14 @@ components:
type: string
description: Describes whether this agent is capable of connection to the Cloud.
False means agent has been built without ACLK component either on purpose (user choice) or due to missing dependency.
- aclk-implementation:
- type: string
- description: Describes which ACLK implementation is currently used.
- enum:
- - Next Generation
- - Legacy
- new-cloud-protocol-supported:
- type: boolean
- description: Informs about new protobuf based Cloud/Agent protocol support of this agent.
- If false agent has to be compiled with protobuf and protoc available.
+ aclk-version:
+ type: integer
+ description: Describes which ACLK version is currently used.
+ protocols-supported:
+ type: array
+ description: List of supported protocols for communication with Cloud.
+ items:
+ type: string
agent-claimed:
type: boolean
description: Informs whether this agent has been added to a space in the cloud (User has to perform claiming).