Refactored buffer subsystem.

Signed-off-by: lns <matzeton@googlemail.com>
This commit is contained in:
lns
2022-04-16 23:21:24 +02:00
parent db83f82d29
commit c283b89afd
4 changed files with 312 additions and 141 deletions

View File

@@ -39,6 +39,6 @@
/* nDPIsrvd default config options */ /* nDPIsrvd default config options */
#define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid" #define nDPIsrvd_PIDFILE "/tmp/ndpisrvd.pid"
#define nDPIsrvd_MAX_REMOTE_DESCRIPTORS 32 #define nDPIsrvd_MAX_REMOTE_DESCRIPTORS 32
#define nDPIsrvd_CACHE_ARRAY_LENGTH 256 #define nDPIsrvd_MAX_WRITE_BUFFERS 1024
#endif #endif

View File

@@ -196,6 +196,11 @@ struct nDPIsrvd_buffer
} ptr; } ptr;
size_t used; size_t used;
size_t max; size_t max;
};
struct nDPIsrvd_json_buffer
{
struct nDPIsrvd_buffer buf;
char * json_string; char * json_string;
size_t json_string_start; size_t json_string_start;
nDPIsrvd_ull json_string_length; nDPIsrvd_ull json_string_length;
@@ -221,7 +226,7 @@ struct nDPIsrvd_socket
instance_cleanup_callback instance_cleanup_callback; instance_cleanup_callback instance_cleanup_callback;
flow_cleanup_callback flow_cleanup_callback; flow_cleanup_callback flow_cleanup_callback;
struct nDPIsrvd_buffer buffer; struct nDPIsrvd_json_buffer buffer;
struct nDPIsrvd_jsmn jsmn; struct nDPIsrvd_jsmn jsmn;
/* easy and fast JSON key/value access via hash table and a static array */ /* easy and fast JSON key/value access via hash table and a static array */
@@ -375,9 +380,6 @@ static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, si
return 1; return 1;
} }
buffer->json_string_start = 0;
buffer->json_string_length = 0ull;
buffer->json_string = NULL;
buffer->used = 0; buffer->used = 0;
buffer->max = buffer_size; buffer->max = buffer_size;
@@ -390,6 +392,24 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
buffer->ptr.raw = NULL; buffer->ptr.raw = NULL;
} }
static inline int nDPIsrvd_json_buffer_init(struct nDPIsrvd_json_buffer * const json_buffer, size_t json_buffer_size)
{
int ret = nDPIsrvd_buffer_init(&json_buffer->buf, json_buffer_size);
if (ret == 0)
{
json_buffer->json_string_start = 0ul;
json_buffer->json_string_length = 0ull;
json_buffer->json_string = NULL;
}
return ret;
}
static inline void nDPIsrvd_json_buffer_free(struct nDPIsrvd_json_buffer * const json_buffer)
{
nDPIsrvd_buffer_free(&json_buffer->buf);
}
static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size, static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size,
size_t instance_user_data_size, size_t instance_user_data_size,
size_t thread_user_data_size, size_t thread_user_data_size,
@@ -409,7 +429,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
if (sock != NULL) if (sock != NULL)
{ {
sock->fd = -1; sock->fd = -1;
if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0) if (nDPIsrvd_json_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0)
{ {
goto error; goto error;
} }
@@ -435,7 +455,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
return sock; return sock;
error: error:
nDPIsrvd_buffer_free(&sock->buffer); nDPIsrvd_json_buffer_free(&sock->buffer);
nDPIsrvd_socket_free(&sock); nDPIsrvd_socket_free(&sock);
return NULL; return NULL;
} }
@@ -545,7 +565,7 @@ static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock)
} }
(*sock)->instance_table = NULL; (*sock)->instance_table = NULL;
nDPIsrvd_buffer_free(&(*sock)->buffer); nDPIsrvd_json_buffer_free(&(*sock)->buffer);
nDPIsrvd_free(*sock); nDPIsrvd_free(*sock);
*sock = NULL; *sock = NULL;
@@ -641,12 +661,13 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock
static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock) static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock)
{ {
if (sock->buffer.used == sock->buffer.max) if (sock->buffer.buf.used == sock->buffer.buf.max)
{ {
return READ_OK; return READ_OK;
} }
ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used); ssize_t bytes_read =
read(sock->fd, sock->buffer.buf.ptr.raw + sock->buffer.buf.used, sock->buffer.buf.max - sock->buffer.buf.used);
if (bytes_read == 0) if (bytes_read == 0)
{ {
@@ -657,7 +678,7 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c
return READ_ERROR; return READ_ERROR;
} }
sock->buffer.used += bytes_read; sock->buffer.buf.used += bytes_read;
return READ_OK; return READ_OK;
} }
@@ -1080,49 +1101,49 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
return 0; return 0;
} }
static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buffer * const buffer, static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_json_buffer * const json_buffer,
struct nDPIsrvd_jsmn * const jsmn) struct nDPIsrvd_jsmn * const jsmn)
{ {
if (buffer->used < NETWORK_BUFFER_LENGTH_DIGITS + 1) if (json_buffer->buf.used < NETWORK_BUFFER_LENGTH_DIGITS + 1)
{ {
return PARSE_NEED_MORE_DATA; return PARSE_NEED_MORE_DATA;
} }
if (buffer->ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') if (json_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{ {
return PARSE_INVALID_OPENING_CHAR; return PARSE_INVALID_OPENING_CHAR;
} }
errno = 0; errno = 0;
buffer->json_string_length = strtoull((const char *)buffer->ptr.text, &buffer->json_string, 10); json_buffer->json_string_length = strtoull((const char *)json_buffer->buf.ptr.text, &json_buffer->json_string, 10);
buffer->json_string_length += buffer->json_string - buffer->ptr.text; json_buffer->json_string_length += json_buffer->json_string - json_buffer->buf.ptr.text;
buffer->json_string_start = buffer->json_string - buffer->ptr.text; json_buffer->json_string_start = json_buffer->json_string - json_buffer->buf.ptr.text;
if (errno == ERANGE) if (errno == ERANGE)
{ {
return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT; return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
} }
if (buffer->json_string == buffer->ptr.text) if (json_buffer->json_string == json_buffer->buf.ptr.text)
{ {
return PARSE_SIZE_MISSING; return PARSE_SIZE_MISSING;
} }
if (buffer->json_string_length > buffer->max) if (json_buffer->json_string_length > json_buffer->buf.max)
{ {
return PARSE_STRING_TOO_BIG; return PARSE_STRING_TOO_BIG;
} }
if (buffer->json_string_length > buffer->used) if (json_buffer->json_string_length > json_buffer->buf.used)
{ {
return PARSE_NEED_MORE_DATA; return PARSE_NEED_MORE_DATA;
} }
if (buffer->ptr.text[buffer->json_string_length - 2] != '}' || if (json_buffer->buf.ptr.text[json_buffer->json_string_length - 2] != '}' ||
buffer->ptr.text[buffer->json_string_length - 1] != '\n') json_buffer->buf.ptr.text[json_buffer->json_string_length - 1] != '\n')
{ {
return PARSE_INVALID_CLOSING_CHAR; return PARSE_INVALID_CLOSING_CHAR;
} }
jsmn_init(&jsmn->parser); jsmn_init(&jsmn->parser);
jsmn->tokens_found = jsmn_parse(&jsmn->parser, jsmn->tokens_found = jsmn_parse(&jsmn->parser,
buffer->ptr.text + buffer->json_string_start, json_buffer->buf.ptr.text + json_buffer->json_string_start,
buffer->json_string_length - buffer->json_string_start, json_buffer->json_string_length - json_buffer->json_string_start,
jsmn->tokens, jsmn->tokens,
nDPIsrvd_MAX_JSON_TOKENS); nDPIsrvd_MAX_JSON_TOKENS);
if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT) if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT)
@@ -1143,12 +1164,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf
return PARSE_OK; return PARSE_OK;
} }
static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer) static void nDPIsrvd_drain_buffer(struct nDPIsrvd_json_buffer * const json_buffer)
{ {
memmove(buffer->ptr.raw, buffer->ptr.raw + buffer->json_string_length, buffer->used - buffer->json_string_length); memmove(json_buffer->buf.ptr.raw,
buffer->used -= buffer->json_string_length; json_buffer->buf.ptr.raw + json_buffer->json_string_length,
buffer->json_string_length = 0; json_buffer->buf.used - json_buffer->json_string_length);
buffer->json_string_start = 0; json_buffer->buf.used -= json_buffer->json_string_length;
json_buffer->json_string_length = 0;
json_buffer->json_string_start = 0;
} }
static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock) static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock)

