speex resapler: run in parallel

This commit is contained in:
Martin Pulec
2022-11-07 13:12:55 +01:00
parent a082cdaf12
commit bfcf910403
3 changed files with 37 additions and 16 deletions

View File

@@ -50,6 +50,7 @@
#include "ug_runtime_error.hpp"
#include "utils/macros.h"
#include "utils/misc.h"
#include "utils/worker.h"
#ifdef HAVE_SPEEXDSP
#include <speex/speex_resampler.h>
@@ -279,6 +280,25 @@ bool speex_resampler::check_reconfigure(unsigned original_sample_rate, unsigned
return true;
}
struct speex_process_channel_data {
SpeexResamplerState *state;
int channel_idx;
const spx_int16_t *in;
spx_int16_t *out;
uint32_t in_frames;
uint32_t in_frames_orig;
uint32_t write_frames;
};
static void *speex_process_channel(void *arg) {
auto *d = (speex_process_channel_data *) arg;
speex_resampler_process_int(d->state,
d->channel_idx,
d->in, &d->in_frames,
d->out, &d->write_frames);
return NULL;
}
/// @todo
/// speex supports also floats so there could be possibility also to add support for more bps
tuple<bool, audio_frame2> speex_resampler::resample(audio_frame2 &a, vector<audio_frame2::channel> &new_channels, int new_sample_rate_num, int new_sample_rate_den) {
@@ -290,23 +310,23 @@ tuple<bool, audio_frame2> speex_resampler::resample(audio_frame2 &a, vector<audi
audio_frame2 remainder;
remainder.init(new_channels.size(), AC_PCM, prop.bps, prop.rate_from);
/// @todo
/// Consider doing this in parallel - complex resampling requires some milliseconds.
/// Parallel resampling would reduce latency (and improve performance if there is not
/// enough single-core power).
vector <speex_process_channel_data> speex_worker_data(new_channels.size());
for (size_t i = 0; i < new_channels.size(); i++) {
uint32_t in_frames = a.get_data_len(i) / sizeof(int16_t);
uint32_t in_frames_orig = in_frames;
uint32_t write_frames = new_channels[i].len / sizeof(int16_t);
speex_resampler_process_int(state,
i,
(const spx_int16_t *)(const void *) a.get_data(i), &in_frames,
(spx_int16_t *)(void *) new_channels[i].data.get(), &write_frames);
if (in_frames != in_frames_orig) {
remainder.append(i, a.get_data(i) + in_frames * sizeof(int16_t), in_frames_orig - in_frames);
speex_worker_data.at(i).state = state;
speex_worker_data.at(i).channel_idx = i;
speex_worker_data.at(i).in = (const spx_int16_t *)(const void *) a.get_data(i);
speex_worker_data.at(i).out = (spx_int16_t *)(void *) new_channels[i].data.get();
speex_worker_data.at(i).in_frames_orig =
speex_worker_data.at(i).in_frames = a.get_data_len(i) / sizeof(int16_t);
speex_worker_data.at(i).write_frames = new_channels[i].len / sizeof(int16_t);
}
task_run_parallel(speex_process_channel, new_channels.size(), speex_worker_data.data(), sizeof speex_worker_data[0], NULL);
for (size_t i = 0; i < new_channels.size(); i++) {
if (speex_worker_data.at(i).in_frames != speex_worker_data.at(i).in_frames_orig) {
remainder.append(i, a.get_data(i) + speex_worker_data.at(i).in_frames * sizeof(int16_t),
speex_worker_data.at(i).in_frames_orig - speex_worker_data.at(i).in_frames);
}
new_channels[i].len = write_frames * sizeof(int16_t);
new_channels[i].len = speex_worker_data.at(i).write_frames * sizeof(int16_t);
}
if (remainder.get_data_len() == 0) {

View File

@@ -63,6 +63,7 @@ static void *parallel_pix_conv_task(void *arg) {
return NULL;
}
/// @todo utilize respawn_parallel
void parallel_pix_conv(int height, char *out, int out_linesize, const char *in, int in_linesize, decoder_t decode, int threads)
{
struct parallel_pix_conv_data data[threads];

View File

@@ -58,7 +58,7 @@ void *wait_task(task_result_handle_t handle);
void task_run_parallel(runnable_t task, int worker_count, void *data, size_t data_size, void **res);
/**
* @param data_len in/out processed block length in bytes
* @param data_len in/out processed block length in bytes (multpile of respawn_parallel's size param)
*/
typedef void (*respawn_parallel_callback_t)(void *in, void *out, size_t data_len, void *udata);
void respawn_parallel(void *in, void *out, size_t nmemb, size_t size, respawn_parallel_callback_t c, void *udata);