本文翻译自乐鑫文档,详见链接地址,结合用例自行测试,欢迎交流。源码链接ringbuf.c

项目中用乐鑫的wifi模组,esp8266,esp32中发现ringbuf的身影,效率应该不错,决定研究一下,网上没有发现相关资料,后来发现官方的一篇相关文档,在我研究相关文档和代码后,逐渐了解部分,在其struct Ringbuffer_t结构体中,五个相关指针,分别指向头,尾,读,写,free,开始看到free一脸懵,这个指针干啥用的,为啥要用呢,后来发现 prvReturnItemByteBuf 这个函数中,直接把read指针赋值给free,free之后才可以写入,我也不清楚这样的好处,可能向malloc free看齐吧(捂脸),知道的方便的话可以告知一下。


ringbuf.c中先看了 xRingbufferCreate 这个入口函数,传入需要长度和缓冲区类型,赋值相应的解析函数,可以先看RINGBUF_TYPE_BYTEBUF这个类型,就相当于一个环形队列吧,另外两个每次送入的数据都要加一个头记录数据长度,每次数据都隔离开了,根据write的tail的长度,可以选择允许或则不允许分割,对应的函数也是不同的。
再看一下发送函数 prvCopyItemByteBuf     读取数据函数 prvGetItemByteBuf 了解一下大概
 

struct Ringbuffer_t {
    size_t xSize;                               //Size of the data storage
    UBaseType_t uxRingbufferFlags;              //Flags to indicate the type and status of ring buffer
    size_t xMaxItemSize;                        //Maximum item size

    CheckItemFitsFunction_t xCheckItemFits;     //Function to check if item can currently fit in ring buffer
    CopyItemFunction_t vCopyItem;               //Function to copy item to ring buffer
    GetItemFunction_t pvGetItem;                //Function to get item from ring buffer
    ReturnItemFunction_t vReturnItem;           //Function to return item to ring buffer
    GetCurMaxSizeFunction_t xGetCurMaxSize;     //Function to get current free size

    uint8_t *pucWrite;                          //Write Pointer. Points to where the next item should be written
    uint8_t *pucRead;                           //Read Pointer. Points to where the next item should be read from
    uint8_t *pucFree;                           //Free Pointer. Points to the last item that has yet to be returned to the ring buffer
    uint8_t *pucHead;                           //Pointer to the start of the ring buffer storage area
    uint8_t *pucTail;                           //Pointer to the end of the ring buffer storage area

    BaseType_t xItemsWaiting;                   //Number of items/bytes(for byte buffers) currently in ring buffer that have not yet been read
    SemaphoreHandle_t xFreeSpaceSemaphore;      //Binary semaphore, wakes up writing threads when more free space becomes available or when another thread times out attempting to write
    SemaphoreHandle_t xItemsBufferedSemaphore;  //Binary semaphore, indicates there are new packets in the circular buffer. See remark.
};

Ring Buffers(环形缓冲区)

ESP-IDF FreeRTOS环形缓冲区是严格的FIFO缓冲区,支持任意大小的项目。在项目大小可变的情况下,环形缓冲区是FreeRTOS队列的一种内存效率更高的替代方法。环形缓冲区的容量不是由它可以存储的项目数来衡量的,而是由用于存储项目的内存量来衡量的。您可以在环形缓冲区上申请一块内存来发送项目,或者只是使用API​​复制数据并发送(根据您调用的send API)。出于效率原因, 总是通过引用从环形缓冲区中检索项目。因此,还必须返回所有检索到的项,以便将其从环形缓冲区中完全删除。环形缓冲区分为以下三种类型:

No-Split (无分割): 缓冲区将确保将项目存储在连续内存中,并且在任何情况下都不会尝试分割项目。当项目必须占用连续内存时,请使用不拆分缓冲区。仅此缓冲区类型允许您自己获取数据项地址并写入该项。

Allow-Split(允许分割)缓冲区将允许在包装时拆分项目,如果这样做将允许存储该项目。允许拆分缓冲区比无拆分缓冲区具有更高的内存效率,但是在检索时可以分两部分返回一个项目。

