aplay/aes67: Initial

This commit is contained in:
Martin Piatka
2026-02-11 13:48:28 +01:00
parent 16de3329ac
commit 4a014f7963

View File

@@ -0,0 +1,409 @@
/**
* @file audio/playback/aes67.cpp
* @author Martin Piatka <piatka@cesnet.cz>
*/
/*
* Copyright (c) 2025 CESNET z.s.p.o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, is permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of CESNET nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <algorithm> // for min
#include <cassert> // for assert
#include <cstddef> // for byte, size_t
#include <cstdint> // for INT32_MAX, uint32_t
#include <cstdlib> // for calloc, free
#include <cstring> // for strcpy, memcpy
#include <memory>
#include <chrono>
#include <vector>
#include <thread>
#include <string> // for char_traits, basic_string
#include <string_view> // for operator==, basic_string_view
#include "audio/audio_playback.h"
#include "audio/types.h"
#include "rtp/net_udp.h"
#include "debug.h"
#include "lib_common.h"
#include "utils/color_out.h"
#include "utils/ring_buffer.h"
#include "utils/string_view_utils.hpp"
#define MOD_NAME "[aes67 aplay] "
namespace {
struct Rtp_stream{
std::string name;
std::string description;
std::map<uint8_t, audio_desc> fmts;
std::string address;
int port;
};
using sess_id_t = uint32_t;
struct Sap_session{
sess_id_t unique_identifier; //Hash computed from sdp username, session id and unicast address
uint64_t sess_id; //Numeric session-id from RFC
uint64_t sess_ver;
uint16_t sap_hash; //Hash of sap packet that contained sdp for this version of session
std::string origin_address;
std::string name;
std::string description;
std::vector<Rtp_stream> streams;
};
Rtp_stream audio_desc_to_rtp_stream(audio_desc& desc){
Rtp_stream ret{};
//TODO
return ret;
}
std::string get_sdp(Sap_session& sap){
std::string sdp;
auto& stream = sap.streams[0];
sdp += "v=0\r\n";
sdp += "o=- " + std::to_string(sap.sess_id) + " " + std::to_string(sap.sess_ver) + " IN IP4 " + sap.origin_address + "\r\n";
sdp += "s=Ultragrid AES67\r\n";
sdp += "i=placeholder info\r\n"; //TODO
sdp += "c=IN IP4 " + stream.address + "/127\r\n"; //TODO
sdp += "t=0 0\r\n";
sdp += "a=recvonly\r\n";
sdp += "m=audio 5004 RTP/AVP " + std::to_string(stream.fmts.begin()->first) + "\r\n"; //TODO list all
sdp += "a=rtpmap:" + std::to_string(stream.fmts.begin()->first) + " L24/48000/1\r\n"; //TODO actual format
sdp += "a=ptime:1\r\n"; //TODO support changing packet time
sdp += "a=ts-refclk:ptp=IEEE1588-2008:00-1D-C1-FF-FE-A1-B8-BC:0\r\n"; //TODO fill real PTP clock
sdp += "a=mediaclk:direct=0\r\n";
return sdp;
}
} //anon namespace
struct state_aes67_play{
std::string network_interface_name;
std::string sap_address;
int sap_port;
std::atomic<bool> sdp_should_run = true;
std::thread sdp_thread;
socket_udp_uniq sdp_sock;
std::atomic<bool> rtp_should_run = true;
std::thread rtp_thread;
audio_desc desc;
Sap_session sap_sess;
ring_buffer_uniq ring_buf;
unsigned buf_len_ms = 100;
};
static std::vector<unsigned char> get_sap_pkt(Sap_session& sess){
std::string sdp = get_sdp(sess);
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "SDP\n%s\n", sdp.c_str());
uint16_t sap_hash = time(nullptr);
uint32_t ip_addr = inet_addr(sess.origin_address.c_str());
std::vector<unsigned char> sap_packet;
sap_packet.push_back(0x20); //Version and flags
sap_packet.push_back(0x00); //Authentication headers length
sap_packet.push_back(sap_hash >> 8);
sap_packet.push_back(sap_hash & 0xFF);
sap_packet.push_back((ip_addr >> 0) & 0xFF);
sap_packet.push_back((ip_addr >> 8) & 0xFF);
sap_packet.push_back((ip_addr >> 16) & 0xFF);
sap_packet.push_back((ip_addr >> 24) & 0xFF);
const char *pt = "application/sdp";
for(const char *i = pt; *i; i++){
sap_packet.push_back(*i);
}
sap_packet.push_back(0x00);
sap_packet.insert(sap_packet.end(), sdp.begin(), sdp.end());
return sap_packet;
}
static void create_sap_sess(state_aes67_play *s){
Sap_session sess{};
sess.sess_id = time(nullptr);
sess.sess_ver = sess.sess_id;
sess.origin_address = udp_host_addr(s->sdp_sock.get());
Rtp_stream stream{};
stream.fmts[96] = s->desc;
in_addr addr;
addr.s_addr = (239 << 0) | (69 << 8) | (rand() << 16);
stream.address = inet_ntoa(addr);
stream.port = 5004;
sess.streams.push_back(std::move(stream));
s->sap_sess = std::move(sess);
}
static void sdp_worker(state_aes67_play *s){
auto sap_packet = get_sap_pkt(s->sap_sess);
using clk = std::chrono::steady_clock;
auto announce_period = std::chrono::seconds(10);
auto next_announce = clk::now();
do{
if(next_announce < clk::now()){
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
next_announce += announce_period;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
} while(s->sdp_should_run);
//TODO Make this nice
sap_packet[0] |= (1 << 2);
udp_send(s->sdp_sock.get(), reinterpret_cast<char*>(sap_packet.data()), sap_packet.size());
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "SDP worker stopping\n");
}
static void rtp_worker(state_aes67_play *s){
//TODO
auto& stream = s->sap_sess.streams[0];
auto rtp_sock = socket_udp_uniq(udp_init_if(stream.address.c_str(), s->network_interface_name.c_str(), stream.port, 0, 255, 4, false));
using clk = std::chrono::steady_clock;
std::vector<unsigned char> rtp_pkt;
rtp_pkt.push_back(0x80); //Version, no padding, no extension, no cssr
rtp_pkt.push_back(96); //TODO payload type
//Seq number
rtp_pkt.push_back(0x00);
rtp_pkt.push_back(0x00);
//timestamp
rtp_pkt.push_back(0x00);
rtp_pkt.push_back(0x00);
rtp_pkt.push_back(0x00);
rtp_pkt.push_back(0x00);
//SSRC
uint32_t ssrc = rand();
rtp_pkt.push_back(ssrc >> 24);
rtp_pkt.push_back(ssrc >> 16);
rtp_pkt.push_back(ssrc >> 8);
rtp_pkt.push_back(ssrc >> 0);
auto hdr_size = rtp_pkt.size();
const unsigned frames_per_packet = 48; //TODO
const unsigned frame_size = s->desc.ch_count * s->desc.bps;
int payload_size = frame_size * frames_per_packet;
rtp_pkt.resize(hdr_size + payload_size);
uint16_t seq = 0;
uint32_t timestamp = 0;
auto next_pkt_time = clk::now();
do{
std::this_thread::sleep_until(next_pkt_time);
next_pkt_time += std::chrono::milliseconds(1);
rtp_pkt[2] = seq >> 8;
rtp_pkt[3] = seq;
rtp_pkt[4] = timestamp >> 24;
rtp_pkt[5] = timestamp >> 16;
rtp_pkt[6] = timestamp >> 8;
rtp_pkt[7] = timestamp;
char *dst = reinterpret_cast<char *>(rtp_pkt.data() + hdr_size);
unsigned avail_frames = ring_get_current_size(s->ring_buf.get()) / frame_size;
unsigned frames_to_write = std::min(frames_per_packet, avail_frames);
ring_buffer_read(s->ring_buf.get(), dst, frames_to_write * frame_size);
const int bps = s->desc.bps;
const int swap_count = bps / 2;
for(int i = 0; i < frames_to_write * s->desc.ch_count; i++){
for(int j = 0; j < swap_count; j++){
unsigned char tmp = dst[i * bps + j];
dst[i * bps + j] = dst[i * bps + bps - j - 1];
dst[i * bps + bps - j - 1] = tmp;;
}
}
udp_send(rtp_sock.get(), reinterpret_cast<char*>(rtp_pkt.data()), rtp_pkt.size());
seq += 1;
timestamp += frames_per_packet;
} while(s->rtp_should_run);
}
static void audio_play_aes67_probe(struct device_info **available_devices, int *count, void (**deleter)(void *))
{
*deleter = free;
*available_devices = static_cast<device_info *>(calloc(1, sizeof(device_info)));
strcpy((*available_devices)[0].dev, "");
strcpy((*available_devices)[0].name, "Default aes67 output");
*count = 1;
}
static void audio_play_aes67_help(){
color_printf("AES67 audio output.\n");
color_printf("Usage\n");
color_printf(TERM_BOLD TERM_FG_RED "\t-r aes67" TERM_FG_RESET "[TODO]\n" TERM_RESET);
color_printf("\n");
}
static void * audio_play_aes67_init(const struct audio_playback_opts *opts){
auto s = std::make_unique<state_aes67_play>();
s->sap_address = "239.255.255.255";
s->sap_port = 9875;
std::string_view cfg_sv(opts->cfg);
while(!cfg_sv.empty()){
auto tok = tokenize(cfg_sv, ':', '"');
auto key = tokenize(tok, '=');
auto val = tokenize(tok, '=');
if(key == "help"){
audio_play_aes67_help();
return INIT_NOERR;
} else if(key == "if"){
s->network_interface_name = val;
} else if (key == "sap_ip"){
s->sap_address = val;
} else if (key == "sap_port"){
if(!parse_num(val, s->sap_port)){
log_msg(LOG_LEVEL_ERROR, MOD_NAME "Failed to parse value for option %s\n", std::string(key).c_str());
return {};
}
} else if(key == "help"){
audio_play_aes67_help();
return INIT_NOERR;
}
}
s->sdp_sock.reset(udp_init_if(s->sap_address.c_str(), s->network_interface_name.c_str(), s->sap_port, 0, 255, 4, false));
return s.release();
}
static void audio_play_aes67_put_frame(void *state, const struct audio_frame *frame){
auto s = static_cast<state_aes67_play *>(state);
auto avail = ring_get_available_write_size(s->ring_buf.get());
auto to_write = frame->data_len;
if(to_write > avail){
log_msg(LOG_LEVEL_WARNING, MOD_NAME "Got frame of len %d, but ring has only %d free\n", frame->data_len, ring_get_available_write_size(s->ring_buf.get()));
to_write = avail;
}
ring_buffer_write(s->ring_buf.get(), frame->data, to_write);
}
static bool is_format_supported(void *data, size_t *len){
struct audio_desc desc;
if (*len < sizeof(desc)) {
return false;
} else {
memcpy(&desc, data, sizeof(desc));
}
return desc.codec == AC_PCM && desc.bps >= 2 && desc.bps <= 3;
}
static bool audio_play_aes67_ctl(void *state, int request, void *data, size_t *len){
UNUSED(state);
switch (request) {
case AUDIO_PLAYBACK_CTL_QUERY_FORMAT:
return is_format_supported(data, len);
default:
return false;
}
}
static void stop_session(state_aes67_play *s){
s->sdp_should_run = false;
s->rtp_should_run = false;
if(s->rtp_thread.joinable()){
s->rtp_thread.join();
}
if(s->sdp_thread.joinable()){
s->sdp_thread.join();
}
}
static bool audio_play_aes67_reconfigure(void *state, struct audio_desc desc){
auto s = static_cast<state_aes67_play *>(state);
stop_session(s);
s->desc = desc;
create_sap_sess(s);
unsigned ring_size = (s->buf_len_ms * desc.sample_rate / 1000) * desc.ch_count * desc.bps * 2;
s->ring_buf.reset(ring_buffer_init(ring_size));
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Starting SDP thread\n");
s->sdp_should_run = true;
s->sdp_thread = std::thread(sdp_worker, s);
log_msg(LOG_LEVEL_NOTICE, MOD_NAME "Starting RTP thread\n");
s->rtp_should_run = true;
s->rtp_thread = std::thread(rtp_worker, s);
return true;
}
static void audio_play_aes67_done(void *state){
auto s = static_cast<state_aes67_play *>(state);
stop_session(s);
delete s;
}
static const struct audio_playback_info aplay_aes67_info = {
audio_play_aes67_probe,
audio_play_aes67_init,
audio_play_aes67_put_frame,
audio_play_aes67_ctl,
audio_play_aes67_reconfigure,
audio_play_aes67_done
};
REGISTER_MODULE(aes67, &aplay_aes67_info, LIBRARY_CLASS_AUDIO_PLAYBACK, AUDIO_PLAYBACK_ABI_VERSION);