diff options
author | Uman Shahzad <uman@mslm.io> | 2021-07-08 22:05:45 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-08 22:05:45 +0500 |
commit | 3aa7be1f7859d715171bc81401c73abfe61a6fdf (patch) | |
tree | f68da5746507a2b186d597a08ae61654c64d53aa /aclk | |
parent | 2e43085c41dcb2efa457430f94e6261be13cffc3 (diff) |
Add HTTP access log messages for ACLK-NG (#11318)
* aclk: link libcap after libwebsockets for legacy build
libwebsockets requires libcap. without this, we get
errors about undefined libcap functions that libwebsockets
uses.
This is needed for building aclk-legacy.
* aclk: create log for HTTP requests similar to legacy version.
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_query.c | 72 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 4 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 1 |
3 files changed, 51 insertions, 26 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 2bf60532ac..531ba82249 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -17,20 +17,20 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; typedef struct aclk_query_handler { aclk_query_type_t type; char *name; // for logging purposes - int(*fnc)(mqtt_wss_client client, aclk_query_t query); + int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query); } aclk_query_handler; -static int info_metadata(mqtt_wss_client client, aclk_query_t query) +static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_send_info_metadata(client, + aclk_send_info_metadata(query_thr->client, !query->data.metadata_info.initial_on_connect, query->data.metadata_info.host); return 0; } -static int alarms_metadata(mqtt_wss_client client, aclk_query_t query) +static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_send_alarm_metadata(client, + aclk_send_alarm_metadata(query_thr->client, !query->data.metadata_info.initial_on_connect); return 0; } @@ -76,7 +76,7 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id) // TODO this function should be quarantied and written nicely // lots of skeletons from initial ACLK Legacy impl. // quick and dirty from the start -static int http_api_v2(mqtt_wss_client client, aclk_query_t query) +static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) { int retval = 0; usec_t t; @@ -126,7 +126,11 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) mysep = strrchr(query->data.http_api_v2.query, '/'); // execute the query + w->tv_in = query->created_tv; + now_realtime_timeval(&w->tv_ready); t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); + size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; + size_t sent = size; #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -175,7 +179,6 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) } #endif - now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -189,6 +192,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) buffer_need_bytes(local_buffer, w->response.data->len); memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); local_buffer->len += w->response.data->len; + sent = sent - size + w->response.data->len; } else { #endif buffer_strcat(local_buffer, w->response.data->buffer); @@ -197,7 +201,26 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) #endif } - aclk_http_msg_v2(client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + // send msg. + aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + + // log. + struct timeval tv; + now_realtime_timeval(&tv); + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , query_thr->idx + , "DATA" + , sent + , size + , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters(query->data.http_api_v2.query) + ); cleanup: #ifdef NETDATA_WITH_ZLIB @@ -213,33 +236,33 @@ cleanup: return retval; } -static int chart_query(mqtt_wss_client client, aclk_query_t query) +static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_chart_msg(client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); + aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); return 0; } -static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query) +static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_alarm_state_msg(client, query->data.alarm_update); + aclk_alarm_state_msg(query_thr->client, query->data.alarm_update); // aclk_alarm_state_msg frees the json object including the header it generates query->data.alarm_update = NULL; return 0; } -static int register_node(mqtt_wss_client client, aclk_query_t query) { +static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) { // TODO create a pending registrations list // with some timeouts to detect registration requests that // go unanswered from the cloud - aclk_generate_node_registration(client, &query->data.node_creation); + aclk_generate_node_registration(query_thr->client, &query->data.node_creation); return 0; } -static int node_state_update(mqtt_wss_client client, aclk_query_t query) { +static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) { // TODO create a pending registrations list // with some timeouts to detect registration requests that // go unanswered from the cloud - aclk_generate_node_state_update(client, &query->data.node_update); + aclk_generate_node_state_update(query_thr->client, &query->data.node_update); return 0; } @@ -256,17 +279,17 @@ aclk_query_handler aclk_query_handlers[] = { }; -static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t query) +static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { if (aclk_query_handlers[i].type == query->type) { debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); - aclk_query_handlers[i].fnc(info->client, query); + aclk_query_handlers[i].fnc(query_thr, query); aclk_query_free(query); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[info->idx]++; + aclk_queries_per_thread[query_thr->idx]++; ACLK_STATS_UNLOCK; } return; @@ -277,11 +300,11 @@ static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t /* Processes messages from queue. Compete for work with other threads */ -int aclk_query_process_msgs(struct aclk_query_thread *info) +int aclk_query_process_msgs(struct aclk_query_thread *query_thr) { aclk_query_t query; while ((query = aclk_queue_pop())) - aclk_query_process_msg(info, query); + aclk_query_process_msg(query_thr, query); return 0; } @@ -291,15 +314,14 @@ int aclk_query_process_msgs(struct aclk_query_thread *info) */ void *aclk_query_main_thread(void *ptr) { - struct aclk_query_thread *info = ptr; + struct aclk_query_thread *query_thr = ptr; + while (!netdata_exit) { - aclk_query_process_msgs(info); + aclk_query_process_msgs(query_thr); QUERY_THREAD_LOCK; - if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); - QUERY_THREAD_UNLOCK; } return NULL; diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index baca4a2f5d..f3722d57d9 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -20,7 +20,9 @@ static struct aclk_query_queue { static inline int _aclk_queue_query(aclk_query_t query) { + now_realtime_timeval(&query->created_tv); query->created = now_realtime_usec(); + ACLK_QUEUE_LOCK; if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; @@ -110,7 +112,7 @@ void aclk_query_free(aclk_query_t query) if (query->type == CHART_NEW) freez(query->data.chart_add_del.chart_name); - + if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update) json_object_put(query->data.alarm_update); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index cbc31ae3cc..58d5dd0407 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -47,6 +47,7 @@ struct aclk_query { char *callback_topic; char *msg_id; + struct timeval created_tv; usec_t created; aclk_query_t next; |