Provide thread sync via locking on architectures that do not support Compare&Swap.

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2022-09-17 18:27:17 +02:00
parent b3e9af495c
commit efaa76e978
2 changed files with 129 additions and 76 deletions

View File

@@ -1026,31 +1026,35 @@ int main(int argc, char ** argv)
unsigned long long int total_alloc_bytes = unsigned long long int total_alloc_bytes =
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
(unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes - (unsigned long long int)(MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0) -
(zlib_compressions * sizeof(struct nDPId_detection_data))); MT_GET_AND_ADD(zlib_compression_bytes, 0) -
(MT_GET_AND_ADD(zlib_compressions, 0) * sizeof(struct nDPId_detection_data)));
#else #else
(unsigned long long int)ndpi_memory_alloc_bytes; (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0);
#endif #endif
unsigned long long int total_free_bytes = unsigned long long int total_free_bytes =
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
(unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes - (unsigned long long int)(MT_GET_AND_ADD(ndpi_memory_free_bytes, 0) -
(zlib_compressions * sizeof(struct nDPId_detection_data))); MT_GET_AND_ADD(zlib_compression_bytes, 0) -
(MT_GET_AND_ADD(zlib_compressions, 0) * sizeof(struct nDPId_detection_data)));
#else #else
(unsigned long long int)ndpi_memory_free_bytes; (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_free_bytes, 0);
#endif #endif
unsigned long long int total_alloc_count = unsigned long long int total_alloc_count =
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
(unsigned long long int)(ndpi_memory_alloc_count - zlib_compressions * 2); (unsigned long long int)(MT_GET_AND_ADD(ndpi_memory_alloc_count, 0) -
MT_GET_AND_ADD(zlib_compressions, 0) * 2);
#else #else
(unsigned long long int)ndpi_memory_alloc_count; (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_alloc_count, 0);
#endif #endif
unsigned long long int total_free_count = unsigned long long int total_free_count =
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
(unsigned long long int)(ndpi_memory_free_count - zlib_decompressions * 2); (unsigned long long int)(MT_GET_AND_ADD(ndpi_memory_free_count, 0) -
MT_GET_AND_ADD(zlib_decompressions, 0) * 2);
#else #else
(unsigned long long int)ndpi_memory_free_count; (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_free_count, 0);
#endif #endif
printf( printf(
@@ -1076,20 +1080,21 @@ int main(int argc, char ** argv)
(unsigned long long int)distributor_return.stats.json_string_len_avg); (unsigned long long int)distributor_return.stats.json_string_len_avg);
} }
if (ndpi_memory_alloc_bytes != ndpi_memory_free_bytes || ndpi_memory_alloc_count != ndpi_memory_free_count || if (MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0) != MT_GET_AND_ADD(ndpi_memory_free_bytes, 0) ||
MT_GET_AND_ADD(ndpi_memory_alloc_count, 0) != MT_GET_AND_ADD(ndpi_memory_free_count, 0) ||
nDPId_return.total_active_flows != nDPId_return.total_idle_flows) nDPId_return.total_active_flows != nDPId_return.total_idle_flows)
{ {
logger(1, "%s: %s", argv[0], "Memory / Flow leak detected."); logger(1, "%s: %s", argv[0], "Memory / Flow leak detected.");
logger(1, logger(1,
"%s: Allocated / Free'd bytes: %llu / %llu", "%s: Allocated / Free'd bytes: %llu / %llu",
argv[0], argv[0],
(unsigned long long int)ndpi_memory_alloc_bytes, (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0),
(unsigned long long int)ndpi_memory_free_bytes); (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_free_bytes, 0));
logger(1, logger(1,
"%s: Allocated / Free'd count: %llu / %llu", "%s: Allocated / Free'd count: %llu / %llu",
argv[0], argv[0],
(unsigned long long int)ndpi_memory_alloc_count, (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_alloc_count, 0),
(unsigned long long int)ndpi_memory_free_count); (unsigned long long int)MT_GET_AND_ADD(ndpi_memory_free_count, 0));
logger(1, logger(1,
"%s: Total Active / Idle Flows: %llu / %llu", "%s: Total Active / Idle Flows: %llu / %llu",
argv[0], argv[0],
@@ -1301,14 +1306,14 @@ int main(int argc, char ** argv)
} }
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
if (zlib_compressions != zlib_decompressions) if (MT_GET_AND_ADD(zlib_compressions, 0) != MT_GET_AND_ADD(zlib_decompressions, 0))
{ {
logger(1, logger(1,
"%s: %s (%llu != %llu)", "%s: %s (%llu != %llu)",
argv[0], argv[0],
"ZLib compression / decompression inconsistency detected.", "ZLib compression / decompression inconsistency detected.",
(unsigned long long int)zlib_compressions, (unsigned long long int)MT_GET_AND_ADD(zlib_compressions, 0),
(unsigned long long int)zlib_decompressions); (unsigned long long int)MT_GET_AND_ADD(zlib_decompressions, 0));
return 1; return 1;
} }
if (nDPId_return.current_compression_diff != 0) if (nDPId_return.current_compression_diff != 0)
@@ -1320,23 +1325,23 @@ int main(int argc, char ** argv)
nDPId_return.current_compression_diff); nDPId_return.current_compression_diff);
return 1; return 1;
} }
if (nDPId_return.total_compressions != zlib_compressions) if (nDPId_return.total_compressions != MT_GET_AND_ADD(zlib_compressions, 0))
{ {
logger(1, logger(1,
"%s: %s (%llu != %llu)", "%s: %s (%llu != %llu)",
argv[0], argv[0],
"ZLib global<->workflow compression / decompression inconsistency detected.", "ZLib global<->workflow compression / decompression inconsistency detected.",
(unsigned long long int)zlib_compressions, (unsigned long long int)MT_GET_AND_ADD(zlib_compressions, 0),
nDPId_return.current_compression_diff); nDPId_return.current_compression_diff);
return 1; return 1;
} }
if (nDPId_return.total_compression_diff != zlib_compression_bytes) if (nDPId_return.total_compression_diff != MT_GET_AND_ADD(zlib_compression_bytes, 0))
{ {
logger(1, logger(1,
"%s: %s (%llu bytes != %llu bytes)", "%s: %s (%llu bytes != %llu bytes)",
argv[0], argv[0],
"ZLib global<->workflow compression / decompression inconsistency detected.", "ZLib global<->workflow compression / decompression inconsistency detected.",
(unsigned long long int)zlib_compression_bytes, (unsigned long long int)MT_GET_AND_ADD(zlib_compression_bytes, 0),
nDPId_return.total_compression_diff); nDPId_return.total_compression_diff);
return 1; return 1;
} }

