mirror of
https://github.com/outbackdingo/nDPId.git
synced 2026-01-28 02:19:37 +00:00
forwarding data from collector(client,source,UNIX-sock) to distributor(client,sink,TCP-sock)
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
38
nDPIsrvd.c
38
nDPIsrvd.c
@@ -18,8 +18,6 @@ enum ev_type { JSON_SOCK, SERV_SOCK };
|
||||
struct remote_desc {
|
||||
enum ev_type type;
|
||||
int fd;
|
||||
uint8_t buf[BUFSIZ];
|
||||
size_t buf_used;
|
||||
union {
|
||||
struct {
|
||||
int json_sockfd;
|
||||
@@ -125,8 +123,6 @@ static struct remote_desc * get_unused_remote_descriptor(void)
|
||||
for (size_t i = 0; i < remotes.desc_size; ++i) {
|
||||
if (remotes.desc[i].fd == -1) {
|
||||
remotes.desc_used++;
|
||||
remotes.desc[i].buf[0] = '\0';
|
||||
remotes.desc[i].buf_used = 0;
|
||||
return &remotes.desc[i];
|
||||
}
|
||||
}
|
||||
@@ -250,6 +246,11 @@ int main(void)
|
||||
continue;
|
||||
}
|
||||
|
||||
/* shutdown writing end for collector clients */
|
||||
if (current->type == JSON_SOCK) {
|
||||
shutdown(current->fd, SHUT_WR); // collector
|
||||
}
|
||||
|
||||
/* setup epoll event */
|
||||
struct epoll_event accept_event = {};
|
||||
accept_event.data.ptr = current;
|
||||
@@ -275,9 +276,8 @@ int main(void)
|
||||
}
|
||||
if (events[i].events & EPOLLIN) {
|
||||
errno = 0;
|
||||
ssize_t bytes_read = read(current->fd,
|
||||
current->buf + current->buf_used,
|
||||
sizeof(current->buf) - current->buf_used);
|
||||
char buf[BUFSIZ];
|
||||
ssize_t bytes_read = read(current->fd, buf, sizeof(buf));
|
||||
if (bytes_read < 0 || errno != 0) {
|
||||
disconnect_client(epollfd, current);
|
||||
continue;
|
||||
@@ -289,7 +289,29 @@ int main(void)
|
||||
disconnect_client(epollfd, current);
|
||||
continue;
|
||||
}
|
||||
current->buf_used += bytes_read;
|
||||
|
||||
/* broadcast data coming from the json-collector socket to all tcp clients */
|
||||
if (current->type == JSON_SOCK) {
|
||||
for (size_t i = 0; i < remotes.desc_size; ++i) {
|
||||
if (remotes.desc[i].fd < 0) {
|
||||
continue;
|
||||
}
|
||||
if (remotes.desc[i].type == SERV_SOCK) {
|
||||
ssize_t bytes_written = write(remotes.desc[i].fd, buf, bytes_read);
|
||||
if (bytes_written < 0 || errno != 0) {
|
||||
disconnect_client(epollfd, current);
|
||||
continue;
|
||||
}
|
||||
if (bytes_written == 0) {
|
||||
syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK
|
||||
? "collector"
|
||||
: "distributor"));
|
||||
disconnect_client(epollfd, current);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user