utils/ptp: Separate threads for event and general messages

This commit is contained in:
Martin Piatka
2025-09-16 14:58:20 +02:00
parent 9fe620369f
commit 91c79c8534
2 changed files with 120 additions and 30 deletions

View File

@@ -39,12 +39,13 @@
#include <chrono>
#include <algorithm>
#include "rtp/net_udp.h"
#include "utils/thread.h"
#include "debug.h"
#define MOD_NAME "[PTP] "
#define PTP_PORT_CRITICAL 319
#define PTP_PORT_EVENT 319
#define PTP_PORT_GENERAL 320
#define MAX_PACKET_LEN 1024
#define MAX_PACKET_LEN 128
#define PTP_ADDRESS "224.0.1.129"
#define PTP_MSG_ANNOUNCE 0xb
@@ -55,18 +56,47 @@ using clk = std::chrono::steady_clock;
namespace {
struct Timestamped_pkt{
uint64_t local_ts;
uint8_t buf[MAX_PACKET_LEN]; //Event packets should not be larger than 54B
unsigned buflen = 0;
};
} //anon namespace
void Ptp_clock::processPtpPkt(uint8_t *buf, size_t len){
void Ptp_clock::update_clock(uint64_t new_local_ts, uint64_t new_ptp_ts){
if(synth_ptp_ts == 0) synth_ptp_ts = new_ptp_ts;
auto delta_local = new_local_ts - local_ts;
auto delta_ptp = new_ptp_ts - ptp_ts;
if(local_ts == 0){
ptp_ts = new_ptp_ts;
local_ts = new_local_ts;
return;
}
ptp_ts = new_ptp_ts;
local_ts = new_local_ts;
synth_ptp_ts += delta_local * spa_corr;
int64_t predicted_delta_spa_ptp = (delta_local * spa_corr) - delta_ptp;
spa_corr = spa_dll_update(&dll, (int64_t) synth_ptp_ts - (int64_t) new_ptp_ts);
update_count.fetch_add(1, std::memory_order_seq_cst);
local_snapshot.store(new_local_ts, std::memory_order_seq_cst);
ptp_snapshot.store(synth_ptp_ts, std::memory_order_seq_cst);
corr_snapshot.store(spa_corr, std::memory_order_seq_cst);
update_count.fetch_add(1, std::memory_order_seq_cst);
}
void Ptp_clock::processPtpPkt(uint8_t *buf, size_t len, uint64_t pkt_ts){
assert(len >= 34);
int version = buf[1] & 0x0F;
if(version != 2)
return;
uint64_t pkt_ts = std::chrono::nanoseconds(clk::now().time_since_epoch()).count();
int msgType = buf[0] & 0x0F;
uint16_t flags = (buf[6] << 8) | buf[7];
@@ -74,33 +104,37 @@ void Ptp_clock::processPtpPkt(uint8_t *buf, size_t len){
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Msg type %x, flags %x, seq = %u\n", msgType, flags, seq);
if(msgType == PTP_MSG_FOLLOWUP || msgType == PTP_MSG_SYNC){
if(msgType == PTP_MSG_SYNC && (flags & 0x200)){
uint64_t sec = (buf[34] << 40) | (buf[35] << 32) | (buf[36] << 24) | (buf[37] << 16) | (buf[38] << 8) | (buf[39]);
uint32_t nsec = (buf[40] << 24) | (buf[41] << 16) | (buf[42] << 8) | (buf[43]);
uint64_t new_ptp_ts = sec * 1'000'000'000 + nsec;
uint64_t new_local_ts = pkt_ts;
if(synth_ptp_ts == 0) synth_ptp_ts = new_ptp_ts;
if(local_ts != 0){
auto delta_local = new_local_ts - local_ts;
auto delta_ptp = new_ptp_ts - ptp_ts;
sync_pkts.push_back({seq, new_ptp_ts, pkt_ts});
return;
}
synth_ptp_ts += delta_local * spa_corr;
if(msgType == PTP_MSG_FOLLOWUP){
uint64_t sec = (buf[34] << 40) | (buf[35] << 32) | (buf[36] << 24) | (buf[37] << 16) | (buf[38] << 8) | (buf[39]);
uint32_t nsec = (buf[40] << 24) | (buf[41] << 16) | (buf[42] << 8) | (buf[43]);
int64_t predicted_delta_spa_ptp = (delta_local * spa_corr) - delta_ptp;
spa_corr = spa_dll_update(&dll, (int64_t) synth_ptp_ts - (int64_t) new_ptp_ts);
uint64_t new_ptp_ts = sec * 1'000'000'000 + nsec;
update_count.fetch_add(1, std::memory_order_seq_cst);
local_snapshot.store(new_local_ts, std::memory_order_seq_cst);
ptp_snapshot.store(synth_ptp_ts, std::memory_order_seq_cst);
corr_snapshot.store(spa_corr, std::memory_order_seq_cst);
update_count.fetch_add(1, std::memory_order_seq_cst);
auto it = std::find_if(sync_pkts.begin(), sync_pkts.end(), [=](const detail::Sync_pkt_data& pkt){ return pkt.seq == seq; });
if(it == sync_pkts.end()){
log_msg(LOG_LEVEL_WARNING, MOD_NAME "Sync pkt for followup not found\n");
return;
}
ptp_ts = new_ptp_ts;
local_ts = new_local_ts;
auto sf_delta = (int64_t) new_ptp_ts - (int64_t) it->imprecise_ptp_ts;
update_clock(it->local_ts, new_ptp_ts);
auto new_end = std::remove_if(sync_pkts.begin(), sync_pkts.end(), [=](const detail::Sync_pkt_data& pkt){ return pkt.seq <= seq; });
sync_pkts.erase(new_end, sync_pkts.end());
return;
}
}
@@ -127,9 +161,10 @@ uint64_t Ptp_clock::get_time(){
return p + delta_local * c;
}
void Ptp_clock::ptp_worker_general(){
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Init sock %s on %s port %d\n", PTP_ADDRESS, network_interface.c_str(), PTP_PORT_GENERAL);
auto ptp_sock = socket_udp_uniq(udp_init_if(PTP_ADDRESS, network_interface.c_str(), PTP_PORT_CRITICAL, 0, 255, 4, false));
void Ptp_clock::ptp_worker_event(){
set_thread_name(__func__);
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Init sock %s on %s port %d\n", PTP_ADDRESS, network_interface.c_str(), PTP_PORT_EVENT);
auto ptp_sock = socket_udp_uniq(udp_init_if(PTP_ADDRESS, network_interface.c_str(), PTP_PORT_EVENT, 0, 255, 4, false));
if(!ptp_sock){
log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to create sock\n");
@@ -145,20 +180,50 @@ void Ptp_clock::ptp_worker_general(){
printf(ret < 0 ? "failed\n" : "success\n");
}
while(should_run){
Timestamped_pkt pkt;
timeval timeout {1, 0};
pkt.buflen = udp_recv_timeout(ptp_sock.get(), (char *)pkt.buf, MAX_PACKET_LEN, &timeout);
pkt.local_ts = std::chrono::nanoseconds(clk::now().time_since_epoch()).count();
if(pkt.buflen == 0)
continue;
ring_buffer_write(event_pkt_ring.get(), reinterpret_cast<const char*>(&pkt), sizeof(pkt));
}
}
void Ptp_clock::ptp_worker_general(){
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Init sock %s on %s port %d\n", PTP_ADDRESS, network_interface.c_str(), PTP_PORT_GENERAL);
auto ptp_sock = socket_udp_uniq(udp_init_if(PTP_ADDRESS, network_interface.c_str(), PTP_PORT_GENERAL, 0, 255, 4, false));
if(!ptp_sock){
log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to create sock\n");
return;
}
spa_dll_init(&dll);
spa_dll_set_bw(&dll, 0.05, 250'000'000, 1'000'000'000);
spa_dll_set_bw(&dll, 0.05, 250'000'000, 1'000'000'000); //TODO
while(should_run){
int buflen = 0;
uint8_t buffer[MAX_PACKET_LEN];
timeval timeout {1, 0};
timeval timeout {0, 33'000'000};
buflen = udp_recv_timeout(ptp_sock.get(), (char *)buffer, MAX_PACKET_LEN, &timeout);
auto ring_avail = ring_get_current_size(event_pkt_ring.get());
if(ring_avail >= sizeof(Timestamped_pkt)){
Timestamped_pkt pkt;
ring_buffer_read(event_pkt_ring.get(), reinterpret_cast<char *>(&pkt), sizeof(pkt));
processPtpPkt(pkt.buf, pkt.buflen, pkt.local_ts);
}
if(buflen == 0)
continue;
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Got msg len %d\n", buflen);
processPtpPkt(buffer, buflen, 0);
processPtpPkt(buffer, buflen);
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Got general msg len %d\n", buflen);
}
}
@@ -166,12 +231,20 @@ void Ptp_clock::ptp_worker_general(){
void Ptp_clock::start(std::string_view interface){
network_interface = interface;
constexpr size_t ring_size = sizeof(Timestamped_pkt) * 1000;
event_pkt_ring.reset(ring_buffer_init(ring_size));
worker_general = std::thread(&Ptp_clock::ptp_worker_general, this);
worker_event = std::thread(&Ptp_clock::ptp_worker_event, this);
}
void Ptp_clock::stop(){
should_run = false;
if(worker_event.joinable()){
worker_event.join();
}
if(worker_general.joinable()){
worker_general.join();
}

View File

@@ -41,7 +41,17 @@
#include <atomic>
#include <thread>
#include <string>
#include <vector>
#include "utils/spa_dll.h"
#include "utils/ring_buffer.h"
namespace detail{
struct Sync_pkt_data{
uint16_t seq;
uint64_t imprecise_ptp_ts;
uint64_t local_ts;
};
}
class Ptp_clock{
public:
@@ -54,14 +64,17 @@ private:
std::string network_interface;
std::atomic<bool> should_run = true;
std::thread worker_critical;
std::thread worker_event;
std::thread worker_general;
ring_buffer_uniq event_pkt_ring;
uint64_t ptp_ts = 0;
uint64_t local_ts = 0;
uint64_t synth_ptp_ts = 0;
double spa_corr = 1.0;
spa_dll dll;
std::vector<detail::Sync_pkt_data> sync_pkts;
std::atomic<uint32_t> update_count = 0;
std::atomic<uint64_t> local_snapshot = 0;
@@ -70,7 +83,11 @@ private:
void ptp_worker_general();
void processPtpPkt(uint8_t *buf, size_t len);
void ptp_worker_event();
void processPtpPkt(uint8_t *buf, size_t len, uint64_t pkt_ts);
void update_clock(uint64_t new_local_ts, uint64_t new_ptp_ts);
};