diff --git a/src/audio/playback/aes67.cpp b/src/audio/playback/aes67.cpp index 41a9bda80..9565c69b3 100644 --- a/src/audio/playback/aes67.cpp +++ b/src/audio/playback/aes67.cpp @@ -42,6 +42,7 @@ #include // for calloc, free #include // for strcpy, memcpy #include +#include #include #include #include @@ -165,11 +166,11 @@ struct state_aes67_play{ int32_t frame_ts_offset = 0; - Ptp_clock ptpclk; + std::optional ptpclk; }; uint32_t get_rtp_timestamp(state_aes67_play *s){ - uint32_t timestamp = nanoseconds_to_media_ts(s->ptpclk.get_time(), s->sap_sess.stream.sample_rate); + uint32_t timestamp = nanoseconds_to_media_ts(s->ptpclk->get_time(), s->sap_sess.stream.sample_rate); timestamp += s->frame_ts_offset; /* To prevent packets arriving with a timestamp higher than current @@ -226,7 +227,7 @@ static void create_sap_sess(state_aes67_play *s){ addr.s_addr = (239 << 0) | (69 << 8) | (ug_rand() << 16); sess.stream.address = inet_ntoa(addr); sess.stream.port = 5004; - sess.ptp_id = s->ptpclk.get_clock_id_str(); + sess.ptp_id = s->ptpclk->get_clock_id_str(); sess.name = s->session_name; sess.description = s->session_description; @@ -234,28 +235,6 @@ static void create_sap_sess(state_aes67_play *s){ s->sap_sess = std::move(sess); } -static void sdp_worker(state_aes67_play *s){ - auto sap_packet = get_sap_pkt(s->sap_sess); - - using clk = std::chrono::steady_clock; - auto announce_period = std::chrono::seconds(10); - auto next_announce = clk::now(); - - do{ - if(next_announce < clk::now()){ - udp_send(s->sdp_sock.get(), reinterpret_cast(sap_packet.data()), sap_packet.size()); - next_announce += announce_period; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } while(s->sdp_should_run); - - //TODO Make this nice - sap_packet[0] |= (1 << 2); - udp_send(s->sdp_sock.get(), reinterpret_cast(sap_packet.data()), sap_packet.size()); - - log_msg(LOG_LEVEL_NOTICE, MOD_NAME "SDP worker stopping\n"); -} - static void rtp_worker(state_aes67_play *s){ //TODO auto& stream = s->sap_sess.stream; @@ -297,10 +276,10 @@ static void rtp_worker(state_aes67_play *s){ uint16_t seq = 0; uint32_t rtp_timestamp = get_rtp_timestamp(s); auto next_pkt_time = clk::now(); - auto next_pkt_ptp_time = s->ptpclk.get_time(); + auto next_pkt_ptp_time = s->ptpclk->get_time(); do{ std::this_thread::sleep_until(next_pkt_time); - auto now_ptp_ts = s->ptpclk.get_time(); + auto now_ptp_ts = s->ptpclk->get_time(); next_pkt_ptp_time += get_pkt_time_ns(frames_per_packet, s->sap_sess.stream.sample_rate); auto now_next_diff = std::min(next_pkt_ptp_time - now_ptp_ts, now_ptp_ts - next_pkt_ptp_time); @@ -308,7 +287,7 @@ static void rtp_worker(state_aes67_play *s){ log_msg(LOG_LEVEL_WARNING, MOD_NAME "Packet timing off by more than 1s\n"); rtp_timestamp = get_rtp_timestamp(s); next_pkt_time = clk::now(); - next_pkt_ptp_time = s->ptpclk.get_time(); + next_pkt_ptp_time = s->ptpclk->get_time(); } if(next_pkt_ptp_time > now_ptp_ts){ @@ -354,6 +333,61 @@ static void rtp_worker(state_aes67_play *s){ } while(s->rtp_should_run); } +static void start_rtp_worker(state_aes67_play *s){ + log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Starting RTP thread\n"); + s->rtp_should_run = true; + s->rtp_thread = std::thread(rtp_worker, s); +} + +static void stop_rtp_worker(state_aes67_play *s){ + s->rtp_should_run = false; + + if(s->rtp_thread.joinable()){ + s->rtp_thread.join(); + } +} + +static void sdp_worker(state_aes67_play *s){ + auto sap_packet = get_sap_pkt(s->sap_sess); + + using clk = std::chrono::steady_clock; + auto announce_period = std::chrono::seconds(10); + auto next_announce = clk::now(); + + do{ + if(!s->ptpclk->is_locked()){ + log_msg(LOG_LEVEL_ERROR, MOD_NAME "PTP clock lost sync, restarting...\n"); + stop_rtp_worker(s); + s->ptpclk->stop(); + //TODO Make this nice + sap_packet[0] |= (1 << 2); + udp_send(s->sdp_sock.get(), reinterpret_cast(sap_packet.data()), sap_packet.size()); + + s->ptpclk.emplace(); + s->ptpclk->start(s->network_interface_name); + s->ptpclk->wait_for_lock(); + create_sap_sess(s); + sap_packet = get_sap_pkt(s->sap_sess); + start_rtp_worker(s); + } + + if(next_announce < clk::now()){ + udp_send(s->sdp_sock.get(), reinterpret_cast(sap_packet.data()), sap_packet.size()); + next_announce += announce_period; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } while(s->sdp_should_run); + + //TODO Make this nice + sap_packet[0] |= (1 << 2); + udp_send(s->sdp_sock.get(), reinterpret_cast(sap_packet.data()), sap_packet.size()); + + log_msg(LOG_LEVEL_NOTICE, MOD_NAME "SDP worker stopping\n"); +} + + + static void audio_play_aes67_probe(struct device_info **available_devices, int *count, void (**deleter)(void *)) { *deleter = free; @@ -409,9 +443,10 @@ static void * audio_play_aes67_init(const struct audio_playback_opts *opts){ s->sdp_sock.reset(udp_init_if(s->sap_address.c_str(), s->network_interface_name.c_str(), s->sap_port, 0, 255, 4, false)); - s->ptpclk.start(s->network_interface_name); + s->ptpclk.emplace(); + s->ptpclk->start(s->network_interface_name); log_msg(LOG_LEVEL_INFO, MOD_NAME "Waiting for PTP lock\n"); - s->ptpclk.wait_for_lock(); + s->ptpclk->wait_for_lock(); return s.release(); } @@ -454,11 +489,8 @@ static bool audio_play_aes67_ctl(void *state, int request, void *data, size_t *l static void stop_session(state_aes67_play *s){ s->sdp_should_run = false; - s->rtp_should_run = false; + stop_rtp_worker(s); - if(s->rtp_thread.joinable()){ - s->rtp_thread.join(); - } if(s->sdp_thread.joinable()){ s->sdp_thread.join(); } @@ -479,9 +511,7 @@ static bool audio_play_aes67_reconfigure(void *state, struct audio_desc desc){ s->sdp_should_run = true; s->sdp_thread = std::thread(sdp_worker, s); - log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Starting RTP thread\n"); - s->rtp_should_run = true; - s->rtp_thread = std::thread(rtp_worker, s); + start_rtp_worker(s); return true; } @@ -489,7 +519,7 @@ static bool audio_play_aes67_reconfigure(void *state, struct audio_desc desc){ static void audio_play_aes67_done(void *state){ auto s = static_cast(state); - s->ptpclk.stop(); + s->ptpclk->stop(); stop_session(s); delete s;