消息队列
简介
消息队列是另外一种任务间通信的机制,它可以用于发送不定长消息的场合。消息队列也有缓冲区(消息队列资源池),可以缓存一定数量的消息。消息队列没满的情况下,可以一直往消息队列里面发送消息,消息队列满了可以选择超时等待;消息队列有消息的情况下,可以从消息队列里面接收消息,如果没有消息,可以选择超时等待。消息队列发送的消息内容长度可以是任意的(其最大长度可以在消息队列初始化时设置),消息队列在发送时会将整个消息内容复制到消息队列的缓冲区中,接收消息时会把消息队列缓冲区中的消息内容复制到接收端指定的地址。消息队列和邮箱类似,但是也有不同,消息队列可以发送不定长的数据,但是邮箱发送的数据长度固定且长度较小,邮箱效率更高。
消息队列实现原理
消息队列有两个任务阻塞队列,因为消息发送和接收都有可能导致阻塞。当没有消息时,就会导致接收消息任务阻塞,任务被放到阻塞队列,等待另一个任务发送消息,阻塞任务被唤醒,并放到就绪队列;当消息队列满了,就会导致发送消息任务阻塞,后续的处理过程和接收消息类似。另外,消息队列还有两个资源池相关的队列,一个是缓存消息队列,这个队列用于管理缓存的消息块(这些消息块包含的消息还没有被读取),有效消息头指针和尾指针记录第一个缓存消息和最后一个缓存消息,便于找到读取和写入消息的位置,一个是空闲消息队列,这个队列用于管理空闲的消息块,空闲消息块指针记录第一个空闲消息块,便于找到保存新消息的位置。
下图描述了任务接收消息被阻塞,然后等待另一个任务发送消息的处理过程。
图中(1),任务1先运行
图中(2),任务1获取消息,由于此时没有消息,获取失败
图中(3),任务1被放到阻塞队列
图中(4),任务2运行
图中(5),任务2发送消息
图中(6),任务2发送的消息,通过空闲消息块指针和消息尾指针找到位置保存
图中(7),任务1被放到就绪队列
图中(8),任务1运行,通过消息头指针读取消息
消息队列实现中断和任务的通信
消息队列也可以用于中断和任务间的通信,和邮箱类似,此处不详述。
重要定义及数据结构
消息队列宏定义
宏定义了阻塞任务的唤醒顺序。
#define OS_MQ_WAKE_TYPE_PRIO 0x55
#define OS_MQ_WAKE_TYPE_FIFO 0xAA
消息队列宏 | 说明 |
---|---|
OS_MQ_WAKE_TYPE_PRIO | 按优先级唤醒 |
OS_MQ_WAKE_TYPE_FIFO | 按FIFO唤醒 |
消息队列控制块结构体
struct os_mq
{
void *msg_pool; /* Point to message pool. */
os_mq_msg_t *msg_queue_head; /* Point to first entry of message queue. */
os_mq_msg_t *msg_queue_tail; /* Point to last entry of message queue. */
os_mq_msg_t *msg_queue_free; /* Point to first entry of free message resource. */
os_list_node_t send_task_list_head; /* Sender tasks blocked on this messages queue. */
os_list_node_t recv_task_list_head; /* Receiver tasks blocked on this messages queue. */
os_list_node_t resource_node; /* Node in resource list */
os_size_t max_msg_size; /* Max message size of each message. */
os_uint16_t queue_depth; /* Maximum number of messages that can be put into message queue. */
os_uint16_t entry_count; /* Number of messages queued. */
os_uint8_t object_inited; /* Indicates whether object is inited or deinited, value is
OS_KOBJ_INITED or OS_KOBJ_DEINITED */
os_uint8_t object_alloc_type; /* Indicates whether object is allocated dynamically or statically,
value is OS_KOBJ_ALLOC_TYPE_STATIC or OS_KOBJ_ALLOC_TYPE_DYNAMIC */
os_uint8_t wake_type; /* The type to wake up blocking tasks, value is OS_MQ_WAKE_TYPE_PRIO
or OS_MQ_WAKE_TYPE_FIFO */
char name[OS_NAME_MAX + 1]; /* Message queue name. */
};
消息队列控制块成员变量 | 说明 |
---|---|
msg_pool | 消息队列资源池指针,指向消息队列资源池起始地址 |
msg_queue_head | 消息头指针,指向消息队列第一条消息 |
msg_queue_tail | 消息尾指针,指向消息队列最后一条消息 |
msg_queue_free | 空闲消息块指针,指向第一条空闲消息块 |
send_task_list_head | 消息发送任务阻塞队列头,发送消息时消息队列没有空闲消息块时将发送任务阻塞在该队列上 |
recv_task_list_head | 消息接收任务阻塞队列头,接收消息时消息队列没有消息时将接收任务阻塞在该队列上 |
resource_node | 资源管理节点,通过该节点将创建的消息队列挂载到gs_os_mq_resource_list_head上 |
max_msg_size | 最大消息大小 |
queue_depth | 消息队列深度 |
entry_count | 消息队列中消息数 |
object_inited | 初始化状态,0x55表示已经初始化,0xAA表示已经去初始化,其他值为未初始化 |
object_alloc_type | 消息队列类型,0为静态消息队列,1为动态消息队列 |
wake_type | 阻塞任务唤醒方式,0x55表示按优先级唤醒,0xAA表示按FIFO唤醒。可以通过属性设置接口进行设置 |
name | 消息队列名字,名字长度不能大于OS_NAME_MAX |
API列表
接口 | 说明 |
---|---|
os_mq_init | 以静态方式创建消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都由使用者提供 |
os_mq_deinit | 对消息队列去初始化,与os_mq_init()匹配使用 |
os_mq_create | 以动态方式创建并初始化消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都是通过动态申请内存的方式获得 |
os_mq_destroy | 销毁消息队列,唤醒所有等待任务,释放消息对象的空间和消息缓冲区的空间,与os_mq_create()匹配使用 |
os_mq_send | 发送消息,消息内容会被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务 |
os_mq_send_urgent | 发送紧急消息,会把当前消息加入到消息队列头以便尽快处理,消息内容被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务 |
os_mq_recv | 接收消息,当前消息队列为空且需要等待时,会阻塞当前接收任务 |
os_mq_set_wake_type | 对于阻塞在消息队列下的任务,设置唤醒阻塞任务的类型 |
os_mq_reset | 复位消息队列,使消息队列达到初始状态 |
os_mq_is_empty | 查询消息队列是否为空 |
os_mq_is_full | 查询消息队列是否为满 |
os_mq_get_queue_depth | 获取消息队列深度 |
os_mq_get_used_entry_count | 获取消息队列中消息数量 |
os_mq_get_unused_entry_count | 获取消息队列中空闲资源数量 |
os_mq_init
该函数以静态方式创建消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都由使用者提供,函数原型如下:
os_err_t os_mq_init(os_mq_t *mq,
const char *name,
void *msg_pool,
os_size_t msg_pool_size,
os_size_t msg_size);
参数 | 说明 |
---|---|
mq | 消息队列控制块,由用户提供,并指向对应的消息队列控制块内存地址 |
name | 消息队列名字,其最大长度由OS_NAME_MAX 宏指定,多余部分会被自动截掉 |
msg_pool | 消息队列缓冲区的起始地址 |
msg_pool_size | 消息队列缓冲区的大小,以byte为单位 |
msg_size | 每个消息的最大长度 |
os_mq_deinit
该函数用于对消息队列去初始化,与os_mq_init()匹配使用,函数原型如下:
os_err_t os_mq_deinit(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
OS_EOK | 去初始化消息队列成功 |
os_mq_create
该函数以动态方式创建并初始化消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都是通过动态申请内存的方式获得,函数原型如下:
os_mq_t *os_mq_create(const char *name, os_size_t msg_size, os_size_t max_msgs);
参数 | 说明 |
---|---|
name | 消息队列名字,其最大长度由OS_NAME_MAX 宏指定,多余部分会被自动截掉 |
msg_size | 每个消息的最大长度 |
max_msgs | 最大消息个数 |
返回 | 说明 |
非OS_NULL | 消息队列创建成功 |
OS_NULL | 消息队列创建失败 |
os_mq_destroy
该函数用于销毁消息队列,唤醒所有等待任务,释放消息对象的空间和消息缓冲区的空间,与os_mq_create()匹配使用,函数原型如下:
os_err_t os_mq_destroy(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
OS_EOK | 销毁消息队列成功 |
os_mq_send
该函数用于发送消息,消息内容会被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务,函数原型如下:
os_err_t os_mq_send(os_mq_t *mq, void *buffer, os_size_t buff_size, os_tick_t timeout);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
buffer | 待发送的消息的地址 |
buff_size | 此消息的长度 |
timeout | 消息暂时不能发送的等待超时时间。若为OS_NO_WAIT,则等待时间为0;若为OS_WAIT_FOREVER,则永久等待直到消息发送;若为其它值,则等待timeout时间或者消息发送为止,并且其他值时timeout必须小于OS_TICK_MAX / 2 |
返回 | 说明 |
OS_EOK | 消息发送成功 |
OS_EFULL | 不等待且消息未发送 |
OS_ETIMEOUT | 等待超时且消息未发送 |
OS_ERROR | 其他错误 |
os_mq_send_urgent
该函数用于发送紧急消息,会把当前消息加入到消息队列头以便尽快处理,消息内容被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务,函数原型如下:
os_err_t os_mq_send_urgent(os_mq_t *mq, void *buffer, os_size_t buff_size, os_tick_t timeout);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
buffer | 待发送的消息的地址 |
buff_size | 此消息的长度 |
timeout | 消息暂时不能发送的等待超时时间。若为OS_NO_WAIT,则等待时间为0;若为OS_WAIT_FOREVER,则永久等待直到消息发送;若为其它值,则等待timeout时间或者消息发送为止,并且其他值时timeout必须小于OS_TICK_MAX / 2 |
返回 | 说明 |
OS_EOK | 消息发送成功 |
OS_EFULL | 不等待且消息未发送 |
OS_ETIMEOUT | 等待超时且消息未发送 |
OS_ERROR | 其他错误 |
os_mq_recv
该函数用于接收消息,当前消息队列为空且需要等待时,会阻塞当前接收任务,函数原型如下:
os_err_t os_mq_recv(os_mq_t *mq,
void *buffer,
os_size_t buff_size,
os_tick_t timeout,
os_size_t *recv_msg_size);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
buffer | 保存接收消息的地址 |
buff_size | 保存接收消息的空间大小 |
timeout | 消息暂时接收不到时的等待超时时间。若为OS_NO_WAIT,则等待时间为0;若为OS_WAIT_FOREVER,则永久等待直到接收到消息;若为其它值,则等待timeout时间或者接收到消息为止,并且其他值时timeout必须小于OS_TICK_MAX / 2 |
recved_set | 接收到的消息的实际长度 |
返回 | 说明 |
OS_EOK | 接收消息成功 |
OS_EEMPTY | 不等待且未接收到消息 |
OS_ETIMEOUT | 等待超时未接收到消息 |
OS_ERROR | 其他错误 |
os_mq_set_wake_type
该函数对于阻塞在消息队列下的任务,设置唤醒阻塞任务的类型,函数原型如下:
os_err_t os_mq_set_wake_type(os_mq_t *mq, os_uint8_t wake_type);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
wake_type | OS_MQ_WAKE_TYPE_PRIO 设置唤醒阻塞任务的类型为按优先级唤醒(消息队列创建后默认为使用此方式),OS_MQ_WAKE_TYPE_FIFO 设置唤醒阻塞任务的类型为先进先出唤醒 |
返回 | 说明 |
OS_EOK | 设置唤醒阻塞任务类型成功 |
OS_EBUSY | 设置唤醒阻塞任务类型失败 |
os_mq_reset
复位消息队列,使消息队列达到初始状态,函数原型如下:
void os_mq_reset(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
无 | 无 |
os_mq_is_empty
该函数用于查询消息队列是否为空,函数原型如下:
os_bool_t os_mq_is_empty(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
OS_TRUE | 消息队列为空 |
OS_FALSE | 消息队列不为空 |
os_mq_is_full
该函数用于查询消息队列是否为满,函数原型如下:
os_bool_t os_mq_is_full(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
OS_TRUE | 消息队列为满 |
OS_FALSE | 消息队列不为满 |
os_mq_get_queue_depth
该函数用于获取消息队列深度,函数原型如下:
os_uint16_t os_mq_get_queue_depth(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
os_uint16_t | 消息队列深度 |
os_mq_get_used_entry_count
该函数用于获取消息队列中消息数量,函数原型如下:
os_uint16_t os_mq_get_used_entry_count(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
os_uint16_t | 消息队列中消息数量 |
os_mq_get_unused_entry_count
该函数用于获取消息队列中空闲资源数量,函数原型如下:
os_uint16_t os_mq_get_unused_entry_count(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列控制块 |
返回 | 说明 |
os_uint16_t | 消息队列中空闲资源数量 |
配置选项
OneOS在使用消息队列时提供了功能裁剪的配置,具体配置如下图所示:
配置项 | 说明 |
---|---|
Enable message queue | 使能消息队列功能,如果不使能该功能,消息队列相关的源代码就不会编译,默认使能 |
使用示例
静态消息队列使用示例
本例使用静态方式初始化消息队列,然后创建一个任务发送消息,另外一个任务接收消息
#include <oneos_config.h>
#include <dlog.h>
#include <os_errno.h>
#include <os_task.h>
#include <shell.h>
#include <string.h>
#include <os_mq.h>
#define TEST_TAG "TEST"
#define TASK_STACK_SIZE 1024
#define TASK1_PRIORITY 15
#define TASK2_PRIORITY 16
#define MQ_MAX_MSG 5
#define MSG_SIZE 24
#define MQ_POLL_SIZE (MQ_MAX_MSG * (MSG_SIZE + sizeof(os_mq_msg_hdr_t)))
#define STR_NUM 4
static char mq_pool[MQ_POLL_SIZE];
static os_mq_t mq_static;
char *str[STR_NUM] = {
"hello, world",
"it's a new day",
"it's a nice day",
"it's a wonderful day"
};
void task1_entry(void *para)
{
os_uint32_t i = 0;
for (i = 0; i < STR_NUM; i++)
{
LOG_W(TEST_TAG, "task1 send str:%s, len:%d", str[i], strlen(str[i]) + 1);
if(OS_EOK == os_mq_send(&mq_static, str[i], strlen(str[i]) + 1, OS_WAIT_FOREVER))
{
LOG_W(TEST_TAG, "task1 send OK");
}
else
{
LOG_W(TEST_TAG, "task1 send err");
}
os_task_msleep(100);
}
}
void task2_entry(void *para)
{
os_size_t recv_size = 0;
char recv_str[MSG_SIZE];
while (1)
{
if (OS_EOK == os_mq_recv(&mq_static, &recv_str[0], MSG_SIZE, OS_WAIT_FOREVER, &recv_size))
{
LOG_W(TEST_TAG, "task2 recv str:%s, len:%d", recv_str, recv_size);
}
}
}
void msgqueue_static_sample(void)
{
os_task_t *task1 = OS_NULL;
os_task_t *task2 = OS_NULL;
if(OS_EOK != os_mq_init(&mq_static, "msgqueue_static", &mq_pool[0], MQ_POLL_SIZE, MSG_SIZE))
{
LOG_W(TEST_TAG, "msgqueue_static_sample msgqueue init ERR");
return;
}
task1 = os_task_create("task1",
task1_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK1_PRIORITY);
if (task1)
{
LOG_W(TEST_TAG, "msgqueue_static_sample startup task1");
os_task_startup(task1);
}
task2 = os_task_create("task2",
task2_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK2_PRIORITY);
if (task2)
{
LOG_W(TEST_TAG, "msgqueue_static_sample startup task2");
os_task_startup(task2);
}
}
SH_CMD_EXPORT(static_msgqueue, msgqueue_static_sample, "test staitc msgqueue");
运行结果如下:
sh>static_msgqueue
W/TEST: msgqueue_static_sample startup task1
W/TEST: task1 send str:hello, world, len:13
W/TEST: task1 send OK
W/TEST: msgqueue_static_sample startup task2
W/TEST: task2 recv str:hello, world, len:13
sh>W/TEST: task1 send str:it's a new day, len:15
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a new day, len:15
W/TEST: task1 send str:it's a nice day, len:16
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a nice day, len:16
W/TEST: task1 send str:it's a wonderful day, len:21
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a wonderful day, len:21
动态消息队列使用示例
本例使用动态方式创建并初始化了消息队列,然后创建一个任务发送消息,另外一个任务接收消息
#include <oneos_config.h>
#include <dlog.h>
#include <os_errno.h>
#include <os_task.h>
#include <shell.h>
#include <string.h>
#include <os_memory.h>
#include <os_mq.h>
#define TEST_TAG "TEST"
#define TASK_STACK_SIZE 1024
#define TASK1_PRIORITY 15
#define TASK2_PRIORITY 16
#define MQ_MAX_MSG 10
#define TEST_NAME_MAX 16
#define STUDENT_NUM 5
static os_mq_t *mq_dynamic;
struct student_score
{
char name[TEST_NAME_MAX];
os_uint32_t score;
};
void task1_entry(void *para)
{
os_uint32_t i = 0;
struct student_score student_data;
char *name[STUDENT_NUM] = {"xiaoming", "xiaohua", "xiaoqiang", "xiaoli", "xiaofang"};
os_uint32_t score[STUDENT_NUM] = {80, 85, 90, 95, 96};
for (i = 0; i < STUDENT_NUM; i++)
{
memset(student_data.name, 0, TEST_NAME_MAX);
strncpy(student_data.name, name[i], TEST_NAME_MAX);
student_data.score = score[i];
if(OS_EOK == os_mq_send(mq_dynamic, &student_data, sizeof(struct student_score), OS_WAIT_FOREVER))
{
LOG_W(TEST_TAG, "task1 send -- name:%s score:%d", student_data.name, student_data.score);
}
os_task_msleep(100);
}
}
void task2_entry(void *para)
{
struct student_score student_data;
os_size_t recv_size = 0;
while (1)
{
if (OS_EOK == os_mq_recv(mq_dynamic, &student_data, sizeof(struct student_score), OS_WAIT_FOREVER, &recv_size))
{
LOG_W(TEST_TAG, "task2 recv -- name:%s score:%d", student_data.name, student_data.score);
}
}
}
void msgqueue_dynamic_sample(void)
{
os_task_t *task1 = OS_NULL;
os_task_t *task2 = OS_NULL;
mq_dynamic = os_mq_create("mailbox_dynamic", sizeof(struct student_score), MQ_MAX_MSG);
if (!mq_dynamic)
{
LOG_W(TEST_TAG, "msgqueue_dynamic_sample msgqueue create ERR");
}
task1 = os_task_create("task1",
task1_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK1_PRIORITY);
if (task1)
{
LOG_W(TEST_TAG, "msgqueue_dynamic_sample startup task1");
os_task_startup(task1);
}
os_task_msleep(200);
task2 = os_task_create("task2",
task2_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK2_PRIORITY);
if (task2)
{
LOG_W(TEST_TAG, "msgqueue_dynamic_sample startup task2");
os_task_startup(task2);
}
}
SH_CMD_EXPORT(dynamic_msgqueue, msgqueue_dynamic_sample, "test dynamic msgqueue");
运行结果如下:
sh>dynamic_msgqueue
W/TEST: msgqueue_dynamic_sample startup task1
W/TEST: task1 send -- name:xiaoming score:80
W/TEST: task1 send -- name:xiaohua score:85
W/TEST: task1 send -- name:xiaoqiang score:90
W/TEST: msgqueue_dynamic_sample startup task2
W/TEST: task2 recv -- name:xiaoming score:80
W/TEST: task2 recv -- name:xiaohua score:85
W/TEST: task2 recv -- name:xiaoqiang score:90
sh>W/TEST: task1 send -- name:xiaoli score:95
W/TEST: task2 recv -- name:xiaoli score:95
W/TEST: task1 send -- name:xiaofang score:96
W/TEST: task2 recv -- name:xiaofang score:96