Byte buffers(字节缓冲区)不将数据存储为单独的项目。所有数据均按字节顺序存储,任意数量的字节都可以每次发送或检索。当不需要维护单独的项(例如字节流)时,请使用字节缓冲区。注解

注:
No-split/allow-split分割缓冲区将始终将项目存储在32位对齐的地址处。因此,在检索项目时,保证项目指针是32位对齐的。这在需要向DMA发送一些数据时特别有用。

存储在无拆分/允许拆分缓冲区中的每个项目都需要为标头额外增加8个字节。项目大小也将四舍五入为32位对齐的大小(4字节的倍数),但是真实的项目大小记录在标头中。创建时不拆分/允许拆分的缓冲区的大小也会四舍五入。

用法

下面的示例演示的用法xRingbufferCreate() 和xRingbufferSend()创建环形缓冲区,然后向其发送项目的过程。

#include "freertos/ringbuf.h"
static char tx_item[] = "test_item";

...

    //Create ring buffer
    RingbufHandle_t buf_handle;
    buf_handle = xRingbufferCreate(1028, RINGBUF_TYPE_NOSPLIT);
    if (buf_handle == NULL) {
        printf("Failed to create ring buffer\n");
    }

    //Send an item
    UBaseType_t res =  xRingbufferSend(buf_handle, tx_item, sizeof(tx_item), pdMS_TO_TICKS(1000));
    if (res != pdTRUE) {
        printf("Failed to send item\n");
    }

 

下面的例子说明使用的xRingbufferSendAcquire()和 xRingbufferSendComplete(),而不是xRingbufferSend()申请的(类型环形缓冲存储器 RINGBUF_TYPE_NOSPLIT ),然后发送项目给它。这种方法增加了一个步骤,但允许获取要写入的内存地址,以及自己写入内存。

#include "freertos/ringbuf.h"
#include "soc/lldesc.h"

typedef struct {
    lldesc_t dma_desc;
    uint8_t buf[1];
} dma_item_t;

#define DMA_ITEM_SIZE(N) (sizeof(lldesc_t)+(((N)+3)&(~3)))

...

    //Retrieve space for DMA descriptor and corresponding data buffer
    //This has to be done with SendAcquire, or the address may be different when copy
    dma_item_t item;
    UBaseType_t res =  xRingbufferSendAcquire(buf_handle,
                        &item, DMA_ITEM_SIZE(buffer_size), pdMS_TO_TICKS(1000));
    if (res != pdTRUE) {
        printf("Failed to acquire memory for item\n");
    }
    item->dma_desc = (lldesc_t) {
        .size = buffer_size,
        .length = buffer_size,
        .eof = 0,
        .owner = 1,
        .buf = &item->buf,
    };
    //Actually send to the ring buffer for consumer to use
    res = xRingbufferSendComplete(buf_handle, &item);
    if (res != pdTRUE) {
        printf("Failed to send item\n");
    }

 

以下示例演示了 使用和no-split ring buffer检索和返回项目。xRingbufferReceive()vRingbufferReturnItem()

...

    //Receive an item from no-split ring buffer
    size_t item_size;
    char *item = (char *)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(1000));

    //Check received item
    if (item != NULL) {
        //Print item
        for (int i = 0; i < item_size; i++) {
            printf("%c", item[i]);
        }
        printf("\n");
        //Return Item
        vRingbufferReturnItem(buf_handle, (void *)item);
    } else {
        //Failed to receive item
        printf("Failed to receive item\n");
    }

 

下面的示例演示了 使用和从allow-split ring buffer检索和返回项目。xRingbufferReceiveSplit()vRingbufferReturnItem()

...

    //Receive an item from allow-split ring buffer
    size_t item_size1, item_size2;
    char *item1, *item2;
    BaseType_t ret = xRingbufferReceiveSplit(buf_handle, (void **)&item1, (void **)&item2, &item_size1, &item_size2, pdMS_TO_TICKS(1000));

    //Check received item
    if (ret == pdTRUE && item1 != NULL) {
        for (int i = 0; i < item_size1; i++) {
            printf("%c", item1[i]);
        }
        vRingbufferReturnItem(buf_handle, (void *)item1);
        //Check if item was split
        if (item2 != NULL) {
            for (int i = 0; i < item_size2; i++) {
                printf("%c", item2[i]);
            }
            vRingbufferReturnItem(buf_handle, (void *)item2);
        }
        printf("\n");
    } else {
        //Failed to receive item
        printf("Failed to receive item\n");
    }

 

