Increased JSON buffer size to 12288 (libnDPI serializes more and more information).

* Making Compare&Fetch mandatory.
 * Added some more Compare&Fetch to prevent TSAN complaining about data races.
   Fixed possible but more ore less harmless data races during shutdown process.
 * Shrink SIGNAL handler to a minimum. SYSV Signal handling and MT-safety is awkward.

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-01-27 17:10:06 +01:00
parent 102b61175c
commit 9564b0ce2c
6 changed files with 113 additions and 42 deletions

View File

@@ -39,6 +39,10 @@ endif
endif # PKG_CONFIG_BIN
ifeq ($(ENABLE_MEMORY_PROFILING),yes)
PROJECT_CFLAGS += -DENABLE_MEMORY_PROFILING=1
endif
ifeq ($(ENABLE_DEBUG),yes)
PROJECT_CFLAGS += -O0 -g3 -fno-omit-frame-pointer -fno-inline
endif
@@ -120,6 +124,11 @@ else
@echo 'NDPI_WITH_PCRE = no'
endif
endif # PKG_CONFIG_BIN
ifeq ($(ENABLE_MEMORY_PROFILING),yes)
@echo 'ENABLE_MEMORY_PROFILING = yes'
else
@echo 'ENABLE_MEMORY_PROFILING = no'
endif
ifeq ($(ENABLE_DEBUG),yes)
@echo 'ENABLE_DEBUG = yes'
else

View File

@@ -11,7 +11,7 @@
* NOTE: Buffer size needs to keep in sync with other implementations
* e.g. dependencies/nDPIsrvd.py
*/
#define NETWORK_BUFFER_MAX_SIZE 9728 /* 8192 + 1024 + 512 */
#define NETWORK_BUFFER_MAX_SIZE 12288 /* 8192 + 4096 */
/* nDPId default config options */
#define nDPId_PIDFILE "/tmp/ndpid.pid"

View File

@@ -20,7 +20,7 @@ DEFAULT_PORT = 7000
DEFAULT_UNIX = '/tmp/ndpid-distributor.sock'
NETWORK_BUFFER_MIN_SIZE = 5
NETWORK_BUFFER_MAX_SIZE = 9728 # Please keep this value in sync with the one in config.h
NETWORK_BUFFER_MAX_SIZE = 12288 # Please keep this value in sync with the one in config.h
PKT_TYPE_ETH_IP4 = 0x0800
PKT_TYPE_ETH_IP6 = 0x86DD

View File

