Unified IO buffer mgmt.

* c-collectd gives the user control over collectd-exec instance name
 * added missing collectd type `flow_l4_icmp_count`

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2021-06-07 14:42:40 +02:00
parent 382706cd20
commit 54e0601fec
7 changed files with 137 additions and 93 deletions

View File

@@ -1,7 +1,6 @@
# TODOs
1. unify `struct io_buffer` from nDPIsrvd.c and `struct nDPIsrvd_buffer` from nDPIsrvd.h
2. improve nDPIsrvd buffer bloat handling (Do not fall back to blocking mode!)
3. improve UDP/TCP timeout handling by reading netfilter conntrack timeouts from /proc (or just read the conntrack table directly)
4. detect interface / timeout changes and apply them to nDPId
5. implement AEAD crypto via libsodium (at least for TCP communication)
1. improve nDPIsrvd buffer bloat handling (Do not fall back to blocking mode!)
2. improve UDP/TCP timeout handling by reading netfilter conntrack timeouts from /proc (or just read conntrack table entries)
3. detect interface / timeout changes and apply them to nDPId
4. implement AEAD crypto via libsodium (at least for TCP communication)

View File

@@ -134,8 +134,12 @@ struct nDPIsrvd_address
struct nDPIsrvd_buffer
{
char raw[NETWORK_BUFFER_MAX_SIZE];
union {
char * text;
uint8_t * raw;
} ptr;
size_t used;
size_t max;
char * json_string;
size_t json_string_start;
nDPIsrvd_ull json_string_length;
@@ -288,6 +292,33 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value)
return enum_str[enum_value - FIRST_ENUM_VALUE];
}
static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, size_t buffer_size)
{
if (buffer->ptr.raw != NULL && buffer->max != buffer_size)
{
return 1; /* Do not fail and realloc()? */
}
buffer->ptr.raw = (uint8_t *)malloc(buffer_size);
if (buffer->ptr.raw == NULL)
{
return 1;
}
buffer->json_string_start = 0;
buffer->json_string_length = 0ull;
buffer->json_string = NULL;
buffer->used = 0;
buffer->max = buffer_size;
return 0;
}
static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
{
free(buffer->ptr.raw);
}
static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_size,
size_t flow_user_data_size,
json_callback json_cb,
@@ -306,6 +337,10 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz
memset(sock, 0, sizeof(*sock));
sock->fd = -1;
if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0)
{
goto error;
}
sock->address.raw.sa_family = -1;
sock->flow_user_data_size = flow_user_data_size;
@@ -324,6 +359,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz
return sock;
error:
nDPIsrvd_buffer_free(&sock->buffer);
nDPIsrvd_free(&sock);
return NULL;
}
@@ -368,6 +404,7 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock)
(*sock)->flow_table = NULL;
}
nDPIsrvd_buffer_free(&(*sock)->buffer);
free(*sock);
*sock = NULL;
@@ -464,7 +501,7 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock
static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock)
{
ssize_t bytes_read =
read(sock->fd, sock->buffer.raw + sock->buffer.used, sizeof(sock->buffer.raw) - sock->buffer.used);
read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used);
if (bytes_read == 0)
{
@@ -664,25 +701,25 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf
{
return PARSE_NEED_MORE_DATA;
}
if (buffer->raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
if (buffer->ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{
return PARSE_INVALID_OPENING_CHAR;
}
errno = 0;
buffer->json_string_length = strtoull((const char *)buffer->raw, &buffer->json_string, 10);
buffer->json_string_length += buffer->json_string - buffer->raw;
buffer->json_string_start = buffer->json_string - buffer->raw;
buffer->json_string_length = strtoull((const char *)buffer->ptr.text, &buffer->json_string, 10);
buffer->json_string_length += buffer->json_string - buffer->ptr.text;
buffer->json_string_start = buffer->json_string - buffer->ptr.text;
if (errno == ERANGE)
{
return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
}
if (buffer->json_string == buffer->raw)
if (buffer->json_string == buffer->ptr.text)
{
return PARSE_SIZE_MISSING;
}
if (buffer->json_string_length > sizeof(buffer->raw))
if (buffer->json_string_length > buffer->max)
{
return PARSE_STRING_TOO_BIG;
}
@@ -690,14 +727,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf
{
return PARSE_NEED_MORE_DATA;
}
if (buffer->raw[buffer->json_string_length - 2] != '}' || buffer->raw[buffer->json_string_length - 1] != '\n')
if (buffer->ptr.text[buffer->json_string_length - 2] != '}' || buffer->ptr.text[buffer->json_string_length - 1] != '\n')
{
return PARSE_INVALID_CLOSING_CHAR;
}
jsmn_init(&jsmn->parser);
jsmn->tokens_found = jsmn_parse(&jsmn->parser,
(char *)(buffer->raw + buffer->json_string_start),
buffer->ptr.text + buffer->json_string_start,
buffer->json_string_length - buffer->json_string_start,
jsmn->tokens,
nDPIsrvd_MAX_JSON_TOKENS);
@@ -711,7 +748,7 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buf
static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer)
{
memmove(buffer->raw, buffer->raw + buffer->json_string_length, buffer->used - buffer->json_string_length);
memmove(buffer->ptr.raw, buffer->ptr.raw + buffer->json_string_length, buffer->used - buffer->json_string_length);
buffer->used -= buffer->json_string_length;
buffer->json_string_length = 0;
buffer->json_string_start = 0;

View File

@@ -10,6 +10,8 @@
#include "nDPIsrvd.h"
#define DEFAULT_COLLECTD_EXEC_INST "exec-nDPIsrvd"
#define LOG(flags, format, ...) \
if (quiet == 0) \
{ \
@@ -28,6 +30,7 @@ static int collectd_timerfd = -1;
static char * serv_optarg = NULL;
static char * collectd_hostname = NULL;
static char * collectd_interval = NULL;
static char * instance_name = NULL;
static nDPIsrvd_ull collectd_interval_ull = 0uL;
static int quiet = 0;
@@ -134,16 +137,19 @@ static int parse_options(int argc, char ** argv)
static char const usage[] =
"Usage: %s "
"[-s host] [-c hostname] [-i interval] [-q]\n\n"
"[-s host] [-c hostname] [-n collectd-instance-name] [-i interval] [-q]\n\n"
"\t-s\tDestination where nDPIsrvd is listening on.\n"
"\t-c\tCollectd hostname.\n"
"\t \tThis value defaults to the environment variable COLLECTD_HOSTNAME.\n"
"\t-n\tName of the collectd(-exec) instance.\n"
"\t \tDefaults to: " DEFAULT_COLLECTD_EXEC_INST
"\n"
"\t-i\tInterval between print statistics to stdout.\n"
"\t \tThis value defaults to the environment variable COLLECTD_INTERVAL.\n"
"\t-q\tDo not print anything except collectd statistics.\n"
"\t \tAutomatically enabled if environment variables mentioned above are set.\n";
while ((opt = getopt(argc, argv, "hs:c:i:q")) != -1)
while ((opt = getopt(argc, argv, "hs:c:n:i:q")) != -1)
{
switch (opt)
{
@@ -155,6 +161,10 @@ static int parse_options(int argc, char ** argv)
free(collectd_hostname);
collectd_hostname = strdup(optarg);
break;
case 'n':
free(instance_name);
instance_name = strdup(optarg);
break;
case 'i':
free(collectd_interval);
collectd_interval = strdup(optarg);
@@ -182,6 +192,11 @@ static int parse_options(int argc, char ** argv)
}
}
if (instance_name == NULL)
{
instance_name = strdup(DEFAULT_COLLECTD_EXEC_INST);
}
if (collectd_interval == NULL)
{
collectd_interval = getenv("COLLECTD_INTERVAL");
@@ -217,9 +232,9 @@ static int parse_options(int argc, char ** argv)
return 0;
}
#define COLLECTD_PUTVAL_N_FORMAT(name) "PUTVAL %s/nDPId/" #name " interval=%llu %llu:%llu\n"
#define COLLECTD_PUTVAL_N_FORMAT(name) "PUTVAL %s/%s/" #name " interval=%llu %llu:%llu\n"
#define COLLECTD_PUTVAL_N(value) \
collectd_hostname, collectd_interval_ull, (unsigned long long int)now, \
collectd_hostname, instance_name, collectd_interval_ull, (unsigned long long int)now, \
(unsigned long long int)collectd_statistics.value
static void print_collectd_exec_output(void)
{

View File

@@ -60,6 +60,7 @@ flow_category_unknown_count value:GAUGE:0:U
flow_l3_ip4_count value:GAUGE:0:U
flow_l3_ip6_count value:GAUGE:0:U
flow_l3_other_count value:GAUGE:0:U
flow_l4_icmp_count value:GAUGE:0:U
flow_l4_tcp_count value:GAUGE:0:U
flow_l4_udp_count value:GAUGE:0:U
flow_l4_other_count value:GAUGE:0:U

View File

@@ -135,47 +135,38 @@ error:
return NULL;
}
static enum nDPIsrvd_parse_return parse_json_lines(struct io_buffer * const buffer)
static enum nDPIsrvd_parse_return parse_json_lines(struct nDPIsrvd_buffer * const buffer)
{
struct nDPIsrvd_buffer buf = {};
struct nDPIsrvd_jsmn jsmn = {};
size_t const n = (buffer->used > sizeof(buf.raw) ? sizeof(buf.raw) : buffer->used);
size_t const n = (buffer->used > buffer->max ? buffer->max : buffer->used);
if (n > NETWORK_BUFFER_MAX_SIZE)
{
return PARSE_STRING_TOO_BIG;
}
memcpy(buf.raw, buffer->ptr, n);
buf.used = buffer->used;
enum nDPIsrvd_parse_return ret;
while ((ret = nDPIsrvd_parse_line(&buf, &jsmn)) == PARSE_OK)
while ((ret = nDPIsrvd_parse_line(buffer, &jsmn)) == PARSE_OK)
{
if (jsmn.tokens_found == 0)
{
return PARSE_JSMN_ERROR;
}
nDPIsrvd_drain_buffer(&buf);
nDPIsrvd_drain_buffer(buffer);
}
memcpy(buffer->ptr, buf.raw, buf.used);
buffer->used = buf.used;
return ret;
}
static void * distributor_client_mainloop_thread(void * const arg)
{
struct io_buffer client_buffer = {.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE),
.max = NETWORK_BUFFER_MAX_SIZE,
.used = 0};
struct nDPIsrvd_buffer client_buffer = {};
int dis_epollfd = create_evq();
int signalfd = setup_signalfd(dis_epollfd);
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
if (client_buffer.ptr == NULL || dis_epollfd < 0 || signalfd < 0)
if (nDPIsrvd_buffer_init(&client_buffer, NETWORK_BUFFER_MAX_SIZE) != 0 || dis_epollfd < 0 || signalfd < 0)
{
THREAD_ERROR_GOTO(arg);
}
@@ -198,7 +189,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
if (events[i].data.fd == mock_servfds[PIPE_READ])
{
ssize_t bytes_read = read(mock_servfds[PIPE_READ],
client_buffer.ptr + client_buffer.used,
client_buffer.ptr.raw + client_buffer.used,
client_buffer.max - client_buffer.used);
if (bytes_read == 0)
{
@@ -208,7 +199,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
{
THREAD_ERROR_GOTO(arg);
}
printf("%.*s", (int)bytes_read, client_buffer.ptr + client_buffer.used);
printf("%.*s", (int)bytes_read, client_buffer.ptr.text + client_buffer.used);
client_buffer.used += bytes_read;
enum nDPIsrvd_parse_return parse_ret = parse_json_lines(&client_buffer);
@@ -247,7 +238,7 @@ error:
del_event(dis_epollfd, mock_servfds[PIPE_READ]);
close(dis_epollfd);
close(signalfd);
free(client_buffer.ptr);
nDPIsrvd_buffer_free(&client_buffer);
return NULL;
}

50
nDPId.c
View File

@@ -1309,7 +1309,9 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
"reader-thread-count",
nDPId_options.reader_thread_count);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "idle-scan-period", nDPId_options.idle_scan_period);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "generic-max-idle-time", nDPId_options.generic_max_idle_time);
ndpi_serialize_string_int64(&workflow->ndpi_serializer,
"generic-max-idle-time",
nDPId_options.generic_max_idle_time);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "icmp-max-idle-time", nDPId_options.icmp_max_idle_time);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "tcp-max-idle-time", nDPId_options.tcp_max_idle_time);
@@ -1337,7 +1339,9 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_tot_l4_payload_len", flow_ext->total_l4_payload_len);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
"flow_avg_l4_payload_len",
(flow_ext->packets_processed > 0 ? flow_ext->total_l4_payload_len / flow_ext->packets_processed : 0));
(flow_ext->packets_processed > 0
? flow_ext->total_l4_payload_len / flow_ext->packets_processed
: 0));
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow);
}
@@ -1962,14 +1966,14 @@ static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const *
return hash;
}
#define SNAP 0xaa
#define SNAP 0xaa
/* mask for FCF */
#define WIFI_DATA 0x2 /* 0000 0010 */
#define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */
#define FCF_TO_DS(fc) ((fc) & 0x0100)
#define FCF_FROM_DS(fc) ((fc) & 0x0200)
#define WIFI_DATA 0x2 /* 0000 0010 */
#define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */
#define FCF_TO_DS(fc) ((fc)&0x0100)
#define FCF_FROM_DS(fc) ((fc)&0x0200)
/* mask for Bad FCF presence */
#define BAD_FCS 0x50 /* 0101 0000 */
#define BAD_FCS 0x50 /* 0101 0000 */
static int process_datalink_layer(struct nDPId_reader_thread * const reader_thread,
struct pcap_pkthdr const * const header,
uint8_t const * const packet,
@@ -2018,7 +2022,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 1;
}
struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset];
struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset];
*ip_offset = sizeof(struct ndpi_chdlc);
*layer3_type = ntohs(chdlc->proto_code);
break;
@@ -2034,12 +2038,14 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
if (packet[0] == 0x0f || packet[0] == 0x8f)
{
struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset];
struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset];
*ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */
*layer3_type = ntohs(chdlc->proto_code);
} else {
}
else
{
*ip_offset = 2;
*layer3_type = ntohs(*((u_int16_t*)&packet[eth_offset]));
*layer3_type = ntohs(*((u_int16_t *)&packet[eth_offset]));
}
break;
case DLT_LINUX_SLL:
@@ -2050,7 +2056,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 1;
}
*layer3_type = (packet[eth_offset+14] << 8) + packet[eth_offset+15];
*layer3_type = (packet[eth_offset + 14] << 8) + packet[eth_offset + 15];
*ip_offset = 16 + eth_offset;
break;
case DLT_IEEE802_11_RADIO:
@@ -2062,7 +2068,8 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 1;
}
struct ndpi_radiotap_header const * const radiotap = (struct ndpi_radiotap_header const * const)&packet[eth_offset];
struct ndpi_radiotap_header const * const radiotap =
(struct ndpi_radiotap_header const * const) & packet[eth_offset];
uint16_t radio_len = radiotap->len;
/* Check Bad FCS presence */
@@ -2081,19 +2088,21 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
}
/* Calculate 802.11 header length (variable) */
struct ndpi_wifi_header const * const wifi = (struct ndpi_wifi_header const * const)(packet + eth_offset + radio_len);
struct ndpi_wifi_header const * const wifi =
(struct ndpi_wifi_header const * const)(packet + eth_offset + radio_len);
uint16_t fc = wifi->fc;
int wifi_len = 0;
/* check wifi data presence */
if (FCF_TYPE(fc) == WIFI_DATA)
{
if ((FCF_TO_DS(fc) && FCF_FROM_DS(fc) == 0x0) ||
(FCF_TO_DS(fc) == 0x0 && FCF_FROM_DS(fc)))
if ((FCF_TO_DS(fc) && FCF_FROM_DS(fc) == 0x0) || (FCF_TO_DS(fc) == 0x0 && FCF_FROM_DS(fc)))
{
wifi_len = 26; /* + 4 byte fcs */
}
} else {
}
else
{
/* no data frames */
break;
}
@@ -2104,7 +2113,8 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 1;
}
struct ndpi_llc_header_snap const * const llc = (struct ndpi_llc_header_snap const * const)(packet + eth_offset + wifi_len + radio_len);
struct ndpi_llc_header_snap const * const llc =
(struct ndpi_llc_header_snap const * const)(packet + eth_offset + wifi_len + radio_len);
if (llc->dsap == SNAP)
{
*layer3_type = ntohs(llc->snap.proto_ID);
@@ -2397,7 +2407,7 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
tcp = (struct ndpi_tcphdr *)l4_ptr;
l4_payload_len = ndpi_max(0, l4_len-4*tcp->doff);
l4_payload_len = ndpi_max(0, l4_len - 4 * tcp->doff);
flow_basic.tcp_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0);
flow_basic.tcp_is_midstream_flow = (tcp->syn == 0 ? 1 : 0);
flow_basic.src_port = ntohs(tcp->source);

