Added zLib compression parameters to control compression conditions.

* more structs are now "compressable"
 * fixed missing DAEMON_RECONNECT event
 * improved memory profiler

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-06-16 19:25:27 +02:00
parent fac7648326
commit db87d45edb
2 changed files with 239 additions and 125 deletions

View File

@@ -21,6 +21,9 @@
#define nDPId_MAX_IDLE_FLOWS_PER_THREAD 512u
#define nDPId_TICK_RESOLUTION 1000u
#define nDPId_MAX_READER_THREADS 32u
#define nDPId_LOG_MEMORY_USAGE_EVERY 5000u /* 5 sec */
#define nDPId_COMPRESSION_SCAN_PERIOD 20000u /* 20 sec */
#define nDPId_COMPRESSION_FLOW_INACTIVITY 30000u /* 30 sec */
#define nDPId_IDLE_SCAN_PERIOD 10000u /* 10 sec */
#define nDPId_GENERIC_IDLE_TIME 600000u /* 600 */
#define nDPId_ICMP_IDLE_TIME 30000u /* 30 sec */

361
nDPId.c
View File

@@ -104,15 +104,22 @@ struct nDPId_flow_extended
};
/*
* Structures related to certain flow states.
* Skipped flows need at least some information.
*/
struct nDPId_flow_skipped
{
struct nDPId_flow_basic flow_basic;
};
struct nDPI_data
/*
* Structure which is important for the detection process.
* The structure is also a compression target, if activated.
*/
struct nDPId_detection_data
{
uint32_t last_ndpi_flow_struct_hash;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_l7_protocol;
struct ndpi_flow_struct flow;
struct ndpi_id_struct src;
struct ndpi_id_struct dst;
@@ -126,16 +133,11 @@ struct nDPId_flow_info
uint8_t reserved_00 : 7;
uint8_t reserved_01[1];
#ifdef ENABLE_ZLIB
uint16_t ndpi_compressed_size;
uint16_t detection_data_compressed_size;
#else
uint16_t reserved_02;
#endif
uint32_t last_ndpi_flow_struct_hash;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_l7_protocol;
struct nDPI_data * ndpi;
struct nDPId_detection_data * detection_data;
};
struct nDPId_flow_finished
@@ -156,6 +158,12 @@ struct nDPId_workflow
unsigned long long int total_l4_data_len;
unsigned long long int detected_flow_protocols;
#ifdef ENABLE_MEMORY_PROFILING
uint64_t last_memory_usage_log_time;
#endif
#ifdef ENABLE_ZLIB
uint64_t last_compression_scan_time;
#endif
uint64_t last_idle_scan_time;
uint64_t last_time;
@@ -329,6 +337,10 @@ static struct
unsigned long long int max_idle_flows_per_thread;
unsigned long long int tick_resolution;
unsigned long long int reader_thread_count;
#ifdef ENABLE_ZLIB
unsigned long long int compression_scan_period;
unsigned long long int compression_flow_inactivity;
#endif
unsigned long long int idle_scan_period;
unsigned long long int generic_max_idle_time;
unsigned long long int icmp_max_idle_time;
@@ -344,6 +356,10 @@ static struct
.max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2,
.tick_resolution = nDPId_TICK_RESOLUTION,
.reader_thread_count = nDPId_MAX_READER_THREADS / 2,
#ifdef ENABLE_ZLIB
.compression_scan_period = nDPId_COMPRESSION_SCAN_PERIOD,
.compression_flow_inactivity = nDPId_COMPRESSION_FLOW_INACTIVITY,
#endif
.idle_scan_period = nDPId_IDLE_SCAN_PERIOD,
.generic_max_idle_time = nDPId_GENERIC_IDLE_TIME,
.icmp_max_idle_time = nDPId_ICMP_IDLE_TIME,
@@ -360,6 +376,10 @@ enum nDPId_subopts
TICK_RESOLUTION,
MAX_READER_THREADS,
IDLE_SCAN_PERIOD,
#ifdef ENABLE_ZLIB
COMPRESSION_SCAN_PERIOD,
COMPRESSION_FLOW_INACTIVITY,
#endif
GENERIC_MAX_IDLE_TIME,
ICMP_MAX_IDLE_TIME,
UDP_MAX_IDLE_TIME,
@@ -372,6 +392,10 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread",
[TICK_RESOLUTION] = "tick-resolution",
[MAX_READER_THREADS] = "max-reader-threads",
#ifdef ENABLE_ZLIB
[COMPRESSION_SCAN_PERIOD] = "compression-scan-period",
[COMPRESSION_FLOW_INACTIVITY] = "compression-flow-activity",
#endif
[IDLE_SCAN_PERIOD] = "idle-scan-period",
[GENERIC_MAX_IDLE_TIME] = "generic-max-idle-time",
[ICMP_MAX_IDLE_TIME] = "icmp-max-idle-time",
@@ -456,6 +480,7 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
ret = strm.total_out;
#ifdef ENABLE_MEMORY_PROFILING
__sync_fetch_and_add(&zlib_decompressions, 1);
__sync_fetch_and_sub(&zlib_compression_diff, ret - srcLen);
#endif
}
else
@@ -474,75 +499,137 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
return ret;
}
static int ndpi_data_deflate(struct nDPId_flow_info * const flow_info)
static int detection_data_deflate(struct nDPId_flow_info * const flow_info)
{
uint8_t tmpOut[sizeof(*flow_info->ndpi)];
uint8_t tmpOut[sizeof(*flow_info->detection_data)];
int ret;
if (flow_info->ndpi_compressed_size != 0)
if (flow_info->detection_data_compressed_size > 0)
{
return -7;
}
ret = zlib_deflate(flow_info->ndpi, sizeof(*flow_info->ndpi), tmpOut, sizeof(tmpOut));
ret = zlib_deflate(flow_info->detection_data, sizeof(*flow_info->detection_data), tmpOut, sizeof(tmpOut));
if (ret <= 0)
{
return ret;
}
struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
if (new_ndpi_data == NULL)
struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret);
if (new_det_data == NULL)
{
return -8;
}
ndpi_free(flow_info->ndpi);
flow_info->ndpi = new_ndpi_data;
ndpi_free(flow_info->detection_data);
flow_info->detection_data = new_det_data;
memcpy(flow_info->ndpi, tmpOut, ret);
flow_info->ndpi_compressed_size = ret;
memcpy(flow_info->detection_data, tmpOut, ret);
flow_info->detection_data_compressed_size = ret;
return ret;
}
static int ndpi_data_inflate(struct nDPId_flow_info * const flow_info)
static int detection_data_inflate(struct nDPId_flow_info * const flow_info)
{
uint8_t tmpOut[sizeof(*flow_info->ndpi)];
uint8_t tmpOut[sizeof(*flow_info->detection_data)];
int ret;
if (flow_info->ndpi_compressed_size == 0)
if (flow_info->detection_data_compressed_size == 0)
{
return -7;
}
ret = zlib_inflate(flow_info->ndpi, flow_info->ndpi_compressed_size, tmpOut, sizeof(tmpOut));
ret = zlib_inflate(flow_info->detection_data, flow_info->detection_data_compressed_size, tmpOut, sizeof(tmpOut));
if (ret <= 0)
{
return ret;
}
struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
if (new_ndpi_data == NULL)
struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret);
if (new_det_data == NULL)
{
return -8;
}
ndpi_free(flow_info->ndpi);
flow_info->ndpi = new_ndpi_data;
ndpi_free(flow_info->detection_data);
flow_info->detection_data = new_det_data;
memcpy(flow_info->ndpi, tmpOut, ret);
flow_info->ndpi_compressed_size = 0;
memcpy(flow_info->detection_data, tmpOut, ret);
flow_info->detection_data_compressed_size = 0;
/*
* Do not use ndpi_id_struct's from ndpi_flow
* as they may not be valid anymore.
* The ndpi_id_struct's from ndpi_flow may not be valid anymore.
* nDPI only updates those pointers while processing packets!
* This is especially important when using compression
* to prevent use of dangling pointers.
*/
flow_info->ndpi->flow.src = &flow_info->ndpi->src;
flow_info->ndpi->flow.dst = &flow_info->ndpi->dst;
flow_info->detection_data->flow.src = &flow_info->detection_data->src;
flow_info->detection_data->flow.dst = &flow_info->detection_data->dst;
return ret;
}
static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
{
struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data;
struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A;
(void)depth;
if (workflow == NULL || flow_basic == NULL)
{
return;
}
if (which == ndpi_preorder || which == ndpi_leaf)
{
switch (flow_basic->type)
{
case FT_UNKNOWN:
case FT_SKIPPED:
case FT_FINISHED:
break;
case FT_INFO:
{
if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_time)
{
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
if (flow_info->detection_data_compressed_size > 0)
{
break;
}
int ret = detection_data_deflate(flow_info);
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"zLib compression failed for flow %u with error code: %d",
flow_info->flow_extended.flow_id,
ret);
}
}
break;
}
}
}
}
static void check_for_compressable_flows(struct nDPId_reader_thread * const reader_thread)
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
if (workflow->last_compression_scan_time + nDPId_options.compression_scan_period < workflow->last_time)
{
for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index)
{
ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow);
}
workflow->last_compression_scan_time = workflow->last_time;
}
}
#endif
static void ip_netmask_to_subnet(union nDPId_ip const * const ip,
@@ -829,6 +916,49 @@ static void ndpi_free_wrapper(void * const freeable)
free(p);
}
static void log_memory_usage(struct nDPId_reader_thread * const reader_thread)
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
if (workflow->last_memory_usage_log_time + nDPId_LOG_MEMORY_USAGE_EVERY < workflow->last_time)
{
if (reader_thread->array_index == 0)
{
uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
syslog(LOG_DAEMON,
"MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
"use, "
"%llu bytes in use",
(long long unsigned int)alloc_count,
(long long unsigned int)free_count,
(long long unsigned int)alloc_bytes,
(long long unsigned int)free_bytes,
(long long unsigned int)(alloc_count - free_count),
(long long unsigned int)(alloc_bytes - free_bytes));
#ifdef ENABLE_ZLIB
uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
syslog(LOG_DAEMON,
"MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu "
"bytes "
"diff",
(long long unsigned int)zlib_compression_count,
(long long unsigned int)zlib_decompression_count,
(long long unsigned int)zlib_compression_count - (long long unsigned int)zlib_decompression_count,
(long long unsigned int)zlib_bytes_diff);
#endif
}
workflow->last_memory_usage_log_time = workflow->last_time;
}
}
#endif
static struct nDPId_workflow * init_workflow(char const * const file_or_device)
@@ -950,25 +1080,27 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
return workflow;
}
static void free_ndpi_structs(struct nDPId_flow_info * const flow_info)
static void free_detection_data(struct nDPId_flow_info * const flow_info)
{
ndpi_free_flow_data(&flow_info->ndpi->flow);
ndpi_free(flow_info->ndpi);
flow_info->ndpi = NULL;
ndpi_free_flow_data(&flow_info->detection_data->flow);
ndpi_free(flow_info->detection_data);
flow_info->detection_data = NULL;
}
static int alloc_ndpi_structs(struct nDPId_flow_info * const flow_info)
static int alloc_detection_data(struct nDPId_flow_info * const flow_info)
{
flow_info->ndpi = (struct nDPI_data *)ndpi_flow_malloc(sizeof(*flow_info->ndpi));
flow_info->detection_data = (struct nDPId_detection_data *)ndpi_flow_malloc(sizeof(*flow_info->detection_data));
if (flow_info->ndpi == NULL)
if (flow_info->detection_data == NULL)
{
goto error;
}
memset(flow_info->detection_data, 0, sizeof(*flow_info->detection_data));
return 0;
error:
free_ndpi_structs(flow_info);
free_detection_data(flow_info);
return 1;
}
@@ -986,7 +1118,7 @@ static void ndpi_flow_info_freer(void * const node)
case FT_INFO:
{
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
free_ndpi_structs(flow_info);
free_detection_data(flow_info);
break;
}
}
@@ -1295,9 +1427,9 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
#ifdef ENABLE_ZLIB
if (nDPId_options.enable_zlib_compression != 0 && flow_info->ndpi_compressed_size > 0)
if (nDPId_options.enable_zlib_compression != 0 && flow_info->detection_data_compressed_size > 0)
{
int ret = ndpi_data_inflate(flow_info);
int ret = detection_data_inflate(flow_info);
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR, "zLib decompression failed with error code: %d", ret);
@@ -1310,12 +1442,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
{
uint8_t protocol_was_guessed = 0;
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0)
if (ndpi_is_protocol_detected(workflow->ndpi_struct,
flow_info->detection_data->guessed_l7_protocol) == 0)
{
flow_info->guessed_l7_protocol = ndpi_detection_giveup(workflow->ndpi_struct,
&flow_info->ndpi->flow,
1,
&protocol_was_guessed);
flow_info->detection_data->guessed_l7_protocol = ndpi_detection_giveup(
workflow->ndpi_struct, &flow_info->detection_data->flow, 1, &protocol_was_guessed);
}
else
{
@@ -1355,38 +1486,6 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
if (workflow->last_idle_scan_time + nDPId_options.idle_scan_period < workflow->last_time)
{
#ifdef ENABLE_MEMORY_PROFILING
if (reader_thread->array_index == 0)
{
uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
syslog(LOG_DAEMON,
"MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
"use, "
"%llu bytes in use",
(long long unsigned int)alloc_count,
(long long unsigned int)free_count,
(long long unsigned int)alloc_bytes,
(long long unsigned int)free_bytes,
(long long unsigned int)(alloc_count - free_count),
(long long unsigned int)(alloc_bytes - free_bytes));
#ifdef ENABLE_ZLIB
uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
syslog(LOG_DAEMON,
"MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu bytes difference",
(long long unsigned int)zlib_compression_count,
(long long unsigned int)zlib_decompression_count,
(long long unsigned int)zlib_bytes_diff);
#endif
}
#endif
for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
{
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
@@ -1615,6 +1714,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
"[%8llu, %d] Reconnected to JSON sink",
workflow->packets_captured,
reader_thread->array_index);
jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
}
}
@@ -1883,8 +1983,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_NOT_DETECTED:
case FLOW_EVENT_GUESSED:
if (ndpi_dpi2json(workflow->ndpi_struct,
&flow->ndpi->flow,
flow->guessed_l7_protocol,
&flow->detection_data->flow,
flow->detection_data->guessed_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -1897,8 +1997,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_DETECTED:
case FLOW_EVENT_DETECTION_UPDATE:
if (ndpi_dpi2json(workflow->ndpi_struct,
&flow->ndpi->flow,
flow->detected_l7_protocol,
&flow->detection_data->flow,
flow->detection_data->detected_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -2471,6 +2571,9 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->last_time = time_ms;
check_for_idle_flows(reader_thread);
#ifdef ENABLE_MEMORY_PROFILING
log_memory_usage(reader_thread);
#endif
if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0)
{
@@ -2806,7 +2909,7 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->total_active_flows++;
flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1);
if (alloc_ndpi_structs(flow_to_process) != 0)
if (alloc_detection_data(flow_to_process) != 0)
{
jsonize_packet_event(
reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD);
@@ -2815,10 +2918,8 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
memset(flow_to_process->ndpi, 0, sizeof(*flow_to_process->ndpi));
ndpi_src = &flow_to_process->ndpi->src;
ndpi_dst = &flow_to_process->ndpi->dst;
ndpi_src = &flow_to_process->detection_data->src;
ndpi_dst = &flow_to_process->detection_data->dst;
is_new_flow = 1;
}
@@ -2849,9 +2950,9 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process;
#ifdef ENABLE_ZLIB
if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->ndpi_compressed_size > 0)
if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->detection_data_compressed_size > 0)
{
int ret = ndpi_data_inflate(flow_to_process);
int ret = detection_data_inflate(flow_to_process);
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -2865,13 +2966,13 @@ static void ndpi_process_packet(uint8_t * const args,
if (direction_changed != 0)
{
ndpi_src = &flow_to_process->ndpi->dst;
ndpi_dst = &flow_to_process->ndpi->src;
ndpi_src = &flow_to_process->detection_data->dst;
ndpi_dst = &flow_to_process->detection_data->src;
}
else
{
ndpi_src = &flow_to_process->ndpi->src;
ndpi_dst = &flow_to_process->ndpi->dst;
ndpi_src = &flow_to_process->detection_data->src;
ndpi_dst = &flow_to_process->detection_data->dst;
}
}
@@ -2907,7 +3008,7 @@ static void ndpi_process_packet(uint8_t * const args,
&flow_to_process->flow_extended,
PACKET_EVENT_PAYLOAD_FLOW);
if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
if (flow_to_process->detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
{
if (flow_to_process->detection_completed != 0)
{
@@ -2917,8 +3018,8 @@ static void ndpi_process_packet(uint8_t * const args,
{
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
flow_to_process->guessed_l7_protocol =
ndpi_detection_giveup(workflow->ndpi_struct, &flow_to_process->ndpi->flow, 1, &protocol_was_guessed);
flow_to_process->detection_data->guessed_l7_protocol = ndpi_detection_giveup(
workflow->ndpi_struct, &flow_to_process->detection_data->flow, 1, &protocol_was_guessed);
if (protocol_was_guessed != 0)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED);
@@ -2930,48 +3031,43 @@ static void ndpi_process_packet(uint8_t * const args,
}
}
flow_to_process->detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct,
&flow_to_process->ndpi->flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size,
time_ms,
ndpi_src,
ndpi_dst);
flow_to_process->detection_data->detected_l7_protocol =
ndpi_detection_process_packet(workflow->ndpi_struct,
&flow_to_process->detection_data->flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size,
time_ms,
ndpi_src,
ndpi_dst);
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detected_l7_protocol) != 0 &&
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detection_data->detected_l7_protocol) != 0 &&
flow_to_process->detection_completed == 0)
{
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
flow_to_process->detection_data->last_ndpi_flow_struct_hash =
calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow);
}
else if (flow_to_process->detection_completed == 1)
{
uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
if (hash != flow_to_process->last_ndpi_flow_struct_hash)
uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow);
if (hash != flow_to_process->detection_data->last_ndpi_flow_struct_hash)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
flow_to_process->last_ndpi_flow_struct_hash = hash;
flow_to_process->detection_data->last_ndpi_flow_struct_hash = hash;
}
}
if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
if (flow_to_process->detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
{
free_ndpi_structs(flow_to_process);
free_detection_data(flow_to_process);
flow_to_process->flow_extended.flow_basic.type = FT_FINISHED;
}
#ifdef ENABLE_ZLIB
else if (nDPId_options.enable_zlib_compression != 0)
{
int ret = ndpi_data_deflate(flow_to_process);
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"zLib compression failed for flow %u with error code: %d",
flow_to_process->flow_extended.flow_id,
ret);
}
check_for_compressable_flows(reader_thread);
}
#endif
}
@@ -3255,7 +3351,8 @@ static void print_subopt_usage(void)
if (*token != NULL)
{
fprintf(stderr, "\t\t%s = ", *token);
switch (index++)
enum nDPId_subopts subopts = index++;
switch (subopts)
{
case MAX_FLOWS_PER_THREAD:
fprintf(stderr, "%llu\n", nDPId_options.max_flows_per_thread);
@@ -3272,6 +3369,14 @@ static void print_subopt_usage(void)
case IDLE_SCAN_PERIOD:
fprintf(stderr, "%llu\n", nDPId_options.idle_scan_period);
break;
#ifdef ENABLE_ZLIB
case COMPRESSION_SCAN_PERIOD:
fprintf(stderr, "%llu\n", nDPId_options.compression_scan_period);
break;
case COMPRESSION_FLOW_INACTIVITY:
fprintf(stderr, "%llu\n", nDPId_options.compression_flow_inactivity);
break;
#endif
case GENERIC_MAX_IDLE_TIME:
fprintf(stderr, "%llu\n", nDPId_options.generic_max_idle_time);
break;
@@ -3293,8 +3398,6 @@ static void print_subopt_usage(void)
case MAX_PACKETS_PER_FLOW_TO_PROCESS:
fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_process);
break;
default:
break;
}
}
else
@@ -3468,6 +3571,14 @@ static int nDPId_parse_options(int argc, char ** argv)
case IDLE_SCAN_PERIOD:
nDPId_options.idle_scan_period = value_llu;
break;
#ifdef ENABLE_ZLIB
case COMPRESSION_SCAN_PERIOD:
nDPId_options.compression_scan_period = value_llu;
break;
case COMPRESSION_FLOW_INACTIVITY:
nDPId_options.compression_flow_inactivity = value_llu;
break;
#endif
case GENERIC_MAX_IDLE_TIME:
nDPId_options.generic_max_idle_time = value_llu;
break;
@@ -3665,7 +3776,7 @@ int main(int argc, char ** argv)
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);
#ifdef ENABLE_MEMORY_PROFILING
syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPI_data));
syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPId_detection_data));
#endif
if (setup_reader_threads() != 0)