diff --git a/src/rtp/net_udp.c b/src/rtp/net_udp.c index 85befce36..924d4ce76 100644 --- a/src/rtp/net_udp.c +++ b/src/rtp/net_udp.c @@ -135,8 +135,17 @@ struct _socket_udp { pthread_mutex_t lock; pthread_cond_t boss_cv; pthread_cond_t reader_cv; +#ifdef WIN32 + WSAOVERLAPPED *overlapped; + WSAEVENT *overlapped_events; + void **dispose_udata; + bool overlapping_active; + int overlapped_max; + int overlapped_count; +#endif }; +static void udp_clean_async_state(socket_udp *s); #ifdef WIN32 /* Want to use both Winsock 1 and 2 socket options, but since @@ -329,7 +338,11 @@ static socket_udp *udp_init4(const char *addr, const char *iface, } else { ifindex = 0; } +#ifdef WIN32 + s->fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED); +#else s->fd = socket(AF_INET, SOCK_DGRAM, 0); +#endif if (s->fd == INVALID_SOCKET) { socket_error("Unable to initialize socket"); goto error; @@ -466,7 +479,7 @@ static inline int udp_send4(socket_udp * s, char *buffer, int buflen) } #ifdef WIN32 -static inline int udp_sendv4(socket_udp * s, LPWSABUF vector, int count) +static inline int udp_sendv4(socket_udp * s, LPWSABUF vector, int count, void *d) { struct sockaddr_in s_in; @@ -477,13 +490,26 @@ static inline int udp_sendv4(socket_udp * s, LPWSABUF vector, int count) s_in.sin_addr.s_addr = s->addr4.s_addr; s_in.sin_port = htons(s->tx_port); + assert(!s->overlapping_active || s->overlapped_count < s->overlapped_max); + DWORD bytesSent; - return WSASendTo(s->fd, vector, count, &bytesSent, 0, + int ret = WSASendTo(s->fd, vector, count, &bytesSent, 0, (struct sockaddr *) &s_in, - sizeof(s_in), NULL, NULL); + sizeof(s_in), s->overlapping_active ? &s->overlapped[s->overlapped_count] : NULL, NULL); + if (s->overlapping_active) { + s->dispose_udata[s->overlapped_count] = d; + } else { + free(d); + } + s->overlapped_count++; + if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING) + return 0; + else { + return ret; + } } #else -static inline int udp_sendv4(socket_udp * s, struct iovec *vector, int count) +static inline int udp_sendv4(socket_udp * s, struct iovec *vector, int count, void *d) { struct msghdr msg; struct sockaddr_in s_in; @@ -504,7 +530,9 @@ static inline int udp_sendv4(socket_udp * s, struct iovec *vector, int count) msg.msg_controllen = 0; msg.msg_flags = 0; - return sendmsg(s->fd, &msg, 0); + int ret = sendmsg(s->fd, &msg, 0); + free(d); + return ret; } #endif // WIN32 @@ -639,7 +667,7 @@ static socket_udp *udp_init6(const char *addr, const char *iface, #ifdef HAVE_IPv6 int reuse = 1; struct sockaddr_in6 s_in; - socket_udp *s = (socket_udp *) malloc(sizeof(socket_udp)); + socket_udp *s = (socket_udp *) calloc(1, sizeof(socket_udp)); s->mode = IPv6; s->addr = NULL; s->rx_port = rx_port; @@ -816,18 +844,31 @@ static int udp_send6(socket_udp * s, char *buffer, int buflen) } #ifdef WIN32 -static int udp_sendv6(socket_udp * s, LPWSABUF vector, int count) +static int udp_sendv6(socket_udp * s, LPWSABUF vector, int count, void *d) { assert(s != NULL); assert(s->mode == IPv6); + assert(!s->overlapping_active || s->overlapped_count < s->overlapped_max); + DWORD bytesSent; - return WSASendTo(s->fd, vector, count, &bytesSent, 0, + int ret = WSASendTo(s->fd, vector, count, &bytesSent, 0, (struct sockaddr *) &s->sock6, - sizeof(s->sock6), NULL, NULL); + sizeof(s->sock6), s->overlapping_active ? &s->overlapped[s->overlapped_count] : NULL, NULL); + if (s->overlapping_active) { + s->dispose_udata[s->overlapped_count] = d; + } else { + free(d); + } + s->overlapped_count++; + if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING) + return 0; + else { + return ret; + } } #else -static int udp_sendv6(socket_udp * s, struct iovec *vector, int count) +static int udp_sendv6(socket_udp * s, struct iovec *vector, int count, void *d) { #ifdef HAVE_IPv6 struct msghdr msg; @@ -842,7 +883,9 @@ static int udp_sendv6(socket_udp * s, struct iovec *vector, int count) msg.msg_control = 0; msg.msg_controllen = 0; msg.msg_flags = 0; - return sendmsg(s->fd, &msg, 0); + int ret = sendmsg(s->fd, &msg, 0); + free(d); + return ret; #else UNUSED(s); UNUSED(vector); @@ -1068,6 +1111,8 @@ void udp_exit(socket_udp * s) } } + udp_clean_async_state(s); + switch (s->mode) { case IPv4: udp_exit4(s); @@ -1104,16 +1149,16 @@ int udp_send(socket_udp * s, char *buffer, int buflen) } #ifdef WIN32 -int udp_sendv(socket_udp * s, LPWSABUF vector, int count) +int udp_sendv(socket_udp * s, LPWSABUF vector, int count, void *d) #else -int udp_sendv(socket_udp * s, struct iovec *vector, int count) +int udp_sendv(socket_udp * s, struct iovec *vector, int count, void *d) #endif // WIN32 { switch (s->mode) { case IPv4: - return udp_sendv4(s, vector, count); + return udp_sendv4(s, vector, count, d); case IPv6: - return udp_sendv6(s, vector, count); + return udp_sendv6(s, vector, count, d); default: abort(); /* Yuk! */ } @@ -1414,6 +1459,70 @@ int udp_change_dest(socket_udp *s, const char *addr) } } +void udp_async_start(socket_udp *s, int nr_packets) +{ +#ifdef WIN32 + if (nr_packets > s->overlapped_max) { + s->overlapped = realloc(s->overlapped, nr_packets * sizeof(WSAOVERLAPPED)); + s->overlapped_events = realloc(s->overlapped_events, nr_packets * sizeof(WSAEVENT)); + s->dispose_udata = realloc(s->dispose_udata, nr_packets * sizeof(void *)); + for (int i = s->overlapped_max; i < nr_packets; ++i) { + memset(&s->overlapped[i], 0, sizeof(WSAOVERLAPPED)); + s->overlapped[i].hEvent = s->overlapped_events[i] = WSACreateEvent(); + assert(s->overlapped[i].hEvent != WSA_INVALID_EVENT); + } + s->overlapped_max = nr_packets; + } + + s->overlapped_count = 0; + s->overlapping_active = true; +#else + UNUSED(nr_packets); + UNUSED(s); +#endif +} + +void udp_async_wait(socket_udp *s) +{ +#ifdef WIN32 + if (!s->overlapping_active) + return; + for(int i = 0; i < s->overlapped_count; i += WSA_MAXIMUM_WAIT_EVENTS) + { + int count = WSA_MAXIMUM_WAIT_EVENTS; + if (s->overlapped_count - i < WSA_MAXIMUM_WAIT_EVENTS) + count = s->overlapped_count - i; + DWORD ret = WSAWaitForMultipleEvents(count, s->overlapped_events + i, TRUE, INFINITE, TRUE); + if (ret == WSA_WAIT_FAILED) { + socket_error("WSAWaitForMultipleEvents"); + } + } + for (int i = 0; i < s->overlapped_count; i++) { + if (WSAResetEvent(s->overlapped[i].hEvent) == FALSE) { + socket_error("WSAResetEvent"); + } + free(s->dispose_udata[i]); + } + s->overlapping_active = false; +#else + UNUSED(s); +#endif +} + +static void udp_clean_async_state(socket_udp *s) +{ +#ifdef WIN32 + for (int i = 0; i < s->overlapped_max; i++) { + WSACloseEvent(s->overlapped[i].hEvent); + } + free(s->overlapped); + free(s->overlapped_events); + free(s->dispose_udata); +#else + UNUSED(s); +#endif +} + bool udp_is_ipv6(socket_udp *s) { return s->mode == IPv6; diff --git a/src/rtp/net_udp.h b/src/rtp/net_udp.h index 7585220fa..ec20e86da 100644 --- a/src/rtp/net_udp.h +++ b/src/rtp/net_udp.h @@ -61,10 +61,12 @@ int udp_recv(socket_udp *s, char *buffer, int buflen); int udp_send(socket_udp *s, char *buffer, int buflen); int udp_recvv(socket_udp *s, struct msghdr *m); +void udp_async_start(socket_udp *s, int nr_packets); +void udp_async_wait(socket_udp *s); #ifdef WIN32 -int udp_sendv(socket_udp *s, LPWSABUF vector, int count); +int udp_sendv(socket_udp *s, LPWSABUF vector, int count, void *d); #else -int udp_sendv(socket_udp *s, struct iovec *vector, int count); +int udp_sendv(socket_udp *s, struct iovec *vector, int count, void *d); #endif char *udp_host_addr(socket_udp *s); diff --git a/src/rtp/rtp.c b/src/rtp/rtp.c index e0763ecd6..ce26b5a15 100644 --- a/src/rtp/rtp.c +++ b/src/rtp/rtp.c @@ -2612,12 +2612,14 @@ rtp_send_data_hdr(struct rtp *session, rtp_packet *packet = NULL; uint8_t initVec[8] = { 0, 0, 0, 0, 0, 0, 0, 0 }; #ifdef WIN32 - WSABUF send_vector[3]; + WSABUF *send_vector; #else struct iovec send_vector[3]; #endif int send_vector_len; + void *d; // to be freed after packet is sent + check_database(session); assert((data == NULL && data_len == 0) @@ -2662,9 +2664,16 @@ rtp_send_data_hdr(struct rtp *session, if (buffer == NULL) { assert(buffer_len < RTP_MAX_PACKET_LEN); /* we dont always need 20 (12|16) but this seems to work. LG */ - buffer = (uint8_t *) malloc(20 + RTP_PACKET_HEADER_SIZE); +#ifdef WIN32 + d = buffer = (uint8_t *) malloc(3 * sizeof(WSABUF) + 20 + RTP_PACKET_HEADER_SIZE); + send_vector = d; + buffer = (uint8_t *) d + 3 * sizeof(WSABUF); +#else + d = buffer = (uint8_t *) malloc(20 + RTP_PACKET_HEADER_SIZE); +#endif packet = (rtp_packet *) buffer; } + #ifdef WIN32 send_vector[0].buf = buffer + RTP_PACKET_HEADER_SIZE; send_vector[0].len = buffer_len; @@ -2760,13 +2769,11 @@ rtp_send_data_hdr(struct rtp *session, buffer_len, initVec); } - rc = udp_sendv(session->rtp_socket, send_vector, send_vector_len); + rc = udp_sendv(session->rtp_socket, send_vector, send_vector_len, d); if (rc == -1) { perror("sending RTP packet"); } - free(buffer); - /* Update the RTCP statistics... */ session->we_sent = TRUE; session->rtp_pcount += 1; @@ -3938,3 +3945,13 @@ bool rtp_is_ipv6(struct rtp *session) return udp_is_ipv6(session->rtp_socket); } +void rtp_async_start(struct rtp *session, int nr_packets) +{ + udp_async_start(session->rtp_socket, nr_packets); +} + +void rtp_async_wait(struct rtp *session) +{ + udp_async_wait(session->rtp_socket); +} + diff --git a/src/rtp/rtp.h b/src/rtp/rtp.h index a543d36f6..1a0e46f72 100644 --- a/src/rtp/rtp.h +++ b/src/rtp/rtp.h @@ -293,6 +293,19 @@ uint64_t rtp_get_bytes_sent(struct rtp *session); int rtp_compute_fract_lost(struct rtp *session, uint32_t ssrc); bool rtp_is_ipv6(struct rtp *session); +/* + * Async API - MSW specific + * + * Using async API hugely improves performance. + * Usage is simple - prior to sending a bulk of packets (eg. video frame), rtp_async_start() + * is started. Then, all packets are sent as usual, exept that neither data nor headers should + * be altered up to rtp_async_wait() call, which waits upon completition of async operations + * started after rtp_async_start(). Caller is responsible that rtp_send_data_hdr() is not called + * more than nr_packet times. + */ +void rtp_async_start(struct rtp *session, int nr_packets); +void rtp_async_wait(struct rtp *session); + #ifdef __cplusplus } #endif diff --git a/src/transmit.cpp b/src/transmit.cpp index 320395d6f..a0158b7a9 100644 --- a/src/transmit.cpp +++ b/src/transmit.cpp @@ -425,6 +425,27 @@ static uint32_t format_interl_fps_hdr_row(enum interlacing_t interlacing, double tmp |= fi << 13; return htonl(tmp); } +static inline int get_data_len(bool with_fec, int mtu, int hdrs_len, + int fec_symbol_size, int *fec_symbol_offset) +{ + int data_len; + data_len = mtu - hdrs_len; + if (with_fec) { + if (fec_symbol_size <= mtu - hdrs_len) { + data_len = data_len / fec_symbol_size * fec_symbol_size; + } else { + if (fec_symbol_size - *fec_symbol_offset <= mtu - hdrs_len) { + data_len = fec_symbol_size - *fec_symbol_offset; + *fec_symbol_offset = 0; + } else { + *fec_symbol_offset += data_len; + } + } + } else { + data_len = (data_len / 48) * 48; + } + return data_len; +} static void tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session, @@ -555,6 +576,33 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session, packet_rate = compute_packet_rate(req_bitrate, tx->mtu); } + // calculate number of packets + int packet_count = 0; + do { + pos += get_data_len(frame->fec_params.type != FEC_NONE, tx->mtu, hdrs_len, + fec_symbol_size, &fec_symbol_offset); + packet_count += 1; + } while (pos < (unsigned int) tile->data_len); + if(tx->fec_scheme == FEC_MULT) { + packet_count *= tx->mult_count; + } + pos = 0; + fec_symbol_offset = 0; + + // initialize header array with values (except offset which is different among + // different packts) + void *rtp_headers = malloc(packet_count * rtp_hdr_len); + uint32_t *rtp_hdr_packet = (uint32_t *) rtp_headers; + for (int i = 0; i < packet_count; ++i) { + memcpy(rtp_hdr_packet, rtp_hdr, rtp_hdr_len); + rtp_hdr_packet += rtp_hdr_len / sizeof(uint32_t); + } + rtp_hdr_packet = (uint32_t *) rtp_headers; + + if (!tx->encryption) { + rtp_async_start(rtp_session, packet_count); + } + do { if(tx->fec_scheme == FEC_MULT) { pos = mult_pos[mult_index]; @@ -562,24 +610,11 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session, int offset = pos + fragment_offset; - rtp_hdr[1] = htonl(offset); + rtp_hdr_packet[1] = htonl(offset); data = tile->data + pos; - data_len = tx->mtu - hdrs_len; - if (frame->fec_params.type != FEC_NONE) { - if (fec_symbol_size <= tx->mtu - hdrs_len) { - data_len = data_len / fec_symbol_size * fec_symbol_size; - } else { - if (fec_symbol_size - fec_symbol_offset <= tx->mtu - hdrs_len) { - data_len = fec_symbol_size - fec_symbol_offset; - fec_symbol_offset = 0; - } else { - fec_symbol_offset += data_len; - } - } - } else { - data_len = (data_len / 48) * 48; - } + data_len = get_data_len(frame->fec_params.type != FEC_NONE, tx->mtu, hdrs_len, + fec_symbol_size, &fec_symbol_offset); if (pos + data_len >= (unsigned int) tile->data_len) { if (send_m) { m = 1; @@ -594,7 +629,7 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session, if (tx->encryption) { data_len = tx->enc_funcs->encrypt(tx->encryption, data, data_len, - (char *) rtp_hdr, + (char *) rtp_hdr_packet, frame->fec_params.type != FEC_NONE ? sizeof(fec_video_payload_hdr_t) : sizeof(video_payload_hdr_t), encrypted_data); @@ -602,7 +637,7 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session, } rtp_send_data_hdr(rtp_session, ts, pt, m, 0, 0, - (char *) rtp_hdr, rtp_hdr_len, + (char *) rtp_hdr_packet, rtp_hdr_len, data, data_len, 0, 0, 0); } @@ -624,8 +659,13 @@ tx_send_base(struct tx *tx, struct video_frame *frame, struct rtp *rtp_session, if(tx->fec_scheme == FEC_MULT) { pos = mult_pos[tx->mult_count - 1]; } - + rtp_hdr_packet += rtp_hdr_len / sizeof(uint32_t); } while (pos < (unsigned int) tile->data_len); + + if (!tx->encryption) { + rtp_async_wait(rtp_session); + } + free(rtp_headers); } /*