消息队列
简介
消息队列是另外一种任务间通信的机制,它可以用于发送不定长消息的场合。消息队列有缓冲区,可以缓存一定数量的消息。消息队列没满的情况下,可以一直往消息队列里面发送消息,消息队列满了可以选择超时等待;消息队列有消息的情况下,可以从消息队列里面接收消息,如果没有消息,可以选择超时等待。
消息队列发送的消息内容长度可以是任意的(其最大长度可以在消息队列初始化时设置),消息队列在发送时会将整个消息内容复制到消息队列的缓冲区中,接收消息时会把消息队列缓冲区中的消息内容复制到接收端指定的地址。
重要定义及数据结构
结构体
struct os_mq
{
struct os_ipc_object parent; /* inherit from ipc_object */
void *msg_pool; /* message pool */
os_uint16_t msg_size; /* message size of each message */
os_uint16_t max_msgs; /* max number of messages */
os_uint16_t entry; /* index of messages in the queue */
void *msg_queue_head; /* list head */
void *msg_queue_tail; /* list tail */
void *msg_queue_free; /* pointer indicated the free node of queue */
os_list_node_t suspend_sender_task; /* sender task suspended on this messages queue */
};
struct os_mq重要参数 | 说明 |
---|---|
msg_pool | 消息队列缓冲区的起始地址 |
msg_size | 每个消息的最大长度 |
max_msgs | 最大消息个数 |
entry | 消息队列中当前的消息个数 |
msg_queue_head | 消息队列链表头 |
msg_queue_tail | 消息队列链表尾 |
msg_queue_free | 消息队列空闲链表 |
suspend_sender_task | 阻塞在消息队列上的发送任务 |
API列表
接口 | 说明 |
---|---|
os_mq_init | 以静态方式创建消息队列,消息队列对象由使用者提供 |
os_mq_deinit | 消息队列反初始化,与os_mq_init()匹配使用 |
os_mq_create | 以动态方式创建并初始化消息队列,即消息队列对象采用动态申请内存的方式申请 |
os_mq_destroy | 销毁消息队列,与os_mq_create()匹配使用 |
os_mq_send | 发送消息 |
os_mq_send_urgent | 发送紧急消息 |
os_mq_recv | 接收消息 |
os_mq_control | 控制或更改消息队列的行为属性 |
os_mq_init
该函数以静态方式创建消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都由使用者提供,函数原型如下:
os_err_t os_mq_init(os_mq_t *mq,
const char *name,
void *msg_pool,
os_size_t msg_pool_size,
os_size_t msg_size,
os_ipc_flag_t flag);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
name | 消息队列名字 |
msg_pool | 消息队列缓冲区的起始地址 |
msg_pool_size | 消息队列缓冲区的大小,以byte为单位 |
msg_size | 每个消息的最大长度 |
flag | 标志,可以取OS_IPC_FLAG_FIFO和OS_IPC_FLAG_PRIO;当取值为OS_IPC_FLAG_FIFO时,等待的任务将按照先进先出的方式排队,先进入的任务先被唤醒;当取值为OS_IPC_FLAG_PRIO时,等待的任务将按照任务优先级排队,优先级高的任务先被唤醒 |
os_mq_deinit
该函数用于对消息队列反初始化,与os_mq_init()匹配使用,函数原型如下:
os_err_t os_mq_deinit(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
返回 | 说明 |
OS_EOK | 反初始化成功 |
os_mq_create
该函数以动态方式创建并初始化消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都是通过动态申请内存的方式获得,函数原型如下:
os_mq_t *os_mq_create(const char *name,
os_size_t msg_size,
os_size_t max_msgs,
os_ipc_flag_t flag);
参数 | 说明 |
---|---|
name | 消息队列名字 |
msg_size | 每个消息的最大长度 |
max_msgs | 最大消息个数 |
flag | 标志,可以取OS_IPC_FLAG_FIFO和OS_IPC_FLAG_PRIO;当取值为OS_IPC_FLAG_FIFO时,等待的任务将按照先进先出的方式排队,先进入的任务先被唤醒;当取值为OS_IPC_FLAG_PRIO时,等待的任务将按照任务优先级排队,优先级高的任务先被唤醒 |
返回 | 说明 |
非OS_NULL | 创建成功,返回消息队列句柄 |
OS_NULL | 创建失败 |
os_mq_destroy
该函数用于销毁消息队列,唤醒所有等待任务,释放消息对象的空间和消息缓冲区的空间,与os_mq_create()匹配使用,函数原型如下:
os_err_t os_mq_destroy(os_mq_t *mq);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
返回 | 说明 |
OS_EOK | 销毁成功 |
os_mq_send
该函数用于发送消息,消息内容会被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务,函数原型如下:
os_err_t os_mq_send(os_mq_t *mq, void *buffer, os_size_t buff_size, os_tick_t timeout);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
buffer | 待发送的消息的地址 |
buff_size | 此消息的长度 |
timeout | 消息缓冲区已满时需要等待的超时时间;若为OS_IPC_WAITING_NO,则不等待直接返回OS_EFULL;若为OS_IPC_WAITING_FOREVER,则永久等待直到消息缓冲区有空余可用;若为其它值,则等待timeout时间或者直到消息缓冲区有空余可用 |
返回 | 说明 |
OS_EOK | 消息发送成功 |
OS_EFULL | 消息队列已满且不等待 |
OS_ETIMEOUT | 消息队列已满且等待超时 |
os_mq_send_urgent
该函数用于发送紧急消息,会把当前消息加入到消息队列头以便尽快处理,消息内容被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务,函数原型如下:
os_err_t os_mq_send_urgent(os_mq_t *mq, void *buffer, os_size_t buff_size, os_tick_t timeout);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
buffer | 待发送的消息的地址 |
buff_size | 此消息的长度 |
timeout | 消息缓冲区已满时需要等待的超时时间;若为OS_IPC_WAITING_NO,则不等待直接返回OS_EFULL;若为OS_IPC_WAITING_FOREVER,则永久等待直到消息缓冲区有空余可用;若为其它值,则等待timeout时间或者直到消息缓冲区有空余可用 |
返回 | 说明 |
OS_EOK | 消息发送成功 |
OS_EFULL | 消息队列已满且不等待 |
OS_ETIMEOUT | 消息队列已满且等待超时 |
os_mq_recv
该函数用于接收消息,当前消息队列为空且需要等待时,会阻塞当前接收任务,函数原型如下:
os_err_t os_mq_recv(os_mq_t *mq,
void *buffer,
os_size_t buff_size,
os_tick_t timeout,
os_size_t *recv_size);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
buffer | 保存接收消息的地址 |
buff_size | 保存接收消息的空间大小 |
timeout | 消息缓冲区为空时需要等待的超时时间;若为OS_IPC_WAITING_NO,则不等待直接返回OS_EEMPTY;若为OS_IPC_WAITING_FOREVER,则永久等待直到有消息可接收;若为其它值,则等待timeout时间或者直到有消息可接收 |
recv_size | 接收到的消息的实际长度 |
返回 | 说明 |
OS_EOK | 接收消息成功 |
OS_EEMPTY | 消息队列为空且不等待 |
OS_ETIMEOUT | 消息队列为空且等待超时 |
OS_ERROR | 错误,保存消息的空间大小比待接收的消息长度小 |
os_mq_control
该函数用于控制或者更改消息队列的行为属性,函数原型如下:
os_err_t os_mq_control(os_mq_t *mq, os_ipc_cmd_t cmd, void *arg);
参数 | 说明 |
---|---|
mq | 消息队列句柄 |
cmd | 控制命令,当前仅支持OS_IPC_CMD_RESET,会唤醒所有阻塞在该消息队列上的任务并清除消息队列的信息 |
arg | 参数,暂未使用 |
使用示例
静态消息队列使用示例
本例使用静态方式初始化消息队列,然后创建一个任务发送消息,另外一个任务接收消息
#include <oneos_config.h>
#include <os_dbg.h>
#include <os_errno.h>
#include <os_task.h>
#include <shell.h>
#include <string.h>
#include <os_mq.h>
#define TEST_TAG "TEST"
#define TASK_STACK_SIZE 1024
#define TASK1_PRIORITY 15
#define TASK2_PRIORITY 16
#define TASK_TIMESLICE 10
#define MQ_MAX_MSG 5
#define MSG_SIZE 24
#define MQ_POLL_SIZE (MQ_MAX_MSG * (MSG_SIZE + OS_MQ_MSG_HDR_LEN))
#define STR_NUM 4
static char mq_pool[MQ_POLL_SIZE];
static os_mq_t mq_static;
char *str[STR_NUM] = {
"hello, world",
"it's a new day",
"it's a nice day",
"it's a wonderful day"
};
void task1_entry(void *para)
{
os_uint32_t i = 0;
for (i = 0; i < STR_NUM; i++)
{
LOG_W(TEST_TAG, "task1 send str:%s, len:%d", str[i], strlen(str[i]) + 1);
if(OS_EOK == os_mq_send(&mq_static, str[i], strlen(str[i]) + 1, OS_IPC_WAITING_FOREVER))
{
LOG_W(TEST_TAG, "task1 send OK");
}
else
{
LOG_W(TEST_TAG, "task1 send err");
}
os_task_sleep(100);
}
}
void task2_entry(void *para)
{
os_size_t recv_size = 0;
char recv_str[MSG_SIZE];
while (1)
{
if (OS_EOK == os_mq_recv(&mq_static, &recv_str[0], MSG_SIZE, OS_IPC_WAITING_FOREVER, &recv_size))
{
LOG_W(TEST_TAG, "task2 recv str:%s, len:%d", recv_str, recv_size);
}
}
}
void msgqueue_static_sample(void)
{
os_task_t *task1 = OS_NULL;
os_task_t *task2 = OS_NULL;
if(OS_EOK != os_mq_init(&mq_static, "msgqueue_static", &mq_pool[0], MQ_POLL_SIZE, MSG_SIZE, OS_IPC_FLAG_FIFO))
{
LOG_W(TEST_TAG, "msgqueue_static_sample msgqueue init ERR");
return;
}
task1 = os_task_create("task1",
task1_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK1_PRIORITY,
TASK_TIMESLICE);
if (task1)
{
LOG_W(TEST_TAG, "msgqueue_static_sample startup task1");
os_task_startup(task1);
}
task2 = os_task_create("task2",
task2_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK2_PRIORITY,
TASK_TIMESLICE);
if (task2)
{
LOG_W(TEST_TAG, "msgqueue_static_sample startup task2");
os_task_startup(task2);
}
}
SH_CMD_EXPORT(static_msgqueue, msgqueue_static_sample, "test staitc msgqueue");
运行结果如下:
sh />static_msgqueue
W/TEST: msgqueue_static_sample startup task1
W/TEST: task1 send str:hello, world, len:13
W/TEST: task1 send OK
W/TEST: msgqueue_static_sample startup task2
W/TEST: task2 recv str:hello, world, len:13
W/TEST: task1 send str:it's a new day, len:15
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a new day, len:15
W/TEST: task1 send str:it's a nice day, len:16
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a nice day, len:16
W/TEST: task1 send str:it's a wonderful day, len:21
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a wonderful day, len:21
动态消息队列使用示例
本例使用动态方式创建并初始化了消息队列,然后创建一个任务发送消息,另外一个任务接收消息
#include <oneos_config.h>
#include <os_dbg.h>
#include <os_errno.h>
#include <os_task.h>
#include <shell.h>
#include <string.h>
#include <os_memory.h>
#include <os_mq.h>
#define TEST_TAG "TEST"
#define TASK_STACK_SIZE 1024
#define TASK1_PRIORITY 15
#define TASK2_PRIORITY 16
#define TASK_TIMESLICE 10
#define MQ_MAX_MSG 10
#define TEST_NAME_MAX 16
#define STUDENT_NUM 5
static os_mq_t *mq_dynamic;
struct student_score
{
char name[TEST_NAME_MAX];
os_uint32_t score;
};
void task1_entry(void *para)
{
os_uint32_t i = 0;
struct student_score student_data;
char *name[STUDENT_NUM] = {"xiaoming", "xiaohua", "xiaoqiang", "xiaoli", "xiaofang"};
os_uint32_t score[STUDENT_NUM] = {80, 85, 90, 95, 96};
for (i = 0; i < STUDENT_NUM; i++)
{
memset(student_data.name, 0, TEST_NAME_MAX);
strncpy(student_data.name, name[i], TEST_NAME_MAX);
student_data.score = score[i];
if(OS_EOK == os_mq_send(mq_dynamic, &student_data, sizeof(struct student_score), OS_IPC_WAITING_FOREVER))
{
LOG_W(TEST_TAG, "task1 send -- name:%s score:%d", student_data.name, student_data.score);
}
os_task_sleep(100);
}
}
void task2_entry(void *para)
{
struct student_score student_data;
os_size_t recv_size = 0;
while (1)
{
if (OS_EOK == os_mq_recv(mq_dynamic, &student_data, sizeof(struct student_score), OS_IPC_WAITING_FOREVER, &recv_size))
{
LOG_W(TEST_TAG, "task2 recv -- name:%s score:%d", student_data.name, student_data.score);
}
}
}
void msgqueue_dynamic_sample(void)
{
os_task_t *task1 = OS_NULL;
os_task_t *task2 = OS_NULL;
mq_dynamic = os_mq_create("mailbox_dynamic", sizeof(struct student_score), MQ_MAX_MSG, OS_IPC_FLAG_FIFO);
if (!mq_dynamic)
{
LOG_W(TEST_TAG, "msgqueue_dynamic_sample msgqueue create ERR");
}
task1 = os_task_create("task1",
task1_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK1_PRIORITY,
TASK_TIMESLICE);
if (task1)
{
LOG_W(TEST_TAG, "msgqueue_dynamic_sample startup task1");
os_task_startup(task1);
}
os_task_sleep(200);
task2 = os_task_create("task2",
task2_entry,
OS_NULL,
TASK_STACK_SIZE,
TASK2_PRIORITY,
TASK_TIMESLICE);
if (task2)
{
LOG_W(TEST_TAG, "msgqueue_dynamic_sample startup task2");
os_task_startup(task2);
}
}
SH_CMD_EXPORT(dynamic_msgqueue, msgqueue_dynamic_sample, "test dynamic msgqueue");
运行结果如下:
sh />dynamic_msgqueue
W/TEST: msgqueue_dynamic_sample startup task1
W/TEST: task1 send -- name:xiaoming score:80
W/TEST: task1 send -- name:xiaohua score:85
W/TEST: msgqueue_dynamic_sample startup task2
W/TEST: task1 send -- name:xiaoqiang score:90
W/TEST: task2 recv -- name:xiaoming score:80
W/TEST: task2 recv -- name:xiaohua score:85
W/TEST: task2 recv -- name:xiaoqiang score:90
W/TEST: task1 send -- name:xiaoli score:95
W/TEST: task2 recv -- name:xiaoli score:95
W/TEST: task1 send -- name:xiaofang score:96
W/TEST: task2 recv -- name:xiaofang score:96