summaryrefslogtreecommitdiffstats
path: root/exporting/exporting_engine.h
blob: 02d0c89bfee522a082a9359effe1660687f35393 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// SPDX-License-Identifier: GPL-3.0-or-later

#ifndef NETDATA_EXPORTING_ENGINE_H
#define NETDATA_EXPORTING_ENGINE_H 1

#include "daemon/common.h"

#include <uv.h>

#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value)
#define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value)
#define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value)

extern struct config exporting_config;

#define EXPORTER_DATA_SOURCE                 "data source"
#define EXPORTER_DATA_SOURCE_DEFAULT         "average"

#define EXPORTER_DESTINATION                "destination"
#define EXPORTER_DESTINATION_DEFAULT        "localhost"

#define EXPORTER_UPDATE_EVERY               "update every"
#define EXPORTER_UPDATE_EVERY_DEFAULT       10

#define EXPORTER_BUF_ONFAIL                 "buffer on failures"
#define EXPORTER_BUF_ONFAIL_DEFAULT         10

#define EXPORTER_TIMEOUT_MS                 "timeout ms"
#define EXPORTER_TIMEOUT_MS_DEFAULT         10000

#define EXPORTER_SEND_CHART_MATCH           "send charts matching"
#define EXPORTER_SEND_CHART_MATCH_DEFAULT   "*"

#define EXPORTER_SEND_HOST_MATCH            "send hosts matching"
#define EXPORTER_SEND_HOST_MATCH_DEFAULT    "localhost *"

#define EXPORTER_SEND_NAMES                 "send names instead of ids"
#define EXPORTER_SEND_NAMES_DEFAULT         CONFIG_BOOLEAN_YES

typedef enum exporting_options {
    EXPORTING_OPTION_NONE              = 0,

    EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0),
    EXPORTING_SOURCE_DATA_AVERAGE      = (1 << 1),
    EXPORTING_SOURCE_DATA_SUM          = (1 << 2),

    EXPORTING_OPTION_SEND_NAMES        = (1 << 16)
} EXPORTING_OPTIONS;

#define EXPORTING_OPTIONS_SOURCE_BITS                                                                                  \
    (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM)
#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS)

struct engine;

struct instance_config {
    const char *name;
    const char *destination;

    int update_every;
    int buffer_on_failures;
    long timeoutms;

    EXPORTING_OPTIONS options;
    SIMPLE_PATTERN *charts_pattern;
    SIMPLE_PATTERN *hosts_pattern;

    void *connector_specific_config;
};

struct simple_connector_config {
    int default_port;
};

struct connector_config {
    BACKEND_TYPE type;
    void *connector_specific_config;
};

struct engine_config {
    const char *prefix;
    const char *hostname;
    int update_every;
};

struct stats {
    collected_number chart_buffered_metrics;
    collected_number chart_lost_metrics;
    collected_number chart_sent_metrics;
    collected_number chart_buffered_bytes;
    collected_number chart_received_bytes;
    collected_number chart_sent_bytes;
    collected_number chart_receptions;
    collected_number chart_transmission_successes;
    collected_number chart_transmission_failures;
    collected_number chart_data_lost_events;
    collected_number chart_lost_bytes;
    collected_number chart_reconnects;
};

struct instance {
    struct instance_config config;
    void *buffer;
    struct stats stats;

    int scheduled;
    int skip_host;
    int skip_chart;

    time_t after;
    time_t before;

    uv_thread_t thread;
    uv_mutex_t mutex;
    uv_cond_t cond_var;

    int (*start_batch_formatting)(struct instance *instance);
    int (*start_host_formatting)(struct instance *instance, RRDHOST *host);
    int (*start_chart_formatting)(struct instance *instance, RRDSET *st);
    int (*metric_formatting)(struct instance *instance, RRDDIM *rd);
    int (*end_chart_formatting)(struct instance *instance, RRDSET *st);
    int (*end_host_formatting)(struct instance *instance, RRDHOST *host);
    int (*end_batch_formatting)(struct instance *instance);

    size_t index;
    struct instance *next;
    struct connector *connector;
};

struct connector {
    struct connector_config config;

    void (*worker)(void *instance_p);

    struct instance *instance_root;
    struct connector *next;
    struct engine *engine;
};

struct engine {
    struct engine_config config;

    size_t instance_num;
    time_t now;

    struct connector *connector_root;
};

void *exporting_main(void *ptr);

struct engine *read_exporting_config();
BACKEND_TYPE exporting_select_type(const char *type);

int init_connectors(struct engine *engine);

int mark_scheduled_instances(struct engine *engine);
int prepare_buffers(struct engine *engine);
int notify_workers(struct engine *engine);

size_t exporting_name_copy(char *dst, const char *src, size_t max_len);

int rrdhost_is_exportable(struct instance *instance, RRDHOST *host);
int rrdset_is_exportable(struct instance *instance, RRDSET *st);

calculated_number exporting_calculate_value_from_stored_data(
    struct instance *instance,
    RRDDIM *rd,
    time_t *last_timestamp);

int start_batch_formatting(struct engine *engine);
int start_host_formatting(struct engine *engine, RRDHOST *host);
int start_chart_formatting(struct engine *engine, RRDSET *st);
int metric_formatting(struct engine *engine, RRDDIM *rd);
int end_chart_formatting(struct engine *engine, RRDSET *st);
int end_host_formatting(struct engine *engine, RRDHOST *host);
int end_batch_formatting(struct engine *engine);

int exporting_discard_response(BUFFER *buffer, struct instance *instance);
void simple_connector_receive_response(int *sock, struct instance *instance);
void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance);
void simple_connector_worker(void *instance_p);

int send_internal_metrics(struct engine *engine);

#endif /* NETDATA_EXPORTING_ENGINE_H */