全部文档
OneOS简介 硬件支持 编译构造工具 API参考文档 高级语言 用户编程手册 应用笔记 FAQ

消息队列


简介

消息队列是另外一种任务间通信的机制,它可以用于发送不定长消息的场合。消息队列有缓冲区,可以缓存一定数量的消息。消息队列没满的情况下,可以一直往消息队列里面发送消息,消息队列满了可以选择超时等待;消息队列有消息的情况下,可以从消息队列里面接收消息,如果没有消息,可以选择超时等待。

消息队列发送的消息内容长度可以是任意的(其最大长度可以在消息队列初始化时设置),消息队列在发送时会将整个消息内容复制到消息队列的缓冲区中,接收消息时会把消息队列缓冲区中的消息内容复制到接收端指定的地址。


重要定义及数据结构

结构体

struct os_mq
{
    struct os_ipc_object parent;                        /* inherit from ipc_object */

    void                *msg_pool;                      /* message pool */
    os_uint16_t          msg_size;                      /* message size of each message */
    os_uint16_t          max_msgs;                      /* max number of messages */
    os_uint16_t          entry;                         /* index of messages in the queue */

    void                *msg_queue_head;                /* list head */
    void                *msg_queue_tail;                /* list tail */
    void                *msg_queue_free;                /* pointer indicated the free node of queue */

    os_list_node_t       suspend_sender_task;           /* sender task suspended on this messages queue */
};
struct os_mq重要参数 说明
msg_pool 消息队列缓冲区的起始地址
msg_size 每个消息的最大长度
max_msgs 最大消息个数
entry 消息队列中当前的消息个数
msg_queue_head 消息队列链表头
msg_queue_tail 消息队列链表尾
msg_queue_free 消息队列空闲链表
suspend_sender_task 阻塞在消息队列上的发送任务

API列表

接口 说明
os_mq_init 以静态方式创建消息队列,消息队列对象由使用者提供
os_mq_deinit 消息队列反初始化,与os_mq_init()匹配使用
os_mq_create 以动态方式创建并初始化消息队列,即消息队列对象采用动态申请内存的方式申请
os_mq_destroy 销毁消息队列,与os_mq_create()匹配使用
os_mq_send 发送消息
os_mq_send_urgent 发送紧急消息
os_mq_recv 接收消息
os_mq_control 控制或更改消息队列的行为属性

os_mq_init

该函数以静态方式创建消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都由使用者提供,函数原型如下:

os_err_t os_mq_init(os_mq_t         *mq,
                    const char      *name,
                    void            *msg_pool,
                    os_size_t       msg_pool_size,
                    os_size_t       msg_size,
                    os_ipc_flag_t   flag);
参数 说明
mq 消息队列句柄
name 消息队列名字
msg_pool 消息队列缓冲区的起始地址
msg_pool_size 消息队列缓冲区的大小,以byte为单位
msg_size 每个消息的最大长度
flag 标志,可以取OS_IPC_FLAG_FIFO和OS_IPC_FLAG_PRIO;当取值为OS_IPC_FLAG_FIFO时,等待的任务将按照先进先出的方式排队,先进入的任务先被唤醒;当取值为OS_IPC_FLAG_PRIO时,等待的任务将按照任务优先级排队,优先级高的任务先被唤醒

os_mq_deinit

该函数用于对消息队列反初始化,与os_mq_init()匹配使用,函数原型如下:

os_err_t os_mq_deinit(os_mq_t *mq);
参数 说明
mq 消息队列句柄
返回 说明
OS_EOK 反初始化成功

os_mq_create

该函数以动态方式创建并初始化消息队列,消息队列对象的内存空间和消息队列缓冲区的内存空间都是通过动态申请内存的方式获得,函数原型如下:

os_mq_t  *os_mq_create(const char      *name,
                       os_size_t       msg_size,
                       os_size_t       max_msgs,
                       os_ipc_flag_t   flag);
