diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-03-09 14:08:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-09 14:08:20 +0100 |
commit | d8aba23d0f471ccb3f0107c4aac119cf35753f94 (patch) | |
tree | 0d2d3632e4a04058912378fb90286634cedfb612 /aclk | |
parent | 5b8737361aac20a9887ae97b95a861d7c4f3deab (diff) |
Adds more info to aclk-state API call (#12231)
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 272 |
1 files changed, 257 insertions, 15 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index a2159d9179..ac6db78ac1 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -24,6 +24,8 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int aclk_rcvd_cloud_msgs = 0; +int aclk_connection_counter = 0; int disconnect_req = 0; int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload @@ -43,8 +45,6 @@ struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_rcvd = 0 }; -//ENDTODO - static RSA *aclk_private_key = NULL; static int load_private_key() { @@ -266,6 +266,7 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t } static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { + aclk_rcvd_cloud_msgs++; if (aclk_use_new_cloud_arch) msg_callback_new_protocol(topic, msg, msglen, qos); else @@ -402,6 +403,8 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; + aclk_rcvd_cloud_msgs = 0; + aclk_connection_counter++; #ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!aclk_use_new_cloud_arch) { @@ -1117,6 +1120,64 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) { + buffer_strcat(wb, "\nFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tLast Acked Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tLast Submitted Seq ID: %"PRIu64, + status.alert_updates, + status.alerts_batch_id, + status.last_acked_sequence_id, + status.pending_min_sequence_id, + status.pending_max_sequence_id, + status.last_submitted_sequence_id + ); +} + +static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); + if (!stats) { + buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tMin Seq ID: %"PRIu64 + "\n\t\tMax Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tSent Min Seq ID: %"PRIu64 + "\n\t\tSent Max Seq ID: %"PRIu64 + "\n\t\tAcked Min Seq ID: %"PRIu64 + "\n\t\tAcked Max Seq ID: %"PRIu64, + stats->updates, + stats->batch_id, + stats->min_seqid, + stats->max_seqid, + stats->min_seqid_pend, + stats->max_seqid_pend, + stats->min_seqid_sent, + stats->max_seqid_sent, + stats->min_seqid_ack, + stats->max_seqid_ack + ); + freez(stats); +} +#endif + char *ng_aclk_state(void) { BUFFER *wb = buffer_create(1024); @@ -1124,46 +1185,160 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" - "ACLK Implementation: Next Generation\n" + "ACLK Version: 2\n" #ifdef ENABLE_NEW_CLOUD_PROTOCOL - "New Cloud Protocol Support: Yes\n" + "Protocols Supported: Legacy, Protobuf\n" #else - "New Cloud Protocol Support: No\n" + "Protocols Supported: Legacy\n" #endif - "Claimed: " ); + buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); char *agent_id = is_agent_claimed(); if (agent_id == NULL) buffer_strcat(wb, "No\n"); else { - buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null"); freez(agent_id); } - buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy"); + buffer_sprintf(wb, "Online: %s\nReconnect count: %d\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0); + + if (aclk_connected) { + buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname); + + buffer_strcat(wb, "\tClaimed ID: "); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + buffer_strcat(wb, host->aclk_state.claimed_id); + else + buffer_strcat(wb, "null"); + rrdhost_aclk_state_unlock(host); + + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + buffer_strcat(wb, "\n\tNode ID: null\n"); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id); + } + + buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child"); + + if (host != localhost) + buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false"); + + buffer_strcat(wb, "\n\tAlert Streaming Status:"); + fill_alert_status_for_host(wb, host); + + buffer_strcat(wb, "\n\tChart Streaming Status:"); + fill_chart_status_for_host(wb, host); + } + rrd_unlock(); +#endif + } ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) + return; + + json_object *tmp = json_object_new_int(status.alert_updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(status.alerts_batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(status.last_acked_sequence_id); + json_object_object_add(obj, "last-acked-seq-id", tmp); + + tmp = json_object_new_int(status.pending_min_sequence_id); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(status.pending_max_sequence_id); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(status.last_submitted_sequence_id); + json_object_object_add(obj, "last-submitted-seq-id", tmp); +} + +static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); + if (!stats) + return; + + json_object *tmp = json_object_new_int(stats->updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(stats->batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(stats->min_seqid); + json_object_object_add(obj, "min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid); + json_object_object_add(obj, "max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_pend); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_pend); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_sent); + json_object_object_add(obj, "sent-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_sent); + json_object_object_add(obj, "sent-max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_ack); + json_object_object_add(obj, "acked-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_ack); + json_object_object_add(obj, "acked-max-seq-id", tmp); + + freez(stats); +} +#endif + char *ng_aclk_state_json(void) { - json_object *tmp, *msg = json_object_new_object(); + json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); json_object_object_add(msg, "aclk-available", tmp); - tmp = json_object_new_string("Next Generation"); - json_object_object_add(msg, "aclk-implementation", tmp); + tmp = json_object_new_int(2); + json_object_object_add(msg, "aclk-version", tmp); + grp = json_object_new_array(); #ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_boolean(1); + tmp = json_object_new_string("Legacy"); + json_object_array_add(grp, tmp); + tmp = json_object_new_string("Protobuf"); + json_object_array_add(grp, tmp); #else - tmp = json_object_new_boolean(0); + tmp = json_object_new_string("Legacy"); + json_object_array_add(grp, tmp); #endif - json_object_object_add(msg, "new-cloud-protocol-supported", tmp); + json_object_object_add(msg, "protocols-supported", grp); char *agent_id = is_agent_claimed(); tmp = json_object_new_boolean(agent_id != NULL); @@ -1176,12 +1351,79 @@ char *ng_aclk_state_json(void) tmp = NULL; json_object_object_add(msg, "claimed-id", tmp); + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL; + json_object_object_add(msg, "cloud-url", tmp); + tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy"); + tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); json_object_object_add(msg, "used-cloud-protocol", tmp); + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); + json_object_object_add(msg, "received-app-layer-msgs", tmp); + + tmp = json_object_new_int(aclk_pubacks_per_conn); + json_object_object_add(msg, "received-mqtt-pubacks", tmp); + + tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0); + json_object_object_add(msg, "reconnect-count", tmp); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + grp = json_object_new_array(); + + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + json_object *nodeinstance = json_object_new_object(); + + tmp = json_object_new_string(host->hostname); + json_object_object_add(nodeinstance, "hostname", tmp); + + tmp = json_object_new_string(host->machine_guid); + json_object_object_add(nodeinstance, "mguid", tmp); + + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) { + tmp = json_object_new_string(host->aclk_state.claimed_id); + json_object_object_add(nodeinstance, "claimed_id", tmp); + } else + json_object_object_add(nodeinstance, "claimed_id", NULL); + rrdhost_aclk_state_unlock(host); + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + json_object_object_add(nodeinstance, "node-id", NULL); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + tmp = json_object_new_string(node_id); + json_object_object_add(nodeinstance, "node-id", tmp); + } + + tmp = json_object_new_int(host->system_info->hops); + json_object_object_add(nodeinstance, "streaming-hops", tmp); + + tmp = json_object_new_string(host == localhost ? "self" : "child"); + json_object_object_add(nodeinstance, "relationship", tmp); + + tmp = json_object_new_boolean((host->receiver || host == localhost)); + json_object_object_add(nodeinstance, "streaming-online", tmp); + + tmp = json_object_new_object(); + fill_alert_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "alert-sync-status", tmp); + + tmp = json_object_new_object(); + fill_chart_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "chart-sync-status", tmp); + + json_object_array_add(grp, nodeinstance); + } + rrd_unlock(); + json_object_object_add(msg, "node-instances", grp); +#endif + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; |