View File

@@ -255,15 +255,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
error: error:
if (mock_test_desc != NULL) if (mock_test_desc != NULL)
{ {
drain_cache_blocking(mock_test_desc); drain_write_buffers_blocking(mock_test_desc);
} }
if (mock_null_desc != NULL) if (mock_null_desc != NULL)
{ {
drain_cache_blocking(mock_null_desc); drain_write_buffers_blocking(mock_null_desc);
} }
if (mock_arpa_desc != NULL) if (mock_arpa_desc != NULL)
{ {
drain_cache_blocking(mock_arpa_desc); drain_write_buffers_blocking(mock_arpa_desc);
} }
del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]); del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]);
@@ -640,7 +640,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
"Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s", "Problematic JSON string (start: %zu, length: %llu, buffer usage: %zu): %.*s",
mock_sock->buffer.json_string_start, mock_sock->buffer.json_string_start,
mock_sock->buffer.json_string_length, mock_sock->buffer.json_string_length,
mock_sock->buffer.used, mock_sock->buffer.buf.used,
(int)mock_sock->buffer.json_string_length, (int)mock_sock->buffer.json_string_length,
mock_sock->buffer.json_string); mock_sock->buffer.json_string);
THREAD_ERROR_GOTO(trv); THREAD_ERROR_GOTO(trv);
@@ -992,13 +992,15 @@ int main(int argc, char ** argv)
unsigned long long int total_alloc_bytes = unsigned long long int total_alloc_bytes =
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
(unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes - (zlib_compressions * sizeof(struct nDPId_detection_data))); (unsigned long long int)(ndpi_memory_alloc_bytes - zlib_compression_bytes -
(zlib_compressions * sizeof(struct nDPId_detection_data)));
#else #else
(unsigned long long int)ndpi_memory_alloc_bytes; (unsigned long long int)ndpi_memory_alloc_bytes;
#endif #endif
unsigned long long int total_free_bytes = unsigned long long int total_free_bytes =
#ifdef ENABLE_ZLIB #ifdef ENABLE_ZLIB
(unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes - (zlib_compressions * sizeof(struct nDPId_detection_data))); (unsigned long long int)(ndpi_memory_free_bytes - zlib_compression_bytes -
(zlib_compressions * sizeof(struct nDPId_detection_data)));
#else #else
(unsigned long long int)ndpi_memory_free_bytes; (unsigned long long int)ndpi_memory_free_bytes;
#endif #endif
@@ -1028,7 +1030,8 @@ int main(int argc, char ** argv)
total_free_bytes - total_free_bytes -
sizeof(struct nDPId_workflow) * sizeof(struct nDPId_workflow) *
nDPId_options.reader_thread_count /* We do not want to take the workflow into account. */, nDPId_options.reader_thread_count /* We do not want to take the workflow into account. */,
total_alloc_count, total_free_count); total_alloc_count,
total_free_count);
printf( printf(
"~~ json string min len.......: %llu chars\n" "~~ json string min len.......: %llu chars\n"

View File

@@ -26,12 +26,17 @@ enum sock_type
DISTRIBUTOR_IN, DISTRIBUTOR_IN,
}; };
struct nDPIsrvd_write_buffer
{
struct nDPIsrvd_buffer buf;
size_t written;
};
struct remote_desc struct remote_desc
{ {
enum sock_type sock_type; enum sock_type sock_type;
int fd; int fd;
struct nDPIsrvd_buffer buf;
UT_array * buf_cache;
union union
{ {
struct struct
@@ -40,6 +45,8 @@ struct remote_desc
struct sockaddr_un peer; struct sockaddr_un peer;
unsigned long long int json_bytes; unsigned long long int json_bytes;
pid_t pid; pid_t pid;
struct nDPIsrvd_json_buffer main_read_buffer;
} event_collector_un; } event_collector_un;
struct struct
{ {
@@ -47,12 +54,18 @@ struct remote_desc
struct sockaddr_un peer; struct sockaddr_un peer;
pid_t pid; pid_t pid;
char * user_name; char * user_name;
struct nDPIsrvd_write_buffer main_write_buffer;
UT_array * additional_write_buffers;
} event_distributor_un; /* UNIX socket */ } event_distributor_un; /* UNIX socket */
struct struct
{ {
int distributor_sockfd; int distributor_sockfd;
struct sockaddr_in peer; struct sockaddr_in peer;
char peer_addr[INET_ADDRSTRLEN]; char peer_addr[INET_ADDRSTRLEN];
struct nDPIsrvd_write_buffer main_write_buffer;
UT_array * additional_write_buffers;
} event_distributor_in; /* TCP/IP socket */ } event_distributor_in; /* TCP/IP socket */
}; };
}; };
@@ -81,11 +94,11 @@ static struct
nDPIsrvd_ull max_remote_descriptors; nDPIsrvd_ull max_remote_descriptors;
char * user; char * user;
char * group; char * group;
nDPIsrvd_ull cache_array_length; nDPIsrvd_ull max_write_buffers;
int cache_fallback_to_blocking; int bufferbloat_fallback_to_blocking;
} nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS, } nDPIsrvd_options = {.max_remote_descriptors = nDPIsrvd_MAX_REMOTE_DESCRIPTORS,
.cache_array_length = nDPIsrvd_CACHE_ARRAY_LENGTH, .max_write_buffers = nDPIsrvd_MAX_WRITE_BUFFERS,
.cache_fallback_to_blocking = 1}; .bufferbloat_fallback_to_blocking = 1};
static void logger_nDPIsrvd(struct remote_desc const * const remote, static void logger_nDPIsrvd(struct remote_desc const * const remote,
char const * const prefix, char const * const prefix,
@@ -97,34 +110,32 @@ static int add_in_event_fd(int epollfd, int fd);
static int add_in_event(int epollfd, struct remote_desc * const remote); static int add_in_event(int epollfd, struct remote_desc * const remote);
static int del_out_event(int epollfd, struct remote_desc * const remote); static int del_out_event(int epollfd, struct remote_desc * const remote);
static void disconnect_client(int epollfd, struct remote_desc * const current); static void disconnect_client(int epollfd, struct remote_desc * const current);
static int drain_cache_blocking(struct remote_desc * const remote); static int drain_write_buffers_blocking(struct remote_desc * const remote);
static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
{ {
struct nDPIsrvd_buffer * const buf_dst = (struct nDPIsrvd_buffer *)dst; struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)dst;
struct nDPIsrvd_buffer const * const buf_src = (struct nDPIsrvd_buffer *)src; struct nDPIsrvd_write_buffer const * const buf_src = (struct nDPIsrvd_write_buffer *)src;
buf_dst->ptr.raw = NULL; buf_dst->buf.ptr.raw = NULL;
if (nDPIsrvd_buffer_init(buf_dst, buf_src->used) != 0) if (nDPIsrvd_buffer_init(&buf_dst->buf, buf_src->buf.used) != 0)
{ {
return; return;
} }
buf_dst->json_string_start = buf_src->json_string_start; buf_dst->written = buf_src->written;
buf_dst->json_string_length = buf_src->json_string_length; buf_dst->buf.used = buf_src->buf.used;
buf_dst->json_string = buf_src->json_string; memcpy(buf_dst->buf.ptr.raw, buf_src->buf.ptr.raw, buf_src->buf.used);
buf_dst->used = buf_src->used;
memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used);
} }
static void nDPIsrvd_buffer_array_dtor(void * elt) static void nDPIsrvd_buffer_array_dtor(void * elt)
{ {
struct nDPIsrvd_buffer * const buf = (struct nDPIsrvd_buffer *)elt; struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)elt;
nDPIsrvd_buffer_free(buf); nDPIsrvd_buffer_free(&buf_dst->buf);
} }
static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer), static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_write_buffer),
NULL, NULL,
nDPIsrvd_buffer_array_copy, nDPIsrvd_buffer_array_copy,
nDPIsrvd_buffer_array_dtor}; nDPIsrvd_buffer_array_dtor};
@@ -142,36 +153,93 @@ void nDPIsrvd_memprof_log(char const * const format, ...)
#endif #endif
#endif #endif
static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length) static struct nDPIsrvd_json_buffer * get_read_buffer(struct remote_desc * const remote)
{ {
struct nDPIsrvd_buffer buf_src = {}; switch (remote->sock_type)
if (utarray_len(remote->buf_cache) >= nDPIsrvd_options.cache_array_length)
{ {
if (nDPIsrvd_options.cache_fallback_to_blocking == 0) case COLLECTOR_UN:
return &remote->event_collector_un.main_read_buffer;
case DISTRIBUTOR_UN:
case DISTRIBUTOR_IN:
return NULL;
}
return NULL;
}
static struct nDPIsrvd_write_buffer * get_write_buffer(struct remote_desc * const remote)
{
switch (remote->sock_type)
{
case COLLECTOR_UN:
return NULL;
case DISTRIBUTOR_UN:
return &remote->event_distributor_un.main_write_buffer;
case DISTRIBUTOR_IN:
return &remote->event_distributor_in.main_write_buffer;
}
return NULL;
}
static UT_array * get_additional_write_buffers(struct remote_desc * const remote)
{
switch (remote->sock_type)
{
case COLLECTOR_UN:
return NULL;
case DISTRIBUTOR_UN:
return remote->event_distributor_un.additional_write_buffers;
case DISTRIBUTOR_IN:
return remote->event_distributor_in.additional_write_buffers;
}
return NULL;
}
static int add_to_additional_write_buffers(struct remote_desc * const remote,
uint8_t * const buf,
nDPIsrvd_ull json_string_length)
{
struct nDPIsrvd_write_buffer buf_src = {};
UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
if (additional_write_buffers == NULL)
{
return -1;
}
if (utarray_len(additional_write_buffers) >= nDPIsrvd_options.max_write_buffers)
{
if (nDPIsrvd_options.bufferbloat_fallback_to_blocking == 0)
{ {
logger_nDPIsrvd(remote, logger_nDPIsrvd(remote,
"Buffer cache limit for", "Buffer limit for",
"for reached, remote too slow: %u lines", "for reached, remote too slow: %u lines",
utarray_len(remote->buf_cache)); utarray_len(additional_write_buffers));
return -1; return -1;
} }
else else
{ {
logger_nDPIsrvd(remote, logger_nDPIsrvd(remote,
"Buffer JSON string cache limit for", "Buffer limit for",
"reached, falling back to blocking I/O: %u lines", "reached, falling back to blocking I/O: %u lines",
utarray_len(remote->buf_cache)); utarray_len(additional_write_buffers));
if (drain_cache_blocking(remote) != 0) if (drain_write_buffers_blocking(remote) != 0)
{ {
return -1; return -1;
} }
} }
} }
buf_src.ptr.raw = buf; buf_src.buf.ptr.raw = buf;
buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length; buf_src.buf.used = buf_src.buf.max = json_string_length;
utarray_push_back(remote->buf_cache, &buf_src); utarray_push_back(additional_write_buffers, &buf_src);
return 0; return 0;
} }
@@ -216,13 +284,20 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote,
static int drain_main_buffer(struct remote_desc * const remote) static int drain_main_buffer(struct remote_desc * const remote)
{ {
if (remote->buf.used == 0) struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote);
if (write_buffer == NULL)
{
return -1;
}
if (write_buffer->buf.used == 0)
{ {
return 0; return 0;
} }
errno = 0; errno = 0;
ssize_t bytes_written = write(remote->fd, remote->buf.ptr.raw, remote->buf.used); ssize_t bytes_written = write(remote->fd, write_buffer->buf.ptr.raw, write_buffer->buf.used);
if (errno == EAGAIN) if (errno == EAGAIN)
{ {
return 0; return 0;
@@ -237,32 +312,37 @@ static int drain_main_buffer(struct remote_desc * const remote)
logger_nDPIsrvd(remote, "Distributor connection", "closed"); logger_nDPIsrvd(remote, "Distributor connection", "closed");
return -1; return -1;
} }
if ((size_t)bytes_written < remote->buf.used) if ((size_t)bytes_written < write_buffer->buf.used)
{ {
#if 0 #if 0
logger_nDPIsrvd( logger_nDPIsrvd(
remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used); remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used);
#endif #endif
memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); memmove(write_buffer->buf.ptr.raw,
write_buffer->buf.ptr.raw + bytes_written,
write_buffer->buf.used - bytes_written);
} }
remote->buf.used -= bytes_written; write_buffer->buf.used -= bytes_written;
return 0; return 0;
} }
static int drain_cache(struct remote_desc * const remote) static int drain_write_buffers(struct remote_desc * const remote)
{ {
UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
errno = 0; errno = 0;
if (drain_main_buffer(remote) != 0) if (drain_main_buffer(remote) != 0 || additional_write_buffers == NULL)
{ {
return -1; return -1;
} }
while (utarray_len(remote->buf_cache) > 0) while (utarray_len(additional_write_buffers) > 0)
{ {
struct nDPIsrvd_buffer * buf = (struct nDPIsrvd_buffer *)utarray_front(remote->buf_cache); struct nDPIsrvd_write_buffer * buf = (struct nDPIsrvd_write_buffer *)utarray_front(additional_write_buffers);
ssize_t written = write(remote->fd, buf->ptr.raw + buf->json_string_start, buf->json_string_length); ssize_t written = write(remote->fd, buf->buf.ptr.raw + buf->written, buf->buf.used - buf->written);
switch (written) switch (written)
{ {
case -1: case -1:
@@ -274,11 +354,10 @@ static int drain_cache(struct remote_desc * const remote)
case 0: case 0:
return -1; return -1;
default: default:
buf->json_string_start += written; buf->written += written;
buf->json_string_length -= written; if (buf->written == buf->buf.max)
if (buf->json_string_length == 0)
{ {
utarray_erase(remote->buf_cache, 0, 1); utarray_erase(additional_write_buffers, 0, 1);
} }
break; break;
} }
@@ -287,7 +366,7 @@ static int drain_cache(struct remote_desc * const remote)
return 0; return 0;
} }
static int drain_cache_blocking(struct remote_desc * const remote) static int drain_write_buffers_blocking(struct remote_desc * const remote)
{ {
int retval = 0; int retval = 0;
@@ -296,9 +375,9 @@ static int drain_cache_blocking(struct remote_desc * const remote)
logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno)); logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno));
return -1; return -1;
} }
if (drain_cache(remote) != 0) if (drain_write_buffers(remote) != 0)
{ {
logger_nDPIsrvd(remote, "Could not drain buffer cache for", "in blocking I/O: %s", strerror(errno)); logger_nDPIsrvd(remote, "Could not drain buffers for", "in blocking I/O: %s", strerror(errno));
retval = -1; retval = -1;
} }
if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0)
@@ -312,17 +391,19 @@ static int drain_cache_blocking(struct remote_desc * const remote)
static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
{ {
if (remote->sock_type != DISTRIBUTOR_UN && remote->sock_type != DISTRIBUTOR_IN) UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
if (additional_write_buffers == NULL)
{ {
return -1; return -1;
} }
if (drain_cache(remote) != 0) if (drain_write_buffers(remote) != 0)
{ {
logger_nDPIsrvd(remote, "Could not drain buffer cache for", ": %s", strerror(errno)); logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno));
disconnect_client(epollfd, remote); disconnect_client(epollfd, remote);
return -1; return -1;
} }
if (utarray_len(remote->buf_cache) == 0) if (utarray_len(additional_write_buffers) == 0)
{ {
return del_out_event(epollfd, remote); return del_out_event(epollfd, remote);
} }
@@ -517,14 +598,42 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot
if (remotes.desc[i].fd == -1) if (remotes.desc[i].fd == -1)
{ {
remotes.desc_used++; remotes.desc_used++;
if (remotes.desc[i].buf_cache == NULL)
struct nDPIsrvd_write_buffer * write_buffer = NULL;
UT_array ** additional_write_buffers = NULL;
switch (type)
{ {
utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); case COLLECTOR_UN:
} if (nDPIsrvd_json_buffer_init(&remotes.desc[i].event_collector_un.main_read_buffer,
if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL) max_buffer_size) != 0)
{ {
return NULL; return NULL;
} }
break;
case DISTRIBUTOR_UN:
write_buffer = &remotes.desc[i].event_distributor_un.main_write_buffer;
additional_write_buffers = &remotes.desc[i].event_distributor_un.additional_write_buffers;
break;
case DISTRIBUTOR_IN:
write_buffer = &remotes.desc[i].event_distributor_in.main_write_buffer;
additional_write_buffers = &remotes.desc[i].event_distributor_in.additional_write_buffers;
break;
}
if (additional_write_buffers != NULL && *additional_write_buffers == NULL)
{
utarray_new(*additional_write_buffers, &nDPIsrvd_buffer_array_icd);
if (*additional_write_buffers == NULL)
{
return NULL;
}
}
if (write_buffer != NULL && nDPIsrvd_buffer_init(&write_buffer->buf, max_buffer_size) != 0)
{
return NULL;
}
remotes.desc[i].sock_type = type; remotes.desc[i].sock_type = type;
remotes.desc[i].fd = remote_fd; remotes.desc[i].fd = remote_fd;
return &remotes.desc[i]; return &remotes.desc[i];
@@ -538,12 +647,28 @@ static void free_remotes(void)
{ {
for (size_t i = 0; i < remotes.desc_size; ++i) for (size_t i = 0; i < remotes.desc_size; ++i)
{ {
if (remotes.desc[i].buf_cache != NULL) switch (remotes.desc[i].sock_type)
{ {
utarray_free(remotes.desc[i].buf_cache); case COLLECTOR_UN:
remotes.desc[i].buf_cache = NULL; nDPIsrvd_json_buffer_free(&remotes.desc[i].event_collector_un.main_read_buffer);
break;
case DISTRIBUTOR_UN:
if (remotes.desc[i].event_distributor_un.additional_write_buffers != NULL)
{
utarray_free(remotes.desc[i].event_distributor_un.additional_write_buffers);
remotes.desc[i].event_distributor_un.additional_write_buffers = NULL;
}
nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_un.main_write_buffer.buf);
break;
case DISTRIBUTOR_IN:
if (remotes.desc[i].event_distributor_in.additional_write_buffers != NULL)
{
utarray_free(remotes.desc[i].event_distributor_in.additional_write_buffers);
remotes.desc[i].event_distributor_in.additional_write_buffers = NULL;
}
nDPIsrvd_buffer_free(&remotes.desc[i].event_distributor_in.main_write_buffer.buf);
break;
} }
nDPIsrvd_buffer_free(&remotes.desc[i].buf);
} }
} }
@@ -610,10 +735,25 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
{ {
case COLLECTOR_UN: case COLLECTOR_UN:
logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno)); logger_nDPIsrvd(current, "Error closing collector connection", ": %s", strerror(errno));
nDPIsrvd_json_buffer_free(&current->event_collector_un.main_read_buffer);
break; break;
case DISTRIBUTOR_UN: case DISTRIBUTOR_UN:
logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno));
if (current->event_distributor_un.additional_write_buffers != NULL)
{
utarray_clear(current->event_distributor_un.additional_write_buffers);
}
nDPIsrvd_buffer_free(&current->event_distributor_un.main_write_buffer.buf);
current->event_distributor_un.main_write_buffer.written = 0;
break;
case DISTRIBUTOR_IN: case DISTRIBUTOR_IN:
logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno)); logger_nDPIsrvd(current, "Error closing distributor connection", ": %s", strerror(errno));
if (current->event_distributor_in.additional_write_buffers != NULL)
{
utarray_clear(current->event_distributor_in.additional_write_buffers);
}
nDPIsrvd_buffer_free(&current->event_distributor_in.main_write_buffer.buf);
current->event_distributor_in.main_write_buffer.written = 0;
break; break;
} }
} }
@@ -625,11 +765,6 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
current->fd = -1; current->fd = -1;
remotes.desc_used--; remotes.desc_used--;
} }
if (current->buf_cache != NULL)
{
utarray_clear(current->buf_cache);
}
nDPIsrvd_buffer_free(&current->buf);
} }
static int nDPIsrvd_parse_options(int argc, char ** argv) static int nDPIsrvd_parse_options(int argc, char ** argv)
@@ -684,14 +819,14 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
nDPIsrvd_options.group = strdup(optarg); nDPIsrvd_options.group = strdup(optarg);
break; break;
case 'C': case 'C':
if (str_value_to_ull(optarg, &nDPIsrvd_options.cache_array_length) != CONVERSION_OK) if (str_value_to_ull(optarg, &nDPIsrvd_options.max_write_buffers) != CONVERSION_OK)
{ {
fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg);
return 1; return 1;
} }
break; break;
case 'D': case 'D':
nDPIsrvd_options.cache_fallback_to_blocking = 0; nDPIsrvd_options.bufferbloat_fallback_to_blocking = 0;
break; break;
case 'v': case 'v':
fprintf(stderr, "%s", get_nDPId_version()); fprintf(stderr, "%s", get_nDPId_version());
@@ -952,21 +1087,27 @@ static int new_connection(int epollfd, int eventfd)
static int handle_collector_protocol(int epollfd, struct remote_desc * const current) static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
{ {
struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
char * json_str_start = NULL; char * json_str_start = NULL;
if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') if (json_read_buffer == NULL)
{
return 1;
}
if (json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{ {
logger_nDPIsrvd(current, logger_nDPIsrvd(current,
"BUG: Collector connection", "BUG: Collector connection",
"JSON invalid opening character: '%c'", "JSON invalid opening character: '%c'",
current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
disconnect_client(epollfd, current); disconnect_client(epollfd, current);
return 1; return 1;
} }
errno = 0; errno = 0;
current->event_collector_un.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10); current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_str_start, 10);
current->event_collector_un.json_bytes += json_str_start - current->buf.ptr.text; current->event_collector_un.json_bytes += json_str_start - json_read_buffer->buf.ptr.text;
if (errno == ERANGE) if (errno == ERANGE)
{ {
@@ -975,18 +1116,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 1; return 1;
} }
if (json_str_start == current->buf.ptr.text) if (json_str_start == json_read_buffer->buf.ptr.text)
{ {
logger_nDPIsrvd(current, logger_nDPIsrvd(current,
"BUG: Collector connection", "BUG: Collector connection",
"missing JSON string length in protocol preamble: \"%.*s\"", "missing JSON string length in protocol preamble: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS, NETWORK_BUFFER_LENGTH_DIGITS,
current->buf.ptr.text); json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current); disconnect_client(epollfd, current);
return 1; return 1;
} }
if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) if (json_str_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
{ {
logger_nDPIsrvd(current, logger_nDPIsrvd(current,
"BUG: Collector connection", "BUG: Collector connection",
@@ -994,33 +1135,33 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"%ld " "%ld "
"bytes", "bytes",
NETWORK_BUFFER_LENGTH_DIGITS, NETWORK_BUFFER_LENGTH_DIGITS,
(long int)(json_str_start - current->buf.ptr.text)); (long int)(json_str_start - json_read_buffer->buf.ptr.text));
} }
if (current->event_collector_un.json_bytes > current->buf.max) if (current->event_collector_un.json_bytes > json_read_buffer->buf.max)
{ {
logger_nDPIsrvd(current, logger_nDPIsrvd(current,
"BUG: Collector connection", "BUG: Collector connection",
"JSON string too big: %llu > %zu", "JSON string too big: %llu > %zu",
current->event_collector_un.json_bytes, current->event_collector_un.json_bytes,
current->buf.max); json_read_buffer->buf.max);
disconnect_client(epollfd, current); disconnect_client(epollfd, current);
return 1; return 1;
} }
if (current->event_collector_un.json_bytes > current->buf.used) if (current->event_collector_un.json_bytes > json_read_buffer->buf.used)
{ {
return 1; return 1;
} }
if (current->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || if (json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' ||
current->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n')
{ {
logger_nDPIsrvd(current, logger_nDPIsrvd(current,
"BUG: Collector connection", "BUG: Collector connection",
"invalid JSON string: %.*s", "invalid JSON string: %.*s",
(int)current->event_collector_un.json_bytes, (int)current->event_collector_un.json_bytes,
current->buf.ptr.text); json_read_buffer->buf.ptr.text);
disconnect_client(epollfd, current); disconnect_client(epollfd, current);
return 1; return 1;
} }
@@ -1030,7 +1171,9 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
static int handle_incoming_data(int epollfd, struct remote_desc * const current) static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{ {
if (current->sock_type != COLLECTOR_UN) struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
if (json_read_buffer == NULL)
{ {
unsigned char garbage = 0; unsigned char garbage = 0;
@@ -1047,18 +1190,19 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
} }
/* read JSON strings (or parts) from the UNIX socket (collecting) */ /* read JSON strings (or parts) from the UNIX socket (collecting) */
if (current->buf.used == current->buf.max) if (json_read_buffer->buf.used == json_read_buffer->buf.max)
{ {
logger_nDPIsrvd(current, logger_nDPIsrvd(current,
"Collector connection", "Collector connection",
"read buffer (%zu bytes) full. No more read possible.", "read buffer (%zu bytes) full. No more read possible.",
current->buf.max); json_read_buffer->buf.max);
} }
else else
{ {
errno = 0; errno = 0;
ssize_t bytes_read = ssize_t bytes_read = read(current->fd,
read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used); json_read_buffer->buf.ptr.raw + json_read_buffer->buf.used,
json_read_buffer->buf.max - json_read_buffer->buf.used);
if (bytes_read < 0 || errno != 0) if (bytes_read < 0 || errno != 0)
{ {
logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno));
@@ -1071,10 +1215,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
disconnect_client(epollfd, current); disconnect_client(epollfd, current);
return 1; return 1;
} }
current->buf.used += bytes_read; json_read_buffer->buf.used += bytes_read;
} }
while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{ {
if (handle_collector_protocol(epollfd, current) != 0) if (handle_collector_protocol(epollfd, current) != 0)
{ {
@@ -1083,19 +1227,18 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
for (size_t i = 0; i < remotes.desc_size; ++i) for (size_t i = 0; i < remotes.desc_size; ++i)
{ {
if (remotes.desc[i].fd < 0) struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(&remotes.desc[i]);
{ UT_array * const additional_write_buffers = get_additional_write_buffers(&remotes.desc[i]);
continue;
} if (remotes.desc[i].fd < 0 || write_buffer == NULL || additional_write_buffers == NULL)
if (remotes.desc[i].sock_type != DISTRIBUTOR_UN && remotes.desc[i].sock_type != DISTRIBUTOR_IN)
{ {
continue; continue;
} }
if (current->event_collector_un.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used || if (current->event_collector_un.json_bytes > write_buffer->buf.max - write_buffer->buf.used ||
utarray_len(remotes.desc[i].buf_cache) > 0) utarray_len(additional_write_buffers) > 0)
{ {
if (utarray_len(remotes.desc[i].buf_cache) == 0) if (utarray_len(additional_write_buffers) == 0)
{ {
#if 0 #if 0
logger_nDPIsrvd(&remotes.desc[i], logger_nDPIsrvd(&remotes.desc[i],
@@ -1114,7 +1257,9 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
continue; continue;
} }
} }
if (add_to_cache(&remotes.desc[i], current->buf.ptr.raw, current->event_collector_un.json_bytes) != 0) if (add_to_additional_write_buffers(&remotes.desc[i],
json_read_buffer->buf.ptr.raw,
current->event_collector_un.json_bytes) != 0)
{ {
disconnect_client(epollfd, &remotes.desc[i]); disconnect_client(epollfd, &remotes.desc[i]);
continue; continue;
@@ -1122,10 +1267,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
} }
else else
{ {
memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used, memcpy(write_buffer->buf.ptr.raw + write_buffer->buf.used,
current->buf.ptr.raw, json_read_buffer->buf.ptr.raw,
current->event_collector_un.json_bytes); current->event_collector_un.json_bytes);
remotes.desc[i].buf.used += current->event_collector_un.json_bytes; write_buffer->buf.used += current->event_collector_un.json_bytes;
} }
if (drain_main_buffer(&remotes.desc[i]) != 0) if (drain_main_buffer(&remotes.desc[i]) != 0)
@@ -1134,10 +1279,10 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
} }
} }
memmove(current->buf.ptr.raw, memmove(json_read_buffer->buf.ptr.raw,
current->buf.ptr.raw + current->event_collector_un.json_bytes, json_read_buffer->buf.ptr.raw + current->event_collector_un.json_bytes,
current->buf.used - current->event_collector_un.json_bytes); json_read_buffer->buf.used - current->event_collector_un.json_bytes);
current->buf.used -= current->event_collector_un.json_bytes; json_read_buffer->buf.used -= current->event_collector_un.json_bytes;
current->event_collector_un.json_bytes = 0; current->event_collector_un.json_bytes = 0;
} }
@@ -1230,7 +1375,7 @@ static int mainloop(int epollfd)
switch (current->sock_type) switch (current->sock_type)
{ {
case COLLECTOR_UN: case COLLECTOR_UN:
logger_nDPIsrvd(current, "Collector disconnected", "closed"); logger_nDPIsrvd(current, "Collector connection", "closed");
break; break;
case DISTRIBUTOR_UN: case DISTRIBUTOR_UN:
case DISTRIBUTOR_IN: case DISTRIBUTOR_IN: