summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-07-06 01:49:32 +0300
committerGitHub <noreply@github.com>2023-07-06 01:49:32 +0300
commitc74bf56ee2910b5c90e5be2e31128580b85b9ca8 (patch)
tree6ef495ac5821ce2c4984d87feb65d79851dd38ec /streaming
parentb45570d251980309e7c2d956dea8886f8aa13bdc (diff)
Code reorg and cleanup - enrichment of /api/v2 (#15294)
* claim script now accepts the same params as the kickstart * rewrote buildinfo to unify all methods * added cloud unavailable in cloud status * added all exporters * renamed httpd to h2o * rename ENABLE_COMPRESSION to ENABLE_LZ4 * rename global variable * rename ENABLE_HTTPS to ENABLE_OPENSSL * fix coverity-scan for openssl * add lz4 to coverity-scan * added all plugins and most of the features * added all plugins and most of the features * generalize bitmap code so that we can have any size of bitmaps * cleanup * fix compilation without protobuf * fix compilation with others allocators * fix bitmap * comprehensive bitmaps unit test * bitmap as macros * added developer mode * added system info to build info * cloud available/unavailable * added /api/v2/info * added units and ni to transitions * when showing instances and transitions, show only the instances that have transitions * cleanup * add missing quotes * add anchor to transitions * added more to build info * calculate retention per tier and expose it to /api/v2/info * added currently collected metrics * do not show space and retention when no numbers are available * fix impossible overflow * Add function for transitions and execute callback * In case of error, reset and try next dictionary entry * Fix error message * simpler logic to maintain retention per tier * /api/v2/alert_transitions * Handle case of recipient null Convert after and before to usec * Add classification, type and component * working /api/v2/alert_transitions * Fix query to properly handle context and alert name * cleanup * Add search with transition * accept transition in /api/v2/alert_transitions * totaly dynamic facets * fixed debug info * restructured facets * cleanup; removal of options=transitions * updated alert entries flags * method to exec * Return also exec run timestamp Temp table cleanup only when we don't execute with a transition * cleanup obsolete anchor parameter * Add sql_get_alert_configuration function * added options=config to alert_transitions * added /api/v2/alert_config * preliminary work for /api/v2/claim * initialize variables; do not expose expected retention if no disk space info is available; do not report aclk as initializing when not claimed * fix claim session key filename * put a newline into the session key file * more progress on claiming * final /api/v2/claim endpoint * after claiming, refresh our state at the output * Fix query to fetch config * Remove debug log * add configuration objects * add configuration objects - fixed * respect the NETDATA_DISABLE_CLOUD env variable * NETDATA_DISABLE_CLOUD env variable sets the default, but the config sets the final value * use a new claimed_id on every claiming * regenerate random key on claiming and wait for online status * ignore write() return value when writing a newline * dont show cloud status disabled when claimed_id is missing * added ctx to alert instances * cleanup config and transitions from /api/v2/alerts * fix unused variable * in /api/v2/alert_config show 1 config without an array * show alert values conditionally, by appending options=values * When storing host info if the key value is empty, store unknown * added options=summary to control when the alerts summary is shown * increased http_api_v2 to version 5 * claming random key file is now not world readable * added local-listeners binary that detects all the listening ports, their IPs and their command lines --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/compression.c2
-rw-r--r--streaming/receiver.c20
-rw-r--r--streaming/rrdpush.c10
-rw-r--r--streaming/rrdpush.h24
-rw-r--r--streaming/sender.c12
5 files changed, 34 insertions, 34 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index 29bf88af37..5a19f46a68 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -1,6 +1,6 @@
#include "rrdpush.h"
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
#include "lz4.h"
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 424c983316..e7233f6097 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -28,7 +28,7 @@ void receiver_state_free(struct receiver_state *rpt) {
close(rpt->fd);
}
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
@@ -111,7 +111,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) {
return true;
}
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
static inline bool receiver_read_compressed(struct receiver_state *r) {
internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
@@ -217,11 +217,11 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
return true;
}
-#else // !ENABLE_COMPRESSION
+#else // !ENABLE_RRDPUSH_COMPRESSION
static inline bool receiver_read_compressed(struct receiver_state *r) {
return receiver_read_uncompressed(r);
}
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
@@ -331,7 +331,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
bool compressed_connection = false;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
compressed_connection = true;
rrdpush_decompressor_reset(&rpt->decompressor);
@@ -609,11 +609,11 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
-#ifdef ENABLE_COMPRESSION
- rpt->config.rrdpush_compression = default_compression_enabled;
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
-#endif //ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
@@ -707,12 +707,12 @@ static void rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
if (!rpt->config.rrdpush_compression)
rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
}
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
{
// netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index ca6a23c896..29d5a981b7 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -39,8 +39,8 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
-#ifdef ENABLE_COMPRESSION
-unsigned int default_compression_enabled = 1;
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+unsigned int default_rrdpush_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
@@ -142,9 +142,9 @@ int rrdpush_init() {
rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
-#ifdef ENABLE_COMPRESSION
- default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
- "enable compression", default_compression_enabled);
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
+ "enable compression", default_rrdpush_compression_enabled);
#endif
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index f97b7f6912..6781f1cfe3 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -50,11 +50,11 @@ typedef enum {
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
#define STREAM_HAS_COMPRESSION 0
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
@@ -118,7 +118,7 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
// signature MUST end with a newline
#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
@@ -262,9 +262,9 @@ struct sender_state {
uint16_t hops;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state compressor;
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
@@ -416,9 +416,9 @@ struct receiver_state {
time_t replication_first_time_t;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
struct decompressor_state decompressor;
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
/*
struct {
uint32_t count;
@@ -440,9 +440,9 @@ struct rrdpush_destinations {
};
extern unsigned int default_rrdpush_enabled;
-#ifdef ENABLE_COMPRESSION
-extern unsigned int default_compression_enabled;
-#endif
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+extern unsigned int default_rrdpush_compression_enabled;
+#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -500,9 +500,9 @@ int connect_to_one_of_destinations(
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state *create_compressor();
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
diff --git a/streaming/sender.c b/streaming/sender.c
index ff1fca5792..68a1d3885f 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -66,7 +66,7 @@ BUFFER *sender_start(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
/*
* In case of stream compression buffer overflow
* Inform the user through the error log file and
@@ -117,7 +117,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -590,11 +590,11 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// reset our capabilities to default
s->capabilities = stream_our_capabilities(host, true);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
// If we don't want compression, remove it from our capabilities
if(!(s->flags & SENDER_FLAG_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
@@ -756,12 +756,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
rrdpush_compressor_reset(&s->compressor);
else
rrdpush_compressor_destroy(&s->compressor);
-#endif //ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
log_sender_capabilities(s);