@@ -93,7 +93,8 @@ static char * generate_pcap_filename(struct nDPIsrvd_flow const * const flow,
{
if (flow_user->guessed != 0 || flow_user->detected == 0)
{
int ret = snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id);
int ret =
snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id);
if (ret <= 0 || (size_t)ret > size)
{
return NULL;
@@ -220,16 +221,19 @@ enum nDPIsrvd_callback_return nDPIsrvd_json_callback(struct nDPIsrvd_socket * co
utarray_push_back(flow_user->packets, &cb_user_data->tmp.pkt);
}
flow_user->pkt_datalink = cb_user_data->tmp.pkt_datalink;
} else {
if (cb_user_data->tmp.guessed != 0) {
}
else
{
if (cb_user_data->tmp.guessed != 0)
{
flow_user->guessed = cb_user_data->tmp.guessed;
}
if (cb_user_data->tmp.detected != 0) {
if (cb_user_data->tmp.detected != 0)
{
flow_user->detected = cb_user_data->tmp.detected;
}
}
if (cb_user_data->tmp.flow_end_or_idle == 1 &&
(flow_user->guessed != 0 || flow_user->detected == 0))
if (cb_user_data->tmp.flow_end_or_idle == 1 && (flow_user->guessed != 0 || flow_user->detected == 0))
{
if (flow_user->packets != NULL)
{

View File

@@ -19,7 +19,7 @@ var (
InfoLogger *log.Logger
ErrorLogger *log.Logger
NETWORK_BUFFER_MAX_SIZE uint16 = 9216
NETWORK_BUFFER_MAX_SIZE uint16 = 12288
nDPIsrvd_JSON_BYTES uint16 = 4
)

124
nDPId.c
View File

@@ -26,6 +26,10 @@
#error "nDPI >= 3.3.0 requiired"
#endif
#ifndef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_8
#error "Compare and Fetch aka __sync_fetch_and_add not available on your platform!"
#endif
enum nDPId_l3_type
{
L3_IP,
@@ -113,9 +117,7 @@ struct nDPId_workflow
{
pcap_t * pcap_handle;
uint8_t error_or_eof : 1;
uint8_t reserved_00 : 7;
uint8_t reserved_01[3];
int error_or_eof;
unsigned long long int packets_captured;
unsigned long long int packets_processed;
@@ -251,7 +253,14 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = {
static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {};
int main_thread_shutdown = 0;
static uint32_t global_flow_id = 0;
static uint64_t global_flow_id = 0;
#ifdef ENABLE_MEMORY_PROFILING
static uint64_t ndpi_memory_alloc_count = 0;
static uint64_t ndpi_memory_alloc_bytes = 0;
static uint64_t ndpi_memory_free_count = 0;
static uint64_t ndpi_memory_free_bytes = 0;
#endif
static char * pcap_file_or_interface = NULL;
static union nDPId_ip pcap_dev_ip = {};
@@ -457,12 +466,48 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
return 1;
}
#ifdef ENABLE_MEMORY_PROFILING
static void * ndpi_malloc_wrapper(size_t const size)
{
void * p = malloc(sizeof(uint64_t) + size);
if (p == NULL)
{
return NULL;
}
*(uint64_t *)p = size;
__sync_fetch_and_add(&ndpi_memory_alloc_count, 1);
__sync_fetch_and_add(&ndpi_memory_alloc_bytes, size);
return (uint8_t *)p + sizeof(uint64_t);
}
static void ndpi_free_wrapper(void * const freeable)
{
void * p = (uint8_t *)freeable - sizeof(uint64_t);
__sync_fetch_and_add(&ndpi_memory_free_count, 1);
__sync_fetch_and_add(&ndpi_memory_free_bytes, *(uint64_t *)p);
free(p);
}
#endif
static struct nDPId_workflow * init_workflow(char const * const file_or_device)
{
int pcap_argument_is_file = 0;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
struct nDPId_workflow * workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow));
struct nDPId_workflow * workflow;
#ifdef ENABLE_MEMORY_PROFILING
set_ndpi_malloc(ndpi_malloc_wrapper);
set_ndpi_free(ndpi_free_wrapper);
set_ndpi_flow_malloc(NULL);
set_ndpi_flow_free(NULL);
#endif
workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow));
if (workflow == NULL)
{
return NULL;
@@ -855,6 +900,27 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
if (workflow->last_idle_scan_time + idle_scan_period < workflow->last_time)
{
#ifdef ENABLE_MEMORY_PROFILING
if (reader_thread->array_index == 0)
{
uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
syslog(LOG_DAEMON,
"MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
"use, "
"%llu bytes in use",
alloc_count,
free_count,
alloc_bytes,
free_bytes,
alloc_count - free_count,
alloc_bytes - free_bytes);
}
#endif
for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
{
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
@@ -2080,12 +2146,7 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->cur_active_flows++;
workflow->total_active_flows++;
#ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
flow_to_process->flow_id = __sync_fetch_and_add(&global_flow_id, 1);
#else
#warning "Compare and Fetch aka __sync_fetch_and_add not available on your platform/compiler, do not trust any flow_id!"
flow_to_process->flow_id = global_flow_id++;
#endif
memset(&flow_to_process->ndpi_flow,
0,
@@ -2223,7 +2284,7 @@ static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread
syslog(LOG_DAEMON | LOG_ERR,
"Error while reading pcap file: '%s'",
pcap_geterr(reader_thread->workflow->pcap_handle));
reader_thread->workflow->error_or_eof = 1;
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
}
}
}
@@ -2259,7 +2320,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
run_pcap_loop(reader_thread);
fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
reader_thread->workflow->error_or_eof = 1;
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
@@ -2267,7 +2328,7 @@ static int processing_threads_error_or_eof(void)
{
for (unsigned long long int i = 0; i < reader_thread_count; ++i)
{
if (reader_threads[i].workflow->error_or_eof == 0)
if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0)
{
return 0;
}
@@ -2460,21 +2521,11 @@ static void free_reader_threads(void)
static void sighandler(int signum)
{
syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d", signum);
(void)signum;
if (main_thread_shutdown == 0)
if (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0)
{
syslog(LOG_DAEMON | LOG_NOTICE, "Stopping reader threads.");
main_thread_shutdown = 1;
if (stop_reader_threads() != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Failed to stop reader threads!");
exit(EXIT_FAILURE);
}
}
else
{
syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient.");
__sync_fetch_and_add(&main_thread_shutdown, 1);
}
}
@@ -2687,18 +2738,22 @@ static int validate_options(char const * const arg0)
{
int retval = 0;
if (instance_alias == NULL) {
if (instance_alias == NULL)
{
char hname[256];
errno = 0;
if (gethostname(hname, sizeof(hname)) != 0) {
if (gethostname(hname, sizeof(hname)) != 0)
{
fprintf(stderr, "%s: Could not retrieve your hostname: %s\n", arg0, strerror(errno));
retval = 1;
} else {
}
else
{
instance_alias = strdup(hname);
fprintf(stderr,
"%s: No instance alias given, using your hostname '%s'\n", arg0, instance_alias);
if (instance_alias == NULL) {
fprintf(stderr, "%s: No instance alias given, using your hostname '%s'\n", arg0, instance_alias);
if (instance_alias == NULL)
{
retval = 1;
}
}
@@ -2807,6 +2862,9 @@ int main(int argc, char ** argv)
}
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);
#ifdef ENABLE_MEMORY_PROFILING
syslog(LOG_DAEMON, "size/processed-flow: %zu bytes\n", sizeof(struct nDPId_flow_info));
#endif
if (setup_reader_threads() != 0)
{
@@ -2829,7 +2887,7 @@ int main(int argc, char ** argv)
sleep(1);
}
if (main_thread_shutdown == 0 && stop_reader_threads() != 0)
if (main_thread_shutdown == 1 && stop_reader_threads() != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "stop_reader_threads");
return 1;