MSW: use asynchronous API

This hugely increases network performance on MSW which make it suitable
to low ratio compressions as well as uncompressed video.
This commit is contained in:
Martin Pulec
2015-02-11 11:25:31 +01:00
parent afaedbc25a
commit 33ff4d435f
5 changed files with 222 additions and 41 deletions

View File

@@ -135,8 +135,17 @@ struct _socket_udp {
pthread_mutex_t lock;
pthread_cond_t boss_cv;
pthread_cond_t reader_cv;
#ifdef WIN32
WSAOVERLAPPED *overlapped;
WSAEVENT *overlapped_events;
void **dispose_udata;
bool overlapping_active;
int overlapped_max;
int overlapped_count;
#endif
};
static void udp_clean_async_state(socket_udp *s);
#ifdef WIN32
/* Want to use both Winsock 1 and 2 socket options, but since
@@ -329,7 +338,11 @@ static socket_udp *udp_init4(const char *addr, const char *iface,
} else {
ifindex = 0;
}
#ifdef WIN32
s->fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
#else
s->fd = socket(AF_INET, SOCK_DGRAM, 0);
#endif
if (s->fd == INVALID_SOCKET) {
socket_error("Unable to initialize socket");
goto error;
@@ -466,7 +479,7 @@ static inline int udp_send4(socket_udp * s, char *buffer, int buflen)
}
#ifdef WIN32
static inline int udp_sendv4(socket_udp * s, LPWSABUF vector, int count)
static inline int udp_sendv4(socket_udp * s, LPWSABUF vector, int count, void *d)
{
struct sockaddr_in s_in;
@@ -477,13 +490,26 @@ static inline int udp_sendv4(socket_udp * s, LPWSABUF vector, int count)
s_in.sin_addr.s_addr = s->addr4.s_addr;
s_in.sin_port = htons(s->tx_port);
assert(!s->overlapping_active || s->overlapped_count < s->overlapped_max);
DWORD bytesSent;
return WSASendTo(s->fd, vector, count, &bytesSent, 0,
int ret = WSASendTo(s->fd, vector, count, &bytesSent, 0,
(struct sockaddr *) &s_in,
sizeof(s_in), NULL, NULL);
sizeof(s_in), s->overlapping_active ? &s->overlapped[s->overlapped_count] : NULL, NULL);
if (s->overlapping_active) {
s->dispose_udata[s->overlapped_count] = d;
} else {
free(d);
}
s->overlapped_count++;
if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
return 0;
else {
return ret;
}
}
#else
static inline int udp_sendv4(socket_udp * s, struct iovec *vector, int count)
static inline int udp_sendv4(socket_udp * s, struct iovec *vector, int count, void *d)
{
struct msghdr msg;
struct sockaddr_in s_in;
@@ -504,7 +530,9 @@ static inline int udp_sendv4(socket_udp * s, struct iovec *vector, int count)
msg.msg_controllen = 0;
msg.msg_flags = 0;
return sendmsg(s->fd, &msg, 0);
int ret = sendmsg(s->fd, &msg, 0);
free(d);
return ret;
}
#endif // WIN32
@@ -639,7 +667,7 @@ static socket_udp *udp_init6(const char *addr, const char *iface,
#ifdef HAVE_IPv6
int reuse = 1;
struct sockaddr_in6 s_in;
socket_udp *s = (socket_udp *) malloc(sizeof(socket_udp));
socket_udp *s = (socket_udp *) calloc(1, sizeof(socket_udp));
s->mode = IPv6;
s->addr = NULL;
s->rx_port = rx_port;
@@ -816,18 +844,31 @@ static int udp_send6(socket_udp * s, char *buffer, int buflen)
}
#ifdef WIN32
static int udp_sendv6(socket_udp * s, LPWSABUF vector, int count)
static int udp_sendv6(socket_udp * s, LPWSABUF vector, int count, void *d)
{
assert(s != NULL);
assert(s->mode == IPv6);
assert(!s->overlapping_active || s->overlapped_count < s->overlapped_max);
DWORD bytesSent;
return WSASendTo(s->fd, vector, count, &bytesSent, 0,
int ret = WSASendTo(s->fd, vector, count, &bytesSent, 0,
(struct sockaddr *) &s->sock6,
sizeof(s->sock6), NULL, NULL);
sizeof(s->sock6), s->overlapping_active ? &s->overlapped[s->overlapped_count] : NULL, NULL);
if (s->overlapping_active) {
s->dispose_udata[s->overlapped_count] = d;
} else {
free(d);
}
s->overlapped_count++;
if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
return 0;
else {
return ret;
}
}
#else
static int udp_sendv6(socket_udp * s, struct iovec *vector, int count)
static int udp_sendv6(socket_udp * s, struct iovec *vector, int count, void *d)
{
#ifdef HAVE_IPv6
struct msghdr msg;
@@ -842,7 +883,9 @@ static int udp_sendv6(socket_udp * s, struct iovec *vector, int count)
msg.msg_control = 0;
msg.msg_controllen = 0;
msg.msg_flags = 0;
return sendmsg(s->fd, &msg, 0);
int ret = sendmsg(s->fd, &msg, 0);
free(d);
return ret;
#else
UNUSED(s);
UNUSED(vector);
@@ -1068,6 +1111,8 @@ void udp_exit(socket_udp * s)
}
}
udp_clean_async_state(s);
switch (s->mode) {
case IPv4:
udp_exit4(s);
@@ -1104,16 +1149,16 @@ int udp_send(socket_udp * s, char *buffer, int buflen)
}
#ifdef WIN32
int udp_sendv(socket_udp * s, LPWSABUF vector, int count)
int udp_sendv(socket_udp * s, LPWSABUF vector, int count, void *d)
#else
int udp_sendv(socket_udp * s, struct iovec *vector, int count)
int udp_sendv(socket_udp * s, struct iovec *vector, int count, void *d)
#endif // WIN32
{
switch (s->mode) {
case IPv4:
return udp_sendv4(s, vector, count);
return udp_sendv4(s, vector, count, d);
case IPv6:
return udp_sendv6(s, vector, count);
return udp_sendv6(s, vector, count, d);
default:
abort(); /* Yuk! */
}
@@ -1414,6 +1459,70 @@ int udp_change_dest(socket_udp *s, const char *addr)
}
}
void udp_async_start(socket_udp *s, int nr_packets)
{
#ifdef WIN32
if (nr_packets > s->overlapped_max) {
s->overlapped = realloc(s->overlapped, nr_packets * sizeof(WSAOVERLAPPED));
s->overlapped_events = realloc(s->overlapped_events, nr_packets * sizeof(WSAEVENT));
s->dispose_udata = realloc(s->dispose_udata, nr_packets * sizeof(void *));
for (int i = s->overlapped_max; i < nr_packets; ++i) {
memset(&s->overlapped[i], 0, sizeof(WSAOVERLAPPED));
s->overlapped[i].hEvent = s->overlapped_events[i] = WSACreateEvent();
assert(s->overlapped[i].hEvent != WSA_INVALID_EVENT);
}
s->overlapped_max = nr_packets;
}
s->overlapped_count = 0;
s->overlapping_active = true;
#else
UNUSED(nr_packets);
UNUSED(s);
#endif
}
void udp_async_wait(socket_udp *s)
{
#ifdef WIN32
if (!s->overlapping_active)
return;
for(int i = 0; i < s->overlapped_count; i += WSA_MAXIMUM_WAIT_EVENTS)
{
int count = WSA_MAXIMUM_WAIT_EVENTS;
if (s->overlapped_count - i < WSA_MAXIMUM_WAIT_EVENTS)
count = s->overlapped_count - i;
DWORD ret = WSAWaitForMultipleEvents(count, s->overlapped_events + i, TRUE, INFINITE, TRUE);
if (ret == WSA_WAIT_FAILED) {
socket_error("WSAWaitForMultipleEvents");
}
}
for (int i = 0; i < s->overlapped_count; i++) {
if (WSAResetEvent(s->overlapped[i].hEvent) == FALSE) {
socket_error("WSAResetEvent");
}
free(s->dispose_udata[i]);
}
s->overlapping_active = false;
#else
UNUSED(s);
#endif
}
static void udp_clean_async_state(socket_udp *s)
{
#ifdef WIN32
for (int i = 0; i < s->overlapped_max; i++) {
WSACloseEvent(s->overlapped[i].hEvent);
}
free(s->overlapped);
free(s->overlapped_events);
free(s->dispose_udata);
#else
UNUSED(s);
#endif
}
bool udp_is_ipv6(socket_udp *s)
{
return s->mode == IPv6;

View File

@@ -61,10 +61,12 @@ int udp_recv(socket_udp *s, char *buffer, int buflen);
int udp_send(socket_udp *s, char *buffer, int buflen);
int udp_recvv(socket_udp *s, struct msghdr *m);
void udp_async_start(socket_udp *s, int nr_packets);
void udp_async_wait(socket_udp *s);
#ifdef WIN32
int udp_sendv(socket_udp *s, LPWSABUF vector, int count);
int udp_sendv(socket_udp *s, LPWSABUF vector, int count, void *d);
#else
int udp_sendv(socket_udp *s, struct iovec *vector, int count);
int udp_sendv(socket_udp *s, struct iovec *vector, int count, void *d);
#endif
char *udp_host_addr(socket_udp *s);

View File

@@ -2612,12 +2612,14 @@ rtp_send_data_hdr(struct rtp *session,
rtp_packet *packet = NULL;
uint8_t initVec[8] = { 0, 0, 0, 0, 0, 0, 0, 0 };
#ifdef WIN32
WSABUF send_vector[3];
WSABUF *send_vector;
#else
struct iovec send_vector[3];
#endif
int send_vector_len;
void *d; // to be freed after packet is sent
check_database(session);
assert((data == NULL && data_len == 0)
@@ -2662,9 +2664,16 @@ rtp_send_data_hdr(struct rtp *session,
if (buffer == NULL) {
assert(buffer_len < RTP_MAX_PACKET_LEN);
/* we dont always need 20 (12|16) but this seems to work. LG */
buffer = (uint8_t *) malloc(20 + RTP_PACKET_HEADER_SIZE);
#ifdef WIN32
d = buffer = (uint8_t *) malloc(3 * sizeof(WSABUF) + 20 + RTP_PACKET_HEADER_SIZE);
send_vector = d;
buffer = (uint8_t *) d + 3 * sizeof(WSABUF);
#else
d = buffer = (uint8_t *) malloc(20 + RTP_PACKET_HEADER_SIZE);
#endif
packet = (rtp_packet *) buffer;
}
#ifdef WIN32
send_vector[0].buf = buffer + RTP_PACKET_HEADER_SIZE;
send_vector[0].len = buffer_len;
@@ -2760,13 +2769,11 @@ rtp_send_data_hdr(struct rtp *session,
buffer_len, initVec);
}
rc = udp_sendv(session->rtp_socket, send_vector, send_vector_len);
rc = udp_sendv(session->rtp_socket, send_vector, send_vector_len, d);
if (rc == -1) {
perror("sending RTP packet");
}
free(buffer);
/* Update the RTCP statistics... */
session->we_sent = TRUE;
session->rtp_pcount += 1;
@@ -3938,3 +3945,13 @@ bool rtp_is_ipv6(struct rtp *session)
return udp_is_ipv6(session->rtp_socket);
}
void rtp_async_start(struct rtp *session, int nr_packets)
{
udp_async_start(session->rtp_socket, nr_packets);
}
void rtp_async_wait(struct rtp *session)
{
udp_async_wait(session->rtp_socket);
}

View File

@@ -293,6 +293,19 @@ uint64_t rtp_get_bytes_sent(struct rtp *session);
int rtp_compute_fract_lost(struct rtp *session, uint32_t ssrc);
bool rtp_is_ipv6(struct rtp *session);
/*
* Async API - MSW specific
*
* Using async API hugely improves performance.
* Usage is simple - prior to sending a bulk of packets (eg. video frame), rtp_async_start()
* is started. Then, all packets are sent as usual, exept that neither data nor headers should
* be altered up to rtp_async_wait() call, which waits upon completition of async operations
* started after rtp_async_start(). Caller is responsible that rtp_send_data_hdr() is not called
* more than nr_packet times.
*/
void rtp_async_start(struct rtp *session, int nr_packets);
void rtp_async_wait(struct rtp *session);
#ifdef __cplusplus
}
#endif

View File

@@ -425,6 +425,27 @@ static uint32_t format_interl_fps_hdr_row(enum interlacing_t interlacing, double
tmp |= fi << 13;
return htonl(tmp);
}
static inline int get_data_len(bool with_fec, int mtu, int hdrs_len,
int fec_symbol_size, int *fec_symbol_offset)
{
int data_len;
data_len = mtu - hdrs_len;
if (with_fec) {
if (fec_symbol_size <= mtu - hdrs_len) {
data_len = data_len / fec_symbol_size * fec_symbol_size;
} else {
if (fec_symbol_size - *fec_symbol_offset <= mtu - hdrs_len) {
data_len = fec_symbol_size - *fec_symbol_offset;
*fec_symbol_offset = 0;
} else {
*fec_symbol_offset += data_len;
}
}
} else {
data_len = (data_len / 48) * 48;
}
return data_len;
}
static void
tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session,
@@ -555,6 +576,33 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session,
packet_rate = compute_packet_rate(req_bitrate, tx->mtu);
}
// calculate number of packets
int packet_count = 0;
do {
pos += get_data_len(frame->fec_params.type != FEC_NONE, tx->mtu, hdrs_len,
fec_symbol_size, &fec_symbol_offset);
packet_count += 1;
} while (pos < (unsigned int) tile->data_len);
if(tx->fec_scheme == FEC_MULT) {
packet_count *= tx->mult_count;
}
pos = 0;
fec_symbol_offset = 0;
// initialize header array with values (except offset which is different among
// different packts)
void *rtp_headers = malloc(packet_count * rtp_hdr_len);
uint32_t *rtp_hdr_packet = (uint32_t *) rtp_headers;
for (int i = 0; i < packet_count; ++i) {
memcpy(rtp_hdr_packet, rtp_hdr, rtp_hdr_len);
rtp_hdr_packet += rtp_hdr_len / sizeof(uint32_t);
}
rtp_hdr_packet = (uint32_t *) rtp_headers;
if (!tx->encryption) {
rtp_async_start(rtp_session, packet_count);
}
do {
if(tx->fec_scheme == FEC_MULT) {
pos = mult_pos[mult_index];
@@ -562,24 +610,11 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session,
int offset = pos + fragment_offset;
rtp_hdr[1] = htonl(offset);
rtp_hdr_packet[1] = htonl(offset);
data = tile->data + pos;
data_len = tx->mtu - hdrs_len;
if (frame->fec_params.type != FEC_NONE) {
if (fec_symbol_size <= tx->mtu - hdrs_len) {
data_len = data_len / fec_symbol_size * fec_symbol_size;
} else {
if (fec_symbol_size - fec_symbol_offset <= tx->mtu - hdrs_len) {
data_len = fec_symbol_size - fec_symbol_offset;
fec_symbol_offset = 0;
} else {
fec_symbol_offset += data_len;
}
}
} else {
data_len = (data_len / 48) * 48;
}
data_len = get_data_len(frame->fec_params.type != FEC_NONE, tx->mtu, hdrs_len,
fec_symbol_size, &fec_symbol_offset);
if (pos + data_len >= (unsigned int) tile->data_len) {
if (send_m) {
m = 1;
@@ -594,7 +629,7 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session,
if (tx->encryption) {
data_len = tx->enc_funcs->encrypt(tx->encryption,
data, data_len,
(char *) rtp_hdr,
(char *) rtp_hdr_packet,
frame->fec_params.type != FEC_NONE ? sizeof(fec_video_payload_hdr_t) :
sizeof(video_payload_hdr_t),
encrypted_data);
@@ -602,7 +637,7 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session,
}
rtp_send_data_hdr(rtp_session, ts, pt, m, 0, 0,
(char *) rtp_hdr, rtp_hdr_len,
(char *) rtp_hdr_packet, rtp_hdr_len,
data, data_len, 0, 0, 0);
}
@@ -624,8 +659,13 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session,
if(tx->fec_scheme == FEC_MULT) {
pos = mult_pos[tx->mult_count - 1];
}
rtp_hdr_packet += rtp_hdr_len / sizeof(uint32_t);
} while (pos < (unsigned int) tile->data_len);
if (!tx->encryption) {
rtp_async_wait(rtp_session);
}
free(rtp_headers);
}
/*