From 0c408ced9a30af3a82a99034fbf55b44ccfbd5fb Mon Sep 17 00:00:00 2001 From: Martin Pulec Date: Mon, 18 Apr 2016 16:18:24 +0200 Subject: [PATCH] Control socket: all stats cumulative --- src/rtp/video_decoders.cpp | 138 +++++++++++++++---------------- src/video_rxtx/ultragrid_rtp.cpp | 13 ++- src/video_rxtx/ultragrid_rtp.h | 4 + 3 files changed, 76 insertions(+), 79 deletions(-) diff --git a/src/rtp/video_decoders.cpp b/src/rtp/video_decoders.cpp index af468db79..9009633be 100644 --- a/src/rtp/video_decoders.cpp +++ b/src/rtp/video_decoders.cpp @@ -172,34 +172,66 @@ struct line_decoder { unsigned int src_linesize; ///< source linesize }; -struct main_msg; +struct reported_statistics_cumul { + mutex lock; + long long int received_bytes_total = 0; + long long int expected_bytes_total = 0; + unsigned long int displayed = 0, dropped = 0, corrupted = 0, missing = 0; + unsigned long int fec_ok = 0, fec_nok = 0; + long long int nano_per_frame_decompress = 0; + long long int nano_per_frame_expected = 0; + long long int reported_frames = 0; + void print() { + char buff[256]; + int bytes = sprintf(buff, "Video dec stats: %lu total / %lu disp / %lu " + "drop / %lu corr / %lu missing.", + displayed + dropped + missing, + displayed, dropped, corrupted, + missing); + if (fec_ok + fec_nok > 0) + sprintf(buff + bytes, " FEC OK/NOK: %ld/%ld\n", fec_ok, fec_nok); + else + sprintf(buff + bytes, "\n"); + log_msg(LOG_LEVEL_INFO, buff); + } +}; // message definitions struct frame_msg { - inline frame_msg(struct control_state *c, atomic &rbt, atomic &ebt, unsigned long int disp, unsigned long int corr) : control(c), recv_frame(nullptr), + inline frame_msg(struct control_state *c, struct reported_statistics_cumul &sr) : control(c), recv_frame(nullptr), nofec_frame(nullptr), received_pkts_cum(0), expected_pkts_cum(0), - received_bytes_total(rbt), - expected_bytes_total(ebt), - displayed(disp), corrupted(corr) + stats(sr) {} inline ~frame_msg() { - if (control && recv_frame) { + if (recv_frame) { + lock_guard lk(stats.lock); + if (recv_frame->fec_params.type != FEC_NONE) { + if (is_corrupted) { + stats.fec_nok += 1; + } else { + stats.fec_ok += 1; + } + } int received_bytes = sum_map(pckt_list[0]); - received_bytes_total += received_bytes; - expected_bytes_total += recv_frame->tiles[0].data_len; ostringstream oss; oss << "bufferId " << buffer_num[0] << " expectedPackets " << expected_pkts_cum << " receivedPackets " << received_pkts_cum << // droppedPackets - " expectedBytes " << expected_bytes_total << - " receivedBytes " << received_bytes_total << - " isCorrupted " << corrupted << - " isDisplayed " << displayed << + " expectedBytes " << (stats.expected_bytes_total += recv_frame->tiles[0].data_len) << + " receivedBytes " << (stats.received_bytes_total += received_bytes) << + " isCorrupted " << (stats.corrupted += (is_corrupted ? 1 : 0)) << + " isDisplayed " << (stats.displayed += (is_displayed ? 1 : 0)) << " timestamp " << time_since_epoch_in_ms() << - " nanoPerFrameDecompress " << nanoPerFrameDecompress << - " nanoPerFrameExpected " << nanoPerFrameExpected; - control_report_stats(control, oss.str()); + " nanoPerFrameDecompress " << (stats.nano_per_frame_decompress += nanoPerFrameDecompress) << + " nanoPerFrameExpected " << (stats.nano_per_frame_expected += nanoPerFrameExpected) << + " reportedFrames " << (stats.reported_frames += 1); + if ((stats.displayed + stats.dropped + stats.missing) % 600 == 599) { + stats.print(); + } + if (control) { + control_report_stats(control, oss.str()); + } } vf_free(recv_frame); vf_free(nofec_frame); @@ -210,11 +242,11 @@ struct frame_msg { struct video_frame *nofec_frame; ///< frame without FEC unique_ptr[]> pckt_list; long long int received_pkts_cum, expected_pkts_cum; - atomic &received_bytes_total; - atomic &expected_bytes_total; - unsigned long int displayed, corrupted; + struct reported_statistics_cumul &stats; long int nanoPerFrameDecompress = 0; long int nanoPerFrameExpected = 0; + bool is_displayed = false; + bool is_corrupted = false; }; struct main_msg_reconfigure { @@ -284,12 +316,7 @@ struct state_video_decoder enum video_mode video_mode; ///< video mode set for this decoder unsigned merged_fb:1; ///< flag if the display device driver requires tiled video or not - // for statistics - /// @{ - volatile unsigned long int displayed, dropped, corrupted, missing; - volatile unsigned long int fec_ok, fec_nok; - long int last_buffer_number; - /// @} + long int last_buffer_number; ///< last received buffer ID timed_message slow_msg; ///< shows warning ony in certain interval synchronized_queue msg_queue; @@ -300,8 +327,7 @@ struct state_video_decoder std::future reconfiguration_future; bool reconfiguration_in_progress; - atomic received_bytes_total; - atomic expected_bytes_total; + struct reported_statistics_cumul stats; ///< stats to be reported through control socket }; /** @@ -318,7 +344,6 @@ static void wait_for_framebuffer_swap(struct state_video_decoder *decoder) { "no decryption key entered!\n" #define NOT_ENCRYPTED_ERR "Receiving unencrypted video data " \ "while expecting encrypted.\n" -#define ERROR_GOTO_CLEANUP ret = FALSE; goto cleanup; static void *fec_thread(void *args) { struct state_video_decoder *decoder = @@ -337,7 +362,6 @@ static void *fec_thread(void *args) { struct video_frame *frame = decoder->frame; struct tile *tile = NULL; - int ret = TRUE; if (data->recv_frame->fec_params.type != FEC_NONE) { if(!fec_state || desc.k != data->recv_frame->fec_params.k || @@ -378,12 +402,10 @@ static void *fec_thread(void *args) { } if(fec_out_len == 0) { - ret = FALSE; verbose_msg("[decoder] FEC: unable to reconstruct data.\n"); - decoder->fec_nok += 1; + data->is_corrupted = true; goto cleanup; } - decoder->fec_ok += 1; video_payload_hdr_t video_hdr; memcpy(&video_hdr, fec_out_buffer, @@ -396,11 +418,11 @@ static void *fec_thread(void *args) { if (!video_desc_eq_excl_param(decoder->received_vid_desc, network_desc, PARAM_TILE_COUNT)) { decoder->msg_queue.push(new main_msg_reconfigure(network_desc, move(data))); - ERROR_GOTO_CLEANUP + goto cleanup; } if(!frame) { - ERROR_GOTO_CLEANUP + goto cleanup; } if(decoder->decoder_type == EXTERNAL_DECODER) { @@ -449,7 +471,6 @@ static void *fec_thread(void *args) { } } } else { /* PT_VIDEO */ - bool corrupted_frame_counted = false; for(int i = 0; i < (int) decoder->max_substreams; ++i) { data->nofec_frame->tiles[i].data_len = data->recv_frame->tiles[i].data_len; data->nofec_frame->tiles[i].data = data->recv_frame->tiles[i].data; @@ -460,13 +481,9 @@ static void *fec_thread(void *args) { data->recv_frame->tiles[i].data_len, (unsigned int) sum_map(data->pckt_list[i]), decoder->decoder_type == EXTERNAL_DECODER && !decoder->accepts_corrupted_frame ? " dropped.\n" : ""); + data->is_corrupted = true; if(decoder->decoder_type == EXTERNAL_DECODER && !decoder->accepts_corrupted_frame) { - ret = FALSE; goto cleanup; - } else if (!corrupted_frame_counted) { - // count it here because decoder accepts corrupted frames - corrupted_frame_counted = true; - decoder->corrupted++; } } } @@ -474,10 +491,7 @@ static void *fec_thread(void *args) { decoder->decompress_queue.push(move(data)); cleanup: - if(ret == FALSE) { - decoder->corrupted++; - decoder->dropped++; - } + ; } delete fec_state; @@ -542,7 +556,7 @@ static void *decompress_thread(void *args) { msg->nanoPerFrameDecompress = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - t0).count(); - msg->nanoPerFrameExpected = 1000000000 / msg->nofec_frame->fps; + msg->nanoPerFrameExpected = 1000000000 / output->fps; if(decoder->postprocess) { bool pp_ret; @@ -605,9 +619,7 @@ static void *decompress_thread(void *args) { int ret = display_put_frame(decoder->display, decoder->frame, putf_flags); if (ret == 0) { - decoder->displayed++; - } else { - decoder->dropped++; + msg->is_displayed = true; } decoder->frame = display_get_frame(decoder->display); } @@ -662,8 +674,6 @@ struct state_video_decoder *video_decoder_init(struct module *parent, s->disp_supported_il = NULL; s->change_il = NULL; - s->displayed = s->dropped = s->corrupted = 0ul; - s->buffer_swapped = true; s->last_buffer_number = -1; @@ -805,7 +815,7 @@ bool video_decoder_register_display(struct state_video_decoder *decoder, struct void video_decoder_remove_display(struct state_video_decoder *decoder) { if(decoder->display) { - unique_ptr msg(new frame_msg(decoder->control, decoder->received_bytes_total, decoder->expected_bytes_total, decoder->displayed, decoder->corrupted)); + unique_ptr msg(new frame_msg(decoder->control, decoder->stats)); decoder->fec_queue.push(move(msg)); decoder->fec_thread_id.join(); @@ -849,19 +859,6 @@ static void cleanup(struct state_video_decoder *decoder) } -#define PRINT_STATISTICS {\ - char buff[256];\ - int bytes = sprintf(buff, "Video dec stats: %lu total: %lu disp / %lu "\ - "drop / %lu corr / %lu missing.",\ - decoder->displayed + decoder->dropped + decoder->missing, \ - decoder->displayed, decoder->dropped, decoder->corrupted,\ - decoder->missing);\ - if (decoder->fec_ok + decoder->fec_nok > 0) sprintf(buff + bytes, " FEC OK/NOK: %ld/%ld\n", \ - decoder->fec_ok, decoder->fec_nok);\ - else sprintf(buff + bytes, "\n");\ - log_msg(LOG_LEVEL_INFO, buff);\ -} - /** * @brief Destroys decoder created with decoder_init() * @param decoder decoder to be destroyed. If NULL, no action is performed. @@ -881,7 +878,7 @@ void video_decoder_destroy(struct state_video_decoder *decoder) free(decoder->disp_supported_il); - PRINT_STATISTICS + decoder->stats.print(); module_done(&decoder->mod); @@ -1732,7 +1729,7 @@ next_packet: // format message { - unique_ptr fec_msg (new frame_msg(decoder->control, decoder->received_bytes_total, decoder->expected_bytes_total, decoder->displayed, decoder->corrupted)); + unique_ptr fec_msg (new frame_msg(decoder->control, decoder->stats)); fec_msg->buffer_num = std::move(buffer_num); fec_msg->recv_frame = frame; frame = NULL; @@ -1746,7 +1743,7 @@ next_packet: decoder->fec_queue.push(move(fec_msg)); auto t1 = std::chrono::high_resolution_clock::now(); double tpf = 1.0 / decoder->display_desc.fps; - if (std::chrono::duration_cast>(t1 - t0).count() > tpf && decoder->displayed > 20) { + if (std::chrono::duration_cast>(t1 - t0).count() > tpf && decoder->stats.displayed > 20) { decoder->slow_msg.print("Your computer may be too SLOW to play this !!!\n"); } @@ -1765,18 +1762,15 @@ cleanup: long int missing = buffer_number - ((decoder->last_buffer_number + 1) & 0x3fffff); missing = (missing + 0x3fffff) % 0x3fffff; + lock_guard lk(decoder->stats.lock); if (missing < 0x3fffff / 2) { - decoder->missing += missing; + decoder->stats.missing += missing; } else { // frames may have been reordered, add arbitrary 1 - decoder->missing += 1; + decoder->stats.missing += 1; } } decoder->last_buffer_number = buffer_number; - if ((decoder->displayed + decoder->dropped + decoder->missing) % 600 == 599) { - PRINT_STATISTICS - } - return ret; } diff --git a/src/video_rxtx/ultragrid_rtp.cpp b/src/video_rxtx/ultragrid_rtp.cpp index b45493eb3..0a6c91095 100644 --- a/src/video_rxtx/ultragrid_rtp.cpp +++ b/src/video_rxtx/ultragrid_rtp.cpp @@ -200,12 +200,12 @@ after_send: std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now(); - int dropped_frames = 0; + int dropped_frames = 0; /// @todo auto nano_actual = std::chrono::duration_cast(t1 - t0).count(); long long int nano_expected = 1000l * 1000 * 1000 / tx_frame->fps; int send_bytes = tx_frame->tiles[0].data_len; - m_send_bytes_total += send_bytes; auto now = time_since_epoch_in_ms(); + auto compress_millis = tx_frame->compress_end - tx_frame->compress_start; ostringstream oss; if (m_port_id != -1) { @@ -213,12 +213,11 @@ after_send: } oss << "bufferId " << buffer_id << " droppedFrames " << dropped_frames << - " nanoPerFrameActual " << nano_actual << - " nanoPerFrameExpected " << nano_expected << - " sendBytes " << send_bytes << - " sendBytesTotal " << m_send_bytes_total << + " nanoPerFrameActual " << (m_nano_per_frame_actual_cumul += nano_actual) << + " nanoPerFrameExpected " << (m_nano_per_frame_expected_cumul += nano_expected) << + " sendBytesTotal " << (m_send_bytes_total += send_bytes) << " timestamp " << now << - " compressMillis " << tx_frame->compress_end - tx_frame->compress_start; + " compressMillis " << (m_compress_millis_cumul += compress_millis); control_report_stats(m_control, oss.str()); } diff --git a/src/video_rxtx/ultragrid_rtp.h b/src/video_rxtx/ultragrid_rtp.h index 554f6ced4..e2a473b5b 100644 --- a/src/video_rxtx/ultragrid_rtp.h +++ b/src/video_rxtx/ultragrid_rtp.h @@ -90,6 +90,10 @@ private: long long int m_send_bytes_total; struct control_state *m_control; + + long long int m_nano_per_frame_actual_cumul = 0; + long long int m_nano_per_frame_expected_cumul = 0; + long long int m_compress_millis_cumul = 0; }; #endif // VIDEO_RXTX_ULTRAGRID_RTP_H_