summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
Diffstat (limited to 'database')
-rw-r--r--database/engine/metadata_log/logfile.c4
-rw-r--r--database/engine/pagecache.c2
-rw-r--r--database/rrd.h137
-rw-r--r--database/rrdcontext.c5
-rw-r--r--database/rrddim.c27
-rw-r--r--database/rrdfunctions.c758
-rw-r--r--database/rrdfunctions.h35
-rw-r--r--database/rrdhost.c109
-rw-r--r--database/rrdlabels.c39
-rw-r--r--database/rrdset.c35
-rw-r--r--database/sqlite/sqlite_aclk_node.c3
-rw-r--r--database/sqlite/sqlite_functions.c4
12 files changed, 978 insertions, 180 deletions
diff --git a/database/engine/metadata_log/logfile.c b/database/engine/metadata_log/logfile.c
index 07eb9b6fe3..db8886f8cb 100644
--- a/database/engine/metadata_log/logfile.c
+++ b/database/engine/metadata_log/logfile.c
@@ -369,7 +369,7 @@ static int scan_metalog_files(struct metalog_instance *ctx)
.obsolete = 0,
.started_t = INVALID_TIME,
.next = NULL,
- .version = 0,
+ .capabilities = 0,
};
struct metalog_pluginsd_state metalog_parser_state;
@@ -383,7 +383,7 @@ static int scan_metalog_files(struct metalog_instance *ctx)
.private = &metalog_parser_state
};
- PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, PARSER_INPUT_SPLIT);
+ PARSER *parser = parser_init(metalog_parser_object.host, &metalog_parser_object, NULL, NULL, PARSER_INPUT_SPLIT|PARSER_NO_ACTION_INIT);
parser_add_keyword(parser, PLUGINSD_KEYWORD_HOST, metalog_pluginsd_host);
parser_add_keyword(parser, PLUGINSD_KEYWORD_GUID, pluginsd_guid);
parser_add_keyword(parser, PLUGINSD_KEYWORD_CONTEXT, pluginsd_context);
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index c105a4fca9..c9c4ec4628 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -1251,7 +1251,7 @@ void free_page_cache(struct rrdengine_instance *ctx)
// Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS
// This affects the reporting of dbengine statistics which are available in real time
// via the /api/v1/dbengine_stats endpoint
-#ifndef NETDATA_INTERNAL_CHECKS
+#ifndef NETDATA_DBENGINE_FREE
if (netdata_exit)
return;
#endif
diff --git a/database/rrd.h b/database/rrd.h
index 2b219fcfe3..b8877ad67b 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -178,20 +178,21 @@ typedef enum rrddim_options {
// this is 8-bit
} RRDDIM_OPTIONS;
-#define rrddim_option_check(rd, flag) ((rd)->flags & (flag))
-#define rrddim_option_set(rd, flag) (rd)->flags |= (flag)
-#define rrddim_option_clear(rd, flag) (rd)->flags &= ~(flag)
+#define rrddim_option_check(rd, option) ((rd)->options & (option))
+#define rrddim_option_set(rd, option) (rd)->options |= (option)
+#define rrddim_option_clear(rd, option) (rd)->options &= ~(option)
// flags are runtime changing status flags (atomics are required to alter/access them)
typedef enum rrddim_flags {
RRDDIM_FLAG_NONE = 0,
+ RRDDIM_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 0),
+
RRDDIM_FLAG_OBSOLETE = (1 << 2), // this is marked by the collector/module as obsolete
// No new values have been collected for this dimension since agent start, or it was marked RRDDIM_FLAG_OBSOLETE at
// least rrdset_free_obsolete_time seconds ago.
RRDDIM_FLAG_ARCHIVED = (1 << 3),
RRDDIM_FLAG_ACLK = (1 << 4),
- RRDDIM_FLAG_PENDING_FOREACH_ALARMS = (1 << 5), // set when foreach alarm has not been initialized yet
RRDDIM_FLAG_META_HIDDEN = (1 << 6), // Status of hidden option in the metadata database
// this is 8 bit
@@ -216,6 +217,8 @@ typedef enum rrdlabel_source {
#define RRDLABEL_FLAG_INTERNAL (RRDLABEL_FLAG_OLD | RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_PERMANENT)
+extern size_t text_sanitize(unsigned char *dst, const unsigned char *src, size_t dst_size, unsigned char *char_map, bool utf, const char *empty, size_t *multibyte_length);
+
extern DICTIONARY *rrdlabels_create(void);
extern void rrdlabels_destroy(DICTIONARY *labels_dict);
extern void rrdlabels_add(DICTIONARY *dict, const char *name, const char *value, RRDLABEL_SRC ls);
@@ -491,13 +494,12 @@ typedef enum rrdset_flags {
// No new values have been collected for this chart since agent start, or it was marked RRDSET_FLAG_OBSOLETE at
// least rrdset_free_obsolete_time seconds ago.
RRDSET_FLAG_ARCHIVED = (1 << 15),
-// RRDSET_FLAG_ACLK = (1 << 16), // not used anymore
- RRDSET_FLAG_PENDING_FOREACH_ALARMS = (1 << 17), // contains dims with uninitialized foreach alarms
RRDSET_FLAG_ANOMALY_DETECTION = (1 << 18), // flag to identify anomaly detection charts.
RRDSET_FLAG_INDEXED_ID = (1 << 19), // the rrdset is indexed by its id
RRDSET_FLAG_INDEXED_NAME = (1 << 20), // the rrdset is indexed by its name
RRDSET_FLAG_ANOMALY_RATE_CHART = (1 << 21), // the rrdset is for storing anomaly rates for all dimensions
+ RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 22),
} RRDSET_FLAGS;
#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
@@ -548,9 +550,6 @@ struct rrdset {
RRDSET_FLAGS flags; // flags
RRD_MEMORY_MODE rrd_memory_mode; // the db mode of this rrdset
- uuid_t hash_uuid; // hash_id for syncing with cloud
- // TODO - obsolete now - cleanup
-
DICTIONARY *rrddim_root_index; // dimensions index
int gap_when_lost_iterations_above; // after how many lost iterations a gap should be stored
@@ -583,6 +582,8 @@ struct rrdset {
size_t rrdlabels_last_saved_version;
+ DICTIONARY *functions_view; // collector functions this rrdset supports, can be NULL
+
// ------------------------------------------------------------------------
// data collection - streaming to parents, temp variables
@@ -684,6 +685,8 @@ extern void rrdset_memory_file_update(RRDSET *st);
extern const char *rrdset_cache_filename(RRDSET *st);
extern bool rrdset_memory_load_or_create_map_save(RRDSET *st_on_file, RRD_MEMORY_MODE memory_mode);
+#include "rrdfunctions.h"
+
// ----------------------------------------------------------------------------
// RRDHOST flags
// use this for configuration flags, not for state control
@@ -691,23 +694,30 @@ extern bool rrdset_memory_load_or_create_map_save(RRDSET *st_on_file, RRD_MEMORY
// and may lead to missing information.
typedef enum rrdhost_flags {
- RRDHOST_FLAG_ORPHAN = (1 << 0), // this host is orphan (not receiving data)
- RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS = (1 << 1), // delete files of obsolete charts
- RRDHOST_FLAG_DELETE_ORPHAN_HOST = (1 << 2), // delete the entire host when orphan
- RRDHOST_FLAG_EXPORTING_SEND = (1 << 3), // send it to external databases
- RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 4), // don't send it to external databases
- RRDHOST_FLAG_ARCHIVED = (1 << 5), // The host is archived, no collected charts yet
- RRDHOST_FLAG_PENDING_FOREACH_ALARMS = (1 << 7), // contains dims with uninitialized foreach alarms
- RRDHOST_FLAG_STREAM_LABELS_UPDATE = (1 << 8),
- RRDHOST_FLAG_STREAM_LABELS_STOP = (1 << 9),
- RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 10), // when set, we should send ACLK stream context updates
- RRDHOST_FLAG_INDEXED_MACHINE_GUID = (1 << 11), // when set, we have indexed its machine guid
- RRDHOST_FLAG_INDEXED_HOSTNAME = (1 << 12), // when set, we have indexed its hostname
- RRDHOST_FLAG_STREAM_COLLECTED_METRICS = (1 << 13), // when set, rrdset_done() should push metrics to parent
- RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 14), // the host has initialized health structures
- RRDHOST_FLAG_INITIALIZED_RRDPUSH = (1 << 15), // the host has initialized rrdpush structures
- RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 16), // the host has pending chart obsoletions
- RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 17), // the host has pending dimension obsoletions
+ // Orphan, Archived and Obsolete flags
+ RRDHOST_FLAG_ORPHAN = (1 << 10), // this host is orphan (not receiving data)
+ RRDHOST_FLAG_ARCHIVED = (1 << 11), // The host is archived, no collected charts yet
+ RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 12), // the host has pending chart obsoletions
+ RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 13), // the host has pending dimension obsoletions
+
+ // Streaming sender
+ RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 14), // the host has initialized rrdpush structures
+ RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 15), // When set, the sender thread is running
+ RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 16), // When set, the host is connected to a parent
+ RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 17), // when set, rrdset_done() should push metrics to parent
+ RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 18), // when set, we have logged the status of metrics streaming
+ RRDHOST_FLAG_RRDPUSH_SENDER_JOIN = (1 << 19), // When set, we want to join the sender thread
+
+ // Health
+ RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 20), // contains charts and dims with uninitialized variables
+ RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 21), // the host has initialized health structures
+
+ // Exporting
+ RRDHOST_FLAG_EXPORTING_SEND = (1 << 22), // send it to external databases
+ RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 23), // don't send it to external databases
+
+ // ACLK
+ RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 24), // when set, we should send ACLK stream context updates
} RRDHOST_FLAGS;
#define rrdhost_flag_check(host, flag) (__atomic_load_n(&((host)->flags), __ATOMIC_SEQ_CST) & (flag))
@@ -721,6 +731,27 @@ typedef enum rrdhost_flags {
#define rrdset_debug(st, fmt, args...) debug_dummy()
#endif
+typedef enum {
+ // Indexing
+ RRDHOST_OPTION_INDEXED_MACHINE_GUID = (1 << 0), // when set, we have indexed its machine guid
+ RRDHOST_OPTION_INDEXED_HOSTNAME = (1 << 1), // when set, we have indexed its hostname
+
+ // Streaming configuration
+ RRDHOST_OPTION_SENDER_ENABLED = (1 << 2), // set when the host is configured to send metrics to a parent
+
+ // Configuration options
+ RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS = (1 << 3), // delete files of obsolete charts
+ RRDHOST_OPTION_DELETE_ORPHAN_HOST = (1 << 4), // delete the entire host when orphan
+} RRDHOST_OPTIONS;
+
+#define rrdhost_option_check(host, flag) ((host)->options & (flag))
+#define rrdhost_option_set(host, flag) (host)->options |= flag
+#define rrdhost_option_clear(host, flag) (host)->options &= ~(flag)
+
+#define rrdhost_has_rrdpush_sender_enabled(host) (rrdhost_option_check(host, RRDHOST_OPTION_SENDER_ENABLED) && (host)->sender)
+
+#define rrdhost_can_send_definitions_to_parent(host) (rrdhost_has_rrdpush_sender_enabled(host) && rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED))
+
// ----------------------------------------------------------------------------
// Health data
@@ -860,7 +891,8 @@ struct rrdhost {
int32_t utc_offset; // the offset in seconds from utc
- RRDHOST_FLAGS flags; // flags about this RRDHOST
+ RRDHOST_OPTIONS options; // configuration option for this RRDHOST (no atomics on this)
+ RRDHOST_FLAGS flags; // runtime flags about this RRDHOST (atomics on this)
RRDHOST_FLAGS *exporting_flags; // array of flags for exporting connector instances
int rrd_update_every; // the update frequency of the host
@@ -873,36 +905,22 @@ struct rrdhost {
struct rrdhost_system_info *system_info; // information collected from the host environment
// ------------------------------------------------------------------------
- // streaming of data to remote hosts - rrdpush
+ // streaming of data to remote hosts - rrdpush sender
- unsigned int rrdpush_send_enabled; // 1 when this host sends metrics to another netdata
char *rrdpush_send_destination; // where to send metrics to
char *rrdpush_send_api_key; // the api key at the receiving netdata
struct rrdpush_destinations *destinations; // a linked list of possible destinations
struct rrdpush_destinations *destination; // the current destination from the above list
+ SIMPLE_PATTERN *rrdpush_send_charts_matching; // pattern to match the charts to be sent
// the following are state information for the threading
// streaming metrics from this netdata to an upstream netdata
struct sender_state *sender;
- volatile unsigned int rrdpush_sender_spawn; // 1 when the sender thread has been spawn
netdata_thread_t rrdpush_sender_thread; // the sender thread
void *dbsync_worker;
- bool rrdpush_sender_connected; // 1 when the sender is ready to push metrics
- int rrdpush_sender_socket; // the fd of the socket to the remote host, or -1
-
- volatile unsigned int rrdpush_sender_error_shown; // 1 when we have logged a communication error
- volatile unsigned int rrdpush_sender_join; // 1 when we have to join the sending thread
-
- SIMPLE_PATTERN *rrdpush_send_charts_matching; // pattern to match the charts to be sent
-
- int rrdpush_sender_pipe[2]; // collector to sender thread signaling
- //BUFFER *rrdpush_sender_buffer; // collector fills it, sender sends it
-
- //uint32_t stream_version; //Set the current version of the stream.
-
// ------------------------------------------------------------------------
- // streaming of data from remote hosts - rrdpush
+ // streaming of data from remote hosts - rrdpush receiver
time_t senders_connect_time; // the time the last sender was connected
time_t senders_last_chart_command; // the time of the last CHART streaming command
@@ -916,25 +934,21 @@ struct rrdhost {
// ------------------------------------------------------------------------
// health monitoring options
- unsigned int health_enabled; // 1 when this host has health enabled
- time_t health_delay_up_to; // a timestamp to delay alarms processing up to
- STRING *health_default_exec; // the full path of the alarms notifications program
- STRING *health_default_recipient; // the default recipient for all alarms
- char *health_log_filename; // the alarms event log filename
- size_t health_log_entries_written; // the number of alarm events written to the alarms event log
- FILE *health_log_fp; // the FILE pointer to the open alarms event log file
- uint32_t health_default_warn_repeat_every; // the default value for the interval between repeating warning notifications
- uint32_t health_default_crit_repeat_every; // the default value for the interval between repeating critical notifications
+ unsigned int health_enabled; // 1 when this host has health enabled
+ time_t health_delay_up_to; // a timestamp to delay alarms processing up to
+ STRING *health_default_exec; // the full path of the alarms notifications program
+ STRING *health_default_recipient; // the default recipient for all alarms
+ char *health_log_filename; // the alarms event log filename
+ size_t health_log_entries_written; // the number of alarm events written to the alarms event log
+ FILE *health_log_fp; // the FILE pointer to the open alarms event log file
+ uint32_t health_default_warn_repeat_every; // the default value for the interval between repeating warning notifications
+ uint32_t health_default_crit_repeat_every; // the default value for the interval between repeating critical notifications
// all RRDCALCs are primarily allocated and linked here
- // RRDCALCs may be linked to charts at any point
- // (charts may or may not exist when these are loaded)
DICTIONARY *rrdcalc_root_index;
// templates of alarms
- // these are used to create alarms when charts
- // are created or renamed, that match them
DICTIONARY *rrdcalctemplate_root_index;
ALARM_LOG health_log; // alarms historical events (event log)
@@ -956,6 +970,10 @@ struct rrdhost {
DICTIONARY *rrdlabels;
// ------------------------------------------------------------------------
+ // Support for functions
+ DICTIONARY *functions; // collector functions this rrdset supports, can be NULL
+
+ // ------------------------------------------------------------------------
// indexes
DICTIONARY *rrdset_root_index; // the host's charts index (by id)
@@ -974,11 +992,6 @@ struct rrdhost {
uuid_t host_uuid; // Global GUID for this host
uuid_t *node_id; // Cloud node_id
-#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl; //Structure used to encrypt the connection
- struct netdata_ssl stream_ssl; //Structure used to encrypt the stream
-#endif
-
netdata_mutex_t aclk_state_lock;
aclk_rrdhost_state aclk_state;
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index bd5fe4c72a..9a9d4f1582 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -11,6 +11,8 @@
#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS)
#define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10
+#define LOG_TRANSITIONS false
+
#define WORKER_JOB_HOSTS 1
#define WORKER_JOB_CHECK 2
#define WORKER_JOB_SEND 3
@@ -2822,7 +2824,8 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _
if(unlikely(id_changed || title_changed || units_changed || family_changed || chart_type_changed || priority_changed || first_time_changed || last_time_changed || deleted_changed)) {
- internal_error(true, "RRDCONTEXT: %s NEW VERSION '%s'%s, version %"PRIu64", title '%s'%s, units '%s'%s, family '%s'%s, chart type '%s'%s, priority %u%s, first_time_t %ld%s, last_time_t %ld%s, deleted '%s'%s, (queued for %llu ms, expected %llu ms)",
+ internal_error(LOG_TRANSITIONS,
+ "RRDCONTEXT: %s NEW VERSION '%s'%s, version %"PRIu64", title '%s'%s, units '%s'%s, family '%s'%s, chart type '%s'%s, priority %u%s, first_time_t %ld%s, last_time_t %ld%s, deleted '%s'%s, (queued for %llu ms, expected %llu ms)",
sending?"SENDING":"QUEUE",
string2str(rc->id), id_changed ? " (CHANGED)" : "",
rc->version,
diff --git a/database/rrddim.c b/database/rrddim.c
index 7d15911b11..d5832a1dab 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -27,20 +27,6 @@ struct rrddim_constructor {
};
-static void rrddim_update_rrddimvars_unsafe(RRDDIM *rd) {
- RRDSET *st = rd->rrdset;
- RRDHOST *host = st->rrdhost;
-
- if(host->health_enabled && !rrdset_is_ar_chart(st)) {
- rrddimvar_add_and_leave_released(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, RRDVAR_FLAG_NONE);
- rrddimvar_add_and_leave_released(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, RRDVAR_FLAG_NONE);
- rrddimvar_add_and_leave_released(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, RRDVAR_FLAG_NONE);
- rrddim_flag_set(rd, RRDDIM_FLAG_PENDING_FOREACH_ALARMS);
- rrdset_flag_set(st, RRDSET_FLAG_PENDING_FOREACH_ALARMS);
- rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_FOREACH_ALARMS);
- }
-}
-
static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrddim, void *constructor_data) {
struct rrddim_constructor *ctr = constructor_data;
RRDDIM *rd = rrddim;
@@ -160,7 +146,11 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
}
}
- rrddim_update_rrddimvars_unsafe(rd);
+ if(!rrdset_is_ar_chart(st)) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_PENDING_HEALTH_INITIALIZATION);
+ rrdset_flag_set(rd->rrdset, RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION);
+ rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION);
+ }
// let the chart resync
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
@@ -264,7 +254,12 @@ static bool rrddim_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
}
rrddim_flag_clear(rd, RRDDIM_FLAG_ARCHIVED);
- rrddim_update_rrddimvars_unsafe(rd);
+
+ if(!rrdset_is_ar_chart(st)) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_PENDING_HEALTH_INITIALIZATION);
+ rrdset_flag_set(rd->rrdset, RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION);
+ rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION);
+ }
}
if(unlikely(rc))
diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c
new file mode 100644
index 0000000000..69d88f53b4
--- /dev/null
+++ b/database/rrdfunctions.c
@@ -0,0 +1,758 @@
+#define NETDATA_RRD_INTERNALS
+#include "rrd.h"
+
+#define MAX_FUNCTION_LENGTH (PLUGINSD_LINE_MAX - 512) // we need some space for the rest of the line
+
+static unsigned char functions_allowed_chars[256] = {
+ [0] = '\0', //
+ [1] = '_', //
+ [2] = '_', //
+ [3] = '_', //
+ [4] = '_', //
+ [5] = '_', //
+ [6] = '_', //
+ [7] = '_', //
+ [8] = '_', //
+ [9] = ' ', // Horizontal Tab
+ [10] = ' ', // Line Feed
+ [11] = ' ', // Vertical Tab
+ [12] = ' ', // Form Feed
+ [13] = ' ', // Carriage Return
+ [14] = '_', //
+ [15] = '_', //
+ [16] = '_', //
+ [17] = '_', //
+ [18] = '_', //
+ [19] = '_', //
+ [20] = '_', //
+ [21] = '_', //
+ [22] = '_', //
+ [23] = '_', //
+ [24] = '_', //
+ [25] = '_', //
+ [26] = '_', //
+ [27] = '_', //
+ [28] = '_', //
+ [29] = '_', //
+ [30] = '_', //
+ [31] = '_', //
+ [32] = ' ', // SPACE keep
+ [33] = '_', // !
+ [34] = '_', // "
+ [35] = '_', // #
+ [36] = '_', // $
+ [37] = '_', // %
+ [38] = '_', // &
+ [39] = '_', // '
+ [40] = '_', // (
+ [41] = '_', // )
+ [42] = '_', // *
+ [43] = '_', // +
+ [44] = ',', // , keep
+ [45] = '-', // - keep
+ [46] = '.', // . keep
+ [47] = '/', // / keep
+ [48] = '0', // 0 keep
+ [49] = '1', // 1 keep
+ [50] = '2', // 2 keep
+ [51] = '3', // 3 keep
+ [52] = '4', // 4 keep
+ [53] = '5', // 5 keep
+ [54] = '6', // 6 keep
+ [55] = '7', // 7 keep
+ [56] = '8', // 8 keep
+ [57] = '9', // 9 keep
+ [58] = ':', // : keep
+ [59] = ':', // ; convert ; to :
+ [60] = '_', // <
+ [61] = ':', // = convert = to :
+ [62] = '_', // >
+ [63] = '_', // ?
+ [64] = '_', // @
+ [65] = 'A', // A keep
+ [66] = 'B', // B keep
+ [67] = 'C', // C keep
+ [68] = 'D', // D keep
+ [69] = 'E', // E keep
+ [70] = 'F', // F keep
+ [71] = 'G', // G keep
+ [72] = 'H', // H keep
+ [73] = 'I', // I keep
+ [74] = 'J', // J keep
+ [75] = 'K', // K keep
+ [76] = 'L', // L keep
+ [77] = 'M', // M keep
+ [78] = 'N', // N keep
+ [79] = 'O', // O keep
+ [80] = 'P', // P keep
+ [81] = 'Q', // Q keep
+ [82] = 'R', // R keep
+ [83] = 'S', // S keep
+ [84] = 'T', // T keep
+ [85] = 'U', // U keep
+ [86] = 'V', // V keep
+ [87] = 'W', // W keep
+ [88] = 'X', // X keep
+ [89] = 'Y', // Y keep
+ [90] = 'Z', // Z keep
+ [91] = '_', // [
+ [92] = '/', // backslash convert \ to /
+ [93] = '_', // ]
+ [94] = '_', // ^
+ [95] = '_', // _ keep
+ [96] = '_', // `
+ [97] = 'a', // a keep
+ [98] = 'b', // b keep
+ [99] = 'c', // c keep
+ [100] = 'd', // d keep
+ [101] = 'e', // e keep
+ [102] = 'f', // f keep
+ [103] = 'g', // g keep
+ [104] = 'h', // h keep
+ [105] = 'i', // i keep
+ [106] = 'j', // j keep
+ [107] = 'k', // k keep
+ [108] = 'l', // l keep
+ [109] = 'm', // m keep
+ [110] = 'n', // n keep
+ [111] = 'o', // o keep
+ [112] = 'p', // p keep
+ [113] = 'q', // q keep
+ [114] = 'r', // r keep
+ [115] = 's', // s keep
+ [116] = 't', // t keep
+ [117] = 'u', // u keep
+ [118] = 'v', // v keep
+ [119] = 'w', // w keep
+ [120] = 'x', // x keep
+ [121] = 'y', // y keep
+ [122] = 'z', // z keep
+ [123] = '_', // {
+ [124] = '_', // |
+ [125] = '_', // }
+ [126] = '_', // ~
+ [127] = '_', //
+ [128] = '_', //
+ [129] = '_', //
+ [130] = '_', //
+ [131] = '_', //
+ [132] = '_', //
+ [133] = '_', //
+ [134] = '_', //
+ [135] = '_', //
+ [136] = '_', //
+ [137] = '_', //
+ [138] = '_', //
+ [139] = '_', //
+ [140] = '_', //
+ [141] = '_', //
+ [142] = '_', //
+ [143] = '_', //
+ [144] = '_', //
+ [145] = '_', //
+ [146] = '_', //
+ [147] = '_', //
+ [148] = '_', //
+ [149] = '_', //
+ [150] = '_', //
+ [151] = '_', //
+ [152] = '_', //
+ [153] = '_', //
+ [154] = '_', //
+ [155] = '_', //
+ [156] = '_', //
+ [157] = '_', //
+ [158] = '_', //
+ [159] = '_', //
+ [160] = '_', //
+ [161] = '_', //
+ [162] = '_', //
+ [163] = '_', //
+ [164] = '_', //
+ [165] = '_', //
+ [166] = '_', //
+ [167] = '_', //
+ [168] = '_', //
+ [169] = '_', //
+ [170] = '_', //
+ [171] = '_', //
+ [172] = '_', //
+ [173] = '_', //
+ [174] = '_', //
+ [175] = '_', //
+ [176] = '_', //
+ [177] = '_', //
+ [178] = '_', //
+ [179] = '_', //
+ [180] = '_', //
+ [181] = '_', //
+ [182] = '_', //
+ [183] = '_', //
+ [184] = '_', //
+ [185] = '_', //
+ [186] = '_', //
+ [187] = '_', //
+ [188] = '_', //
+ [189] = '_', //
+ [190] = '_', //
+ [191] = '_', //
+ [192] = '_', //
+ [193] = '_', //
+ [194] = '_', //
+ [195] = '_', //
+ [196] = '_', //
+ [197] = '_', //
+ [198] = '_', //
+ [199] = '_', //
+ [200] = '_', //
+ [201] = '_', //
+ [202] = '_', //
+ [203] = '_', //
+ [204] = '_', //
+ [205] = '_', //
+ [206] = '_', //
+ [207] = '_', //
+ [208] = '_', //
+ [209] = '_', //
+ [210] = '_', //
+ [211] = '_', //
+ [212] = '_', //
+ [213] = '_', //
+ [214] = '_', //
+ [215] = '_', //
+ [216] = '_', //
+ [217] = '_', //
+ [218] = '_', //
+ [219] = '_', //
+ [220] = '_', //
+ [221] = '_', //
+ [222] = '_', //
+ [223] = '_', //
+ [224] = '_', //
+ [225] = '_', //
+ [226] = '_', //
+ [227] = '_', //
+ [228] = '_', //
+ [229] = '_', //
+ [230] = '_', //
+ [231] = '_', //
+ [232] = '_', //
+ [233] = '_', //
+ [234] = '_', //
+ [235] = '_', //
+ [236] = '_', //
+ [237] = '_', //
+ [238] = '_', //
+ [239] = '_', //
+ [240] = '_', //
+ [241] = '_', //
+ [242] = '_', //
+ [243] = '_', //
+ [244] = '_', //
+ [245] = '_', //
+ [246] = '_', //
+ [247] = '_', //
+ [248] = '_', //
+ [249] = '_', //
+ [250] = '_', //
+ [251] = '_', //
+ [252] = '_', //
+ [253] = '_', //
+ [254] = '_', //
+ [255] = '_' //
+};
+
+static inline size_t sanitize_function_text(char *dst, const char *src, size_t dst_len) {
+ return text_sanitize((unsigned char *)dst, (const unsigned char *)src, dst_len,
+ functions_allowed_chars, true, "", NULL);
+}
+
+// we keep a dictionary per RRDSET with these functions
+// the dictionary is created on demand (only when a function is added to an RRDSET)
+
+typedef enum {
+ RRD_FUNCTION_LOCAL = (1 << 0),
+ RRD_FUNCTION_GLOBAL = (1 << 1),
+
+ // this is 8-bit
+} RRD_FUNCTION_OPTIONS;
+
+struct rrd_collector_function {
+ bool sync; // when true, the function is called synchronously
+ uint8_t options; // RRD_FUNCTION_OPTIONS
+ STRING *help;
+ int timeout; // the default timeout of the function
+
+ int (*function)(BUFFER *wb, int timeout, const char *function, void *collector_data,
+ function_data_ready_callback callback, void *callback_data);
+
+ void *collector_data;
+ struct rrd_collector *collector;
+};
+
+// Each function points to this collector structure
+// so that when the collector exits, all of them will
+// be invalidated (running == false)
+// The last function that is using this collector
+// frees the structure too (or when the collector calls
+// rrdset_collector_finished()).
+
+struct rrd_collector {
+ int32_t refcount;
+ pid_t tid;
+ bool running;
+};
+
+// Each thread that adds RRDSET functions, has to call
+// rrdset_collector_started() and rrdset_collector_finished()
+// to create the collector structure.
+
+static __thread struct rrd_collector *thread_rrd_collector = NULL;
+
+static void rrd_collector_free(struct rrd_collector *rdc) {
+ int32_t expected = 0;
+ if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) {
+ // the collector is still referenced by charts.
+ // leave it hanging there, the last chart will actually free it.
+ return;
+ }
+
+ // we can free it now
+ freez(rdc);
+}
+
+// called once per collector
+void rrd_collector_started(void) {
+ if(likely(thread_rrd_collector)) return;
+
+ thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
+ thread_rrd_collector->tid = gettid();
+ thread_rrd_collector->running = true;
+}
+
+// called once per collector
+void rrd_collector_finished(void) {
+ if(!thread_rrd_collector)
+ return;
+
+ thread_rrd_collector->running = false;
+ rrd_collector_free(thread_rrd_collector);
+ thread_rrd_collector = NULL;
+}
+
+static struct rrd_collector *rrd_collector_acquire(void) {
+ __atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST);
+ return thread_rrd_collector;
+}
+
+static void rrd_collector_release(struct rrd_collector *rdc) {
+ if(unlikely(!rdc)) return;
+
+ int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST);
+ if(refcount == 0 && !rdc->running)
+ rrd_collector_free(rdc);
+}
+
+static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
+ void *rrdhost __maybe_unused) {
+ struct rrd_collector_function *rdcf = func;
+
+ if(!thread_rrd_collector)
+ fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
+ __FUNCTION__, dictionary_acquired_item_name(item));
+
+ rdcf->collector = rrd_collector_acquire();
+}
+
+static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
+ void *rrdhost __maybe_unused) {
+ struct rrd_collector_function *rdcf = func;
+ rrd_collector_release(rdcf->collector);
+}
+
+static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
+ void *new_func __maybe_unused, void *rrdhost __maybe_unused) {
+ struct rrd_collector_function *rdcf = func;
+ struct rrd_collector_function *new_rdcf = new_func;
+
+ if(!thread_rrd_collector)
+ fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
+ __FUNCTION__, dictionary_acquired_item_name(item));
+
+ bool changed = false;
+
+ if(rdcf->collector != thread_rrd_collector) {
+ struct rrd_collector *old_rdc = rdcf->collector;
+ rdcf->collector = rrd_collector_acquire();
+ rrd_collector_release(old_rdc);
+ changed = true;
+ }
+
+ if(rdcf->function != new_rdcf->function) {
+ rdcf->function = new_rdcf->function;
+ changed = true;
+ }
+
+ if(rdcf->help != new_rdcf->help) {
+ STRING *old = rdcf->help;
+ rdcf->help = new_rdcf->help;
+ string_freez(old);
+ changed = true;
+ }
+ else
+ string_freez(new_rdcf->help);
+
+ if(rdcf->timeout != new_rdcf->timeout) {
+ rdcf->timeout = new_rdcf->timeout;
+ changed = true;
+ }
+
+ if(rdcf->sync != new_rdcf->sync) {
+ rdcf->sync = new_rdcf->sync;
+ changed = true;
+ }
+
+ if(rdcf->collector_data != new_rdcf->collector_data) {
+ rdcf->collector_data = new_rdcf->collector_data;
+ changed = true;
+ }
+
+ return changed;
+}
+
+
+void rrdfunctions_init(RRDHOST *host) {
+ if(host->functions) return;
+
+ host->functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback(host->functions, rrd_functions_insert_callback, host);
+ dictionary_register_delete_callback(host->functions, rrd_functions_delete_callback, host);
+ dictionary_register_conflict_callback(host->functions, rrd_functions_conflict_callback, host);
+}
+
+void