156
nDPId.c
View File

@@ -53,10 +53,6 @@
#error "nDPI >= 4.4.0 or API version >= 6336 required" #error "nDPI >= 4.4.0 or API version >= 6336 required"
#endif #endif
#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8)
#error "Compare and Swap aka __sync_fetch_and_add not available on your platform!"
#endif
#if nDPId_MAX_READER_THREADS <= 0 #if nDPId_MAX_READER_THREADS <= 0
#error "Invalid value for nDPId_MAX_READER_THREADS" #error "Invalid value for nDPId_MAX_READER_THREADS"
#endif #endif
@@ -66,6 +62,56 @@
#error "Invalid value for nDPId_FLOW_SCAN_INTERVAL" #error "Invalid value for nDPId_FLOW_SCAN_INTERVAL"
#endif #endif
/* MIPS* does not support Compare and Swap. Use traditional locking as fallback. */
#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8)
#define MT_VALUE(name, type) \
struct name \
{ \
volatile uint64_t var; \
pthread_mutex_t var_mutex; \
} name
#define MT_INIT(value) \
{ \
value, PTHREAD_MUTEX_INITIALIZER \
}
#define MT_INIT2(name, value) \
do \
{ \
name.var = value; \
pthread_mutex_init(&name.var_mutex, NULL); \
} while (0)
static inline uint64_t mt_pt_get_and_add(volatile uint64_t * value, uint64_t add, pthread_mutex_t * mutex)
{
uint64_t result;
pthread_mutex_lock(mutex);
result = *value;
*value += add;
pthread_mutex_unlock(mutex);
return result;
}
#define MT_GET_AND_ADD(name, value) mt_pt_get_and_add(&name.var, value, &name.var_mutex)
static inline uint64_t mt_pt_get_and_sub(volatile uint64_t * value, uint64_t sub, pthread_mutex_t * mutex)
{
uint64_t result;
pthread_mutex_lock(mutex);
result = *value;
*value -= sub;
pthread_mutex_unlock(mutex);
return result;
}
#define MT_GET_AND_SUB(name, value) mt_pt_get_and_sub(&name.var, value, &name.var_mutex)
#else
#define MT_VALUE(name, type) volatile type name
#define MT_INIT(value) value
#define MT_INIT2(name, value) \
do \
{ \
name = value; \
} while (0)
#define MT_GET_AND_ADD(name, value) __sync_fetch_and_add(&name, value)
#define MT_GET_AND_SUB(name, value) __sync_fetch_and_sub(&name, value)
#endif
enum nDPId_l3_type enum nDPId_l3_type
{ {
L3_IP, L3_IP,
@@ -200,7 +246,7 @@ struct nDPId_workflow
{ {
pcap_t * pcap_handle; pcap_t * pcap_handle;
uint8_t error_or_eof; MT_VALUE(error_or_eof, uint8_t);
uint8_t is_pcap_file; uint8_t is_pcap_file;
uint8_t max_flow_to_track_reached : 1; uint8_t max_flow_to_track_reached : 1;
@@ -375,20 +421,20 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = {
static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {};
static struct nDPIsrvd_address collector_address; static struct nDPIsrvd_address collector_address;
static volatile int nDPId_main_thread_shutdown = 0; static MT_VALUE(nDPId_main_thread_shutdown, int) = MT_INIT(0);
static volatile uint64_t global_flow_id = 1; static MT_VALUE(global_flow_id, uint64_t) = MT_INIT(1);
static int ip4_interface_avail = 0, ip6_interface_avail = 0; static int ip4_interface_avail = 0, ip6_interface_avail = 0;
#ifdef ENABLE_MEMORY_PROFILING #ifdef ENABLE_MEMORY_PROFILING
static volatile uint64_t ndpi_memory_alloc_count = 0; static MT_VALUE(ndpi_memory_alloc_count, uint64_t) = MT_INIT(0);
static volatile uint64_t ndpi_memory_alloc_bytes = 0; static MT_VALUE(ndpi_memory_alloc_bytes, uint64_t) = MT_INIT(0);
static volatile uint64_t ndpi_memory_free_count = 0; static MT_VALUE(ndpi_memory_free_count, uint64_t) = MT_INIT(0);
static volatile uint64_t ndpi_memory_free_bytes = 0; static MT_VALUE(ndpi_memory_free_bytes, uint64_t) = MT_INIT(0);
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
static volatile uint64_t zlib_compressions = 0; static MT_VALUE(zlib_compressions, uint64_t) = MT_INIT(0);
static volatile uint64_t zlib_decompressions = 0; static MT_VALUE(zlib_decompressions, uint64_t) = MT_INIT(0);
static volatile uint64_t zlib_compression_diff = 0; static MT_VALUE(zlib_compression_diff, uint64_t) = MT_INIT(0);
static volatile uint64_t zlib_compression_bytes = 0; static MT_VALUE(zlib_compression_bytes, uint64_t) = MT_INIT(0);
#endif #endif
#endif #endif
@@ -595,9 +641,9 @@ static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstL
{ {
ret = strm.total_out; ret = strm.total_out;
#ifdef ENABLE_MEMORY_PROFILING #ifdef ENABLE_MEMORY_PROFILING
__sync_fetch_and_add(&zlib_compressions, 1); MT_GET_AND_ADD(zlib_compressions, 1);
__sync_fetch_and_add(&zlib_compression_diff, srcLen - ret); MT_GET_AND_ADD(zlib_compression_diff, srcLen - ret);
__sync_fetch_and_add(&zlib_compression_bytes, ret); MT_GET_AND_ADD(zlib_compression_bytes, ret);
#endif #endif
} }
else else
@@ -639,8 +685,8 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
{ {
ret = strm.total_out; ret = strm.total_out;
#ifdef ENABLE_MEMORY_PROFILING #ifdef ENABLE_MEMORY_PROFILING
__sync_fetch_and_add(&zlib_decompressions, 1); MT_GET_AND_ADD(zlib_decompressions, 1);
__sync_fetch_and_sub(&zlib_compression_diff, ret - srcLen); MT_GET_AND_SUB(zlib_compression_diff, ret - srcLen);
#endif #endif
} }
else else
@@ -1070,8 +1116,8 @@ static void * ndpi_malloc_wrapper(size_t const size)
} }
*(uint64_t *)p = size; *(uint64_t *)p = size;
__sync_fetch_and_add(&ndpi_memory_alloc_count, 1); MT_GET_AND_ADD(ndpi_memory_alloc_count, 1);
__sync_fetch_and_add(&ndpi_memory_alloc_bytes, size); MT_GET_AND_ADD(ndpi_memory_alloc_bytes, size);
return (uint8_t *)p + sizeof(uint64_t); return (uint8_t *)p + sizeof(uint64_t);
} }
@@ -1080,8 +1126,8 @@ static void ndpi_free_wrapper(void * const freeable)
{ {
void * p = (uint8_t *)freeable - sizeof(uint64_t); void * p = (uint8_t *)freeable - sizeof(uint64_t);
__sync_fetch_and_add(&ndpi_memory_free_count, 1); MT_GET_AND_ADD(ndpi_memory_free_count, 1);
__sync_fetch_and_add(&ndpi_memory_free_bytes, *(uint64_t *)p); MT_GET_AND_ADD(ndpi_memory_free_bytes, *(uint64_t *)p);
free(p); free(p);
} }
@@ -1090,10 +1136,10 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr
{ {
if (reader_thread->array_index == 0) if (reader_thread->array_index == 0)
{ {
uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0); uint64_t alloc_count = MT_GET_AND_ADD(ndpi_memory_alloc_count, 0);
uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0); uint64_t free_count = MT_GET_AND_ADD(ndpi_memory_free_count, 0);
uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0); uint64_t alloc_bytes = MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0);
uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0); uint64_t free_bytes = MT_GET_AND_ADD(ndpi_memory_free_bytes, 0);
logger(0, logger(0,
"MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
@@ -1106,10 +1152,10 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr
(long long unsigned int)(alloc_count - free_count), (long long unsigned int)(alloc_count - free_count),
(long long unsigned int)(alloc_bytes - free_bytes)); (long long unsigned int)(alloc_bytes - free_bytes));
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0); uint64_t zlib_compression_count = MT_GET_AND_ADD(zlib_compressions, 0);
uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0); uint64_t zlib_decompression_count = MT_GET_AND_ADD(zlib_decompressions, 0);
uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0); uint64_t zlib_bytes_diff = MT_GET_AND_ADD(zlib_compression_diff, 0);
uint64_t zlib_bytes_total = __sync_fetch_and_add(&zlib_compression_bytes, 0); uint64_t zlib_bytes_total = MT_GET_AND_ADD(zlib_compression_bytes, 0);
logger(0, logger(0,
"MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu " "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu "
@@ -1142,6 +1188,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
return NULL; return NULL;
} }
MT_INIT2(workflow->error_or_eof, 0);
errno = 0; errno = 0;
if (access(file_or_device, R_OK) != 0 && errno == ENOENT) if (access(file_or_device, R_OK) != 0 && errno == ENOENT)
{ {
@@ -2999,7 +3047,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 1; return 1;
} }
struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset];
*ip_offset = sizeof(struct ndpi_chdlc); *ip_offset = sizeof(struct ndpi_chdlc);
*layer3_type = ntohs(chdlc->proto_code); *layer3_type = ntohs(chdlc->proto_code);
break; break;
@@ -3021,7 +3069,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
if (packet[0] == 0x0f || packet[0] == 0x8f) if (packet[0] == 0x0f || packet[0] == 0x8f)
{ {
struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset];
*ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */
*layer3_type = ntohs(chdlc->proto_code); *layer3_type = ntohs(chdlc->proto_code);
} }
@@ -3059,7 +3107,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
} }
struct ndpi_radiotap_header const * const radiotap = struct ndpi_radiotap_header const * const radiotap =
(struct ndpi_radiotap_header const * const)&packet[eth_offset]; (struct ndpi_radiotap_header const * const) & packet[eth_offset];
uint16_t radio_len = radiotap->len; uint16_t radio_len = radiotap->len;
/* Check Bad FCS presence */ /* Check Bad FCS presence */
@@ -3808,7 +3856,7 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->flow_allocation_already_failed = 0; workflow->flow_allocation_already_failed = 0;
workflow->total_active_flows++; workflow->total_active_flows++;
flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1); flow_to_process->flow_extended.flow_id = MT_GET_AND_ADD(global_flow_id, 1);
if (alloc_detection_data(flow_to_process) != 0) if (alloc_detection_data(flow_to_process) != 0)
{ {
@@ -4120,10 +4168,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
{ {
case PCAP_ERROR: case PCAP_ERROR:
logger(1, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); logger(1, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
case PCAP_ERROR_BREAK: case PCAP_ERROR_BREAK:
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
default: default:
return; return;
@@ -4136,7 +4184,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0)
{ {
logger(1, "pthread_sigmask: %s", strerror(errno)); logger(1, "pthread_sigmask: %s", strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
} }
@@ -4147,7 +4195,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (signal_fd < 0) if (signal_fd < 0)
{ {
logger(1, "signalfd: %s", strerror(errno)); logger(1, "signalfd: %s", strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
} }
@@ -4155,7 +4203,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (pcap_fd < 0) if (pcap_fd < 0)
{ {
logger(1, "%s", "Got an invalid PCAP fd"); logger(1, "%s", "Got an invalid PCAP fd");
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
} }
@@ -4163,7 +4211,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (epoll_fd < 0) if (epoll_fd < 0)
{ {
logger(1, "Got an invalid epoll fd: %s", strerror(errno)); logger(1, "Got an invalid epoll fd: %s", strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
} }
@@ -4174,14 +4222,14 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0) if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0)
{ {
logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno)); logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
} }
event.data.fd = signal_fd; event.data.fd = signal_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0) if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0)
{ {
logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno)); logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
} }
@@ -4190,7 +4238,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
int const timeout_ms = 1000; /* TODO: Configurable? */ int const timeout_ms = 1000; /* TODO: Configurable? */
int nready; int nready;
struct timeval tval_before_epoll, tval_after_epoll; struct timeval tval_before_epoll, tval_after_epoll;
while (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
{ {
get_current_time(&tval_before_epoll); get_current_time(&tval_before_epoll);
errno = 0; errno = 0;
@@ -4202,7 +4250,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
continue; continue;
} }
logger(1, "Epoll returned error: %s", strerror(errno)); logger(1, "Epoll returned error: %s", strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
break; break;
} }
@@ -4224,7 +4272,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if ((events[i].events & EPOLLERR) != 0) if ((events[i].events & EPOLLERR) != 0)
{ {
logger(1, "%s", "Epoll error event"); logger(1, "%s", "Epoll error event");
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
} }
if (events[i].data.fd == signal_fd) if (events[i].data.fd == signal_fd)
@@ -4267,10 +4315,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
logger(1, logger(1,
"Error while reading from pcap device: '%s'", "Error while reading from pcap device: '%s'",
pcap_geterr(reader_thread->workflow->pcap_handle)); pcap_geterr(reader_thread->workflow->pcap_handle));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
break; break;
case PCAP_ERROR_BREAK: case PCAP_ERROR_BREAK:
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return; return;
default: default:
break; break;
@@ -4316,7 +4364,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
run_pcap_loop(reader_thread); run_pcap_loop(reader_thread);
set_collector_block(reader_thread); set_collector_block(reader_thread);
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return NULL; return NULL;
} }
@@ -4324,7 +4372,7 @@ static int processing_threads_error_or_eof(void)
{ {
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{ {
if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0) if (MT_GET_AND_ADD(reader_threads[i].workflow->error_or_eof, 0) == 0)
{ {
return 0; return 0;
} }
@@ -4557,9 +4605,9 @@ static void sighandler(int signum)
{ {
(void)signum; (void)signum;
if (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0) if (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0)
{ {
__sync_fetch_and_add(&nDPId_main_thread_shutdown, 1); MT_GET_AND_ADD(nDPId_main_thread_shutdown, 1);
} }
} }
@@ -5096,7 +5144,7 @@ int main(int argc, char ** argv)
signal(SIGTERM, sighandler); signal(SIGTERM, sighandler);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
while (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
{ {
sleep(1); sleep(1);
} }