以下示例演示了 使用和从 byte buffer检索和返回项目xRingbufferReceiveUpTo()vRingbufferReturnItem()

...

    //Receive data from byte buffer
    size_t item_size;
    char *item = (char *)xRingbufferReceiveUpTo(buf_handle, &item_size, pdMS_TO_TICKS(1000), sizeof(tx_item));

    //Check received data
    if (item != NULL) {
        //Print item
        for (int i = 0; i < item_size; i++) {
            printf("%c", item[i]);
        }
        printf("\n");
        //Return Item
        vRingbufferReturnItem(buf_handle, (void *)item);
    } else {
        //Failed to receive item
        printf("Failed to receive item\n");
    }

 

对于上面使用的功能ISR安全版本,通话xRingbufferSendFromISR()xRingbufferReceiveFromISR(), xRingbufferReceiveSplitFromISR()xRingbufferReceiveUpToFromISR(),和vRingbufferReturnItemFromISR()

 

Sending to Ring Buffer

下图说明了在发送项目/数据方面 no-split/allow-split (无拆分/允许拆分)缓冲区和字节缓冲区之间的区别。这些图假定大小分别为18、3和27字节的三个项目分别发送到128个字节缓冲区中

 

                                                将项目发送到 no-split/allow-split ring buffers

对于不拆分/允许拆分的缓冲区,每个数据项之前都有一个8字节的标头。此外,每个项目占用的空间都将四舍五入到最接近的32位对齐大小,以保持整体32位对齐。但是,项目的真实大小会记录在标头中,当检索到项目时将返回标头。

参考上图,将18、3和27个字节的项分别舍入20、4和28个字节 。然后在每个项目的前面添加一个8字节的标头。

                                                                   将项目发送到  byte buffers

字节缓冲区将数据视为字节序列,并且不会产生任何开销(没有标头)。所有发送到字节缓冲区的数据都合并为一个项目。

参考上图,将18、3和27个字节项依次写入字节缓冲区,并合并为48个字节的单个项

 

SendAcquire和SendComplete

不拆分缓冲区中的项目按严格的FIFO顺序获取(通过SendAcquire),并且必须由SendComplete发送到缓冲区中,以便消费者可以访问数据。可以在不调用SendComplete的情况下发送或获取多个项目,并且不一定需要按照获取顺序完成这些项目。但是,数据项的接收必须以FIFO顺序进行,因此,不调用最早获取的项来调用SendComplete将阻止后续项的接收。

下图说明了SendAcquire / SendComplete不按相同顺序发生时将发生的情况。在开始时,已经有16字节的数据项发送到环形缓冲区。然后调用SendAcquire以在环形缓冲区上获取20、8、24字节的空间。

                                            无拆分环形缓冲区中的SendAcquire / SendComplete项目

之后,我们填充(使用)缓冲区,并通过SendComplete以8、24、20的顺序将它们发送到环形缓冲区。当发送8字节和24字节数据时,使用者仍然只能获得16字节数据项目。由于使用情况,如果20个字节的项目不完整,则该项目不可用,以下数据项目也不可用。

当最后完成20个字节的项目后,就可以立即接收20个,8个,24个字节的所有3个数据项,紧接在开始时存在于缓冲区中的16个字节的项目之后。

允许分割/字节缓冲区不允许使用SendAcquire / SendComplete,因为需要完整(不包装)获取的缓冲区。

 

Wrap around (环绕)

下图说明了当发送的项目需要跨过尾部和头部组成环形时, no-split, allow-split, byte buffers之间的区别。这些图假定有一个128字节的缓冲区, 其中有56字节的可用空间可以环绕,发送的项目为28字节

                                              Wrap around in no-split buffers(在无拆分缓冲区中环绕)

