Event I/O abstraction layer. (#28)

* Finalize Event I/O abstraction layer.
* Fix possible fd leakage, Gitlab-CI build and error logging.
* Fixed possible uninitialized signalfd variable.
* Fixed possible memory leak.
* Fixed some SonarCloud complaints.
* Fixed nDPId-test nDPIsrvd-arpa-mockup stuck indefinitely.
* Add nDPId / nDPIsrvd command line option to use poll() on Linux instead of the default epoll().

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni
2023-11-06 12:38:15 +01:00
committed by GitHub
parent 17c21e1d27
commit 1b67927169
8 changed files with 449 additions and 342 deletions

View File

@@ -147,7 +147,7 @@ jobs:
- name: Build single nDPId executable (invoke CC directly) - name: Build single nDPId executable (invoke CC directly)
if: startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') if: startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
run: | run: |
cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c nio.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
- name: Test EXEC - name: Test EXEC
run: | run: |
./build/nDPId-test ./build/nDPId-test

View File

@@ -67,7 +67,7 @@ build_and_test_static_libndpi:
- > - >
if ldd ./build-cmake-submodule/nDPId | grep -qoEi libndpi; then \ if ldd ./build-cmake-submodule/nDPId | grep -qoEi libndpi; then \
echo 'nDPId linked against a static libnDPI should not contain a shared linked libnDPI.' >&2; false; fi echo 'nDPId linked against a static libnDPI should not contain a shared linked libnDPI.' >&2; false; fi
- cc nDPId.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz - cc nDPId.c nio.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz
artifacts: artifacts:
expire_in: 1 week expire_in: 1 week
paths: paths:

View File

@@ -38,7 +38,10 @@ if(HAS_EPOLL)
option(FORCE_POLL "Force the use of poll() instead of epoll()." OFF) option(FORCE_POLL "Force the use of poll() instead of epoll()." OFF)
if(NOT FORCE_POLL) if(NOT FORCE_POLL)
set(EPOLL_DEFS "-DENABLE_EPOLL=1") set(EPOLL_DEFS "-DENABLE_EPOLL=1")
set(EPOLL_SRCS "nio.c") endif()
else()
if(BUILD_EXAMPLES)
message(FATAL_ERROR "Examples are using epoll event I/O. Without epoll available, you can not build/run those.")
endif() endif()
endif() endif()
@@ -100,8 +103,8 @@ else()
unset(NDPI_WITH_MAXMINDDB CACHE) unset(NDPI_WITH_MAXMINDDB CACHE)
endif() endif()
add_executable(nDPId nDPId.c ${EPOLL_SRCS} utils.c) add_executable(nDPId nDPId.c nio.c utils.c)
add_executable(nDPIsrvd nDPIsrvd.c ${EPOLL_SRCS} utils.c) add_executable(nDPIsrvd nDPIsrvd.c nio.c utils.c)
add_executable(nDPId-test nDPId-test.c) add_executable(nDPId-test nDPId-test.c)
add_custom_target(dist) add_custom_target(dist)

View File

@@ -292,14 +292,12 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
{ {
int nDPIsrvd_distributor_disconnects = 0; int nDPIsrvd_distributor_disconnects = 0;
int const nDPIsrvd_distributor_expected_disconnects = 5; int const nDPIsrvd_distributor_expected_disconnects = 5;
int epollfd; struct nio io;
struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_test_desc = NULL; struct remote_desc * mock_test_desc = NULL;
struct remote_desc * mock_buff_desc = NULL; struct remote_desc * mock_buff_desc = NULL;
struct remote_desc * mock_null_desc = NULL; struct remote_desc * mock_null_desc = NULL;
struct remote_desc * mock_arpa_desc = NULL; struct remote_desc * mock_arpa_desc = NULL;
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
logger(0, "nDPIsrvd thread started, init.."); logger(0, "nDPIsrvd thread started, init..");
@@ -308,11 +306,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
logger(1, "nDPIsrvd block signals failed: %s", strerror(errno)); logger(1, "nDPIsrvd block signals failed: %s", strerror(errno));
} }
errno = 0; nio_init(&io);
epollfd = create_evq();
if (epollfd < 0) #ifdef ENABLE_EPOLL
if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
#else
if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
#endif
{ {
logger(1, "nDPIsrvd epollfd invalid: %d", epollfd); logger(1, "%s", "Error creating nDPIsrvd poll/epoll event I/O");
THREAD_ERROR_GOTO(arg); THREAD_ERROR_GOTO(arg);
} }
@@ -356,9 +358,9 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
mock_arpa_desc->event_distributor_in.peer.sin_port = 0; mock_arpa_desc->event_distributor_in.peer.sin_port = 0;
errno = 0; errno = 0;
if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 || if (add_in_event(&io, mock_json_desc) != 0 || add_in_event(&io, mock_test_desc) != 0 ||
add_in_event(epollfd, mock_buff_desc) != 0 || add_in_event(epollfd, mock_null_desc) != 0 || add_in_event(&io, mock_buff_desc) != 0 || add_in_event(&io, mock_null_desc) != 0 ||
add_in_event(epollfd, mock_arpa_desc) != 0) add_in_event(&io, mock_arpa_desc) != 0)
{ {
logger(1, "%s", "nDPIsrvd add input event failed"); logger(1, "%s", "nDPIsrvd add input event failed");
THREAD_ERROR_GOTO(arg); THREAD_ERROR_GOTO(arg);
@@ -370,28 +372,25 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects) while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects)
{ {
errno = 0; if (nio_run(&io, -1) != NIO_SUCCESS)
int nready = epoll_wait(epollfd, events, events_size, -1);
if (nready < 0 && errno != EINTR)
{ {
logger(1, "%s", "nDPIsrvd epoll wait failed."); logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno));
THREAD_ERROR_GOTO(arg); THREAD_ERROR_GOTO(arg);
} }
else if (errno == EINTR)
{ int nready = nio_get_nready(&io);
continue;
}
for (int i = 0; i < nready; i++) for (int i = 0; i < nready; i++)
{ {
if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc || struct remote_desc * remote = (struct remote_desc *)nio_get_ptr(&io, i);
events[i].data.ptr == mock_buff_desc || events[i].data.ptr == mock_null_desc ||
events[i].data.ptr == mock_arpa_desc) if (remote == mock_json_desc || remote == mock_test_desc ||
remote == mock_buff_desc || remote == mock_null_desc ||
remote == mock_arpa_desc)
{ {
if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0) if (nio_has_error(&io, i) == NIO_SUCCESS)
{ {
char const * remote_desc_name; char const * remote_desc_name;
struct remote_desc * remote = (struct remote_desc *)events[i].data.ptr;
if (remote == mock_json_desc) if (remote == mock_json_desc)
{ {
remote_desc_name = "Mock JSON"; remote_desc_name = "Mock JSON";
@@ -405,7 +404,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
drain_write_buffers_blocking(mock_null_desc); drain_write_buffers_blocking(mock_null_desc);
if (mock_arpa_desc->fd >= 0) if (mock_arpa_desc->fd >= 0)
drain_write_buffers_blocking(mock_arpa_desc); drain_write_buffers_blocking(mock_arpa_desc);
} while (handle_data_event(epollfd, &events[i]) == 0); } while (handle_data_event(&io, i) == 0);
} }
else if (remote == mock_test_desc) else if (remote == mock_test_desc)
{ {
@@ -433,16 +432,22 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
remote_desc_name, remote_desc_name,
nDPIsrvd_distributor_disconnects, nDPIsrvd_distributor_disconnects,
nDPIsrvd_distributor_expected_disconnects); nDPIsrvd_distributor_expected_disconnects);
free_remote(epollfd, remote); free_remote(&io, remote);
} }
else else
{ {
if (handle_data_event(epollfd, &events[i]) != 0) if (handle_data_event(&io, i) != 0)
{ {
if (mock_arpa_desc == events[i].data.ptr) if (mock_arpa_desc == remote)
{ {
// arpa mock does not care about shutdown events // arpa mock does not care about shutdown events
disconnect_client(epollfd, mock_arpa_desc); free_remote(&io, mock_arpa_desc);
nDPIsrvd_distributor_disconnects++;
logger(1,
"nDPIsrvd distributor '%s' connection closed (%d/%d)",
"Mock ARPA",
nDPIsrvd_distributor_disconnects,
nDPIsrvd_distributor_expected_disconnects);
continue; continue;
} }
logger(1, "%s", "nDPIsrvd data event handler failed"); logger(1, "%s", "nDPIsrvd data event handler failed");
@@ -453,17 +458,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
else else
{ {
logger(1, logger(1,
"nDPIsrvd epoll returned unexpected event data: %d (%p)", "nDPIsrvd epoll returned unexpected event data: %p", remote);
events[i].data.fd,
events[i].data.ptr);
THREAD_ERROR_GOTO(arg); THREAD_ERROR_GOTO(arg);
} }
} }
} }
error: error:
free_remotes(epollfd); free_remotes(&io);
close(epollfd); nio_free(&io);
logger(0, "%s", "nDPIsrvd worker thread exits.."); logger(0, "%s", "nDPIsrvd worker thread exits..");
return NULL; return NULL;
@@ -919,10 +922,8 @@ static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_so
static void * distributor_client_mainloop_thread(void * const arg) static void * distributor_client_mainloop_thread(void * const arg)
{ {
int dis_epollfd = create_evq(); struct nio io;
int signalfd = setup_signalfd(dis_epollfd); int signalfd = -1;
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
struct distributor_return_value * const drv = (struct distributor_return_value *)arg; struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
struct thread_return_value * const trv = &drv->thread_return_value; struct thread_return_value * const trv = &drv->thread_return_value;
struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data), struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data),
@@ -947,9 +948,29 @@ static void * distributor_client_mainloop_thread(void * const arg)
logger(0, "Distributor thread started, init.."); logger(0, "Distributor thread started, init..");
nio_init(&io);
#ifdef ENABLE_EPOLL
if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
#else
if (nio_use_poll(&io, 32) != NIO_SUCCESS)
#endif
{
logger(1, "%s", "Error creating Distributor poll/epoll event I/O");
THREAD_ERROR_GOTO(trv);
}
signalfd = setup_signalfd(&io);
if (signalfd < 0)
{
logger(1, "Distributor signal fd setup failed: %s", strerror(errno));
THREAD_ERROR_GOTO(trv);
}
if (thread_block_signals() != 0) if (thread_block_signals() != 0)
{ {
logger(1, "Distributor block signals failed: %s", strerror(errno)); logger(1, "Distributor block signals failed: %s", strerror(errno));
THREAD_ERROR_GOTO(trv);
} }
errno = 0; errno = 0;
@@ -962,31 +983,26 @@ static void * distributor_client_mainloop_thread(void * const arg)
mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ]; mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ];
mock_null->fd = mock_nullfds[PIPE_NULL_READ]; mock_null->fd = mock_nullfds[PIPE_NULL_READ];
if (dis_epollfd < 0 || signalfd < 0) errno = 0;
if (add_in_event_fd(&io, mock_testfds[PIPE_TEST_READ]) != 0)
{ {
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
errno = 0; errno = 0;
if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0) if (add_in_event_fd(&io, mock_bufffds[PIPE_BUFFER_READ]) != 0)
{ {
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
errno = 0; errno = 0;
if (add_in_event_fd(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]) != 0) if (add_in_event_fd(&io, mock_nullfds[PIPE_NULL_READ]) != 0)
{ {
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
errno = 0; errno = 0;
if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0) if (add_in_event_fd(&io, mock_arpafds[PIPE_ARPA_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0)
{ {
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
@@ -1006,31 +1022,30 @@ static void * distributor_client_mainloop_thread(void * const arg)
while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0) while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0)
{ {
int nready = epoll_wait(dis_epollfd, events, events_size, -1); if (nio_run(&io, -1) != NIO_SUCCESS)
if (nready < 0 && errno != EINTR)
{ {
logger(1, "%s", "Distributor epoll wait failed."); logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno));
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
else if (nready < 0 && errno == EINTR)
{ int nready = nio_get_nready(&io);
continue;
}
for (int i = 0; i < nready; i++) for (int i = 0; i < nready; i++)
{ {
if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0) int fd = nio_get_fd(&io, i);
if (nio_has_input(&io, i) != NIO_SUCCESS && nio_has_error(&io, i) != NIO_SUCCESS)
{ {
logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP)); logger(1, "Invalid epoll event received for fd: %d", fd);
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) if (nio_has_error(&io, i) == NIO_SUCCESS)
{ {
logger(1, "Distributor disconnected: %d", events[i].data.fd); logger(1, "Distributor disconnected: %d", fd);
del_event(dis_epollfd, events[i].data.fd); del_event(&io, fd);
} }
if (events[i].data.fd == mock_testfds[PIPE_TEST_READ]) if (fd == mock_testfds[PIPE_TEST_READ])
{ {
switch (nDPIsrvd_read(mock_sock)) switch (nDPIsrvd_read(mock_sock))
{ {
@@ -1065,7 +1080,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
} }
else if (events[i].data.fd == mock_bufffds[PIPE_BUFFER_READ]) else if (fd == mock_bufffds[PIPE_BUFFER_READ])
{ {
switch (nDPIsrvd_read(mock_buff)) switch (nDPIsrvd_read(mock_buff))
{ {
@@ -1100,7 +1115,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
} }
else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ]) else if (fd == mock_nullfds[PIPE_NULL_READ])
{ {
switch (nDPIsrvd_read(mock_null)) switch (nDPIsrvd_read(mock_null))
{ {
@@ -1129,7 +1144,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
} }
else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ]) else if (fd == mock_arpafds[PIPE_ARPA_READ])
{ {
char buf[NETWORK_BUFFER_MAX_SIZE]; char buf[NETWORK_BUFFER_MAX_SIZE];
ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf)); ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf));
@@ -1144,7 +1159,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
* I am just here to trigger some IP code paths. * I am just here to trigger some IP code paths.
*/ */
} }
else if (events[i].data.fd == signalfd) else if (fd == signalfd)
{ {
struct signalfd_siginfo fdsi; struct signalfd_siginfo fdsi;
ssize_t s; ssize_t s;
@@ -1166,9 +1181,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
else else
{ {
logger(1, logger(1,
"Distributor epoll returned unexpected event data: %d (%p)", "Distributor epoll returned unexpected event data: %p", nio_get_ptr(&io, i));
events[i].data.fd,
events[i].data.ptr);
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
} }
} }
@@ -1238,17 +1251,17 @@ static void * distributor_client_mainloop_thread(void * const arg)
} }
error: error:
del_event(dis_epollfd, signalfd); del_event(&io, signalfd);
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); del_event(&io, mock_testfds[PIPE_TEST_READ]);
del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]); del_event(&io, mock_bufffds[PIPE_BUFFER_READ]);
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]); del_event(&io, mock_nullfds[PIPE_NULL_READ]);
del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); del_event(&io, mock_arpafds[PIPE_ARPA_READ]);
close(mock_testfds[PIPE_TEST_READ]); close(mock_testfds[PIPE_TEST_READ]);
close(mock_bufffds[PIPE_BUFFER_READ]); close(mock_bufffds[PIPE_BUFFER_READ]);
close(mock_nullfds[PIPE_NULL_READ]); close(mock_nullfds[PIPE_NULL_READ]);
close(mock_arpafds[PIPE_ARPA_READ]); close(mock_arpafds[PIPE_ARPA_READ]);
close(dis_epollfd);
close(signalfd); close(signalfd);
nio_free(&io);
nDPIsrvd_socket_free(&mock_sock); nDPIsrvd_socket_free(&mock_sock);
nDPIsrvd_socket_free(&mock_buff); nDPIsrvd_socket_free(&mock_buff);
@@ -1501,9 +1514,9 @@ static int nio_selftest()
#endif #endif
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (nio_use_epoll(&io, 5) != NIO_ERROR_SUCCESS) if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
#else #else
if (nio_use_poll(&io, 3) != NIO_ERROR_SUCCESS) if (nio_use_poll(&io, 32) != NIO_SUCCESS)
#endif #endif
{ {
logger(1, "%s", "Could not use poll/epoll for nio"); logger(1, "%s", "Could not use poll/epoll for nio");
@@ -1518,8 +1531,8 @@ static int nio_selftest()
goto error; goto error;
} }
if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_ERROR_SUCCESS || if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_SUCCESS ||
nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_ERROR_SUCCESS) nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
{ {
logger(1, "%s", "Could not add pipe fds to nio"); logger(1, "%s", "Could not add pipe fds to nio");
goto error; goto error;
@@ -1535,26 +1548,26 @@ static int nio_selftest()
size_t const wlen = strnlen(wbuf, sizeof(wbuf)); size_t const wlen = strnlen(wbuf, sizeof(wbuf));
write(pipefds[1], wbuf, wlen); write(pipefds[1], wbuf, wlen);
if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS) if (nio_run(&io, 1000) != NIO_SUCCESS)
{ {
logger(1, "%s", "Event notification failed"); logger(1, "%s", "Event notification failed");
goto error; goto error;
} }
if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS) if (nio_can_output(&io, 0) != NIO_SUCCESS)
{ {
logger(1, "%s", "Pipe fd (write) can not output"); logger(1, "%s", "Pipe fd (write) can not output");
goto error; goto error;
} }
if (nio_has_input(&io, 1) != NIO_ERROR_SUCCESS) if (nio_has_input(&io, 1) != NIO_SUCCESS)
{ {
logger(1, "%s", "Pipe fd (read) has no input"); logger(1, "%s", "Pipe fd (read) has no input");
goto error; goto error;
} }
if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) != NIO_ERROR_SUCCESS || if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) != NIO_SUCCESS ||
nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS) nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS)
{ {
logger(1, "%s", "Event validation failed"); logger(1, "%s", "Event validation failed");
goto error; goto error;
@@ -1567,31 +1580,38 @@ static int nio_selftest()
goto error; goto error;
} }
if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS) if (nio_run(&io, 1000) != NIO_SUCCESS)
{ {
logger(1, "%s", "Event notification failed"); logger(1, "%s", "Event notification failed");
goto error; goto error;
} }
if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS) if (nio_can_output(&io, 0) != NIO_SUCCESS)
{ {
logger(1, "%s", "Pipe fd (write) can not output"); logger(1, "%s", "Pipe fd (write) can not output");
goto error; goto error;
} }
if (nio_has_input(&io, 1) == NIO_ERROR_SUCCESS) if (nio_has_input(&io, 1) == NIO_SUCCESS)
{ {
logger(1, "%s", "Pipe fd (read) has input"); logger(1, "%s", "Pipe fd (read) has input");
goto error; goto error;
} }
if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) == NIO_ERROR_SUCCESS || if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) == NIO_SUCCESS ||
nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS) nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS)
{ {
logger(1, "%s", "Event validation failed"); logger(1, "%s", "Event validation failed");
goto error; goto error;
} }
if (nio_del_fd(&io, pipefds[0]) != NIO_SUCCESS
|| nio_del_fd(&io, pipefds[1]) != NIO_SUCCESS)
{
logger(1, "%s", "Event delete failed");
goto error;
}
nio_free(&io); nio_free(&io);
return 0; return 0;
error: error:

