summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authoravstrakhov <57530959+avstrakhov@users.noreply.github.com>2022-01-19 18:57:49 +0300
committerGitHub <noreply@github.com>2022-01-19 17:57:49 +0200
commitb003e5fd40c3b42dfacc87db5a7730b76eff0e92 (patch)
treeda105b1b74c98cb567ff18aa73d282566bbff454 /streaming/receiver.c
parent8a01dabf89add0ef473d37bc8f2310cabfc1db43 (diff)
Add code for LZ4 streaming data compression (#11821)
* Add code for LZ4 streaming data compression * Fix LGTM alert * Add lz4 library for link when compression enabled * Add LZ4_resetStream_fast presence detection * Disable compression for older LZ4 libraries * Correct LZ4 API check * [Testing Stream Compression] Debug msgs and report.md * Add LZ4 library version using LZ4_initStream * Fixed bug in SSL mode * [Testing compression] - Add compression info messages * Set compression enabled by default, update doc * Update streaming/README.md Co-authored-by: DShreve2 <david@netdata.cloud> * [Agent Negotiation] Compression as separate capability * [Agent Negotiation] Compression as separate capability - default compression variable always active * Add code to negotiate compression * [Agent Negotiation] Based on stream version * [Agent Negotiation] Version based - fix compilation error * [Agent Negotiation] Fix glob var default_compression_enbaled=0 affects all the connections - Handle compression - stream version based * [Agent Negotiation - Compression] - Add control flag in 1. sender/receiver state & 2. stream.conf per child * [Agent Negotiation - Compression] Fix stream.conf key, mguid control * [Agent Negotiate Compression] Fine control on stream.conf per key,mguid for each child * [Agent Negotiation Compression] Stop destroying compressor for runtime configuration + Update Readme.md * [Agent Negotiation Compression] Use stream_version 4 if compression is disabled * Correct child's compression check * [Agent Negotiation Compression] Create streaming compression section in docs. * [Agent Negotiation Compresion] Remove redundant debug msgs * [Stream Compression] - integrate compression build info & config info in api/v1/info endpoint. * [Agent Negotiation] Finalize README.md * [Agent Stream Compression] Fix buildinfo json, Finalize readme.md * [Agent Stream Compression] Negotiate compression based on stream version * [Agent Stream Compression] Stream compression control per child in stream.conf | per AP_KEY, MACHINE_GUID * [Agent Stream Compression] Avoid destroying compressor enabling runtime configuration + Update Readme.md * [Agent Stream Compression] - Provide stream compression build info & config info in api/v1/info endpoint + Update Readme.md * [Agent Stream Compression] Fix rebase conflicts * [Agent Stream Compression] Fix more rebase conflicts * [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api * [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api * [Agent Stream Compression] Change unsuccessful buffer check to error * [Agent Stream Compression] Readme.md proof-read corrections, downgrade to stream_version_clabels, add shields for supported versions, EOF lint * [Agent Stream Compression] Fix missed lz4 library on Alpine Linux * Phrasal review Co-authored-by: odynik <odynik.ee@gmail.com> Co-authored-by: DShreve2 <david@netdata.cloud> Co-authored-by: Tina Lüdtke <tina@kickoke.com>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c156
1 files changed, 152 insertions, 4 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a0920e563d..5b8d7d7048 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -22,6 +22,10 @@ void destroy_receiver_state(struct receiver_state *rpt) {
SSL_free(rpt->ssl.conn);
}
#endif
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->destroy(&rpt->decompressor);
+#endif
freez(rpt);
}
@@ -159,6 +163,8 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin
return PARSER_RC_OK;
}
+
+#ifndef ENABLE_COMPRESSION
/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
@@ -186,6 +192,130 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
r->read_len = strlen(r->read_buffer);
return 0;
}
+#else
+/*
+ * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
+ * if SSL encryption is on, then use SSL API for reading stream data.
+ * Use line oriented fgets() in buffer from receiver_state is provided.
+ * In other cases use fread to read binary data from socket.
+ * Return zero on success and the number of bytes were read using pointer in the last argument.
+ */
+static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) {
+ if (!ret)
+ return 1;
+ *ret = 0;
+#ifdef ENABLE_HTTPS
+ if (r->ssl.conn && !r->ssl.flags) {
+ ERR_clear_error();
+ if (buffer != r->read_buffer + r->read_len) {
+ *ret = SSL_read(r->ssl.conn, buffer, size);
+ if (*ret > 0 )
+ return 0;
+ } else {
+ // we need to receive data with LF to parse compression header
+ size_t ofs = 0;
+ int res = 0;
+ while (ofs < size) {
+ do {
+ res = SSL_read(r->ssl.conn, buffer + ofs, 1);
+ } while (res == 0);
+
+ if (res < 0)
+ break;
+ if (buffer[ofs] == '\n')
+ break;
+ ofs += res;
+ }
+ if (res > 0) {
+ ofs += res;
+ *ret = ofs;
+ buffer[ofs] = 0;
+ return 0;
+ }
+ }
+ // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
+ }
+ return 1;
+ }
+#endif
+ if (buffer != r->read_buffer + r->read_len) {
+ // read to external buffer
+ *ret = fread(buffer, 1, size, fp);
+ if (!*ret)
+ return 1;
+ } else {
+ if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
+ return 1;
+ *ret = strlen(r->read_buffer);
+ }
+ return 0;
+}
+
+/*
+ * Get the next line of data for parsing.
+ * Return data from the decompressor buffer if available.
+ * Otherwise read next line from the socket and check for compression header.
+ * Return the line was read If no compression header was found.
+ * Otherwise read the entire block of compressed data, decompress it
+ * and return it in receiver_state buffer.
+ * Return zero on success.
+ */
+static int receiver_read(struct receiver_state *r, FILE *fp) {
+ // check any decompressed data present
+ if (r->decompressor &&
+ r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ size_t available = sizeof(r->read_buffer) - r->read_len;
+ if (available) {
+ size_t len = r->decompressor->get(r->decompressor,
+ r->read_buffer + r->read_len, available);
+ if (!len)
+ return 1;
+ r->read_len += len;
+ }
+ return 0;
+ }
+ int ret = 0;
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
+ return 1;
+
+ if (!is_compressed_data(r->read_buffer, ret)) {
+ r->read_len += ret;
+ return 0;
+ }
+
+ if (unlikely(!r->decompressor))
+ r->decompressor = create_decompressor();
+
+ size_t bytes_to_read = r->decompressor->start(r->decompressor,
+ r->read_buffer, ret);
+
+ // Read the entire block of compressed data because
+ // we're unable to decompress incomplete block
+ char compressed[bytes_to_read];
+ do {
+ if (read_stream(r, fp, compressed, bytes_to_read, &ret))
+ return 1;
+ // Send input data to decompressor
+ if (ret)
+ r->decompressor->put(r->decompressor, compressed, ret);
+ bytes_to_read -= ret;
+ } while (bytes_to_read > 0);
+ // Decompress
+ size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
+ if (!bytes_to_parse)
+ return 1;
+ // Fill read buffer with decompressed data
+ r->read_len = r->decompressor->get(r->decompressor,
+ r->read_buffer, sizeof(r->read_buffer));
+ return 0;
+}
+
+#endif
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
@@ -208,7 +338,6 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) {
return NULL;
}
-
size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
size_t result;
PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
@@ -239,12 +368,14 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
parser->plugins_action->chart_action = &pluginsd_chart_action;
parser->plugins_action->set_action = &pluginsd_set_action;
- parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
- parser->plugins_action->clabel_action = &pluginsd_clabel_action;
user->parser = parser;
- do {
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->reset(rpt->decompressor);
+#endif
+ do{
if (receiver_read(rpt, fp))
break;
int pos = 0;
@@ -311,6 +442,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression = default_compression_enabled;
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression);
+ rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled);
+#endif //ENABLE_COMPRESSION
+
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
@@ -432,6 +570,16 @@ static int rrdpush_receive(struct receiver_state *rpt)
info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
char initial_response[HTTP_HEADER_SIZE];
if (rpt->stream_version > 1) {
+ if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
+#ifdef ENABLE_COMPRESSION
+ if(!rpt->rrdpush_compression)
+ rpt->stream_version = STREAM_VERSION_CLABELS;
+#else
+ if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) {
+ rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
+ }
+#endif
+ }
info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
} else if (rpt->stream_version == 1) {