无拆分缓冲区将仅将项目存储在连续的可用空间中,并且在任何情况下都不会拆分项目。当缓冲区尾部的可用空间不足以完全存储项目及其标头时,尾部的可用空间将被标记为虚拟数据。然后,缓冲区将包裹起来并将项目存储在缓冲区顶部的可用空间中。

参考上图,缓冲区尾部的16个字节的可用空间不足以存储28个字节的项。因此,将这16个字节标记为虚拟数据,并将该项目写入缓冲区开头的可用空间。

                                                               在允许分割的缓冲区中

当缓冲区尾部的可用空间不足以存储项目数据及其标题时,允许拆分缓冲区将尝试将项目分为两部分。拆分项目的两个部分都有自己的标头(因此会产生额外的8个字节的开销)。

参考上图,缓冲区尾部的16个字节的可用空间不足以存储28个字节的项。因此,该项目分为两部分(8和20字节),并分为两部分写入缓冲区。

注解

Allow-split buffers(允许拆分缓冲区)将拆分项的两个部分都视为两个单独的项,因此调用 xRingbufferReceiveSplit()而不xRingbufferReceive()是以线程安全的方式接收拆分项的两个部分。

 

                                                      在字节缓冲区中

字节缓冲区会将尽可能多的数据存储到缓冲区尾部的可用空间中。然后,剩余的数据将存储在缓冲区开头的可用空间中。在字节缓冲区中回绕时不会产生开销。

参考上图,缓冲区尾部的16个字节的可用空间不足以完全存储28个字节的数据。因此,16个字节的可用空间被数据填充,其余12个字节被写入缓冲区开头的可用空间。现在,缓冲区包含两个独立的连续部分中的数据,并且字节缓冲区会将连续的每个部分视为一个单独的项目。

Retrieving/Returning(检索/返回)

下图说明了在获取和返回数据时无拆分/允许拆分和字节缓冲区之间的区别。

 

                                                 在不拆分/允许拆分的环形缓冲区中检索/返回项目

以严格的FIFO顺序检索无拆分/允许拆分缓冲区中的项目,必须将其返回 以释放占用的空间。可以在返回之前检索多个项目,并且不一定需要按照检索顺序返回这些项目。但是,释放空间必须按FIFO顺序进行,因此不返回最早检索到的项目将阻止释放后续项目的空间。

参考上图,以FIFO顺序检索16、20和8字节的项目。但是,在检索项目时不会返回它们(20、8、16)。这样,直到返回第一项(16字节)后,空间才会释放。

                                                              在字节缓冲区中检索/返回数据

字节缓冲区不允许在返回之前进行多次检索(每次检索之后都必须先返回,然后才允许再次进行检索)。使用xRingbufferReceive()或时 xRingbufferReceiveFromISR(),将检索所有连续存储的数据。xRingbufferReceiveUpTo() 或xRingbufferReceiveUpToFromISR()可用于限制检索到的最大字节数。由于每次检索后都必须有返回值,因此一旦返回数据,空间将被释放。

参照上图,检索,返回和释放缓冲区尾部的38个字节的连续存储数据。下一次调用xRingbufferReceive()xRingbufferReceiveFromISR() 环绕,然后对缓冲区开头的30个字节的连续存储数据进行相同的处理。

具有队列集的环形缓冲区

可以使用以下方式将环形缓冲区添加到FreeRTOS队列集中xRingbufferAddToQueueSetRead(),使得每次环形缓冲区接收到项或数据时,都会通知队列集。一旦添加到队列集中,每次从环形缓冲区中检索项目的尝试都应先调用xQueueSelectFromSet()。要检查所选队列集成员是否为环形缓冲区,请调用xRingbufferCanRead()

下面的示例演示了环形缓冲区与队列集的用法。

#include "freertos/queue.h"
#include "freertos/ringbuf.h"

