Use blocking I/O to prevent data loss if nDPIsrvd too slow.

* Fixed MemoryProfiler stack overflow.

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-11-16 15:53:29 +01:00
parent d389f04135
commit 25b974af67
2 changed files with 44 additions and 17 deletions

View File

@@ -232,8 +232,7 @@ find_package(PCAP "1.8.1" REQUIRED)
target_compile_options(nDPId PRIVATE "-pthread")
target_compile_definitions(nDPId PRIVATE -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS} ${ZLIB_DEFS})
target_include_directories(nDPId PRIVATE
"${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi"
"${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
"${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi")
target_link_libraries(nDPId "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}"
"${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}" "${LIBM_LIB}"

58
nDPId.c
View File

@@ -39,6 +39,10 @@
#error "Compare and Swap aka __sync_fetch_and_add not available on your platform!"
#endif
#if nDPId_MAX_READER_THREADS < 0
#error "Invalid value for nDPId_MAX_READER_THREADS"
#endif
enum nDPId_l3_type
{
L3_IP,
@@ -1039,6 +1043,7 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
{
case FT_UNKNOWN:
case FT_SKIPPED:
written = 0;
break;
case FT_FINISHED:
@@ -1046,11 +1051,14 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
struct nDPId_flow_finished const * const flow_finished = (struct nDPId_flow_finished const *)flow_basic;
#if 1
written = snprintf(output + output_used, BUFSIZ, "%u,", flow_finished->flow_info.flow_extended.flow_id);
written = snprintf(output + output_used,
BUFSIZ - output_used,
"%u,",
flow_finished->flow_info.flow_extended.flow_id);
#else
written =
snprintf(output + output_used,
BUFSIZ,
BUFSIZ - output_used,
"[%u, %u, %llu],",
flow_finished->flow_info.flow_extended.flow_id,
flow_finished->flow_info.flow_extended.flow_basic.l4_protocol,
@@ -1064,10 +1072,10 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic;
#if 1
written = snprintf(output + output_used, BUFSIZ, "%u,", flow_info->flow_extended.flow_id);
written = snprintf(output + output_used, BUFSIZ - output_used, "%u,", flow_info->flow_extended.flow_id);
#else
written = snprintf(output + output_used,
BUFSIZ,
BUFSIZ - output_used,
"[%u, %u, %llu],",
flow_info->flow_extended.flow_id,
flow_info->flow_extended.flow_basic.l4_protocol,
@@ -1081,7 +1089,14 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
{
break;
}
output_used += written;
else
{
output_used += written;
if (output_used >= BUFSIZ)
{
break;
}
}
}
return (output_used > 0 ? output_used - 1 : 0);
@@ -1899,14 +1914,11 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
}
errno = 0;
if (reader_thread->json_sock_reconnect == 0 && write(reader_thread->json_sockfd, newline_json_str, s_ret) != s_ret)
ssize_t written;
if (reader_thread->json_sock_reconnect == 0 &&
(written = write(reader_thread->json_sockfd, newline_json_str, s_ret)) != s_ret)
{
saved_errno = errno;
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] Send data to nDPIsrvd Collector failed: %s",
workflow->packets_captured,
reader_thread->array_index,
strerror(saved_errno));
if (saved_errno == EPIPE)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -1920,10 +1932,26 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
}
else
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] Possible data loss detected",
workflow->packets_captured,
reader_thread->array_index);
fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
off_t pos = (written < 0 ? 0 : written);
while ((written = write(reader_thread->json_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos)
{
if (written < 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %d] Send data (blocking I/O) to nDPIsrvd Collector failed: %s",
workflow->packets_captured,
reader_thread->array_index,
strerror(saved_errno));
reader_thread->json_sock_reconnect = 1;
break;
}
else
{
pos += written;
}
}
fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & O_NONBLOCK);
}
}
}