开发者社区 > 云原生 > 云消息队列 > 正文

System V消息队列与posix消息队列测试

我已经测试了两种情况:

系统V消息队列:发送1,000,000条消息“ hello”,msgsend()并从另一个进程获取它们msgget() posix消息队列:使用发送1,000,000条消息“ hello”,mq_send()并使用mq_receive() 然后在两种情况下,我都已计算出将消息发送到队列的CPU处理时间。

系统V mq CPU推送时间:0.8(秒)

posix mq CPU推送时间:1.4(秒)(!)

系统V的代码:

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <time.h>

#define EXPERIMENT_COUNT 100

clock_t start, end;
double cpu_time_useda_avg = 0;
double cpu_time_used;

struct mesg_buffer { 
    long mesg_type; 
    char mesg_text[100]; 
} message; 

int run_sending_app()
{
    key_t key; 
    int msgid; 

    // ftok to generate unique key 
    key = ftok("/var/tmp/progfile", 65); 

    // msgget creates a message queue 
    // and returns identifier 
    msgid = msgget(key, 0666 | IPC_CREAT); 
    message.mesg_type = 1; 

    // printf("Write Data : "); 
    // fgets(message.mesg_text, sizeof(message.mesg_text), stdin);

    strcpy(message.mesg_text, "hello");
    int e,i;

    for( e = 0; e < EXPERIMENT_COUNT; e++ )
    {
        start = clock();

        for(i = 0; i != 1000000; i++)
            msgsnd(msgid, &message, sizeof(message), 0); 

        end = clock();
        cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC;

        printf("cpu_time_used = %f\n", cpu_time_used);

        cpu_time_useda_avg += cpu_time_used;
    }

    printf("cpu_time_useda_avg = %f\n", cpu_time_useda_avg/EXPERIMENT_COUNT);

    // // display the message 
    // printf("Data send is : %s \n", message.mesg_text); 

    return 0; 
}

int run_receiving_app()
{
    key_t key; 
    int msgid;

    // ftok to generate unique key 
    key = ftok("/var/tmp/progfile", 65); 

    // msgget creates a message queue 
    // and returns identifier 
    msgid = msgget(key, 0666 | IPC_CREAT); 

    while(1)
    {
        // msgrcv to receive message 
        msgrcv(msgid, &message, sizeof(message), 0, 0);
        // // display the message 
        // printf("Data Received is : %s \n",  
        //                 message.mesg_text); 
    }


    // to destroy the message queue 
    msgctl(msgid, IPC_RMID, NULL); 

    return 0; 
}

int main(int argc, char const *argv[])
{   
    if( argc > 1 )
        if( !strcasecmp(argv[1], "cli") )
            run_sending_app();
        else if( !strcasecmp(argv[1], "serv") )
            run_receiving_app();

    return 0;
}

Posix的代码:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <mqueue.h>
#include <time.h>

#define EXPERIMENT_COUNT 100

clock_t start, end;
double cpu_time_useda_avg = 0;
double cpu_time_used;

#define QUEUE_NAME  "/test_queue"
#define MAX_SIZE    1024
#define MSG_STOP    "exit"

#define CHECK(x) \
    do { \
        if (!(x)) { \
            fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
            perror(#x); \
            exit(-1); \
        } \
    } while (0) \

int mq_run_server()
{
    mqd_t mq;
    struct mq_attr attr;
    char buffer[MAX_SIZE + 1];
    int must_stop = 0;

    /* initialize the queue attributes */
    attr.mq_flags = 0;
    attr.mq_maxmsg = 1000000;
    attr.mq_msgsize = MAX_SIZE;
    attr.mq_curmsgs = 0;

    /* create the message queue */
    // mq_unlink(QUEUE_NAME); exit(0);

    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0644, &attr);
    CHECK((mqd_t)-1 != mq);

    do {
        ssize_t bytes_read;

        /* receive the message */
        bytes_read = mq_receive(mq, buffer, MAX_SIZE, NULL);
        CHECK(bytes_read >= 0);

        buffer[bytes_read] = '\0';
        if (! strncmp(buffer, MSG_STOP, strlen(MSG_STOP)))
        {
            must_stop = 1;
        }
        else
        {
            // printf("Received: %s\n", buffer);
        }
    } while (!must_stop);

    /* cleanup */
    CHECK((mqd_t)-1 != mq_close(mq));
    CHECK((mqd_t)-1 != mq_unlink(QUEUE_NAME));

    return 0;
}

int mq_run_client()
{
    mqd_t mq;
    char buffer[MAX_SIZE];

    /* open the mail queue */
    mq = mq_open(QUEUE_NAME, O_WRONLY);
    CHECK((mqd_t)-1 != mq);


    strcpy(buffer, "hello");
    int e,i;

    for( e = 0; e < EXPERIMENT_COUNT; e++ )
    {
        start = clock();

        for(i = 0; i != 1000000; i++)
            mq_send(mq, buffer, MAX_SIZE, 0);

        end = clock();
        cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC;

        printf("cpu_time_used = %f\n", cpu_time_used);

        cpu_time_useda_avg += cpu_time_used;
    }

    printf("cpu_time_useda_avg = %f\n", cpu_time_useda_avg/EXPERIMENT_COUNT);


    // printf("Send to server (enter \"exit\" to stop it):\n");
    // 
    // do {
    //     printf("> ");
    //     fflush(stdout);

    //     memset(buffer, 0, MAX_SIZE);
    //     fgets(buffer, MAX_SIZE, stdin);

    //     /* send the message */
    //     CHECK(0 <= mq_send(mq, buffer, MAX_SIZE, 0));

    // } while (strncmp(buffer, MSG_STOP, strlen(MSG_STOP)));

    /* cleanup */
    CHECK((mqd_t)-1 != mq_close(mq));

    return 0;
}

int main(int argc, char const *argv[])
{
    if( argc > 1 )
        if( !strcasecmp(argv[1], "serv") )
            mq_run_server();
        else if( !strcasecmp(argv[1], "cli") )
            mq_run_client();

    return 0;
}

在这两种情况下,如果我运行./a.out serv,服务器端都会运行,如果我运行./a.out cli,则客户端会运行。

我的问题

为什么与System V MQ相比,Posix MQ性能如此低,而http://man7.org/linux/man-pages/man7/mq_overview.7.html说Posix MQ 与System V MQ 非常相似?

展开
收起
几许相思几点泪 2019-12-29 20:40:36 3252 0
1 条回答
写回答
取消 提交回答
  • 尝试更改:

    mq_send(mq, buffer, MAX_SIZE, 0);
    
    

    mq_send(mq, buffer, 104, 0);
    
    

    要么

    #define MAX_SIZE 104
    
    

    这样您就可以比较相同的数据量。

    2019-12-29 20:40:56
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关电子书

更多
企业互联网架构之消息队列 立即下载
基于消息队列RocketMQ的大型分布式应用上云最佳实践 立即下载
云原生消息队列Apache RocketMQ 立即下载