81
nDPId.c
View File

@@ -16,7 +16,6 @@
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/epoll.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/signalfd.h> #include <sys/signalfd.h>
#include <sys/un.h> #include <sys/un.h>
@@ -27,6 +26,7 @@
#include "config.h" #include "config.h"
#include "nDPIsrvd.h" #include "nDPIsrvd.h"
#include "nio.h"
#include "utils.h" #include "utils.h"
#ifndef UNIX_PATH_MAX #ifndef UNIX_PATH_MAX
@@ -467,6 +467,9 @@ static struct
uint8_t enable_zlib_compression; uint8_t enable_zlib_compression;
#endif #endif
uint8_t enable_data_analysis; uint8_t enable_data_analysis;
#ifdef ENABLE_EPOLL
uint8_t use_poll;
#endif
/* subopts */ /* subopts */
unsigned long long int max_flows_per_thread; unsigned long long int max_flows_per_thread;
unsigned long long int max_idle_flows_per_thread; unsigned long long int max_idle_flows_per_thread;
@@ -4405,53 +4408,57 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
return; return;
} }
int epoll_fd = epoll_create1(EPOLL_CLOEXEC); struct nio io;
if (epoll_fd < 0) nio_init(&io);
#ifdef ENABLE_EPOLL
if ((nDPId_options.use_poll == 0 && nio_use_epoll(&io, 32) != NIO_SUCCESS)
|| (nDPId_options.use_poll != 0 && nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS))
#else
if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
#endif
{ {
logger(1, "Got an invalid epoll fd: %s", strerror(errno)); logger(1, "%s", "Event I/O poll/epoll setup failed");
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
nio_free(&io);
return; return;
} }
struct epoll_event event = {}; errno = 0;
event.events = EPOLLIN; if (nio_add_fd(&io, pcap_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
event.data.fd = pcap_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0)
{ {
logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno)); logger(1,
"Could not add pcap fd to event queue: %s",
(errno != 0 ? strerror(errno) : "Internal Error"));
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
nio_free(&io);
return; return;
} }
event.data.fd = signal_fd; errno = 0;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0) if (nio_add_fd(&io, signal_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
{ {
logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno)); logger(1,
"Could not add signal fd to event queue: %s",
(errno != 0 ? strerror(errno) : "Internal Error"));
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
nio_free(&io);
return; return;
} }
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
int const timeout_ms = 1000; /* TODO: Configurable? */ int const timeout_ms = 1000; /* TODO: Configurable? */
int nready;
struct timeval tval_before_epoll, tval_after_epoll; struct timeval tval_before_epoll, tval_after_epoll;
while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
{ {
get_current_time(&tval_before_epoll); get_current_time(&tval_before_epoll);
errno = 0; errno = 0;
nready = epoll_wait(epoll_fd, events, events_size, timeout_ms); if (nio_run(&io, timeout_ms) != NIO_SUCCESS)
if (errno != 0)
{ {
if (errno == EINTR) logger(1, "Event I/O returned error: %s", strerror(errno));
{
continue;
}
logger(1, "Epoll returned error: %s", strerror(errno));
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
break; break;
} }
int nready = nio_get_nready(&io);
if (nready == 0) if (nready == 0)
{ {
struct timeval tval_diff; struct timeval tval_diff;
@@ -4467,13 +4474,15 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
for (int i = 0; i < nready; ++i) for (int i = 0; i < nready; ++i)
{ {
if ((events[i].events & EPOLLERR) != 0) if (nio_has_error(&io, i) == NIO_SUCCESS)
{ {
logger(1, "%s", "Epoll error event"); logger(1, "%s", "Event I/O error");
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
} }
if (events[i].data.fd == signal_fd) int fd = nio_get_fd(&io, i);
if (fd == signal_fd)
{ {
struct signalfd_siginfo fdsi; struct signalfd_siginfo fdsi;
if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi)) if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi))
@@ -4504,7 +4513,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame); logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame);
} }
} }
else if (events[i].data.fd == pcap_fd) else if (fd == pcap_fd)
{ {
switch (pcap_dispatch( switch (pcap_dispatch(
reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread))
@@ -4517,6 +4526,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
break; break;
case PCAP_ERROR_BREAK: case PCAP_ERROR_BREAK:
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
nio_free(&io);
return; return;
default: default:
break; break;
@@ -4524,10 +4534,12 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
} }
else else
{ {
logger(1, "Unknown event data 0x%llx returned", (unsigned long long int)events[i].data.u64); logger(1, "Unknown event descriptor or data returned: %p", nio_get_ptr(&io, i));
} }
} }
} }
nio_free(&io);
} }
} }
} }
@@ -4900,7 +4912,7 @@ static void print_usage(char const * const arg0)
"Usage: %s " "Usage: %s "
"[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n" "[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n"
"\t \t" "\t \t"
"[-l] [-L logfile] [-c address] " "[-l] [-L logfile] [-c address] [-e]"
"[-d] [-p pidfile]\n" "[-d] [-p pidfile]\n"
"\t \t" "\t \t"
"[-u user] [-g group] " "[-u user] [-g group] "
@@ -4921,6 +4933,8 @@ static void print_usage(char const * const arg0)
"\t-L\tLog all messages to a log file.\n" "\t-L\tLog all messages to a log file.\n"
"\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n" "\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n"
"\t \tDefault: %s\n" "\t \tDefault: %s\n"
"\t-e\tUse poll() instead of epoll().\n"
"\t \tDefault: epoll() on Linux, poll() otherwise\n"
"\t-d\tFork into background after initialization.\n" "\t-d\tFork into background after initialization.\n"
"\t-p\tWrite the daemon PID to the given file path.\n" "\t-p\tWrite the daemon PID to the given file path.\n"
"\t \tDefault: %s\n" "\t \tDefault: %s\n"
@@ -4982,7 +4996,7 @@ static int nDPId_parse_options(int argc, char ** argv)
{ {
int opt; int opt;
while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:Azo:vh")) != -1) while ((opt = getopt(argc, argv, "i:IEB:lL:c:edp:u:g:P:C:J:S:a:Azo:vh")) != -1)
{ {
switch (opt) switch (opt)
{ {
@@ -5010,6 +5024,13 @@ static int nDPId_parse_options(int argc, char ** argv)
case 'c': case 'c':
set_cmdarg(&nDPId_options.collector_address, optarg); set_cmdarg(&nDPId_options.collector_address, optarg);
break; break;
case 'e':
#ifdef ENABLE_EPOLL
nDPId_options.use_poll = 1;
#else
logger_early(1, "%s", "nDPId was built w/o epoll() support, poll() is already the default");
#endif
break;
case 'd': case 'd':
daemonize_enable(); daemonize_enable();
break; break;

View File

@@ -9,7 +9,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h> #include <stdint.h>
#include <string.h> #include <string.h>
#include <sys/epoll.h>
#include <sys/signalfd.h> #include <sys/signalfd.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
@@ -17,6 +16,7 @@
#include "config.h" #include "config.h"
#include "nDPIsrvd.h" #include "nDPIsrvd.h"
#include "nio.h"
#include "utils.h" #include "utils.h"
enum sock_type enum sock_type
@@ -92,7 +92,10 @@ static struct
struct cmdarg group; struct cmdarg group;
nDPIsrvd_ull max_remote_descriptors; nDPIsrvd_ull max_remote_descriptors;
nDPIsrvd_ull max_write_buffers; nDPIsrvd_ull max_write_buffers;
int bufferbloat_fallback_to_blocking; uint8_t bufferbloat_fallback_to_blocking;
#ifdef ENABLE_EPOLL
uint8_t use_poll;
#endif
} nDPIsrvd_options = {.pidfile = CMDARG(nDPIsrvd_PIDFILE), } nDPIsrvd_options = {.pidfile = CMDARG(nDPIsrvd_PIDFILE),
.collector_un_sockpath = CMDARG(COLLECTOR_UNIX_SOCKET), .collector_un_sockpath = CMDARG(COLLECTOR_UNIX_SOCKET),
.distributor_un_sockpath = CMDARG(DISTRIBUTOR_UNIX_SOCKET), .distributor_un_sockpath = CMDARG(DISTRIBUTOR_UNIX_SOCKET),
@@ -109,11 +112,11 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote,
...); ...);
static int fcntl_add_flags(int fd, int flags); static int fcntl_add_flags(int fd, int flags);
static int fcntl_del_flags(int fd, int flags); static int fcntl_del_flags(int fd, int flags);
static int add_in_event_fd(int epollfd, int fd); static int add_in_event_fd(struct nio * const io, int fd);
static int add_in_event(int epollfd, struct remote_desc * const remote); static int add_in_event(struct nio * const io, struct remote_desc * const remote);
static int del_event(int epollfd, int fd); static int del_event(struct nio * const io, int fd);
static int del_out_event(int epollfd, struct remote_desc * const remote); static int set_in_event(struct nio * const io, struct remote_desc * const remote);
static void disconnect_client(int epollfd, struct remote_desc * const current); static void disconnect_client(struct nio * const io, struct remote_desc * const current);
static int drain_write_buffers_blocking(struct remote_desc * const remote); static int drain_write_buffers_blocking(struct remote_desc * const remote);
static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
@@ -414,7 +417,7 @@ static int drain_write_buffers_blocking(struct remote_desc * const remote)
return retval; return retval;
} }
static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) static int handle_outgoing_data(struct nio * const io, struct remote_desc * const remote)
{ {
UT_array * const additional_write_buffers = get_additional_write_buffers(remote); UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
@@ -425,7 +428,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
if (drain_write_buffers(remote) != 0) if (drain_write_buffers(remote) != 0)
{ {
logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno)); logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno));
disconnect_client(epollfd, remote); disconnect_client(io, remote);
return -1; return -1;
} }
if (utarray_len(additional_write_buffers) == 0) if (utarray_len(additional_write_buffers) == 0)
@@ -433,7 +436,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote); struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote);
if (write_buffer->buf.used == 0) { if (write_buffer->buf.used == 0) {
return del_out_event(epollfd, remote); return set_in_event(io, remote);
} else { } else {
return drain_main_buffer(remote); return drain_main_buffer(remote);
} }
@@ -679,15 +682,17 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot
return NULL; return NULL;
} }
static void free_remote(int epollfd, struct remote_desc * remote) static void free_remote(struct nio * const io, struct remote_desc * remote)
{ {
if (remote->fd > -1) if (remote->fd > -1)
{ {
errno = 0; errno = 0;
del_event(epollfd, remote->fd); if (del_event(io, remote->fd) != 0)
if (errno != 0)
{ {
logger_nDPIsrvd(remote, "Could not delete event from epoll for connection", ": %s", strerror(errno)); logger_nDPIsrvd(remote,
"Could not delete event from queue for connection",
": %s",
(errno != 0 ? strerror(errno) : "Internal Error"));
} }
errno = 0; errno = 0;
close(remote->fd); close(remote->fd);
@@ -732,11 +737,11 @@ static void free_remote(int epollfd, struct remote_desc * remote)
} }
} }
static void free_remotes(int epollfd) static void free_remotes(struct nio * const io)
{ {
for (size_t i = 0; i < remotes.desc_size; ++i) for (size_t i = 0; i < remotes.desc_size; ++i)
{ {
free_remote(epollfd, &remotes.desc[i]); free_remote(io, &remotes.desc[i]);
} }
nDPIsrvd_free(remotes.desc); nDPIsrvd_free(remotes.desc);
remotes.desc = NULL; remotes.desc = NULL;
@@ -744,68 +749,34 @@ static void free_remotes(int epollfd)
remotes.desc_size = 0; remotes.desc_size = 0;
} }
static int add_event(int epollfd, int events, int fd, void * ptr) static int add_in_event_fd(struct nio * const io, int fd)
{ {
int retval; return nio_add_fd(io, fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS;
struct epoll_event event = {};
if (ptr != NULL)
{
event.data.ptr = ptr;
}
else
{
event.data.fd = fd;
}
event.events = events;
while ((retval = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR) {}
return retval;
} }
static int add_in_event_fd(int epollfd, int fd) static int add_in_event(struct nio * const io, struct remote_desc * const remote)
{ {
return add_event(epollfd, EPOLLIN, fd, NULL); return nio_add_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS;
} }
static int add_in_event(int epollfd, struct remote_desc * const remote) static int set_out_event(struct nio * const io, struct remote_desc * const remote)
{ {
return add_event(epollfd, EPOLLIN, remote->fd, remote); return nio_mod_fd(io, remote->fd, NIO_EVENT_OUTPUT, remote) != NIO_SUCCESS;
} }
static int mod_event(int epollfd, int events, int fd, void * ptr) static int set_in_event(struct nio * const io, struct remote_desc * const remote)
{ {
int retval; return nio_mod_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS;
struct epoll_event event = {};
event.data.ptr = ptr;
event.events = events;
while ((retval = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR) {}
return retval;
} }
static int add_out_event(int epollfd, struct remote_desc * const remote) static int del_event(struct nio * const io, int fd)
{ {
return mod_event(epollfd, EPOLLIN | EPOLLOUT, remote->fd, remote); return nio_del_fd(io, fd) != NIO_SUCCESS;
} }
static int del_out_event(int epollfd, struct remote_desc * const remote) static void disconnect_client(struct nio * const io, struct remote_desc * const remote)
{ {
return mod_event(epollfd, EPOLLIN, remote->fd, remote); free_remote(io, remote);
}
static int del_event(int epollfd, int fd)
{
int retval;
while ((retval = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR) {}
return retval;
}
static void disconnect_client(int epollfd, struct remote_desc * const remote)
{
free_remote(epollfd, remote);
} }
static int nDPIsrvd_parse_options(int argc, char ** argv) static int nDPIsrvd_parse_options(int argc, char ** argv)
@@ -828,6 +799,13 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
case 'c': case 'c':
set_cmdarg(&nDPIsrvd_options.collector_un_sockpath, optarg); set_cmdarg(&nDPIsrvd_options.collector_un_sockpath, optarg);
break; break;
case 'e':
#ifdef ENABLE_EPOLL
nDPIsrvd_options.use_poll = 1;
#else
logger_early(1, "%s", "nDPIsrvd was built w/o epoll() support, poll() is already the default");
#endif
break;
case 'd': case 'd':
daemonize_enable(); daemonize_enable();
break; break;
@@ -870,7 +848,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
default: default:
fprintf(stderr, "%s\n", get_nDPId_version()); fprintf(stderr, "%s\n", get_nDPId_version());
fprintf(stderr, fprintf(stderr,
"Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n" "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-e] [-d] [-p pidfile]\n"
"\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n" "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n"
"\t[-m max-remote-descriptors] [-u user] [-g group]\n" "\t[-m max-remote-descriptors] [-u user] [-g group]\n"
"\t[-C max-buffered-json-lines] [-D]\n" "\t[-C max-buffered-json-lines] [-D]\n"
@@ -879,6 +857,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
"\t-L\tLog all messages to a log file.\n" "\t-L\tLog all messages to a log file.\n"
"\t-c\tPath to a listening UNIX socket (nDPIsrvd Collector).\n" "\t-c\tPath to a listening UNIX socket (nDPIsrvd Collector).\n"
"\t \tDefault: %s\n" "\t \tDefault: %s\n"
"\t-e\tUse poll() instead of epoll().\n"
"\t \tDefault: epoll() on Linux, poll() otherwise\n"
"\t-d\tFork into background after initialization.\n" "\t-d\tFork into background after initialization.\n"
"\t-p\tWrite the daemon PID to the given file path.\n" "\t-p\tWrite the daemon PID to the given file path.\n"
"\t \tDefault: %s\n" "\t \tDefault: %s\n"
@@ -970,7 +950,7 @@ static struct remote_desc * accept_remote(int server_fd,
return current; return current;
} }
static int new_connection(int epollfd, int eventfd) static int new_connection(struct nio * const io, int eventfd)
{ {
union union
{ {
@@ -1112,7 +1092,7 @@ static int new_connection(int epollfd, int eventfd)
if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0) if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0)
{ {
logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno)); logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno));
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
@@ -1123,18 +1103,19 @@ static int new_connection(int epollfd, int eventfd)
/* shutdown reading end for distributor clients does not work due to epoll usage */ /* shutdown reading end for distributor clients does not work due to epoll usage */
} }
/* setup epoll event */ /* setup event I/O */
if (add_in_event(epollfd, current) != 0) errno = 0;
if (add_in_event(io, current) != NIO_SUCCESS)
{ {
logger(1, "Error adding input event to %d: %s", current->fd, strerror(errno)); logger(1, "Error adding input event to %d: %s", current->fd, (errno != 0 ? strerror(errno) : "Internal Error"));
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
return 0; return 0;
} }
static int handle_collector_protocol(int epollfd, struct remote_desc * const current) static int handle_collector_protocol(struct nio * const io, struct remote_desc * const current)
{ {
struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
char * json_str_start = NULL; char * json_str_start = NULL;
@@ -1150,7 +1131,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"BUG: Collector connection", "BUG: Collector connection",
"JSON invalid opening character: '%c'", "JSON invalid opening character: '%c'",
json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
@@ -1161,7 +1142,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
if (errno == ERANGE) if (errno == ERANGE)
{ {
logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits"); logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits");
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
@@ -1172,7 +1153,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"missing JSON string length in protocol preamble: \"%.*s\"", "missing JSON string length in protocol preamble: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS, NETWORK_BUFFER_LENGTH_DIGITS,
json_read_buffer->buf.ptr.text); json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
@@ -1194,7 +1175,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"JSON string too big: %llu > %zu", "JSON string too big: %llu > %zu",
current->event_collector_un.json_bytes, current->event_collector_un.json_bytes,
json_read_buffer->buf.max); json_read_buffer->buf.max);
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
@@ -1212,14 +1193,14 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
(int)current->event_collector_un.json_bytes > 512 ? 512 (int)current->event_collector_un.json_bytes > 512 ? 512
: (int)current->event_collector_un.json_bytes, : (int)current->event_collector_un.json_bytes,
json_read_buffer->buf.ptr.text); json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
return 0; return 0;
} }
static int handle_incoming_data(int epollfd, struct remote_desc * const current) static int handle_incoming_data(struct nio * const io, struct remote_desc * const current)
{ {
struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
@@ -1235,7 +1216,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{ {
logger_nDPIsrvd(current, "Distributor connection", "closed"); logger_nDPIsrvd(current, "Distributor connection", "closed");
} }
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
@@ -1261,13 +1242,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
if (bytes_read < 0 || errno != 0) if (bytes_read < 0 || errno != 0)
{ {
logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno));
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
if (bytes_read == 0) if (bytes_read == 0)
{ {
logger_nDPIsrvd(current, "Collector connection", "closed during read"); logger_nDPIsrvd(current, "Collector connection", "closed during read");
disconnect_client(epollfd, current); disconnect_client(io, current);
return 1; return 1;
} }
json_read_buffer->buf.used += bytes_read; json_read_buffer->buf.used += bytes_read;
@@ -1275,7 +1256,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{ {
if (handle_collector_protocol(epollfd, current) != 0) if (handle_collector_protocol(io, current) != 0)
{ {
break; break;
} }
@@ -1296,13 +1277,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
if (utarray_len(additional_write_buffers) == 0) if (utarray_len(additional_write_buffers) == 0)
{ {
errno = 0; errno = 0;
if (add_out_event(epollfd, &remotes.desc[i]) != 0) if (set_out_event(io, &remotes.desc[i]) != 0)
{ {
logger_nDPIsrvd(&remotes.desc[i], logger_nDPIsrvd(&remotes.desc[i],
"Could not add event to", "Could not add event to",
", disconnecting: %s", ", disconnecting: %s",
strerror(errno)); (errno != 0 ? strerror(errno) : "Internal Error"));
disconnect_client(epollfd, &remotes.desc[i]); disconnect_client(io, &remotes.desc[i]);
continue; continue;
} }
} }
@@ -1310,7 +1291,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
json_read_buffer->buf.ptr.raw, json_read_buffer->buf.ptr.raw,
current->event_collector_un.json_bytes) != 0) current->event_collector_un.json_bytes) != 0)
{ {
disconnect_client(epollfd, &remotes.desc[i]); disconnect_client(io, &remotes.desc[i]);
continue; continue;
} }
} }
@@ -1324,7 +1305,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
if (drain_main_buffer(&remotes.desc[i]) != 0) if (drain_main_buffer(&remotes.desc[i]) != 0)
{ {
disconnect_client(epollfd, &remotes.desc[i]); disconnect_client(io, &remotes.desc[i]);
} }
} }
@@ -1338,13 +1319,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
return 0; return 0;
} }
static int handle_data_event(int epollfd, struct epoll_event * const event) static int handle_data_event(struct nio * const io, int index)
{ {
struct remote_desc * current = (struct remote_desc *)event->data.ptr; struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, index);
if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0) if (nio_has_input(io, index) != NIO_SUCCESS && nio_can_output(io, index) != NIO_SUCCESS)
{ {
logger(1, "Can not handle event mask: %d", event->events); logger(1, "%s", "Neither input nor output event set.");
return 1; return 1;
} }
@@ -1360,17 +1341,17 @@ static int handle_data_event(int epollfd, struct epoll_event * const event)
return 1; return 1;
} }
if ((event->events & EPOLLIN) != 0) if (nio_has_input(io, index) == NIO_SUCCESS)
{ {
return handle_incoming_data(epollfd, current); return handle_incoming_data(io, current);
} }
else else
{ {
return handle_outgoing_data(epollfd, current); return handle_outgoing_data(io, current);
} }
} }
static int setup_signalfd(int epollfd) static int setup_signalfd(struct nio * const io)
{ {
sigset_t mask; sigset_t mask;
int sfd; int sfd;
@@ -1390,7 +1371,7 @@ static int setup_signalfd(int epollfd)
return -1; return -1;
} }
if (add_in_event_fd(epollfd, sfd) != 0) if (add_in_event_fd(io, sfd) != 0)
{ {
return -1; return -1;
} }
@@ -1403,24 +1384,28 @@ static int setup_signalfd(int epollfd)
return sfd; return sfd;
} }
static int mainloop(int epollfd) static int mainloop(struct nio * const io)
{ {
struct epoll_event events[32]; int signalfd = setup_signalfd(io);
size_t const events_size = sizeof(events) / sizeof(events[0]);
int signalfd = setup_signalfd(epollfd);
while (nDPIsrvd_main_thread_shutdown == 0) while (nDPIsrvd_main_thread_shutdown == 0)
{ {
int nready = epoll_wait(epollfd, events, events_size, 1000); if (nio_run(io, 1000) != NIO_SUCCESS)
{
logger(1, "Event I/O returned error: %s", strerror(errno));
}
int nready = nio_get_nready(io);
for (int i = 0; i < nready; i++) for (int i = 0; i < nready; i++)
{ {
if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) int fd = nio_get_fd(io, i);
if (nio_has_error(io, i) == NIO_SUCCESS)
{ {
if (events[i].data.fd != collector_un_sockfd && events[i].data.fd != distributor_un_sockfd && if (fd != collector_un_sockfd && fd != distributor_un_sockfd && fd != distributor_in_sockfd)
events[i].data.fd != distributor_in_sockfd)
{ {
struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr; struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, i);
switch (current->sock_type) switch (current->sock_type)
{ {
case COLLECTOR_UN: case COLLECTOR_UN:
@@ -1431,25 +1416,24 @@ static int mainloop(int epollfd)
logger_nDPIsrvd(current, "Distributor connection", "closed"); logger_nDPIsrvd(current, "Distributor connection", "closed");
break; break;
} }
disconnect_client(epollfd, current); disconnect_client(io, current);
} }
else else
{ {
logger(1, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown")); logger(1, "Event I/O error: %s", (errno != 0 ? strerror(errno) : "unknown"));
} }
break; break;
} }
if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd || if (fd == collector_un_sockfd || fd == distributor_un_sockfd || fd == distributor_in_sockfd)
events[i].data.fd == distributor_in_sockfd)
{ {
/* New connection to collector / distributor. */ /* New connection to collector / distributor. */
if (new_connection(epollfd, events[i].data.fd) != 0) if (new_connection(io, fd) != 0)
{ {
continue; continue;
} }
} }
else if (events[i].data.fd == signalfd) else if (fd == signalfd)
{ {
struct signalfd_siginfo fdsi; struct signalfd_siginfo fdsi;
ssize_t s; ssize_t s;
@@ -1473,7 +1457,7 @@ static int mainloop(int epollfd)
else else
{ {
/* Incoming data / Outoing data ready to receive / send. */ /* Incoming data / Outoing data ready to receive / send. */
if (handle_data_event(epollfd, &events[i]) != 0) if (handle_data_event(io, i) != 0)
{ {
/* do nothing */ /* do nothing */
} }
@@ -1481,57 +1465,57 @@ static int mainloop(int epollfd)
} }
} }
free_remotes(epollfd); free_remotes(io);
nio_free(io);
close(signalfd); close(signalfd);
return 0; return 0;
} }
static int create_evq(void) static int setup_event_queue(struct nio * const io)
{ {
return epoll_create1(EPOLL_CLOEXEC); #ifdef ENABLE_EPOLL
} if ((nDPIsrvd_options.use_poll == 0 && nio_use_epoll(io, 32) != NIO_SUCCESS)
|| (nDPIsrvd_options.use_poll != 0 && nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS))
static int setup_event_queue(void) #else
{ if (nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
int epollfd = create_evq(); #endif
if (epollfd < 0)
{ {
logger(1, "Error creating epoll: %s", strerror(errno)); logger(1, "%s", "Event I/O poll/epoll setup failed");
return -1; return -1;
} }
if (add_in_event_fd(epollfd, collector_un_sockfd) != 0) errno = 0;
if (add_in_event_fd(io, collector_un_sockfd) != 0)
{ {
logger(1, "Error adding collector UNIX socket fd to epoll: %s", strerror(errno)); logger(1,
"Error adding collector UNIX socket fd to event I/O: %s",
(errno != 0 ? strerror(errno) : "Internal Error"));
return -1; return -1;
} }
if (add_in_event_fd(epollfd, distributor_un_sockfd) != 0) errno = 0;
if (add_in_event_fd(io, distributor_un_sockfd) != 0)
{ {
logger(1, "Error adding distributor UNIX socket fd to epoll: %s", strerror(errno)); logger(1,
"Error adding distributor UNIX socket fd to event I/O: %s",
(errno != 0 ? strerror(errno) : "Internal Error"));
return -1; return -1;
} }
if (distributor_in_sockfd >= 0) if (distributor_in_sockfd >= 0)
{ {
if (add_in_event_fd(epollfd, distributor_in_sockfd) != 0) errno = 0;
if (add_in_event_fd(io, distributor_in_sockfd) != 0)
{ {
logger(1, "Error adding distributor TCP/IP socket fd to epoll: %s", strerror(errno)); logger(1,
"Error adding distributor TCP/IP socket fd to event I/O: %s",
(errno != 0 ? strerror(errno) : "Internal Error"));
return -1; return -1;
} }
} }
return epollfd; return 0;
}
static void close_event_queue(int epollfd)
{
for (size_t i = 0; i < remotes.desc_size; ++i)
{
disconnect_client(epollfd, &remotes.desc[i]);
}
close(epollfd);
} }
static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors)
@@ -1555,13 +1539,14 @@ static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors)
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
int retval = 1; int retval = 1;
int epollfd; struct nio io;
if (argc == 0) if (argc == 0)
{ {
return 1; return 1;
} }
nio_init(&io);
init_logging("nDPIsrvd"); init_logging("nDPIsrvd");
if (nDPIsrvd_parse_options(argc, argv) != 0) if (nDPIsrvd_parse_options(argc, argv) != 0)
@@ -1677,14 +1662,12 @@ int main(int argc, char ** argv)
signal(SIGTERM, SIG_IGN); signal(SIGTERM, SIG_IGN);
signal(SIGQUIT, SIG_IGN); signal(SIGQUIT, SIG_IGN);
epollfd = setup_event_queue(); if (setup_event_queue(&io) != 0)
if (epollfd < 0)
{ {
goto error_unlink_sockets; goto error_unlink_sockets;
} }
retval = mainloop(epollfd); retval = mainloop(&io);
close_event_queue(epollfd);
error_unlink_sockets: error_unlink_sockets:
if (unlink(get_cmdarg(&nDPIsrvd_options.collector_un_sockpath)) != 0) if (unlink(get_cmdarg(&nDPIsrvd_options.collector_un_sockpath)) != 0)

210
nio.c
View File

@@ -1,5 +1,6 @@
#include "nio.h" #include "nio.h"
#include <errno.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
@@ -11,9 +12,9 @@ void nio_init(struct nio * io)
{ {
io->nready = -1; io->nready = -1;
io->poll_max_fds = 0; io->poll_max_fds = 0;
io->poll_cur_fds = 0;
io->poll_fds = NULL; io->poll_fds = NULL;
io->poll_ptrs = NULL; io->poll_ptrs = NULL;
io->poll_fds_set = NULL;
io->epoll_fd = -1; io->epoll_fd = -1;
io->max_events = 0; io->max_events = 0;
io->events = NULL; io->events = NULL;
@@ -27,13 +28,15 @@ int nio_use_poll(struct nio * io, nfds_t max_fds)
io->poll_max_fds = max_fds; io->poll_max_fds = max_fds;
io->poll_fds = (struct pollfd *)calloc(max_fds, sizeof(*io->poll_fds)); io->poll_fds = (struct pollfd *)calloc(max_fds, sizeof(*io->poll_fds));
io->poll_ptrs = calloc(max_fds, sizeof(*io->poll_ptrs)); io->poll_ptrs = calloc(max_fds, sizeof(*io->poll_ptrs));
io->poll_fds_set = calloc(max_fds, sizeof(*io->poll_fds_set));
for (size_t i = 0; i < max_fds; ++i) for (size_t i = 0; i < max_fds; ++i)
{ {
io->poll_fds[i].fd = -1; io->poll_fds[i].fd = -1;
} }
return io->poll_fds == NULL || io->poll_ptrs == NULL; // return NIO_ERROR_INTERNAL on error return io->poll_fds == NULL || io->poll_ptrs == NULL || io->poll_fds_set == NULL; // return NIO_ERROR_INTERNAL on
// error
} }
int nio_use_epoll(struct nio * io, int max_events) int nio_use_epoll(struct nio * io, int max_events)
@@ -46,7 +49,7 @@ int nio_use_epoll(struct nio * io, int max_events)
io->max_events = max_events; io->max_events = max_events;
io->events = calloc(max_events, sizeof(struct epoll_event)); io->events = calloc(max_events, sizeof(struct epoll_event));
return io->events == NULL || io->epoll_fd < 0; return io->events == NULL || io->epoll_fd < 0; // return NIO_ERROR_INTERNAL on error
#else #else
(void)io; (void)io;
(void)max_events; (void)max_events;
@@ -57,9 +60,13 @@ int nio_use_epoll(struct nio * io, int max_events)
int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr) int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
{ {
if (fd < 0)
return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0) if (io->epoll_fd >= 0)
{ {
int rv;
struct epoll_event event = {}; struct epoll_event event = {};
if (ptr == NULL) if (ptr == NULL)
@@ -78,7 +85,11 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
if (event.events == 0) if (event.events == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
return epoll_ctl(io->epoll_fd, EPOLL_CTL_ADD, fd, &event); while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR)
{
/* If epoll_ctl() was interrupted by the system, repeat. */
}
return rv;
} }
else else
#endif #endif
@@ -87,9 +98,6 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
struct pollfd * unused_pollfd = NULL; struct pollfd * unused_pollfd = NULL;
void ** unused_ptr = NULL; void ** unused_ptr = NULL;
if (io->poll_cur_fds == io->poll_max_fds || fd < 0)
return NIO_ERROR_INTERNAL;
for (size_t i = 0; i < io->poll_max_fds; ++i) for (size_t i = 0; i < io->poll_max_fds; ++i)
{ {
if (io->poll_fds[i].fd < 0) if (io->poll_fds[i].fd < 0)
@@ -112,9 +120,8 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
unused_pollfd->fd = fd; unused_pollfd->fd = fd;
*unused_ptr = ptr; *unused_ptr = ptr;
io->poll_cur_fds++;
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
@@ -122,9 +129,13 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr) int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr)
{ {
if (fd < 0)
return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0) if (io->epoll_fd >= 0)
{ {
int rv;
struct epoll_event event = {}; struct epoll_event event = {};
if (ptr == NULL) if (ptr == NULL)
@@ -143,43 +154,43 @@ int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr)
if (event.events == 0) if (event.events == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
return epoll_ctl(io->epoll_fd, EPOLL_CTL_MOD, fd, &event); while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR)
{
/* If epoll_ctl() was interrupted by the system, repeat. */
}
return rv;
} }
else else
#endif #endif
if (io->poll_max_fds > 0) if (io->poll_max_fds > 0)
{ {
struct pollfd * unused_pollfd = NULL; struct pollfd * used_pollfd = NULL;
void ** unused_ptr = NULL; void ** used_ptr = NULL;
if (io->poll_cur_fds == io->poll_max_fds || fd < 0)
return NIO_ERROR_INTERNAL;
for (size_t i = 0; i < io->poll_max_fds; ++i) for (size_t i = 0; i < io->poll_max_fds; ++i)
{ {
if (io->poll_fds[i].fd < 0) if (io->poll_fds[i].fd == fd)
{ {
unused_pollfd = &io->poll_fds[i]; used_pollfd = &io->poll_fds[i];
unused_ptr = &io->poll_ptrs[i]; used_ptr = &io->poll_ptrs[i];
break; break;
} }
} }
if (unused_pollfd == NULL) if (used_pollfd == NULL)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
unused_pollfd->events = 0; used_pollfd->events = 0;
if ((event_flags & NIO_EVENT_INPUT) != 0) if ((event_flags & NIO_EVENT_INPUT) != 0)
unused_pollfd->events |= POLLIN; used_pollfd->events |= POLLIN;
if ((event_flags & NIO_EVENT_OUTPUT) != 0) if ((event_flags & NIO_EVENT_OUTPUT) != 0)
unused_pollfd->events |= POLLOUT; used_pollfd->events |= POLLOUT;
if (unused_pollfd->events == 0) if (used_pollfd->events == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
unused_pollfd->fd = fd; used_pollfd->fd = fd;
*unused_ptr = ptr; *used_ptr = ptr;
io->poll_cur_fds++;
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
@@ -187,38 +198,43 @@ int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr)
int nio_del_fd(struct nio * io, int fd) int nio_del_fd(struct nio * io, int fd)
{ {
if (fd < 0)
return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0) if (io->epoll_fd >= 0)
{ {
return epoll_ctl(io->epoll_fd, EPOLL_CTL_DEL, fd, NULL); int rv;
while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR)
{
/* If epoll_ctl() was interrupted by the system, repeat. */
}
return rv;
} }
else else
#endif #endif
if (io->poll_max_fds > 0) if (io->poll_max_fds > 0)
{ {
struct pollfd * unused_pollfd = NULL; struct pollfd * used_pollfd = NULL;
void ** unused_ptr = NULL; void ** used_ptr = NULL;
if (io->poll_cur_fds == io->poll_max_fds || fd < 0)
return NIO_ERROR_INTERNAL;
for (size_t i = 0; i < io->poll_max_fds; ++i) for (size_t i = 0; i < io->poll_max_fds; ++i)
{ {
if (io->poll_fds[i].fd < 0) if (io->poll_fds[i].fd == fd)
{ {
unused_pollfd = &io->poll_fds[i]; used_pollfd = &io->poll_fds[i];
unused_ptr = &io->poll_ptrs[i]; used_ptr = &io->poll_ptrs[i];
break; break;
} }
} }
if (unused_pollfd == NULL) if (used_pollfd == NULL)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
unused_pollfd->fd = -1; used_pollfd->fd = -1;
*unused_ptr = NULL; *used_ptr = NULL;
io->poll_cur_fds--;
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
@@ -229,7 +245,11 @@ int nio_run(struct nio * io, int timeout)
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0) if (io->epoll_fd >= 0)
{ {
io->nready = epoll_wait(io->epoll_fd, io->events, io->max_events, timeout); do
{
io->nready = epoll_wait(io->epoll_fd, io->events, io->max_events, timeout);
} while (io->nready < 0 && errno == EINTR);
if (io->nready < 0) if (io->nready < 0)
return NIO_ERROR_SYSTEM; return NIO_ERROR_SYSTEM;
} }
@@ -237,60 +257,73 @@ int nio_run(struct nio * io, int timeout)
#endif #endif
if (io->poll_max_fds > 0) if (io->poll_max_fds > 0)
{ {
io->nready = poll(io->poll_fds, io->poll_max_fds, timeout); do
{
io->nready = poll(io->poll_fds, io->poll_max_fds, timeout);
} while (io->nready < 0 && errno == EINTR);
if (io->nready < 0) if (io->nready < 0)
return NIO_ERROR_SYSTEM; return NIO_ERROR_SYSTEM;
else
io->nready = io->poll_max_fds; if (io->nready > 0)
{
for (nfds_t i = 0, j = 0; i < io->poll_max_fds; ++i)
{
if (io->poll_fds[i].fd >= 0 && io->poll_fds[i].revents != 0)
{
io->poll_fds_set[j++] = i;
}
}
}
} }
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
int nio_check(struct nio * io, int index, int events) int nio_check(struct nio * io, int index, int event_flags)
{ {
if (index < 0 || index >= io->nready) if (nio_is_valid(io, index) != NIO_SUCCESS)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0 && index < io->max_events) if (io->epoll_fd >= 0)
{ {
uint32_t epoll_events = 0; uint32_t epoll_events = 0;
if ((events & NIO_EVENT_INPUT) != 0) if ((event_flags & NIO_EVENT_INPUT) != 0)
epoll_events |= EPOLLIN; epoll_events |= EPOLLIN;
if ((events & NIO_EVENT_OUTPUT) != 0) if ((event_flags & NIO_EVENT_OUTPUT) != 0)
epoll_events |= EPOLLOUT; epoll_events |= EPOLLOUT;
if ((events & NIO_EVENT_ERROR) != 0) if ((event_flags & NIO_EVENT_ERROR) != 0)
epoll_events |= EPOLLERR | EPOLLHUP; epoll_events |= EPOLLERR | EPOLLHUP;
if (epoll_events == 0) if (epoll_events == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
struct epoll_event * ee = (struct epoll_event *)io->events; struct epoll_event const * const events = (struct epoll_event *)io->events;
if ((ee[index].events & epoll_events) == 0) if ((events[index].events & epoll_events) == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
else else
#endif #endif
if (io->poll_max_fds > 0 && index < (int)io->poll_max_fds) if (io->poll_max_fds > 0)
{ {
short int poll_events = 0; short int poll_events = 0;
if ((events & NIO_EVENT_INPUT) != 0) if ((event_flags & NIO_EVENT_INPUT) != 0)
poll_events |= POLLIN; poll_events |= POLLIN;
if ((events & NIO_EVENT_OUTPUT) != 0) if ((event_flags & NIO_EVENT_OUTPUT) != 0)
poll_events |= POLLOUT; poll_events |= POLLOUT;
if ((events & NIO_EVENT_ERROR) != 0) if ((event_flags & NIO_EVENT_ERROR) != 0)
poll_events |= POLLERR | POLLHUP; poll_events |= POLLERR | POLLHUP;
if (poll_events == 0) if (poll_events == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
if ((io->poll_fds[index].revents & poll_events) == 0) if ((io->poll_fds[io->poll_fds_set[index]].revents & poll_events) == 0)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
@@ -302,34 +335,62 @@ int nio_is_valid(struct nio * io, int index)
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL #ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0 && index <= io->max_events) if (io->epoll_fd >= 0)
{ {
return NIO_ERROR_SUCCESS; return NIO_SUCCESS;
} }
else else
#endif #endif
if (io->poll_max_fds > 0 && index < (int)io->poll_max_fds) if (io->poll_max_fds > 0 && io->poll_fds[io->poll_fds_set[index]].fd >= 0)
{ {
if (io->poll_fds[index].revents != 0) return NIO_SUCCESS;
return NIO_ERROR_SUCCESS;
} }
return NIO_ERROR_INTERNAL; return NIO_ERROR_INTERNAL;
} }
int nio_has_input(struct nio * io, int index) int nio_get_fd(struct nio * io, int index)
{ {
return nio_check(io, index, NIO_EVENT_INPUT); if (nio_is_valid(io, index) != NIO_SUCCESS)
return -1;
#ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0)
{
struct epoll_event const * const events = (struct epoll_event *)io->events;
return events[index].data.fd;
}
else
#endif
if (io->poll_max_fds > 0)
{
return io->poll_fds[io->poll_fds_set[index]].fd;
}
return -1;
} }
int nio_can_output(struct nio * io, int index) void * nio_get_ptr(struct nio * io, int index)
{ {
return nio_check(io, index, NIO_EVENT_OUTPUT); if (nio_is_valid(io, index) != NIO_SUCCESS)
} return NULL;
int nio_has_error(struct nio * io, int index) #ifdef ENABLE_EPOLL
{ if (io->epoll_fd >= 0)
return nio_check(io, index, NIO_EVENT_ERROR); {
struct epoll_event * const events = (struct epoll_event *)io->events;
return events[index].data.ptr;
}
else
#endif
if (io->poll_max_fds > 0)
{
return io->poll_ptrs[io->poll_fds_set[index]];
}
return NULL;
} }
void nio_free(struct nio * io) void nio_free(struct nio * io)
@@ -351,5 +412,6 @@ void nio_free(struct nio * io)
#endif #endif
free(io->poll_fds); free(io->poll_fds);
free(io->poll_ptrs); free(io->poll_ptrs);
free(io->poll_fds_set);
free(io->events); free(io->events);
} }

28
nio.h
View File

@@ -5,7 +5,7 @@
enum enum
{ {
NIO_ERROR_SUCCESS = 0, NIO_SUCCESS = 0,
NIO_ERROR_INTERNAL = 1, NIO_ERROR_INTERNAL = 1,
NIO_ERROR_SYSTEM = -1 NIO_ERROR_SYSTEM = -1
}; };
@@ -23,9 +23,9 @@ struct nio
int nready; int nready;
nfds_t poll_max_fds; nfds_t poll_max_fds;
nfds_t poll_cur_fds;
struct pollfd * poll_fds; struct pollfd * poll_fds;
void ** poll_ptrs; void ** poll_ptrs;
nfds_t * poll_fds_set;
int epoll_fd; int epoll_fd;
int max_events; int max_events;
@@ -46,15 +46,33 @@ int nio_del_fd(struct nio * io, int fd);
int nio_run(struct nio * io, int timeout); int nio_run(struct nio * io, int timeout);
static inline int nio_get_nready(struct nio const * const io)
{
return io->nready;
}
int nio_check(struct nio * io, int index, int events); int nio_check(struct nio * io, int index, int events);
int nio_is_valid(struct nio * io, int index); int nio_is_valid(struct nio * io, int index);
int nio_has_input(struct nio * io, int index); int nio_get_fd(struct nio * io, int index);
int nio_can_output(struct nio * io, int index); void * nio_get_ptr(struct nio * io, int index);
int nio_has_error(struct nio * io, int index); static inline int nio_has_input(struct nio * io, int index)
{
return nio_check(io, index, NIO_EVENT_INPUT);
}
static inline int nio_can_output(struct nio * io, int index)
{
return nio_check(io, index, NIO_EVENT_OUTPUT);
}
static inline int nio_has_error(struct nio * io, int index)
{
return nio_check(io, index, NIO_EVENT_ERROR);
}
void nio_free(struct nio * io); void nio_free(struct nio * io);