[core] Bugfix: Implement ring buffer with xRingbuffer (#7973)

This commit is contained in:
Kevin Ahrendt 2024-12-18 21:07:07 -05:00 committed by GitHub
parent 0aaef9293b
commit 61499dbdd8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 82 additions and 21 deletions

View file

@ -13,8 +13,8 @@ static const char *const TAG = "ring_buffer";
RingBuffer::~RingBuffer() { RingBuffer::~RingBuffer() {
if (this->handle_ != nullptr) { if (this->handle_ != nullptr) {
vStreamBufferDelete(this->handle_); vRingbufferDelete(this->handle_);
ExternalRAMAllocator<uint8_t> allocator(ExternalRAMAllocator<uint8_t>::ALLOW_FAILURE); RAMAllocator<uint8_t> allocator(RAMAllocator<uint8_t>::ALLOW_FAILURE);
allocator.deallocate(this->storage_, this->size_); allocator.deallocate(this->storage_, this->size_);
} }
} }
@ -22,26 +22,49 @@ RingBuffer::~RingBuffer() {
std::unique_ptr<RingBuffer> RingBuffer::create(size_t len) { std::unique_ptr<RingBuffer> RingBuffer::create(size_t len) {
std::unique_ptr<RingBuffer> rb = make_unique<RingBuffer>(); std::unique_ptr<RingBuffer> rb = make_unique<RingBuffer>();
rb->size_ = len + 1; rb->size_ = len;
ExternalRAMAllocator<uint8_t> allocator(ExternalRAMAllocator<uint8_t>::ALLOW_FAILURE); RAMAllocator<uint8_t> allocator(RAMAllocator<uint8_t>::ALLOW_FAILURE);
rb->storage_ = allocator.allocate(rb->size_); rb->storage_ = allocator.allocate(rb->size_);
if (rb->storage_ == nullptr) { if (rb->storage_ == nullptr) {
return nullptr; return nullptr;
} }
rb->handle_ = xStreamBufferCreateStatic(rb->size_, 1, rb->storage_, &rb->structure_); rb->handle_ = xRingbufferCreateStatic(rb->size_, RINGBUF_TYPE_BYTEBUF, rb->storage_, &rb->structure_);
ESP_LOGD(TAG, "Created ring buffer with size %u", len); ESP_LOGD(TAG, "Created ring buffer with size %u", len);
return rb; return rb;
} }
size_t RingBuffer::read(void *data, size_t len, TickType_t ticks_to_wait) { size_t RingBuffer::read(void *data, size_t len, TickType_t ticks_to_wait) {
if (ticks_to_wait > 0) size_t bytes_read = 0;
xStreamBufferSetTriggerLevel(this->handle_, len);
size_t bytes_read = xStreamBufferReceive(this->handle_, data, len, ticks_to_wait); void *buffer_data = xRingbufferReceiveUpTo(this->handle_, &bytes_read, ticks_to_wait, len);
xStreamBufferSetTriggerLevel(this->handle_, 1); if (buffer_data == nullptr) {
return 0;
}
std::memcpy(data, buffer_data, bytes_read);
vRingbufferReturnItem(this->handle_, buffer_data);
if (bytes_read < len) {
// Data may have wrapped around, so read a second time to receive the remainder
size_t follow_up_bytes_read = 0;
size_t bytes_remaining = len - bytes_read;
buffer_data = xRingbufferReceiveUpTo(this->handle_, &follow_up_bytes_read, 0, bytes_remaining);
if (buffer_data == nullptr) {
return bytes_read;
}
std::memcpy((void *) ((uint8_t *) (data) + bytes_read), buffer_data, follow_up_bytes_read);
vRingbufferReturnItem(this->handle_, buffer_data);
bytes_read += follow_up_bytes_read;
}
return bytes_read; return bytes_read;
} }
@ -49,22 +72,55 @@ size_t RingBuffer::read(void *data, size_t len, TickType_t ticks_to_wait) {
size_t RingBuffer::write(const void *data, size_t len) { size_t RingBuffer::write(const void *data, size_t len) {
size_t free = this->free(); size_t free = this->free();
if (free < len) { if (free < len) {
size_t needed = len - free; // Free enough space in the ring buffer to fit the new data
uint8_t discard[needed]; this->discard_bytes_(len - free);
xStreamBufferReceive(this->handle_, discard, needed, 0);
} }
return xStreamBufferSend(this->handle_, data, len, 0); return this->write_without_replacement(data, len, 0);
} }
size_t RingBuffer::write_without_replacement(const void *data, size_t len, TickType_t ticks_to_wait) { size_t RingBuffer::write_without_replacement(const void *data, size_t len, TickType_t ticks_to_wait) {
return xStreamBufferSend(this->handle_, data, len, ticks_to_wait); if (!xRingbufferSend(this->handle_, data, len, ticks_to_wait)) {
// Couldn't fit all the data, so only write what will fit
size_t free = std::min(this->free(), len);
if (xRingbufferSend(this->handle_, data, free, 0)) {
return free;
}
return 0;
}
return len;
} }
size_t RingBuffer::available() const { return xStreamBufferBytesAvailable(this->handle_); } size_t RingBuffer::available() const {
UBaseType_t ux_items_waiting = 0;
vRingbufferGetInfo(this->handle_, nullptr, nullptr, nullptr, nullptr, &ux_items_waiting);
return ux_items_waiting;
}
size_t RingBuffer::free() const { return xStreamBufferSpacesAvailable(this->handle_); } size_t RingBuffer::free() const { return xRingbufferGetCurFreeSize(this->handle_); }
BaseType_t RingBuffer::reset() { return xStreamBufferReset(this->handle_); } BaseType_t RingBuffer::reset() {
// Discards all the available data
return this->discard_bytes_(this->available());
}
bool RingBuffer::discard_bytes_(size_t discard_bytes) {
size_t bytes_read = 0;
void *buffer_data = xRingbufferReceiveUpTo(this->handle_, &bytes_read, 0, discard_bytes);
if (buffer_data != nullptr)
vRingbufferReturnItem(this->handle_, buffer_data);
if (bytes_read < discard_bytes) {
size_t wrapped_bytes_read = 0;
buffer_data = xRingbufferReceiveUpTo(this->handle_, &wrapped_bytes_read, 0, discard_bytes - bytes_read);
if (buffer_data != nullptr) {
vRingbufferReturnItem(this->handle_, buffer_data);
bytes_read += wrapped_bytes_read;
}
}
return (bytes_read == discard_bytes);
}
} // namespace esphome } // namespace esphome

View file

@ -3,7 +3,7 @@
#ifdef USE_ESP32 #ifdef USE_ESP32
#include <freertos/FreeRTOS.h> #include <freertos/FreeRTOS.h>
#include <freertos/stream_buffer.h> #include <freertos/ringbuf.h>
#include <cinttypes> #include <cinttypes>
#include <memory> #include <memory>
@ -82,9 +82,14 @@ class RingBuffer {
static std::unique_ptr<RingBuffer> create(size_t len); static std::unique_ptr<RingBuffer> create(size_t len);
protected: protected:
StreamBufferHandle_t handle_; /// @brief Discards data from the ring buffer.
StaticStreamBuffer_t structure_; /// @param discard_bytes amount of bytes to discard
uint8_t *storage_; /// @return True if all bytes were successfully discarded, false otherwise
bool discard_bytes_(size_t discard_bytes);
RingbufHandle_t handle_{nullptr};
StaticRingbuffer_t structure_;
uint8_t *storage_{nullptr};
size_t size_{0}; size_t size_{0};
}; };