参数 说明
name 消息队列名字
msg_size 每个消息的最大长度
max_msgs 最大消息个数
flag 标志,可以取OS_IPC_FLAG_FIFO和OS_IPC_FLAG_PRIO;当取值为OS_IPC_FLAG_FIFO时,等待的任务将按照先进先出的方式排队,先进入的任务先被唤醒;当取值为OS_IPC_FLAG_PRIO时,等待的任务将按照任务优先级排队,优先级高的任务先被唤醒
返回 说明
非OS_NULL 创建成功,返回消息队列句柄
OS_NULL 创建失败

os_mq_destroy

该函数用于销毁消息队列,唤醒所有等待任务,释放消息对象的空间和消息缓冲区的空间,与os_mq_create()匹配使用,函数原型如下:

os_err_t os_mq_destroy(os_mq_t *mq);
参数 说明
mq 消息队列句柄
返回 说明
OS_EOK 销毁成功

os_mq_send

该函数用于发送消息,消息内容会被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务,函数原型如下:

os_err_t os_mq_send(os_mq_t *mq, void *buffer, os_size_t buff_size, os_tick_t timeout);
参数 说明
mq 消息队列句柄
buffer 待发送的消息的地址
buff_size 此消息的长度
timeout 消息缓冲区已满时需要等待的超时时间;若为OS_IPC_WAITING_NO,则不等待直接返回OS_EFULL;若为OS_IPC_WAITING_FOREVER,则永久等待直到消息缓冲区有空余可用;若为其它值,则等待timeout时间或者直到消息缓冲区有空余可用
返回 说明
OS_EOK 消息发送成功
OS_EFULL 消息队列已满且不等待
OS_ETIMEOUT 消息队列已满且等待超时

os_mq_send_urgent

该函数用于发送紧急消息,会把当前消息加入到消息队列头以便尽快处理,消息内容被复制到消息队列缓冲区中,当消息队列已满且需要等待时,会阻塞当前发送任务,函数原型如下:

os_err_t os_mq_send_urgent(os_mq_t *mq, void *buffer, os_size_t buff_size, os_tick_t timeout);
参数 说明
mq 消息队列句柄
buffer 待发送的消息的地址
buff_size 此消息的长度
timeout 消息缓冲区已满时需要等待的超时时间;若为OS_IPC_WAITING_NO,则不等待直接返回OS_EFULL;若为OS_IPC_WAITING_FOREVER,则永久等待直到消息缓冲区有空余可用;若为其它值,则等待timeout时间或者直到消息缓冲区有空余可用
返回 说明
OS_EOK 消息发送成功
OS_EFULL 消息队列已满且不等待
OS_ETIMEOUT 消息队列已满且等待超时

os_mq_recv

该函数用于接收消息,当前消息队列为空且需要等待时,会阻塞当前接收任务,函数原型如下:

os_err_t os_mq_recv(os_mq_t     *mq,
                    void        *buffer,
                    os_size_t   buff_size,
                    os_tick_t   timeout,
                    os_size_t   *recv_size);
参数 说明
mq 消息队列句柄
buffer 保存接收消息的地址
buff_size 保存接收消息的空间大小
timeout 消息缓冲区为空时需要等待的超时时间;若为OS_IPC_WAITING_NO,则不等待直接返回OS_EEMPTY;若为OS_IPC_WAITING_FOREVER,则永久等待直到有消息可接收;若为其它值,则等待timeout时间或者直到有消息可接收
recv_size 接收到的消息的实际长度
返回 说明
OS_EOK 接收消息成功
OS_EEMPTY 消息队列为空且不等待
OS_ETIMEOUT 消息队列为空且等待超时
OS_ERROR 错误,保存消息的空间大小比待接收的消息长度小

os_mq_control

该函数用于控制或者更改消息队列的行为属性,函数原型如下:

os_err_t os_mq_control(os_mq_t *mq, os_ipc_cmd_t cmd, void *arg);
参数 说明
mq 消息队列句柄
cmd 控制命令,当前仅支持OS_IPC_CMD_RESET,会唤醒所有阻塞在该消息队列上的任务并清除消息队列的信息
arg 参数,暂未使用

使用示例

静态消息队列使用示例

本例使用静态方式初始化消息队列,然后创建一个任务发送消息,另外一个任务接收消息

