Skip to content

Commit

Permalink
Media player task blocking and memory tweaks (#72)
Browse files Browse the repository at this point in the history
* use psram for wifi and bluetooth buffers

* reduce pipeline task priorities

* don't block for typical running messages

* speaker tasks blocks on read from ring buffer

* fix mixing without clipping bug

* mixer blocks on ring buffer writes

* reader blocks on ring buffer writes

* avoid compilation warning about missing fields

* resampler blocks on ring buffer reads and writes

* decoder blocks on ring buffer reads and writes

* remove fixed delays for pipeline tasks

* reduce decoder and resampler task stacks

* move pipeline task stacks into internal memory

* move mixer task stack to internal memory

* log pipeline errors

* update TODO
  • Loading branch information
kahrendt authored Sep 3, 2024
1 parent 1554ef7 commit 8acbcb3
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 252 deletions.
44 changes: 17 additions & 27 deletions esphome/components/nabu/audio_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
namespace esphome {
namespace nabu {

static const size_t READ_WRITE_TIMEOUT_MS = 20;

AudioDecoder::AudioDecoder(RingBuffer *input_ring_buffer, RingBuffer *output_ring_buffer, size_t internal_buffer_size) {
this->input_ring_buffer_ = input_ring_buffer;
this->output_ring_buffer_ = output_ring_buffer;
Expand Down Expand Up @@ -89,38 +91,32 @@ AudioDecoderState AudioDecoder::decode(bool stop_gracefully) {
}
}

if (this->potentially_failed_count_ > 5) {
if (this->potentially_failed_count_ > 10) {
return AudioDecoderState::FAILED;
}

FileDecoderState state = FileDecoderState::MORE_TO_PROCESS;

while (state == FileDecoderState::MORE_TO_PROCESS) {
if (this->output_buffer_length_ > 0) {
// Have decoded data, feed into output ring buffer
size_t bytes_free = this->output_ring_buffer_->free();
size_t bytes_to_write = std::min(this->output_buffer_length_, bytes_free);
// Have decoded data, write it to the output ring buffer

size_t bytes_to_write = this->output_buffer_length_;

if (bytes_to_write > 0) {
size_t bytes_written = this->output_ring_buffer_->write((void *) this->output_buffer_current_, bytes_to_write);
size_t bytes_written = this->output_ring_buffer_->write_without_replacement(
(void *) this->output_buffer_current_, bytes_to_write, pdMS_TO_TICKS(READ_WRITE_TIMEOUT_MS));

this->output_buffer_length_ -= bytes_written;
this->output_buffer_current_ += bytes_written;
}

if (this->output_buffer_length_ > 0) {
// Output ring buffer is full, so we can't do any more processing
// Output buffer still has decoded audio to write
return AudioDecoderState::DECODING;
}
} else {
// Try to decode more data
size_t bytes_available = this->input_ring_buffer_->available();
size_t bytes_to_read = std::min(bytes_available, this->internal_buffer_size_ - this->input_buffer_length_);

if ((this->potentially_failed_count_ > 0) && (bytes_to_read == 0)) {
// We didn't have enough data last time, and we have no new data, so just return
return AudioDecoderState::DECODING;
}
// Decode more data

// Shift unread data in input buffer to start
if (this->input_buffer_length_ > 0) {
Expand All @@ -131,15 +127,17 @@ AudioDecoderState AudioDecoder::decode(bool stop_gracefully) {
// read in new ring buffer data to fill the remaining input buffer
size_t bytes_read = 0;

size_t bytes_to_read = this->internal_buffer_size_ - this->input_buffer_length_;

if (bytes_to_read > 0) {
uint8_t *new_audio_data = this->input_buffer_ + this->input_buffer_length_;
bytes_read = this->input_ring_buffer_->read((void *) new_audio_data, bytes_to_read);
bytes_read = this->input_ring_buffer_->read((void *) new_audio_data, bytes_to_read, pdMS_TO_TICKS(READ_WRITE_TIMEOUT_MS));

this->input_buffer_length_ += bytes_read;
}

if (this->input_buffer_length_ == 0) {
// No input data available, so we can't do any more processing
if ((this->input_buffer_length_ == 0) || ((this->potentially_failed_count_ > 0) && (bytes_read == 0))) {
// No input data available or no new data has been read, so we can't do any more processing
state = FileDecoderState::IDLE;
} else {
switch (this->media_file_type_) {
Expand Down Expand Up @@ -167,14 +165,6 @@ AudioDecoderState AudioDecoder::decode(bool stop_gracefully) {
} else {
this->potentially_failed_count_ = 0;
}
if (this->get_stream_info().has_value()) {
size_t monotone_samples =
(this->output_ring_buffer_->available() / sizeof(int16_t)) / this->get_stream_info().value().channels;
if (monotone_samples > this->get_stream_info().value().sample_rate/100) {
// We have more than 10 milliseconds of samples ready to output, we can break
break;
}
}
}
return AudioDecoderState::DECODING;
}
Expand Down Expand Up @@ -253,7 +243,7 @@ FileDecoderState AudioDecoder::decode_flac_() {
return FileDecoderState::END_OF_FILE;
}

return FileDecoderState::MORE_TO_PROCESS;
return FileDecoderState::IDLE;
}

FileDecoderState AudioDecoder::decode_mp3_() {
Expand Down Expand Up @@ -296,7 +286,7 @@ FileDecoderState AudioDecoder::decode_mp3_() {
this->stream_info_ = stream_info;
}
}
// }

return FileDecoderState::MORE_TO_PROCESS;
}

Expand Down
200 changes: 104 additions & 96 deletions esphome/components/nabu/audio_mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static const size_t OUTPUT_BUFFER_SAMPLES = 8192; // Audio samples - keep
static const size_t QUEUE_COUNT = 20;

static const uint32_t TASK_STACK_SIZE = 3072;
static const size_t TASK_DELAY_MS = 20;
static const size_t TASK_DELAY_MS = 25;

static const int16_t MAX_AUDIO_SAMPLE_VALUE = INT16_MAX;
static const int16_t MIN_AUDIO_SAMPLE_VALUE = INT16_MIN;
Expand Down Expand Up @@ -48,12 +48,7 @@ void AudioMixer::stop() {
}

size_t AudioMixer::read(uint8_t *buffer, size_t length, TickType_t ticks_to_wait) {
size_t available_bytes = this->available();
size_t bytes_to_read = std::min(length, available_bytes);
if (bytes_to_read > 0) {
return this->output_ring_buffer_->read((void *) buffer, bytes_to_read, ticks_to_wait);
}
return 0;
return this->output_ring_buffer_->read((void *) buffer, length, ticks_to_wait);
}

void AudioMixer::audio_mixer_task_(void *params) {
Expand All @@ -67,6 +62,9 @@ void AudioMixer::audio_mixer_task_(void *params) {
int16_t *announcement_buffer = allocator.allocate(OUTPUT_BUFFER_SAMPLES);
int16_t *combination_buffer = allocator.allocate(OUTPUT_BUFFER_SAMPLES);

int16_t *combination_buffer_current = combination_buffer;
size_t combination_buffer_length = 0;

if ((media_buffer == nullptr) || (announcement_buffer == nullptr)) {
event.type = EventType::WARNING;
event.err = ESP_ERR_NO_MEM;
Expand Down Expand Up @@ -102,7 +100,7 @@ void AudioMixer::audio_mixer_task_(void *params) {
xQueueSend(this_mixer->event_queue_, &event, portMAX_DELAY);

while (true) {
if (xQueueReceive(this_mixer->command_queue_, &command_event, pdMS_TO_TICKS(TASK_DELAY_MS)) == pdTRUE) {
if (xQueueReceive(this_mixer->command_queue_, &command_event, 0) == pdTRUE) {
if (command_event.command == CommandEventType::STOP) {
break;
} else if (command_event.command == CommandEventType::DUCK) {
Expand All @@ -113,7 +111,7 @@ void AudioMixer::audio_mixer_task_(void *params) {

uint8_t total_ducking_steps = 0;
if (target_ducking_db_reduction > current_ducking_db_reduction) {
// The dB reduction level is increasing (which results in quiter audio)
// The dB reduction level is increasing (which results in quieter audio)
total_ducking_steps = target_ducking_db_reduction - current_ducking_db_reduction - 1;
db_change_per_ducking_step = 1;
} else {
Expand All @@ -140,117 +138,129 @@ void AudioMixer::audio_mixer_task_(void *params) {
}
}

size_t media_available = this_mixer->media_ring_buffer_->available();
size_t announcement_available = this_mixer->announcement_ring_buffer_->available();
size_t output_free = this_mixer->output_ring_buffer_->free();

if ((output_free > 0) && (media_available * transfer_media + announcement_available > 0)) {
size_t bytes_to_read = output_free;

if (media_available * transfer_media > 0) {
bytes_to_read = std::min(bytes_to_read, media_available);
if (combination_buffer_length > 0) {
size_t output_bytes_written = this_mixer->output_ring_buffer_->write_without_replacement(
(void *) combination_buffer, combination_buffer_length, pdMS_TO_TICKS(TASK_DELAY_MS));
combination_buffer_length -= output_bytes_written;
if ((combination_buffer_length > 0) && (output_bytes_written > 0)) {
memmove(combination_buffer, combination_buffer + output_bytes_written / sizeof(int16_t),
combination_buffer_length);
}
} else {
size_t media_available = this_mixer->media_ring_buffer_->available();
size_t announcement_available = this_mixer->announcement_ring_buffer_->available();

if (announcement_available > 0) {
bytes_to_read = std::min(bytes_to_read, announcement_available);
}
if (media_available * transfer_media + announcement_available > 0) {
size_t bytes_to_read = OUTPUT_BUFFER_SAMPLES * sizeof(int16_t);

if (bytes_to_read > 0) {
size_t media_bytes_read = 0;
if (media_available * transfer_media > 0) {
media_bytes_read = this_mixer->media_ring_buffer_->read((void *) media_buffer, bytes_to_read, 0);
if (media_bytes_read > 0) {
size_t samples_read = media_bytes_read / sizeof(int16_t);
if (ducking_transition_samples_remaining > 0) {
// Ducking level is still transitioning
bytes_to_read = std::min(bytes_to_read, media_available);
}

size_t samples_left = ducking_transition_samples_remaining;
if (announcement_available > 0) {
bytes_to_read = std::min(bytes_to_read, announcement_available);
}

// There may be more than one step worth of samples to duck in the buffers, so manage positions
int16_t *current_media_buffer = media_buffer;
if (bytes_to_read > 0) {
size_t media_bytes_read = 0;
if (media_available * transfer_media > 0) {
media_bytes_read = this_mixer->media_ring_buffer_->read((void *) media_buffer, bytes_to_read, 0);
if (media_bytes_read > 0) {
size_t samples_read = media_bytes_read / sizeof(int16_t);
if (ducking_transition_samples_remaining > 0) {
// Ducking level is still transitioning

size_t samples_left_in_step = samples_left % samples_per_ducking_step;
if (samples_left_in_step == 0) {
// Start of a new ducking step
size_t samples_left = ducking_transition_samples_remaining;

current_ducking_db_reduction += db_change_per_ducking_step;
samples_left_in_step = samples_per_ducking_step;
}
size_t samples_left_to_duck = std::min(samples_left_in_step, samples_read);
// There may be more than one step worth of samples to duck in the buffers, so manage positions
int16_t *current_media_buffer = media_buffer;

size_t total_samples_ducked = 0;
size_t samples_left_in_step = samples_left % samples_per_ducking_step;
if (samples_left_in_step == 0) {
// Start of a new ducking step

while (samples_left_to_duck > 0) {
// Ensure we only point to valid index in the Q15 scaling factor table
uint8_t safe_db_reduction_index =
clamp<uint8_t>(current_ducking_db_reduction, 0, decibel_reduction_table.size() - 1);
current_ducking_db_reduction += db_change_per_ducking_step;
samples_left_in_step = samples_per_ducking_step;
}
size_t samples_left_to_duck = std::min(samples_left_in_step, samples_read);

int16_t q15_scale_factor = decibel_reduction_table[safe_db_reduction_index];
this_mixer->scale_audio_samples_(current_media_buffer, current_media_buffer, q15_scale_factor,
samples_left_to_duck);
size_t total_samples_ducked = 0;

current_media_buffer += samples_left_to_duck;
while (samples_left_to_duck > 0) {
// Ensure we only point to valid index in the Q15 scaling factor table
uint8_t safe_db_reduction_index =
clamp<uint8_t>(current_ducking_db_reduction, 0, decibel_reduction_table.size() - 1);

samples_read -= samples_left_to_duck;
samples_left -= samples_left_to_duck;
int16_t q15_scale_factor = decibel_reduction_table[safe_db_reduction_index];
this_mixer->scale_audio_samples_(current_media_buffer, current_media_buffer, q15_scale_factor,
samples_left_to_duck);

total_samples_ducked += samples_left_to_duck;
current_media_buffer += samples_left_to_duck;

samples_left_in_step = samples_left % samples_per_ducking_step;
if (samples_left_in_step == 0) {
// Start of a new step
samples_read -= samples_left_to_duck;
samples_left -= samples_left_to_duck;

current_ducking_db_reduction += db_change_per_ducking_step;
samples_left_in_step = samples_per_ducking_step;
total_samples_ducked += samples_left_to_duck;

samples_left_in_step = samples_left % samples_per_ducking_step;
if (samples_left_in_step == 0) {
// Start of a new step

current_ducking_db_reduction += db_change_per_ducking_step;
samples_left_in_step = samples_per_ducking_step;
}
samples_left_to_duck = std::min(samples_left_in_step, samples_read);
}
samples_left_to_duck = std::min(samples_left_in_step, samples_read);
}
} else if (target_ducking_db_reduction > 0) {
// We still need to apply a ducking scaling, but we are done transitioning
} else if (target_ducking_db_reduction > 0) {
// We still need to apply a ducking scaling, but we are done transitioning

uint8_t safe_db_reduction_index =
clamp<uint8_t>(target_ducking_db_reduction, 0, decibel_reduction_table.size() - 1);
uint8_t safe_db_reduction_index =
clamp<uint8_t>(target_ducking_db_reduction, 0, decibel_reduction_table.size() - 1);

int16_t q15_scale_factor = decibel_reduction_table[safe_db_reduction_index];
this_mixer->scale_audio_samples_(media_buffer, media_buffer, q15_scale_factor, samples_read);
int16_t q15_scale_factor = decibel_reduction_table[safe_db_reduction_index];
this_mixer->scale_audio_samples_(media_buffer, media_buffer, q15_scale_factor, samples_read);
}
}
}
}

size_t announcement_bytes_read = 0;
if (announcement_available > 0) {
announcement_bytes_read =
this_mixer->announcement_ring_buffer_->read((void *) announcement_buffer, bytes_to_read, 0);
}
size_t announcement_bytes_read = 0;
if (announcement_available > 0) {
announcement_bytes_read =
this_mixer->announcement_ring_buffer_->read((void *) announcement_buffer, bytes_to_read, 0);
}

size_t output_bytes_written = 0;
if ((media_bytes_read > 0) && (announcement_bytes_read > 0)) {
// We have both a media and an announcement stream, so mix them together
if ((media_bytes_read > 0) && (announcement_bytes_read > 0)) {
// We have both a media and an announcement stream, so mix them together

if (media_bytes_read != announcement_bytes_read) {
printf("somehow media and announcement bytes read are different\n");
}
size_t samples_read = bytes_to_read / sizeof(int16_t);
size_t samples_read = bytes_to_read / sizeof(int16_t);

this_mixer->mix_audio_samples_without_clipping_(media_buffer, announcement_buffer, combination_buffer,
samples_read);
this_mixer->mix_audio_samples_without_clipping_(media_buffer, announcement_buffer, combination_buffer,
samples_read);

output_bytes_written = this_mixer->output_ring_buffer_->write((void *) combination_buffer, bytes_to_read);
if (output_bytes_written != bytes_to_read) {
printf("couldn't copy all the mixed samples into the output ring buffer\n");
}
} else if (media_bytes_read > 0) {
output_bytes_written = this_mixer->output_ring_buffer_->write((void *) media_buffer, media_bytes_read);
combination_buffer_length = samples_read * sizeof(int16_t);
// output_bytes_written = this_mixer->output_ring_buffer_->write((void *) combination_buffer,
// bytes_to_read);
} else if (media_bytes_read > 0) {
memcpy(combination_buffer, media_buffer, media_bytes_read);
combination_buffer_length = media_bytes_read;
// output_bytes_written = this_mixer->output_ring_buffer_->write((void *) media_buffer, media_bytes_read);

} else if (announcement_bytes_read > 0) {
output_bytes_written =
this_mixer->output_ring_buffer_->write((void *) announcement_buffer, announcement_bytes_read);
}
} else if (announcement_bytes_read > 0) {
memcpy(combination_buffer, announcement_buffer, announcement_bytes_read);
combination_buffer_length = announcement_bytes_read;
// output_bytes_written =
// this_mixer->output_ring_buffer_->write((void *) announcement_buffer, announcement_bytes_read);
}

size_t samples_written = output_bytes_written / sizeof(int16_t);
if (ducking_transition_samples_remaining > 0) {
ducking_transition_samples_remaining -= std::min(samples_written, ducking_transition_samples_remaining);
size_t samples_written = combination_buffer_length / sizeof(int16_t);
if (ducking_transition_samples_remaining > 0) {
ducking_transition_samples_remaining -= std::min(samples_written, ducking_transition_samples_remaining);
}
}
} else {
// No audio data available in either buffer

delay(TASK_DELAY_MS);
}
}
}
Expand Down Expand Up @@ -286,10 +296,8 @@ esp_err_t AudioMixer::allocate_buffers_() {
return ESP_ERR_NO_MEM;
}

ExternalRAMAllocator<StackType_t> stack_allocator(ExternalRAMAllocator<StackType_t>::ALLOW_FAILURE);

if (this->stack_buffer_ == nullptr)
this->stack_buffer_ = stack_allocator.allocate(TASK_STACK_SIZE);
this->stack_buffer_ = (StackType_t *) malloc(TASK_STACK_SIZE);

if (this->stack_buffer_ == nullptr) {
return ESP_ERR_NO_MEM;
Expand Down Expand Up @@ -333,10 +341,10 @@ void AudioMixer::mix_audio_samples_without_clipping_(int16_t *media_buffer, int1
if ((added_sample > MAX_AUDIO_SAMPLE_VALUE) || (added_sample < MIN_AUDIO_SAMPLE_VALUE)) {
// The largest magnitude the media sample can be to avoid clipping (converted to Q30 fixed point)
int32_t q30_media_sample_safe_max =
static_cast<int32_t>(MAX_AUDIO_SAMPLE_VALUE - std::abs(announcement_buffer[i])) << 15;
static_cast<int32_t>(std::abs(MIN_AUDIO_SAMPLE_VALUE) - std::abs(announcement_buffer[i])) << 15;

// Actual media sample value (Q15 number stored in an int32 for future division)
int32_t media_sample_value = media_buffer[i];
int32_t media_sample_value = abs(media_buffer[i]);

// Calculation to perform the Q15 division for media_sample_safe_max/media_sample_value
// Reference: https://sestevenson.wordpress.com/2010/09/20/fixed-point-division-2/ (accessed August 15,
Expand Down
Loading

0 comments on commit 8acbcb3

Please sign in to comment.