...

    //Create ring buffer and queue set
    RingbufHandle_t buf_handle = xRingbufferCreate(1028, RINGBUF_TYPE_NOSPLIT);
    QueueSetHandle_t queue_set = xQueueCreateSet(3);

    //Add ring buffer to queue set
    if (xRingbufferAddToQueueSetRead(buf_handle, queue_set) != pdTRUE) {
        printf("Failed to add to queue set\n");
    }

...

    //Block on queue set
    xQueueSetMemberHandle member = xQueueSelectFromSet(queue_set, pdMS_TO_TICKS(1000));

    //Check if member is ring buffer
    if (member != NULL && xRingbufferCanRead(buf_handle, member) == pdTRUE) {
        //Member is ring buffer, receive item from ring buffer
        size_t item_size;
        char *item = (char *)xRingbufferReceive(buf_handle, &item_size, 0);

        //Handle item
        ...

    } else {
        ...
    }

具有静态分配的环形缓冲区

所述xRingbufferCreateStatic()可用于产生具有特定的内存要求(如环形缓冲器在外部RAM中被分配)环缓冲器。环形缓冲区使用的所有内存块必须事先手动分配,然后传递xRingbufferCreateStatic()给进行初始化,以作为环形缓冲区。这些块包括:

  • 环形缓冲区的数据结构类型 StaticRingbuffer_t
  • 环形缓冲区的存储区域为size xBufferSize。请注意,xBufferSize对于非拆分/允许拆分的缓冲区,必须为32位对齐。

分配这些块的方式将取决于用户要求(例如,静态声明所有块,或使用特定功能(例如外部RAM)动态分配所有块)。

注解

该CONFIG_FREERTOS_SUPPORT_STATIC_ALLOCATION选项必须启用menuconfig的对静态分配的缓冲区环是可用的。

删除通过创建的环形缓冲区时xRingbufferCreateStatic(),该函数vRingbufferDelete()将不会释放任何存储块。vRingbufferDelete()调用后必须由用户手动完成。

 

下面的代码片段演示了环形缓冲区完全在外部RAM中分配。

#include "freertos/ringbuf.h"
#include "freertos/semphr.h"
#include "esp_heap_caps.h"

#define BUFFER_SIZE     400      //32-bit aligned size
#define BUFFER_TYPE     RINGBUF_TYPE_NOSPLIT
...

//Allocate ring buffer data structure and storage area into external RAM
StaticRingbuffer_t *buffer_struct = (StaticRingbuffer_t *)heap_caps_malloc(sizeof(StaticRingbuffer_t), MALLOC_CAP_SPIRAM);
uint8_t *buffer_storage = (uint8_t *)heap_caps_malloc(sizeof(uint8_t)*BUFFER_SIZE, MALLOC_CAP_SPIRAM);

//Create a ring buffer with manually allocated memory
RingbufHandle_t handle = xRingbufferCreateStatic(BUFFER_SIZE, BUFFER_TYPE, buffer_storage, buffer_struct);

...

//Delete the ring buffer after used
vRingbufferDelete(handle);

//Manually free all blocks of memory
free(buffer_struct);
free(buffer_storage);

 

环形缓冲区API参考

注解

理想情况下,环形缓冲区可以以SMP方式与多个任务一起使用,其中始终优先为优先级最高的任务提供服务。但是,由于在环形缓冲区的基础实现中使用了二进制信号量,因此在非常特殊的情况下可能发生优先级倒置。

环形缓冲区控制着二进制信号量的发送,二进制信号量是在环形缓冲区上的空间释放时给出的。等待发送的最高优先级任务将反复获取信号量,直到有足够的可用空间可用或超时为止。理想情况下,这应避免为任何优先级较低的任务提供服务,因为信号量应始终分配给优先级最高的任务。

但是,在获取信号量的两次迭代之间,关键部分中存在间隙,这可能允许另一个任务(在另一个内核上或具有更高的优先级)释放环形缓冲区上的某些空间,从而提供信号量。因此,将在优先级最高的任务可以重新获取信号之前给出信号。这将导致信号量被等待发送的第二高优先级任务获取,从而导致优先级倒置。

如果同时使用环形缓冲区的任务数量很少,并且环形缓冲区未在最大容量附近运行,则这种副作用不会严重影响环形缓冲区的性能。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