summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authoravstrakhov <57530959+avstrakhov@users.noreply.github.com>2022-01-19 18:57:49 +0300
committerGitHub <noreply@github.com>2022-01-19 17:57:49 +0200
commitb003e5fd40c3b42dfacc87db5a7730b76eff0e92 (patch)
treeda105b1b74c98cb567ff18aa73d282566bbff454 /streaming
parent8a01dabf89add0ef473d37bc8f2310cabfc1db43 (diff)
Add code for LZ4 streaming data compression (#11821)
* Add code for LZ4 streaming data compression * Fix LGTM alert * Add lz4 library for link when compression enabled * Add LZ4_resetStream_fast presence detection * Disable compression for older LZ4 libraries * Correct LZ4 API check * [Testing Stream Compression] Debug msgs and report.md * Add LZ4 library version using LZ4_initStream * Fixed bug in SSL mode * [Testing compression] - Add compression info messages * Set compression enabled by default, update doc * Update streaming/README.md Co-authored-by: DShreve2 <david@netdata.cloud> * [Agent Negotiation] Compression as separate capability * [Agent Negotiation] Compression as separate capability - default compression variable always active * Add code to negotiate compression * [Agent Negotiation] Based on stream version * [Agent Negotiation] Version based - fix compilation error * [Agent Negotiation] Fix glob var default_compression_enbaled=0 affects all the connections - Handle compression - stream version based * [Agent Negotiation - Compression] - Add control flag in 1. sender/receiver state & 2. stream.conf per child * [Agent Negotiation - Compression] Fix stream.conf key, mguid control * [Agent Negotiate Compression] Fine control on stream.conf per key,mguid for each child * [Agent Negotiation Compression] Stop destroying compressor for runtime configuration + Update Readme.md * [Agent Negotiation Compression] Use stream_version 4 if compression is disabled * Correct child's compression check * [Agent Negotiation Compression] Create streaming compression section in docs. * [Agent Negotiation Compresion] Remove redundant debug msgs * [Stream Compression] - integrate compression build info & config info in api/v1/info endpoint. * [Agent Negotiation] Finalize README.md * [Agent Stream Compression] Fix buildinfo json, Finalize readme.md * [Agent Stream Compression] Negotiate compression based on stream version * [Agent Stream Compression] Stream compression control per child in stream.conf | per AP_KEY, MACHINE_GUID * [Agent Stream Compression] Avoid destroying compressor enabling runtime configuration + Update Readme.md * [Agent Stream Compression] - Provide stream compression build info & config info in api/v1/info endpoint + Update Readme.md * [Agent Stream Compression] Fix rebase conflicts * [Agent Stream Compression] Fix more rebase conflicts * [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api * [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api * [Agent Stream Compression] Change unsuccessful buffer check to error * [Agent Stream Compression] Readme.md proof-read corrections, downgrade to stream_version_clabels, add shields for supported versions, EOF lint * [Agent Stream Compression] Fix missed lz4 library on Alpine Linux * Phrasal review Co-authored-by: odynik <odynik.ee@gmail.com> Co-authored-by: DShreve2 <david@netdata.cloud> Co-authored-by: Tina Lüdtke <tina@kickoke.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md219
-rw-r--r--streaming/compression.c345
-rw-r--r--streaming/receiver.c156
-rw-r--r--streaming/rrdpush.c10
-rw-r--r--streaming/rrdpush.h69
-rw-r--r--streaming/sender.c100
-rw-r--r--streaming/stream.conf19
7 files changed, 816 insertions, 102 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 7f74fb31f5..c90b7a77aa 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -1,89 +1,90 @@
-<!--
+---
title: "Streaming and replication"
description: "Replicate and mirror Netdata's metrics through real-time streaming from child to parent nodes. Then combine, correlate, and export."
custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README.md
--->
+---
-# Streaming and replication
-Each Netdata is able to replicate/mirror its database to another Netdata, by streaming the collected
-metrics in real-time to it. This is quite different to [data archiving to third party time-series
+Each Netdata node is able to replicate/mirror its database to another Netdata node, by streaming the collected
+metrics in real-time. This is quite different to [data archiving to third party time-series
databases](/exporting/README.md).
+The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
+
+There are also **proxy** nodes, which collect metrics from a child and sends it to a parent.
-When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is
-capable of. This includes the following:
+When one Netdata node streams metrics another, the receiving instance can use the data for all features of a typical Netdata node, for example:
- Visualize metrics with a dashboard
- Run health checks that trigger alarms and send alarm notifications
- Export metrics to an external time-series database
-The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
-There are also **proxy** nodes, which collects metrics from a child and sends it to a parent.
+
+
## Supported configurations
### Netdata without a database or web API (headless collector)
-Local Netdata (child), **without any database or alarms**, collects metrics and sends them to another Netdata
+A local Netdata Agent (child), **without any database or alarms**, collects metrics and sends them to another Netdata node
(parent).
+The same parent can collect data for any number of child nodes and serves alerts for each child.
The node menu shows a list of all "databases streamed to" the parent. Clicking one of those links allows the user to
view the full dashboard of the child node. The URL has the form
`http://parent-host:parent-port/host/child-host/`.
-Alarms for the child are served by the parent.
-In this mode the child is just a plain data collector. It spawns all external plugins, but instead of maintaining a
-local database and accepting dashboard requests, it streams all metrics to the parent. The memory footprint is reduced
-significantly, to between 6 MiB and 40 MiB, depending on the enabled plugins. To reduce the memory usage as much as
+In a headless setup, the child acts as a plain data collector. It spawns all external plugins, but instead of maintaining a
+local database and accepting dashboard requests, it streams all metrics to the parent.
+
+This setup works great to reduce the memory footprint. Depending on the enabled plugins, memory usage is between 6 MiB and 40 MiB. To reduce the memory usage as much as
possible, refer to the [performance optimization guide](/docs/guides/configure/performance.md).
-The same parent can collect data for any number of child nodes.
### Database Replication
-Local Netdata (child), **with a local database (and possibly alarms)**, collects metrics and
-sends them to another Netdata (parent).
+The local Netdata Agent (child), **with a local database (and possibly alarms)**, collects metrics and
+sends them to another Netdata node (parent).
The user can use all the functions **at both** `http://child-ip:child-port/` and
`http://parent-host:parent-port/host/child-host/`.
The child and the parent may have different data retention policies for the same metrics.
-Alarms for the child are triggered by **both** the child and the parent (and actually
-each can have different alarms configurations or have alarms disabled).
+Alerts for the child are triggered by **both** the child and the parent.
+It is possible to enable different alert configurations on the parent and the child.
-Take a note, that custom chart names, configured on the child, should be in the form `type.name` to work correctly. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
+In order for custom chart names on the child to work correctly, follow the form `type.name`. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
### Netdata proxies
-Local Netdata (child), with or without a database, collects metrics and sends them to another
-Netdata (**proxy**), which may or may not maintain a database, which forwards them to another
+The local Netdata Agent(child), with or without a database, collects metrics and sends them to another
+Netdata node(**proxy**), which may or may not maintain a database, which forwards them to another
Netdata (parent).
-Alarms for the child can be triggered by any of the involved hosts that maintains a database.
+Alerts for the child can be triggered by any of the involved hosts that maintains a database.
-Any number of daisy chaining Netdata servers are supported, each with or without a database and
-with or without alarms for the child metrics.
+You can daisy-chain any number of Netdata, each with or without a database and
+with or without alerts for the child metrics.
-### mix and match with backends
+### Mix and match with backends
All nodes that maintain a database can also send their data to a backend database.
This allows quite complex setups.
Example:
-1. Netdata `A`, `B` do not maintain a database and stream metrics to Netdata `C`(live streaming functionality, i.e. this PR)
-2. Netdata `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (backends functionality)
-3. Netdata `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality, i.e. this PR)
-4. Netdata `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`
-5. Netdata `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (backends functionality)
-6. alarms are triggered by `H` for all hosts
-7. users can use all the Netdata that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
+1. Netdata nodes `A` and `B` do not maintain a database and stream metrics to Netdata node `C`(live streaming functionality).
+2. Netdata node `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (backends functionality).
+3. Netdata node `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality).
+4. Netdata node `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`.
+5. Netdata node `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (backends functionality)
+6. Alerts are triggered by `H` for all hosts.
+7. Users can use all Netdata nodes that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
## Configuration
-These are options that affect the operation of Netdata in this area:
+The following options affect how Netdata streams:
```
[global]
@@ -91,7 +92,7 @@ These are options that affect the operation of Netdata in this area:
```
`[global].memory mode = none` disables the database at this host. This also disables health
-monitoring (there cannot be health monitoring without a database).
+monitoring because a node can't have health monitoring without a database.
```
[web]
@@ -117,22 +118,27 @@ entries "... too busy to accept new streaming request. Will be allowed in X secs
`[backend]` configures data archiving to a backend (it archives all databases maintained on
this host).
-### streaming configuration
+### Streaming configuration
+
+The new file `stream.conf` contains streaming configuration for a sending and a receiving Netdata node.
+
+To configure streaming on your system:
+1. Generate an API key using `uuidgen`. Note: API keys are just random GUIDs. You can use the same API key on all your Netdata, or use a different API key for any pair of sending-receiving Netdata nodes.
-A new file is introduced: `stream.conf` (to edit it on your system run
-`/etc/netdata/edit-config stream.conf`). This file holds streaming configuration for both the
-sending and the receiving Netdata.
+2. Authorize the communication between a pair of sending-receiving Netdata nodes using the generated API key.
+Once the communication is authorized, the sending Netdata node can push metrics for any number of hosts.
-API keys are used to authorize the communication of a pair of sending-receiving Netdata.
-Once the communication is authorized, the sending Netdata can push metrics for any number of hosts.
+3. To edit `stream.conf`, run `/etc/netdata/edit-config stream.conf`
-You can generate an API key with the command `uuidgen`. API keys are just random GUIDs.
-You can use the same API key on all your Netdata, or use a different API key for any pair of
-sending-receiving Netdata.
+The following sections describe how you can configure sending and receiving Netdata nodes.
-##### options for the sending node
-This is the section for the sending Netdata. On the receiving node, `[stream].enabled` can be `no`.
+
+
+
+##### Options for the sending node
+
+This is the section for the sending Netdata node. On the receiving node, `[stream].enabled` can be `no`.
If it is `yes`, the receiving node will also stream the metrics to another node (i.e. it will be
a proxy).
@@ -141,8 +147,13 @@ a proxy).
enabled = yes | no
destination = IP:PORT[:SSL] ...
api key = XXXXXXXXXXX
-```
+[API_KEY]
+ enabled = yes | no
+
+[MACHINE_GUID]
+ enabled = yes | no
+```
This is an overview of how these options can be combined:
| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|backend|alarms|dashboard|
@@ -154,9 +165,10 @@ This is an overview of how these options can be combined:
For the options to encrypt the data stream between the child and the parent, refer to [securing the communication](#securing-streaming-communications)
-##### options for the receiving node
-`stream.conf` looks like this:
+##### Options for the receiving node
+
+For a receiving Netdata node, the `stream.conf` looks like this:
```sh
# replace API_KEY with your uuidgen generated GUID
@@ -192,7 +204,7 @@ You can also use `default memory mode = dbengine` for an API key or `memory mode
a single host. The additional `page cache size` and `dbengine multihost disk space` configuration options
are inherited from the global Netdata configuration.
-##### allow from
+##### Allow from
`allow from` settings are [Netdata simple patterns](/libnetdata/simple_pattern/README.md): string matches
that use `*` as wildcard (any number of times) and a `!` prefix for a negative match.
@@ -201,7 +213,7 @@ important: left to right, the first positive or negative match is used.
`allow from` is available in Netdata v1.9+
-##### tracing
+##### Tracing
When a child is trying to push metrics to a parent or proxy, it logs entries like these:
@@ -229,7 +241,10 @@ For Netdata v1.9+, streaming can also be monitored via `access.log`.
### Securing streaming communications
-Netdata does not activate TLS encryption by default. To encrypt streaming connections, you first need to [enable TLS support](/web/server/README.md#enabling-tls-support) on the parent. With encryption enabled on the receiving side, you need to instruct the child to use TLS/SSL as well. On the child's `stream.conf`, configure the destination as follows:
+Netdata does not activate TLS encryption by default. To encrypt streaming connections:
+1. On the parent node (receiving node), [enable TLS support](/web/server/README.md#enabling-tls-support).
+2. On the child node (sending node), [enable TLS support](/web/server/README.md#enabling-tls-support).
+3. On the child's `stream.conf`, configure the destination as follows:
```
[stream]
@@ -316,6 +331,80 @@ With the introduction of TLS/SSL, the parent-child communication behaves as show
| Yes|-/force/optional|Yes|no|The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect.|
| Yes|-/force/optional|Yes|yes|The parent-child stream is encrypted.|
+### Streaming compression
+<a href="https://github.com/netdata/netdata/releases/latest"><img src="https://img.shields.io/badge/Supported%20Netdata%20Agent-v1.33%2B-brightgreen" alt="Supported version Netdata agent release"></a>
+
+<a href="https://github.com/netdata/netdata/releases/latest"><img src="https://img.shields.io/badge/Supported%20Netdata%20stream%20version-v5%2B-blue" alt="Supported version Netdata agent release"></a>
+
+#### OS dependencies
+* Streaming compression is based on [lz4 v1.9.0+](https://github.com/lz4/lz4). The [lz4 v1.9.0+](https://github.com/lz4/lz4) library must be installed in your OS in order to enable streaming compression. Any lower version will disable Netdata streaming compression for compatibility purposes between the older versions of Netdata agents.
+
+To check if your Netdata Agent supports stream compression run the following GET request in your browser or terminal:
+
+```
+curl -X GET http://localhost:19999/api/v1/info | grep 'Stream Compression'
+```
+
+**Output**
+```
+"buildinfo": "dbengine|Native HTTPS|Netdata Cloud|ACLK Next Generation|New Cloud Protocol Support|ACLK Legacy|TLS Host Verification|Machine Learning|Stream Compression|protobuf|JSON-C|libcrypto|libm|LWS v3.2.2|mosquitto|zlib|apps|cgroup Network Tracking|EBPF|perf|slabinfo",
+```
+> Note: If your OS doesn't support Netdata compression the `buildinfo` will not contain the `Stream Compression` statement.
+
+To check if your Netdata Agent has stream compression enabled, run the following GET request in your browser or terminal:
+
+```
+ curl -X GET http://localhost:19999/api/v1/info | grep 'stream-compression'
+```
+**Output**
+```
+"stream-compression": "enabled"
+```
+Note: The `stream-compression` status can be `"enabled" | "disabled" | "N/A"`.
+
+Stream data compression is enabled by default on systems where LZ4 library v1.9.0+ is installed. A compressed data packet is determined and decompressed on the fly.
+
+#### Limitations
+The current implementation of streaming data compression has the limitation that the size of single data block transmitted must not exceed 16384 bytes. If single data block size exceeds this limit, stream data compression should be disabled.
+
+#### How to enable stream compression
+Netdata Agents are shipped with data compression enabled by default. You can also configure which streams will use compression.
+
+With enabled stream compression, a Netdata Agent can negotiate streaming compression with other Netdata Agents. During the negotiation of streaming compression both Netdata Agents should support and enable compression in order to communicate over a compressed stream. The negotiation will result into an uncompressed stream, if one of the Netdata Agents doesn't support **or** has compression disabled.
+
+To enable stream compression:
+
+1. Edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`.
+
+2. In the `[stream]` section, set `enable compression` to `yes`.
+```
+# This is the default stream compression flag for an agent.
+
+[stream]
+ enable compression = yes | no
+```
+
+
+| Parent | Stream compression | Child |
+|----------------------|--------------------|----------------------|
+| Supported & Enabled | compressed | Supported & Enabled |
+| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled |
+| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported |
+| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported |
+
+In case of parents with multiple children you can select which streams will be compressed by using the same configuration under the `[API_KEY]`, `[MACHINE_GUID]` section.
+
+This configuration uses AND logic with the default stream compression configuration under the `[stream]` section. This means the stream compression from child to parent will be enabled only if the outcome of the AND logic operation is true (`default compression enabled` && `api key compression enabled`). So both should be enabled to get stream compression otherwise stream compression is disabled.
+```
+[API_KEY]
+ enable compression = yes | no
+```
+Same thing applies with the `[MACHINE_GUID]` configuration.
+```
+[MACHINE_GUID]
+ enable compression = yes | no
+```
## Viewing remote host dashboards, using mirrored databases
On any receiving Netdata, that maintains remote databases and has its web server enabled,
@@ -385,7 +474,12 @@ I will use this API_KEY: `11111111-2222-3333-4444-555555555555`. Replace it with
#### Configuring the parent
-On the parent, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+To configure the parent node:
+
+1. On the parent node, edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`
+
+2. Set the following parameters:
```bash
[11111111-2222-3333-4444-555555555555]
@@ -414,7 +508,12 @@ the `netdata` process, but a system power issue would leave the connection open
#### Configuring the child nodes
-On each of the child nodes, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+To configure the child node:
+
+1. On the child node, edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`
+
+2. Set the following parameters:
```bash
[stream]
@@ -446,9 +545,9 @@ _`netdata.conf` configuration on child nodes, to disable the local database and
Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the parent node, which can handle the health checking (`[health].enabled = yes`)
-#### Netdata unique id
+#### Netdata unique ID
-The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
+The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata Agent**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
> If you are building an image to be used for automated provisioning of autoscaled VMs, it important to delete that file from the image, so that each instance of your image will generate its own.
@@ -456,7 +555,7 @@ The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random
Both parent and child nodes log information at `/var/log/netdata/error.log`.
-Run the following on both the parent and child nodes:
+To obtain the error logs, run the following on both the parent and child nodes:
```
tail -f /var/log/netdata/error.log | grep STREAM
@@ -511,7 +610,7 @@ This means a setup like the following is also possible:
## Proxies
-A proxy is a Netdata instance that is receiving metrics from a Netdata, and streams them to another Netdata.
+A proxy is a Netdata node that is receiving metrics from a Netdata node, and streams them to another Netdata node.
Netdata proxies may or may not maintain a database for the metrics passing through them.
When they maintain a database, they can also run health checks (alarms and notifications)
@@ -571,11 +670,11 @@ down), you will see the following in the child's `error.log`.
ERROR : STREAM_SENDER[HOSTNAME] : Failed to connect to 'PARENT IP', port 'PARENT PORT' (errno 113, No route to host)
```
-### 'Is this a Netdata?'
+### 'Is this a Netdata node?'
This question can appear when Netdata starts the stream and receives an unexpected response. This error can appear when
the parent is using SSL and the child tries to connect using plain text. You will also see this message when
-Netdata connects to another server that isn't Netdata. The complete error message will look like this:
+Netdata connects to another server that isn't a Netdata node. The complete error message will look like this:
```
ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HOSTNAME:PARENT PORT]: server is not replying properly (is it a netdata?).
diff --git a/streaming/compression.c b/streaming/compression.c
new file mode 100644
index 0000000000..917f05bd66
--- /dev/null
+++ b/streaming/compression.c
@@ -0,0 +1,345 @@
+#include "rrdpush.h"
+#include "lz4.h"
+
+#ifdef ENABLE_COMPRESSION
+
+#define LZ4_MAX_MSG_SIZE 0x4000
+#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
+
+#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define SIGNATURE_SIZE 4
+
+
+/*
+ * LZ4 streaming API compressor specific data
+ */
+struct compressor_data {
+ LZ4_stream_t *stream;
+ char *stream_buffer;
+ size_t stream_buffer_pos;
+};
+
+
+/*
+ * Reset compressor state for a new stream
+ */
+static void lz4_compressor_reset(struct compressor_state *state)
+{
+ if (state->data) {
+ if (state->data->stream) {
+ LZ4_resetStream_fast(state->data->stream);
+ info("STREAM_COMPRESSION: Compressor resets stream fast!");
+ }
+ state->data->stream_buffer_pos = 0;
+ }
+}
+
+/*
+ * Destroy compressor state and all related data
+ */
+static void lz4_compressor_destroy(struct compressor_state **state)
+{
+ if (state && *state) {
+ struct compressor_state *s = *state;
+ if (s->data) {
+ if (s->data->stream)
+ LZ4_freeStream(s->data->stream);
+ freez(s->data->stream_buffer);
+ }
+ freez(s->buffer);
+ freez(s);
+ *state = NULL;
+ debug(D_STREAM, "STREAM_COMPRESSION: Compressor destroyed!");
+ }
+}
+
+/*
+ * Compress the given block of data
+ * Comprecced data will remain in the internal buffer until the next invokation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * or 0 in case of error
+ */
+static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
+{
+ if (!state || !size || !out)
+ return 0;
+ if (size > LZ4_MAX_MSG_SIZE) {
+ error("Message size above limit: %lu", size);
+ return 0;
+ }
+ size_t max_dst_size = LZ4_COMPRESSBOUND(size);
+ size_t data_size = max_dst_size + SIGNATURE_SIZE;
+
+ if (!state->buffer) {
+ state->buffer = mallocz(data_size);
+ state->buffer_size = data_size;
+ } else if (state->buffer_size < data_size) {
+ state->buffer = reallocz(state->buffer, data_size);
+ state->buffer_size = data_size;
+ }
+
+ memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size);
+ long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream,
+ state->data->stream_buffer + state->data->stream_buffer_pos,
+ state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
+ if (compressed_data_size < 0) {
+ error("Date compression error: %ld", compressed_data_size);
+ return 0;
+ }
+ state->data->stream_buffer_pos += size;
+ if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE)
+ state->data->stream_buffer_pos = 0;
+ uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
+ *(uint32_t *)state->buffer = len | SIGNATURE;
+ *out = state->buffer;
+ debug(D_STREAM, "STREAM: Compressed data header: %ld", compressed_data_size);
+ return compressed_data_size + SIGNATURE_SIZE;
+}
+
+/*
+ * Create and initalize compressor state
+ * Return the pointer to compressor_state structure created
+ */
+struct compressor_state *create_compressor()
+{
+ struct compressor_state *state = callocz(1, sizeof(struct compressor_state));
+
+ state->reset = lz4_compressor_reset;
+ state->compress = lz4_compressor_compress;
+ state->destroy = lz4_compressor_destroy;
+
+ state->data = callocz(1, sizeof(struct compressor_data));
+ state->data->stream = LZ4_createStream();
+ state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
+ state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
+ state->reset(state);
+ debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming compression!");
+ return state;
+}
+
+/*
+ * LZ4 streaming API decompressor specific data
+ */
+struct decompressor_data {
+ LZ4_streamDecode_t *stream;
+ char *stream_buffer;
+ size_t stream_buffer_size;
+ size_t stream_buffer_pos;
+};
+
+/*
+ * Reset decompressor state for a new stream
+ */
+static void lz4_decompressor_reset(struct decompressor_state *state)
+{
+ if (state->data) {
+ if (state->data->stream)
+ LZ4_setStreamDecode(state->data->stream, NULL, 0);
+ state->data->stream_buffer_pos = 0;
+ state->buffer_len = 0;
+ state->out_buffer_len = 0;
+ }
+}
+
+/*
+ * Destroy decompressor state and all related data
+ */
+static void lz4_decompressor_destroy(struct decompressor_state **state)
+{
+ if (state && *state) {
+ struct decompressor_state *s = *state;
+ if (s->data) {
+ debug(D_STREAM, "STREAM_COMPRESSION: Destroying decompressor.");
+ if (s->data->stream)
+ LZ4_freeStreamDecode(s->data->stream);
+ freez(s->data->stream_buffer);
+ }
+ freez(s->buffer);
+ freez(s);
+ *state = NULL;
+ }
+}
+
+static size_t decode_compress_header(const char *data, size_t data_size)
+{
+ if (!data || !data_size)
+ return 0;
+ if (data_size < SIGNATURE_SIZE)
+ return 0;
+ uint32_t sign = *(uint32_t *)data;
+ if ((sign & SIGNATURE_MASK) != SIGNATURE)
+ return 0;
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+/*
+ * Check input data for the compression header
+ * Return the size of compressed data or 0 for uncompressed data
+ */
+size_t is_compressed_data(const char *data, size_t data_size)
+{
+ return decode_compress_header(data, data_size);
+}
+
+/*
+ * Start the collection of compressed data in an internal buffer
+ * Return the size of compressed data or 0 for uncompressed data
+ */
+static size_t lz4_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size)
+{
+ size_t length = decode_compress_header(header, header_size);
+ if (!length)
+ return 0;
+
+ if (!state->buffer) {
+ state->buffer = mallocz(length);
+ state->buffer_size = length;
+ } else if (state->buffer_size < length) {
+ state->buffer = reallocz(state->buffer, length);
+ state->buffer_size = length;
+ }
+ state->buffer_len = length;
+ state->buffer_pos = 0;
+ state->out_buffer_pos = 0;
+ state->out_buffer_len = 0;
+ return length;
+}
+
+/*
+ * Add a chunk of compressed data to the internal buffer
+ * Return the current size of compressed data or 0 for error
+ */
+static size_t lz4_decompressor_put(struct decompressor_state *state, const char *data, size_t size)
+{
+ if (!state || !size || !data)
+ return 0;
+ if (!state->buffer)
+ fatal("STREAM: No decompressor buffer allocated");
+
+ if (state->buffer_pos + size > state->buffer_len) {
+ error("STREAM: Decompressor buffer overflow %lu + %lu > %lu",
+ state->buffer_pos, size, state->buffer_len);
+ size = state->buffer_len - state->buffer_pos;
+ }
+ memcpy(state->buffer + state->buffer_pos, data, size);
+ state->buffer_pos += size;
+ return state->buffer_pos;
+}
+
+static size_t saving_percent(size_t comp_len, size_t src_len)
+{
+ if (comp_len > src_len)
+ comp_len = src_len;
+ if (!src_len)
+ return 0;
+ return 100 - comp_len * 100 / src_len;
+}
+
+/*
+ * Decompress the compressed data in the internal buffer
+ * Return the size of uncompressed data or 0 for error
+ */
+static size_t lz4_decompressor_decompress(struct decompressor_state *state)
+{
+ if (!state)
+ return 0;
+ if (!state->buffer) {
+ error("STREAM: No decompressor buffer allocated");
+ return 0;
+ }
+
+ long int decompressed_size = LZ4_decompress_safe_continue(state->data->stream, state->buffer,
+ state->data->stream_buffer + state->data->stream_buffer_pos,
+ state->buffer_len, state->data->stream_buffer_size - state->data->stream_buffer_pos);
+ if (decompressed_size < 0) {
+ error("STREAM: Decompressor error %ld", decompressed_size);
+ return 0;
+ }
+
+ state->out_buffer = state->data->stream_buffer + state->data->stream_buffer_pos;
+ state->data->stream_buffer_pos += decompressed_size;
+ if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - LZ4_MAX_MSG_SIZE)
+ state->data->stream_buffer_pos = 0;
+ state->out_buffer_len = decompressed_size;
+ state->out_buffer_pos = 0;
+
+ // Some compression statistics
+ size_t old_avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
+ size_t old_avg_size = state->packet_count ? state->total_uncompressed / state->packet_count : 0;
+
+ state->total_compressed += state->buffer_len + SIGNATURE_SIZE;
+ state->total_uncompressed += decompressed_size;
+ state->packet_count++;
+
+ size_t saving = saving_percent(state->buffer_len, decompressed_size);
+ size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
+ size_t avg_size = state->total_uncompressed / state->packet_count;
+
+ if (old_avg_saving != avg_saving || old_avg_size != avg_size){
+ debug(D_STREAM, "STREAM: Saving: %lu%% (avg. %lu%%), avg.size: %lu", saving, avg_saving, avg_size);
+ }
+ return decompressed_size;
+}
+
+/*
+ * Return the size of uncompressed data left in the internal buffer or 0 for error
+ */
+static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state)
+{
+ return state->out_buffer_len ?
+ state->out_buffer_len - state->out_buffer_pos : 0;
+}
+
+/*
+ * Fill the buffer provided with uncompressed data from the internal buffer
+ * Return the size of uncompressed data copied or 0 for error
+ */
+static size_t lz4_decompressor_get(struct decompressor_state *state, char *data, size_t size)
+{
+ if (!state || !size || !data)
+ return 0;
+ if (!state->out_buffer)
+ fatal("STREAM: No decompressor output buffer allocated");
+ if (state->out_buffer_pos + size > state->out_buffer_len)
+ size = state->out_buffer_len - state->out_buffer_pos;
+
+ char *p = state->out_buffer + state->out_buffer_pos, *endp = p + size, *last_lf = NULL;
+ for (; p < endp; ++p)
+ if (*p == '\n' || *p == 0)
+ last_lf = p;
+ if (last_lf)
+ size = last_lf + 1 - (state->out_buffer + state->out_buffer_pos);
+
+ memcpy(data, state->out_buffer + state->out_buffer_pos, size);
+ state->out_buffer_pos += size;
+ return size;
+}
+
+/*
+ * Create and initalize decompressor state
+ * Return the pointer to decompressor_state structure created
+ */
+struct decompressor_state *create_decompressor()
+{
+ struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state));
+ state->reset = lz4_decompressor_reset;
+ state->start = lz4_decompressor_start;
+ state->put = lz4_decompressor_put;
+ state->decompress = lz4_decompressor_decompress;
+ state->get = lz4_decompressor_get;
+ state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer;
+ state->destroy = lz4_decompressor_destroy;
+
+ state->data = callocz(1, sizeof(struct decompressor_data));
+ fatal_assert(state->data);
+ state->data->stream = LZ4_createStreamDecode();
+ state->data->stream_buffer_size = LZ4_decoderRingBufferSize(LZ4_MAX_MSG