Improved InfluxDB push daemon.

* fixed missing flow active gauge
 * fixed invalid flow risk severity gauges
 * fixed missing flow risk gauges

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2023-12-10 23:14:00 +01:00
parent 142a435bf6
commit 5432b06665

View File

@@ -11,6 +11,8 @@
#include "nDPIsrvd.h"
#include "utils.h"
#define MAX_RISKS_PER_FLOW 8
static int main_thread_shutdown = 0;
static int influxd_timerfd = -1;
@@ -27,17 +29,19 @@ struct flow_user_data
{
nDPIsrvd_ull last_flow_src_l4_payload_len;
nDPIsrvd_ull last_flow_dst_l4_payload_len;
nDPIsrvd_ull detected_risks;
uint8_t risks[MAX_RISKS_PER_FLOW];
uint8_t category;
uint8_t breed;
uint8_t confidence;
uint8_t severity;
// "fallthroughs" if we are not in sync with nDPI
uint8_t risk_ndpid_invalid : 1;
uint8_t category_ndpid_invalid : 1;
uint8_t breed_ndpid_invalid : 1;
uint8_t confidence_ndpid_invalid : 1;
uint8_t severity_ndpid_invalid : 1;
// detection status
uint8_t new_seen : 1;
uint8_t is_detected : 1;
uint8_t is_guessed : 1;
uint8_t is_not_detected : 1;
@@ -58,7 +62,7 @@ struct influx_ctx
struct curl_slist * http_header;
};
static struct influxd_statistics
static struct
{
pthread_mutex_t rw_lock;
@@ -730,6 +734,11 @@ static void influxd_unmap_flow_from_stat(struct flow_user_data * const flow_user
influxd_statistics.gauges.flow_l4_other_count--;
}
if (flow_user_data->new_seen != 0)
{
influxd_statistics.gauges.flow_active_count--;
}
if (flow_user_data->is_detected != 0)
{
influxd_statistics.gauges.flow_detected_count--;
@@ -768,6 +777,19 @@ static void influxd_unmap_flow_from_stat(struct flow_user_data * const flow_user
{
(*severity_map[flow_user_data->severity - 1].global_stat)--;
}
for (uint8_t i = 0; i < MAX_RISKS_PER_FLOW; ++i)
{
if (flow_user_data->risks[i] > 0)
{
influxd_statistics.gauges.flow_risk_count[flow_user_data->risks[i]]--;
}
}
if (flow_user_data->risk_ndpid_invalid != 0)
{
influxd_statistics.gauges.flow_risk_unknown_count--;
}
}
static ssize_t influxd_map_index(char const * const json_key,
@@ -856,6 +878,7 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
if (TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "new") != 0)
{
flow_user_data->new_seen = 1;
influxd_statistics.gauges.flow_active_count++;
struct nDPIsrvd_json_token const * const l3_proto = TOKEN_GET_SZ(sock, "l3_proto");
@@ -914,15 +937,15 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
struct nDPIsrvd_json_token const * current = NULL;
int next_child_index = -1;
if (flow_risk != NULL && flow_user_data != NULL)
if (flow_user_data->is_detected == 0)
{
if (flow_user_data->is_detected == 0)
{
flow_user_data->is_detected = 1;
influxd_statistics.gauges.flow_detected_count++;
}
flow_user_data->is_detected = 1;
influxd_statistics.gauges.flow_detected_count++;
}
if (flow_user_data->detected_risks == 0)
if (flow_risk != NULL)
{
if (flow_user_data->risks[0] == 0)
{
influxd_statistics.counters.flow_risky_count++;
}
@@ -939,45 +962,62 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
strncpy(numeric_risk_buf, numeric_risk_str, numeric_risk_len);
numeric_risk_buf[numeric_risk_len] = '\0';
struct nDPIsrvd_json_token const * const severity =
TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity");
if (influxd_map_flow_u8(sock,
severity,
severity_map,
nDPIsrvd_ARRAY_LENGTH(severity_map),
&flow_user_data->severity) != 0 ||
influxd_map_value_to_stat(sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) !=
0)
if (flow_user_data->severity == 0 && flow_user_data->severity_ndpid_invalid == 0)
{
size_t value_len = 0;
char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len);
flow_user_data->severity = 0;
flow_user_data->severity_ndpid_invalid = 1;
if (value_len > 0 && value_str != NULL)
struct nDPIsrvd_json_token const * const severity =
TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity");
if (influxd_map_flow_u8(sock,
severity,
severity_map,
nDPIsrvd_ARRAY_LENGTH(severity_map),
&flow_user_data->severity) != 0 ||
influxd_map_value_to_stat(
sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0)
{
logger(1,
"Unknown/Invalid JSON value for key 'ndpi','breed': %.*s",
(int)value_len,
value_str);
size_t value_len = 0;
char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len);
flow_user_data->severity = 0;
flow_user_data->severity_ndpid_invalid = 1;
if (value_len > 0 && value_str != NULL)
{
logger(1,
"Unknown/Invalid JSON value for key 'ndpi','breed': %.*s",
(int)value_len,
value_str);
}
}
}
if (str_value_to_ull(numeric_risk_str, &numeric_risk_value) == CONVERSION_OK)
{
if ((flow_user_data->detected_risks & (1ull << numeric_risk_value)) == 0)
if (numeric_risk_value < NDPI_MAX_RISK && numeric_risk_value > 0)
{
if (numeric_risk_value < NDPI_MAX_RISK && numeric_risk_value > 0)
for (uint8_t i = 0; i < MAX_RISKS_PER_FLOW; ++i)
{
influxd_statistics.gauges.flow_risk_count[numeric_risk_value - 1]++;
flow_user_data->detected_risks |= (1ull << (numeric_risk_value - 1));
}
else
{
influxd_statistics.gauges.flow_risk_unknown_count++;
if (flow_user_data->risks[i] != 0)
{
continue;
}
influxd_statistics.gauges.flow_risk_count[numeric_risk_value]++;
flow_user_data->risks[i] = numeric_risk_value;
}
}
else if (flow_user_data->risk_ndpid_invalid == 0)
{
flow_user_data->risk_ndpid_invalid = 1;
influxd_statistics.gauges.flow_risk_unknown_count++;
}
}
else
{
logger(1, "Invalid numeric risk value: %s", numeric_risk_buf);
}
}
else
{
logger(1, "%s", "Missing numeric risk value");
}
}
}
@@ -1063,7 +1103,6 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
TOKEN_VALUE_EQUALS_SZ(sock, flow_event_name, "idle") != 0)
{
influxd_unmap_flow_from_stat(flow_user_data);
influxd_statistics.gauges.flow_active_count--;
}
}