From 91c79c85341b68fea064563ea15a883e26a20fc3 Mon Sep 17 00:00:00 2001 From: Martin Piatka Date: Tue, 16 Sep 2025 14:58:20 +0200 Subject: [PATCH] utils/ptp: Separate threads for event and general messages --- src/utils/ptp.cpp | 129 ++++++++++++++++++++++++++++++++++++---------- src/utils/ptp.hpp | 21 +++++++- 2 files changed, 120 insertions(+), 30 deletions(-) diff --git a/src/utils/ptp.cpp b/src/utils/ptp.cpp index 760b950ca..ae8e09faa 100644 --- a/src/utils/ptp.cpp +++ b/src/utils/ptp.cpp @@ -39,12 +39,13 @@ #include #include #include "rtp/net_udp.h" +#include "utils/thread.h" #include "debug.h" #define MOD_NAME "[PTP] " -#define PTP_PORT_CRITICAL 319 +#define PTP_PORT_EVENT 319 #define PTP_PORT_GENERAL 320 -#define MAX_PACKET_LEN 1024 +#define MAX_PACKET_LEN 128 #define PTP_ADDRESS "224.0.1.129" #define PTP_MSG_ANNOUNCE 0xb @@ -55,18 +56,47 @@ using clk = std::chrono::steady_clock; namespace { +struct Timestamped_pkt{ + uint64_t local_ts; + uint8_t buf[MAX_PACKET_LEN]; //Event packets should not be larger than 54B + unsigned buflen = 0; +}; } //anon namespace -void Ptp_clock::processPtpPkt(uint8_t *buf, size_t len){ +void Ptp_clock::update_clock(uint64_t new_local_ts, uint64_t new_ptp_ts){ + if(synth_ptp_ts == 0) synth_ptp_ts = new_ptp_ts; + + auto delta_local = new_local_ts - local_ts; + auto delta_ptp = new_ptp_ts - ptp_ts; + + if(local_ts == 0){ + ptp_ts = new_ptp_ts; + local_ts = new_local_ts; + return; + } + ptp_ts = new_ptp_ts; + local_ts = new_local_ts; + + synth_ptp_ts += delta_local * spa_corr; + + int64_t predicted_delta_spa_ptp = (delta_local * spa_corr) - delta_ptp; + spa_corr = spa_dll_update(&dll, (int64_t) synth_ptp_ts - (int64_t) new_ptp_ts); + + update_count.fetch_add(1, std::memory_order_seq_cst); + local_snapshot.store(new_local_ts, std::memory_order_seq_cst); + ptp_snapshot.store(synth_ptp_ts, std::memory_order_seq_cst); + corr_snapshot.store(spa_corr, std::memory_order_seq_cst); + update_count.fetch_add(1, std::memory_order_seq_cst); +} + +void Ptp_clock::processPtpPkt(uint8_t *buf, size_t len, uint64_t pkt_ts){ assert(len >= 34); int version = buf[1] & 0x0F; if(version != 2) return; - uint64_t pkt_ts = std::chrono::nanoseconds(clk::now().time_since_epoch()).count(); - int msgType = buf[0] & 0x0F; uint16_t flags = (buf[6] << 8) | buf[7]; @@ -74,33 +104,37 @@ void Ptp_clock::processPtpPkt(uint8_t *buf, size_t len){ log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Msg type %x, flags %x, seq = %u\n", msgType, flags, seq); - if(msgType == PTP_MSG_FOLLOWUP || msgType == PTP_MSG_SYNC){ + if(msgType == PTP_MSG_SYNC && (flags & 0x200)){ uint64_t sec = (buf[34] << 40) | (buf[35] << 32) | (buf[36] << 24) | (buf[37] << 16) | (buf[38] << 8) | (buf[39]); uint32_t nsec = (buf[40] << 24) | (buf[41] << 16) | (buf[42] << 8) | (buf[43]); uint64_t new_ptp_ts = sec * 1'000'000'000 + nsec; - uint64_t new_local_ts = pkt_ts; - if(synth_ptp_ts == 0) synth_ptp_ts = new_ptp_ts; - if(local_ts != 0){ - auto delta_local = new_local_ts - local_ts; - auto delta_ptp = new_ptp_ts - ptp_ts; + sync_pkts.push_back({seq, new_ptp_ts, pkt_ts}); + return; + } - synth_ptp_ts += delta_local * spa_corr; + if(msgType == PTP_MSG_FOLLOWUP){ + uint64_t sec = (buf[34] << 40) | (buf[35] << 32) | (buf[36] << 24) | (buf[37] << 16) | (buf[38] << 8) | (buf[39]); + uint32_t nsec = (buf[40] << 24) | (buf[41] << 16) | (buf[42] << 8) | (buf[43]); - int64_t predicted_delta_spa_ptp = (delta_local * spa_corr) - delta_ptp; - spa_corr = spa_dll_update(&dll, (int64_t) synth_ptp_ts - (int64_t) new_ptp_ts); + uint64_t new_ptp_ts = sec * 1'000'000'000 + nsec; - update_count.fetch_add(1, std::memory_order_seq_cst); - local_snapshot.store(new_local_ts, std::memory_order_seq_cst); - ptp_snapshot.store(synth_ptp_ts, std::memory_order_seq_cst); - corr_snapshot.store(spa_corr, std::memory_order_seq_cst); - update_count.fetch_add(1, std::memory_order_seq_cst); + auto it = std::find_if(sync_pkts.begin(), sync_pkts.end(), [=](const detail::Sync_pkt_data& pkt){ return pkt.seq == seq; }); + if(it == sync_pkts.end()){ + log_msg(LOG_LEVEL_WARNING, MOD_NAME "Sync pkt for followup not found\n"); + return; } - ptp_ts = new_ptp_ts; - local_ts = new_local_ts; + auto sf_delta = (int64_t) new_ptp_ts - (int64_t) it->imprecise_ptp_ts; + + update_clock(it->local_ts, new_ptp_ts); + + auto new_end = std::remove_if(sync_pkts.begin(), sync_pkts.end(), [=](const detail::Sync_pkt_data& pkt){ return pkt.seq <= seq; }); + sync_pkts.erase(new_end, sync_pkts.end()); + + return; } } @@ -127,9 +161,10 @@ uint64_t Ptp_clock::get_time(){ return p + delta_local * c; } -void Ptp_clock::ptp_worker_general(){ - log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Init sock %s on %s port %d\n", PTP_ADDRESS, network_interface.c_str(), PTP_PORT_GENERAL); - auto ptp_sock = socket_udp_uniq(udp_init_if(PTP_ADDRESS, network_interface.c_str(), PTP_PORT_CRITICAL, 0, 255, 4, false)); +void Ptp_clock::ptp_worker_event(){ + set_thread_name(__func__); + log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Init sock %s on %s port %d\n", PTP_ADDRESS, network_interface.c_str(), PTP_PORT_EVENT); + auto ptp_sock = socket_udp_uniq(udp_init_if(PTP_ADDRESS, network_interface.c_str(), PTP_PORT_EVENT, 0, 255, 4, false)); if(!ptp_sock){ log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to create sock\n"); @@ -145,20 +180,50 @@ void Ptp_clock::ptp_worker_general(){ printf(ret < 0 ? "failed\n" : "success\n"); } + while(should_run){ + Timestamped_pkt pkt; + timeval timeout {1, 0}; + pkt.buflen = udp_recv_timeout(ptp_sock.get(), (char *)pkt.buf, MAX_PACKET_LEN, &timeout); + pkt.local_ts = std::chrono::nanoseconds(clk::now().time_since_epoch()).count(); + if(pkt.buflen == 0) + continue; + + ring_buffer_write(event_pkt_ring.get(), reinterpret_cast(&pkt), sizeof(pkt)); + + } +} + +void Ptp_clock::ptp_worker_general(){ + log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Init sock %s on %s port %d\n", PTP_ADDRESS, network_interface.c_str(), PTP_PORT_GENERAL); + auto ptp_sock = socket_udp_uniq(udp_init_if(PTP_ADDRESS, network_interface.c_str(), PTP_PORT_GENERAL, 0, 255, 4, false)); + + if(!ptp_sock){ + log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to create sock\n"); + return; + } + spa_dll_init(&dll); - spa_dll_set_bw(&dll, 0.05, 250'000'000, 1'000'000'000); + spa_dll_set_bw(&dll, 0.05, 250'000'000, 1'000'000'000); //TODO while(should_run){ int buflen = 0; uint8_t buffer[MAX_PACKET_LEN]; - timeval timeout {1, 0}; + timeval timeout {0, 33'000'000}; buflen = udp_recv_timeout(ptp_sock.get(), (char *)buffer, MAX_PACKET_LEN, &timeout); + + auto ring_avail = ring_get_current_size(event_pkt_ring.get()); + if(ring_avail >= sizeof(Timestamped_pkt)){ + Timestamped_pkt pkt; + ring_buffer_read(event_pkt_ring.get(), reinterpret_cast(&pkt), sizeof(pkt)); + processPtpPkt(pkt.buf, pkt.buflen, pkt.local_ts); + } + if(buflen == 0) continue; - log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Got msg len %d\n", buflen); + processPtpPkt(buffer, buflen, 0); - processPtpPkt(buffer, buflen); + log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Got general msg len %d\n", buflen); } } @@ -166,12 +231,20 @@ void Ptp_clock::ptp_worker_general(){ void Ptp_clock::start(std::string_view interface){ network_interface = interface; + constexpr size_t ring_size = sizeof(Timestamped_pkt) * 1000; + event_pkt_ring.reset(ring_buffer_init(ring_size)); + worker_general = std::thread(&Ptp_clock::ptp_worker_general, this); + worker_event = std::thread(&Ptp_clock::ptp_worker_event, this); } void Ptp_clock::stop(){ should_run = false; + if(worker_event.joinable()){ + worker_event.join(); + } + if(worker_general.joinable()){ worker_general.join(); } diff --git a/src/utils/ptp.hpp b/src/utils/ptp.hpp index 46d86ae2f..094c4d94c 100644 --- a/src/utils/ptp.hpp +++ b/src/utils/ptp.hpp @@ -41,7 +41,17 @@ #include #include #include +#include #include "utils/spa_dll.h" +#include "utils/ring_buffer.h" + +namespace detail{ + struct Sync_pkt_data{ + uint16_t seq; + uint64_t imprecise_ptp_ts; + uint64_t local_ts; + }; +} class Ptp_clock{ public: @@ -54,14 +64,17 @@ private: std::string network_interface; std::atomic should_run = true; - std::thread worker_critical; + std::thread worker_event; std::thread worker_general; + ring_buffer_uniq event_pkt_ring; + uint64_t ptp_ts = 0; uint64_t local_ts = 0; uint64_t synth_ptp_ts = 0; double spa_corr = 1.0; spa_dll dll; + std::vector sync_pkts; std::atomic update_count = 0; std::atomic local_snapshot = 0; @@ -70,7 +83,11 @@ private: void ptp_worker_general(); - void processPtpPkt(uint8_t *buf, size_t len); + void ptp_worker_event(); + void processPtpPkt(uint8_t *buf, size_t len, uint64_t pkt_ts); + + void update_clock(uint64_t new_local_ts, uint64_t new_ptp_ts); + };