#include <oneos_config.h>
#include <os_dbg.h>
#include <os_errno.h>
#include <os_task.h>
#include <shell.h>
#include <string.h>
#include <os_mq.h>

#define TEST_TAG        "TEST"
#define TASK_STACK_SIZE 1024
#define TASK1_PRIORITY  15
#define TASK2_PRIORITY  16
#define TASK_TIMESLICE  10

#define MQ_MAX_MSG      5
#define MSG_SIZE        24
#define MQ_POLL_SIZE    (MQ_MAX_MSG * (MSG_SIZE + OS_MQ_MSG_HDR_LEN))

#define STR_NUM         4

static char mq_pool[MQ_POLL_SIZE];
static os_mq_t mq_static;

char *str[STR_NUM] = {
    "hello, world",
    "it's a new day",
    "it's a nice day",
    "it's a wonderful day"
    };

void task1_entry(void *para)
{
    os_uint32_t i = 0;

    for (i = 0; i < STR_NUM; i++)
    {
        LOG_W(TEST_TAG, "task1 send str:%s, len:%d", str[i], strlen(str[i]) + 1);
        if(OS_EOK == os_mq_send(&mq_static, str[i], strlen(str[i]) + 1, OS_IPC_WAITING_FOREVER))
        {
            LOG_W(TEST_TAG, "task1 send OK");
        }
        else
        {
            LOG_W(TEST_TAG, "task1 send err");
        }
        os_task_sleep(100);
    }
}

void task2_entry(void *para)
{
    os_size_t recv_size = 0;
    char recv_str[MSG_SIZE];

    while (1)
    {        
        if (OS_EOK == os_mq_recv(&mq_static, &recv_str[0], MSG_SIZE, OS_IPC_WAITING_FOREVER, &recv_size))
        {
            LOG_W(TEST_TAG, "task2 recv str:%s, len:%d", recv_str, recv_size);
        }
    }
}

void msgqueue_static_sample(void)
{
    os_task_t *task1 = OS_NULL;
    os_task_t *task2 = OS_NULL;

    if(OS_EOK != os_mq_init(&mq_static, "msgqueue_static", &mq_pool[0], MQ_POLL_SIZE, MSG_SIZE, OS_IPC_FLAG_FIFO))
    {
        LOG_W(TEST_TAG, "msgqueue_static_sample msgqueue init ERR");
        return;
    }

    task1 = os_task_create("task1", 
                           task1_entry, 
                           OS_NULL, 
                           TASK_STACK_SIZE, 
                           TASK1_PRIORITY, 
                           TASK_TIMESLICE);
    if (task1)
    {        
        LOG_W(TEST_TAG, "msgqueue_static_sample startup task1");
        os_task_startup(task1);
    }

    task2 = os_task_create("task2", 
                           task2_entry, 
                           OS_NULL, 
                           TASK_STACK_SIZE, 
                           TASK2_PRIORITY, 
                           TASK_TIMESLICE);
    if (task2)
    {        
        LOG_W(TEST_TAG, "msgqueue_static_sample startup task2");
        os_task_startup(task2);
    }
}

SH_CMD_EXPORT(static_msgqueue, msgqueue_static_sample, "test staitc msgqueue");

运行结果如下:

sh />static_msgqueue
W/TEST: msgqueue_static_sample startup task1
W/TEST: task1 send str:hello, world, len:13
W/TEST: task1 send OK
W/TEST: msgqueue_static_sample startup task2
W/TEST: task2 recv str:hello, world, len:13
W/TEST: task1 send str:it's a new day, len:15
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a new day, len:15
W/TEST: task1 send str:it's a nice day, len:16
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a nice day, len:16
W/TEST: task1 send str:it's a wonderful day, len:21
W/TEST: task1 send OK
W/TEST: task2 recv str:it's a wonderful day, len:21

动态消息队列使用示例

本例使用动态方式创建并初始化了消息队列,然后创建一个任务发送消息,另外一个任务接收消息