View File

@@ -18,13 +18,6 @@
#include "nDPIsrvd.h"
#include "utils.h"
struct io_buffer
{
uint8_t * ptr;
size_t used;
size_t max;
};
enum sock_type
{
JSON_SOCK,
@@ -35,7 +28,7 @@ struct remote_desc
{
enum sock_type sock_type;
int fd;
struct io_buffer buf;
struct nDPIsrvd_buffer buf;
union {
struct
{
@@ -52,7 +45,7 @@ struct remote_desc
};
};
static struct remotes
static struct
{
struct remote_desc * desc;
size_t desc_size;
@@ -181,9 +174,10 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in
if (remotes.desc[i].fd == -1)
{
remotes.desc_used++;
remotes.desc[i].buf.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE);
remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE;
remotes.desc[i].buf.used = 0;
if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0)
{
return NULL;
}
remotes.desc[i].sock_type = type;
remotes.desc[i].fd = remote_fd;
return &remotes.desc[i];
@@ -226,8 +220,7 @@ static void disconnect_client(int epollfd, struct remote_desc * const current)
current->fd = -1;
remotes.desc_used--;
}
free(current->buf.ptr);
current->buf.ptr = NULL;
nDPIsrvd_buffer_free(&current->buf);
}
static int nDPIsrvd_parse_options(int argc, char ** argv)
@@ -437,18 +430,18 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
{
char * json_str_start = NULL;
if (current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: JSON invalid opening character: '%c'",
current->buf.ptr[NETWORK_BUFFER_LENGTH_DIGITS]);
current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
disconnect_client(epollfd, current);
return 1;
}
errno = 0;
current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
current->event_json.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
current->event_json.json_bytes += json_str_start - current->buf.ptr.text;
if (errno == ERANGE)
{
@@ -457,12 +450,12 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 1;
}
if ((uint8_t *)json_str_start == current->buf.ptr)
if (json_str_start == current->buf.ptr.text)
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: Missing size before JSON string: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS,
current->buf.ptr);
current->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
@@ -482,13 +475,13 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 1;
}
if (current->buf.ptr[current->event_json.json_bytes - 2] != '}' ||
current->buf.ptr[current->event_json.json_bytes - 1] != '\n')
if (current->buf.ptr.text[current->event_json.json_bytes - 2] != '}' ||
current->buf.ptr.text[current->event_json.json_bytes - 1] != '\n')
{
syslog(LOG_DAEMON | LOG_ERR,
"BUG: Invalid JSON string: %.*s",
(int)current->event_json.json_bytes,
current->buf.ptr);
current->buf.ptr.text);
disconnect_client(epollfd, current);
return 1;
}
@@ -512,7 +505,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
errno = 0;
ssize_t bytes_read =
read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used);
if (bytes_read < 0 || errno != 0)
{
disconnect_client(epollfd, current);
@@ -564,7 +557,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) !=
(ssize_t)remotes.desc[i].buf.used)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -582,13 +575,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
}
}
memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
current->buf.ptr,
memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
current->buf.ptr.raw,
current->event_json.json_bytes);
remotes.desc[i].buf.used += current->event_json.json_bytes;
errno = 0;
ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used);
ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used);
if (errno == EAGAIN)
{
continue;
@@ -630,8 +623,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
ntohs(remotes.desc[i].event_serv.peer.sin_port),
bytes_written,
remotes.desc[i].buf.used);
memmove(remotes.desc[i].buf.ptr,
remotes.desc[i].buf.ptr + bytes_written,
memmove(remotes.desc[i].buf.ptr.raw,
remotes.desc[i].buf.ptr.raw + bytes_written,
remotes.desc[i].buf.used - bytes_written);
remotes.desc[i].buf.used -= bytes_written;
continue;
@@ -640,8 +633,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
remotes.desc[i].buf.used = 0;
}
memmove(current->buf.ptr,
current->buf.ptr + current->event_json.json_bytes,
memmove(current->buf.ptr.raw,
current->buf.ptr.raw + current->event_json.json_bytes,
current->buf.used - current->event_json.json_bytes);
current->buf.used -= current->event_json.json_bytes;
current->event_json.json_bytes = 0;
@@ -819,7 +812,7 @@ static int setup_remote_descriptors(size_t max_descriptors)
{
remotes.desc_used = 0;
remotes.desc_size = max_descriptors;
remotes.desc = (struct remote_desc *)malloc(remotes.desc_size * sizeof(*remotes.desc));
remotes.desc = (struct remote_desc *)calloc(remotes.desc_size, sizeof(*remotes.desc));
if (remotes.desc == NULL)
{
return -1;
@@ -827,8 +820,6 @@ static int setup_remote_descriptors(size_t max_descriptors)
for (size_t i = 0; i < remotes.desc_size; ++i)
{
remotes.desc[i].fd = -1;
remotes.desc[i].buf.ptr = NULL;
remotes.desc[i].buf.max = 0;
}
return 0;