aplay/aes67: Restart RTP when PTP clock loses lock

This commit is contained in:
Martin Piatka
2025-10-15 13:26:37 +02:00
parent d9f8792b38
commit 18ceb4cb98

View File

@@ -42,6 +42,7 @@
#include <cstdlib> // for calloc, free
#include <cstring> // for strcpy, memcpy
#include <memory>
#include <optional>
#include <chrono>
#include <vector>
#include <thread>
@@ -165,11 +166,11 @@ struct state_aes67_play{
int32_t frame_ts_offset = 0;
Ptp_clock ptpclk;
std::optional<Ptp_clock> ptpclk;
};
uint32_t get_rtp_timestamp(state_aes67_play *s){
uint32_t timestamp = nanoseconds_to_media_ts(s->ptpclk.get_time(), s->sap_sess.stream.sample_rate);
uint32_t timestamp = nanoseconds_to_media_ts(s->ptpclk->get_time(), s->sap_sess.stream.sample_rate);
timestamp += s->frame_ts_offset;
/* To prevent packets arriving with a timestamp higher than current
@@ -226,7 +227,7 @@ static void create_sap_sess(state_aes67_play *s){
addr.s_addr = (239 << 0) | (69 << 8) | (ug_rand() << 16);
sess.stream.address = inet_ntoa(addr);
sess.stream.port = 5004;
sess.ptp_id = s->ptpclk.get_clock_id_str();
sess.ptp_id = s->ptpclk->get_clock_id_str();
sess.name = s->session_name;
sess.description = s->session_description;
@@ -234,28 +235,6 @@ static void create_sap_sess(state_aes67_play *s){
s->sap_sess = std::move(sess);
}
static void sdp_worker(state_aes67_play *s){
auto sap_packet = get_sap_pkt(s->sap_sess);
using clk = std::chrono::steady_clock;
auto announce_period = std::chrono::seconds(10);
auto next_announce = clk::now();
do{
if(next_announce < clk::now()){
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
next_announce += announce_period;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
} while(s->sdp_should_run);
//TODO Make this nice
sap_packet[0] |= (1 << 2);
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "SDP worker stopping\n");
}
static void rtp_worker(state_aes67_play *s){
//TODO
auto& stream = s->sap_sess.stream;
@@ -297,10 +276,10 @@ static void rtp_worker(state_aes67_play *s){
uint16_t seq = 0;
uint32_t rtp_timestamp = get_rtp_timestamp(s);
auto next_pkt_time = clk::now();
auto next_pkt_ptp_time = s->ptpclk.get_time();
auto next_pkt_ptp_time = s->ptpclk->get_time();
do{
std::this_thread::sleep_until(next_pkt_time);
auto now_ptp_ts = s->ptpclk.get_time();
auto now_ptp_ts = s->ptpclk->get_time();
next_pkt_ptp_time += get_pkt_time_ns(frames_per_packet, s->sap_sess.stream.sample_rate);
auto now_next_diff = std::min(next_pkt_ptp_time - now_ptp_ts, now_ptp_ts - next_pkt_ptp_time);
@@ -308,7 +287,7 @@ static void rtp_worker(state_aes67_play *s){
log_msg(LOG_LEVEL_WARNING, MOD_NAME "Packet timing off by more than 1s\n");
rtp_timestamp = get_rtp_timestamp(s);
next_pkt_time = clk::now();
next_pkt_ptp_time = s->ptpclk.get_time();
next_pkt_ptp_time = s->ptpclk->get_time();
}
if(next_pkt_ptp_time > now_ptp_ts){
@@ -354,6 +333,61 @@ static void rtp_worker(state_aes67_play *s){
} while(s->rtp_should_run);
}
static void start_rtp_worker(state_aes67_play *s){
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Starting RTP thread\n");
s->rtp_should_run = true;
s->rtp_thread = std::thread(rtp_worker, s);
}
static void stop_rtp_worker(state_aes67_play *s){
s->rtp_should_run = false;
if(s->rtp_thread.joinable()){
s->rtp_thread.join();
}
}
static void sdp_worker(state_aes67_play *s){
auto sap_packet = get_sap_pkt(s->sap_sess);
using clk = std::chrono::steady_clock;
auto announce_period = std::chrono::seconds(10);
auto next_announce = clk::now();
do{
if(!s->ptpclk->is_locked()){
log_msg(LOG_LEVEL_ERROR, MOD_NAME "PTP clock lost sync, restarting...\n");
stop_rtp_worker(s);
s->ptpclk->stop();
//TODO Make this nice
sap_packet[0] |= (1 << 2);
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
s->ptpclk.emplace();
s->ptpclk->start(s->network_interface_name);
s->ptpclk->wait_for_lock();
create_sap_sess(s);
sap_packet = get_sap_pkt(s->sap_sess);
start_rtp_worker(s);
}
if(next_announce < clk::now()){
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
next_announce += announce_period;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
} while(s->sdp_should_run);
//TODO Make this nice
sap_packet[0] |= (1 << 2);
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "SDP worker stopping\n");
}
static void audio_play_aes67_probe(struct device_info **available_devices, int *count, void (**deleter)(void *))
{
*deleter = free;
@@ -409,9 +443,10 @@ static void * audio_play_aes67_init(const struct audio_playback_opts *opts){
s->sdp_sock.reset(udp_init_if(s->sap_address.c_str(), s->network_interface_name.c_str(), s->sap_port, 0, 255, 4, false));
s->ptpclk.start(s->network_interface_name);
s->ptpclk.emplace();
s->ptpclk->start(s->network_interface_name);
log_msg(LOG_LEVEL_INFO, MOD_NAME "Waiting for PTP lock\n");
s->ptpclk.wait_for_lock();
s->ptpclk->wait_for_lock();
return s.release();
}
@@ -454,11 +489,8 @@ static bool audio_play_aes67_ctl(void *state, int request, void *data, size_t *l
static void stop_session(state_aes67_play *s){
s->sdp_should_run = false;
s->rtp_should_run = false;
stop_rtp_worker(s);
if(s->rtp_thread.joinable()){
s->rtp_thread.join();
}
if(s->sdp_thread.joinable()){
s->sdp_thread.join();
}
@@ -479,9 +511,7 @@ static bool audio_play_aes67_reconfigure(void *state, struct audio_desc desc){
s->sdp_should_run = true;
s->sdp_thread = std::thread(sdp_worker, s);
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Starting RTP thread\n");
s->rtp_should_run = true;
s->rtp_thread = std::thread(rtp_worker, s);
start_rtp_worker(s);
return true;
}
@@ -489,7 +519,7 @@ static bool audio_play_aes67_reconfigure(void *state, struct audio_desc desc){
static void audio_play_aes67_done(void *state){
auto s = static_cast<state_aes67_play *>(state);
s->ptpclk.stop();
s->ptpclk->stop();
stop_session(s);
delete s;