消息队列一
1. 消息队列简介
消息队列可以在任务与任务、任务与中断之间传递消息,队列可以保存有限个具有确定长度的数据单元。队列可保存的最大单元数目被称为队列的长度,在队列创建时需要指定其长度和每个单元(队列项或消息)的大小。通常情况下,队列被作为FIFO(先进先出)使用,即数据由队尾写入,从队首读出。当然由队列首写入也是可能的
队列有如下特性:
- 数据存储:往队列写入数据是通过字节拷贝把数据复制存储到队列中;从队列读出数据使得把队列中的数据拷贝删除
- 可被多任务存取:队列不是属于某个特别指定的任务的,任何任务都可以向队列中发送消息,或者从队列中提取消息
- 读队列时阻塞:当某个任务试图读一个队列时,其可以指定一个阻塞超时时间。在这段时间中,若队列为空,该任务将保持阻塞状态以等待队列数据有效。当其他任务或中断服务程序往其等待的队列中写入了数据,该任务将自动由阻塞态转移为就绪态。当等待的时间超过了指定的阻塞时间,即使队列中尚无有效数据,任务也会自动由阻塞态转移为就绪态
- 写队列时阻塞:任务也可以在写队列时指定一个阻塞超时时间。这个时间是当被写队列已满时,任务进入阻塞态以等待队列空间有效的最长时间
队列读写过程如下图所示
队列的结构体 Queue_t 如下示,该结构体在queue.c文件中定义:
typedef struct QueueDefinition
{
int8_t *pcHead;
int8_t *pcTail;
int8_t *pcWriteTo;
union
{
int8_t *pcReadFrom;
UBaseType_t uxRecursiveCallCount;
} u;
List_t xTasksWaitingToSend;
List_t xTasksWaitingToReceive;
volatile UBaseType_t uxMessagesWaiting;
UBaseType_t uxLength;
UBaseType_t uxItemSize;
volatile int8_t cRxLock;
volatile int8_t cTxLock;
#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated;
#endif
#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition *pxQueueSetContainer;
#endif
#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType;
#endif
} xQUEUE;
typedef xQUEUE Queue_t;
|
2. 消息队列的函数应用
2.1 创建消息队列
在使用队列之前必须先创建队列,有动态和静态创建队列两种方法。以下四个函数原型在queue.c文件中定义
QueueHandle_t xQueueCreate(UBaseType_t uxQueueLength,
UBaseType_t uxItemSize);
QueueHandle_t xQueueGenericCreate(const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
const uint8_t ucQueueType);
QueueHandle_t xQueueCreateStatic(UBaseType_t uxQueueLength,
UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
StaticQueue_t *pxStaticQueue);
QueueHandle_t xQueueGenericCreateStatic(const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
StaticQueue_t *pxStaticQueue,
const uint8_t ucQueueType);
返回值:创建成功返回队列句柄;失败返回NULL
|
动态创建队列最终会调用xQueueGenericCreate()函数,下面来分析该源码(静态创建类似,不做分析)
QueueHandle_t xQueueGenericCreate(const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
const uint8_t ucQueueType ){
Queue_t *pxNewQueue;
size_t xQueueSizeInBytes;
uint8_t *pucQueueStorage;
if( uxItemSize == ( UBaseType_t ) 0 ){
xQueueSizeInBytes = ( size_t ) 0;
}
else{
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
}
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );
if( pxNewQueue != NULL ){
pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
return pxNewQueue;
}
|
队列初始化函数prvInitialiseNewQueue()源码分析
static void prvInitialiseNewQueue(const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
const uint8_t ucQueueType,
Queue_t *pxNewQueue){
( void ) ucQueueType;
if(uxItemSize == ( UBaseType_t ) 0){
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else{
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
}
|
队列复位函数xQueueGenericReset源码分析
BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue){
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
taskENTER_CRITICAL();
{
pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
pxQueue->pcWriteTo = pxQueue->pcHead;
pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
pxQueue->cRxLock = queueUNLOCKED;
pxQueue->cTxLock = queueUNLOCKED;
if( xNewQueue == pdFALSE ){
if(listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToSend))==pdFALSE){
if(xTaskRemoveFromEventList(&(pxQueue->xTasksWaitingToSend))!=pdFALSE){
queueYIELD_IF_USING_PREEMPTION();
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
else{
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
}
taskEXIT_CRITICAL();
return pdPASS;
}
|
2.2 向队列发送消息
BaseType_t xQueueSend(QueueHandle_t xQueue,
const void *pvItemToQueue,
TickType_t xTicksToWait);
BaseType_t xQueueSendToBack(QueueHandle_t xQueue,
const void *pvItemToQueue,
TickType_t xTicksToWait);
BaseType_t xQueueSendToFront(QueueHandle_t xQueue,
const void *pvItemToQueue,
TickType_t xTicksToWait);
BaseType_t xQueueOverwrite(QueueHandle_t xQueue,
const void *pvItemToQueue);
BaseType_t xQueueGenericSend(QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition);
返回值:发送消息成功,返回pdPASS;队列满消息发送失败,返回errQUEUE_FULL
|
任务级入队函数最终都是调用xQueueGenericSend()函数,下面来分析该函数源码
BaseType_t xQueueGenericSend(QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition){
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
for( ;; ){
taskENTER_CRITICAL();
{
if((pxQueue->uxMessagesWaiting < pxQueue->uxLength)||(xCopyPosition == queueOVERWRITE)) {
xYieldRequired = prvCopyDataToQueue(pxQueue, pvItemToQueue, xCopyPosition);
#if ( configUSE_QUEUE_SETS == 1 )
{ }
#else
{
if(listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToReceive)) == pdFALSE){
if(xTaskRemoveFromEventList(&(pxQueue->xTasksWaitingToReceive)) != pdFALSE){
queueYIELD_IF_USING_PREEMPTION();
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
else if( xYieldRequired != pdFALSE){
queueYIELD_IF_USING_PREEMPTION();
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
#endif
taskEXIT_CRITICAL();
return pdPASS;
}
else{
if( xTicksToWait == ( TickType_t ) 0 ){
taskEXIT_CRITICAL();
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
else if( xEntryTimeSet == pdFALSE ){
vTaskSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL();
vTaskSuspendAll();
prvLockQueue( pxQueue );
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE){
if( prvIsQueueFull( pxQueue ) != pdFALSE ){
traceBLOCKING_ON_QUEUE_SEND( pxQueue );
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );
prvUnlockQueue( pxQueue );
if( xTaskResumeAll() == pdFALSE ){
portYIELD_WITHIN_API();
}
}
else{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
}
}
|
BaseType_t xQueueSendFromISR(QueueHandle_t xQueue,
const void * pvItemToQueue,
BaseType_t * pxHigherPriorityTaskWoken);
BaseType_t xQueueSendToBackFromISR(QueueHandle_t xQueue,
const void * pvItemToQueue,
BaseType_t * pxHigherPriorityTaskWoken);
BaseType_t xQueueSendToFrontFromISR(QueueHandle_t xQueue,
const void * pvItemToQueue,
BaseType_t * pxHigherPriorityTaskWoken);
BaseType_t xQueueOverwriteFromISR(QueueHandle_t xQueue,
const void * pvItemToQueue,
BaseType_t * pxHigherPriorityTaskWoken);
BaseType_t xQueueGenericSendFromISR(QueueHandle_t xQueue,
const void * pvItemToQueue,
BaseType_t * pxHigherPriorityTaskWoken,
BaseType_t xCopyPosition);
返回值:发送消息成功,返回pdPASS;队列满消息发送失败,返回errQUEUE_FULL
|
中断级入队函数最终都是调用xQueueGenericSendFromISR()函数,下面来分析该函数源码
BaseType_t xQueueGenericSendFromISR(QueueHandle_t xQueue,
const void * const pvItemToQueue,
BaseType_t * const pxHigherPriorityTaskWoken,
const BaseType_t xCopyPosition){
BaseType_t xReturn;
UBaseType_t uxSavedInterruptStatus;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
const int8_t cTxLock = pxQueue->cTxLock;
( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
if( cTxLock == queueUNLOCKED ){
#if ( configUSE_QUEUE_SETS == 1 )
{}
#else
{
if(listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToReceive) == pdFALSE){
if(xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive))!= pdFALSE){
if( pxHigherPriorityTaskWoken != NULL ){
*pxHigherPriorityTaskWoken = pdTRUE;
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
#endif
}
else{
pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
}
xReturn = pdPASS;
}
else{
traceQUEUE_SEND_FROM_ISR_FAILED( pxQueue );
xReturn = errQUEUE_FULL;
}
}
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
return xReturn;
}
|
2.3 队列上锁和解锁
- 队列上锁 prvLockQueue():本质上是一个宏,定义如下
#define prvLockQueue( pxQueue )
taskENTER_CRITICAL();
{
if( ( pxQueue )->cRxLock == queueUNLOCKED )
{
( pxQueue )->cRxLock = queueLOCKED_UNMODIFIED;
}
if( ( pxQueue )->cTxLock == queueUNLOCKED )
{
( pxQueue )->cTxLock = queueLOCKED_UNMODIFIED;
}
}
taskEXIT_CRITICAL()
|
static void prvUnlockQueue( Queue_t * const pxQueue ){
taskENTER_CRITICAL();
{
int8_t cTxLock = pxQueue->cTxLock;
while( cTxLock > queueLOCKED_UNMODIFIED ){
#if ( configUSE_QUEUE_SETS == 1 )
{}
#else
{
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ){
if(xTaskRemoveFromEventList(&(pxQueue->xTasksWaitingToReceive)) != pdFALSE){
vTaskMissedYield();
}
else{
mtCOVERAGE_TEST_MARKER();
}
}
else{
break;
}
}
#endif
--cTxLock;
}
pxQueue->cTxLock = queueUNLOCKED;
}
taskEXIT_CRITICAL();
taskENTER_CRITICAL();
{
int8_t cRxLock = pxQueue->cRxLock;
while( cRxLock > queueLOCKED_UNMODIFIED ){
if(listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToSend)) == pdFALSE){
if(xTaskRemoveFromEventList(&(pxQueue->xTasksWaitingToSend)) != pdFALSE){
vTaskMissedYield();
}
else{
mtCOVERAGE_TEST_MARKER();
}
--cRxLock;
}
else{
break;
}
}
pxQueue->cRxLock = queueUNLOCKED;
}
taskEXIT_CRITICAL();
}
|
2.4 从队列读取消息
BaseType_t xQueueReceive(QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait);
BaseType_t xQueuePeek(QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait);
BaseType_t xQueueGenericReceive(QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait,
const BaseType_t xJustPeeking);
返回值:读取数据成功,返回pdTRUE;读取失败,返回pdFALSE
|
任务级出队函数最终都是调用xQueueGenericReceive()函数,出队与入队的源码流程差不多,在此不再赘述
BaseType_t xQueueReceiveFromISR(QueueHandle_t xQueue,
void * const pvBuffer,
BaseType_t * pxTaskWoken);
BaseType_t xQueuePeekFromISR(QueueHandle_t xQueue,
void * const pvBuffer,
返回值:读取数据成功,返回pdTRUE;读取失败,返回pdFALSE
|
|