#include <oneos_config.h>
#include <os_dbg.h>
#include <os_errno.h>
#include <os_task.h>
#include <shell.h>
#include <string.h>
#include <os_memory.h>
#include <os_mq.h>

#define TEST_TAG        "TEST"
#define TASK_STACK_SIZE 1024
#define TASK1_PRIORITY  15
#define TASK2_PRIORITY  16
#define TASK_TIMESLICE  10

#define MQ_MAX_MSG      10
#define TEST_NAME_MAX   16
#define STUDENT_NUM     5

static os_mq_t *mq_dynamic;

struct student_score
{
    char                name[TEST_NAME_MAX];
    os_uint32_t         score;
};

void task1_entry(void *para)
{
    os_uint32_t i = 0;
    struct student_score student_data;
    char *name[STUDENT_NUM] = {"xiaoming", "xiaohua", "xiaoqiang", "xiaoli", "xiaofang"};
    os_uint32_t score[STUDENT_NUM] = {80, 85, 90, 95, 96};

    for (i = 0; i < STUDENT_NUM; i++)
    {
        memset(student_data.name, 0, TEST_NAME_MAX);
        strncpy(student_data.name, name[i], TEST_NAME_MAX);
        student_data.score = score[i];
        if(OS_EOK == os_mq_send(mq_dynamic, &student_data, sizeof(struct student_score), OS_IPC_WAITING_FOREVER))
        {
            LOG_W(TEST_TAG, "task1 send -- name:%s score:%d", student_data.name, student_data.score);
        }

        os_task_sleep(100);
    }
}

void task2_entry(void *para)
{
    struct student_score student_data;
    os_size_t recv_size = 0;

    while (1)
    {
        if (OS_EOK == os_mq_recv(mq_dynamic, &student_data, sizeof(struct student_score), OS_IPC_WAITING_FOREVER, &recv_size))
        {
            LOG_W(TEST_TAG, "task2 recv -- name:%s score:%d", student_data.name, student_data.score);
        }
    }
}

void msgqueue_dynamic_sample(void)
{
    os_task_t *task1 = OS_NULL;
    os_task_t *task2 = OS_NULL;

    mq_dynamic = os_mq_create("mailbox_dynamic", sizeof(struct student_score), MQ_MAX_MSG, OS_IPC_FLAG_FIFO);
    if (!mq_dynamic)
    {
        LOG_W(TEST_TAG, "msgqueue_dynamic_sample msgqueue create ERR");
    }

    task1 = os_task_create("task1", 
                           task1_entry, 
                           OS_NULL, 
                           TASK_STACK_SIZE, 
                           TASK1_PRIORITY, 
                           TASK_TIMESLICE);
    if (task1)
    {        
        LOG_W(TEST_TAG, "msgqueue_dynamic_sample startup task1");
        os_task_startup(task1);
    }

    os_task_sleep(200);
    task2 = os_task_create("task2", 
                           task2_entry, 
                           OS_NULL, 
                           TASK_STACK_SIZE, 
                           TASK2_PRIORITY, 
                           TASK_TIMESLICE);
    if (task2)
    {        
        LOG_W(TEST_TAG, "msgqueue_dynamic_sample startup task2");
        os_task_startup(task2);
    }
}

SH_CMD_EXPORT(dynamic_msgqueue, msgqueue_dynamic_sample, "test dynamic msgqueue");

运行结果如下:

sh />dynamic_msgqueue
W/TEST: msgqueue_dynamic_sample startup task1
W/TEST: task1 send -- name:xiaoming score:80
W/TEST: task1 send -- name:xiaohua score:85
W/TEST: msgqueue_dynamic_sample startup task2
W/TEST: task1 send -- name:xiaoqiang score:90
W/TEST: task2 recv -- name:xiaoming score:80
W/TEST: task2 recv -- name:xiaohua score:85
W/TEST: task2 recv -- name:xiaoqiang score:90
W/TEST: task1 send -- name:xiaoli score:95
W/TEST: task2 recv -- name:xiaoli score:95
W/TEST: task1 send -- name:xiaofang score:96
W/TEST: task2 recv -- name:xiaofang score:96

results matching ""

    No results matching ""

    返回顶部