我已经测试了两种情况:
系统V消息队列:发送1,000,000条消息“ hello”,msgsend()并从另一个进程获取它们msgget() posix消息队列:使用发送1,000,000条消息“ hello”,mq_send()并使用mq_receive() 然后在两种情况下,我都已计算出将消息发送到队列的CPU处理时间。
系统V mq CPU推送时间:0.8(秒)
posix mq CPU推送时间:1.4(秒)(!)
系统V的代码:
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <time.h>
#define EXPERIMENT_COUNT 100
clock_t start, end;
double cpu_time_useda_avg = 0;
double cpu_time_used;
struct mesg_buffer {
long mesg_type;
char mesg_text[100];
} message;
int run_sending_app()
{
key_t key;
int msgid;
// ftok to generate unique key
key = ftok("/var/tmp/progfile", 65);
// msgget creates a message queue
// and returns identifier
msgid = msgget(key, 0666 | IPC_CREAT);
message.mesg_type = 1;
// printf("Write Data : ");
// fgets(message.mesg_text, sizeof(message.mesg_text), stdin);
strcpy(message.mesg_text, "hello");
int e,i;
for( e = 0; e < EXPERIMENT_COUNT; e++ )
{
start = clock();
for(i = 0; i != 1000000; i++)
msgsnd(msgid, &message, sizeof(message), 0);
end = clock();
cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC;
printf("cpu_time_used = %f\n", cpu_time_used);
cpu_time_useda_avg += cpu_time_used;
}
printf("cpu_time_useda_avg = %f\n", cpu_time_useda_avg/EXPERIMENT_COUNT);
// // display the message
// printf("Data send is : %s \n", message.mesg_text);
return 0;
}
int run_receiving_app()
{
key_t key;
int msgid;
// ftok to generate unique key
key = ftok("/var/tmp/progfile", 65);
// msgget creates a message queue
// and returns identifier
msgid = msgget(key, 0666 | IPC_CREAT);
while(1)
{
// msgrcv to receive message
msgrcv(msgid, &message, sizeof(message), 0, 0);
// // display the message
// printf("Data Received is : %s \n",
// message.mesg_text);
}
// to destroy the message queue
msgctl(msgid, IPC_RMID, NULL);
return 0;
}
int main(int argc, char const *argv[])
{
if( argc > 1 )
if( !strcasecmp(argv[1], "cli") )
run_sending_app();
else if( !strcasecmp(argv[1], "serv") )
run_receiving_app();
return 0;
}
Posix的代码:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <mqueue.h>
#include <time.h>
#define EXPERIMENT_COUNT 100
clock_t start, end;
double cpu_time_useda_avg = 0;
double cpu_time_used;
#define QUEUE_NAME "/test_queue"
#define MAX_SIZE 1024
#define MSG_STOP "exit"
#define CHECK(x) \
do { \
if (!(x)) { \
fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
perror(#x); \
exit(-1); \
} \
} while (0) \
int mq_run_server()
{
mqd_t mq;
struct mq_attr attr;
char buffer[MAX_SIZE + 1];
int must_stop = 0;
/* initialize the queue attributes */
attr.mq_flags = 0;
attr.mq_maxmsg = 1000000;
attr.mq_msgsize = MAX_SIZE;
attr.mq_curmsgs = 0;
/* create the message queue */
// mq_unlink(QUEUE_NAME); exit(0);
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0644, &attr);
CHECK((mqd_t)-1 != mq);
do {
ssize_t bytes_read;
/* receive the message */
bytes_read = mq_receive(mq, buffer, MAX_SIZE, NULL);
CHECK(bytes_read >= 0);
buffer[bytes_read] = '\0';
if (! strncmp(buffer, MSG_STOP, strlen(MSG_STOP)))
{
must_stop = 1;
}
else
{
// printf("Received: %s\n", buffer);
}
} while (!must_stop);
/* cleanup */
CHECK((mqd_t)-1 != mq_close(mq));
CHECK((mqd_t)-1 != mq_unlink(QUEUE_NAME));
return 0;
}
int mq_run_client()
{
mqd_t mq;
char buffer[MAX_SIZE];
/* open the mail queue */
mq = mq_open(QUEUE_NAME, O_WRONLY);
CHECK((mqd_t)-1 != mq);
strcpy(buffer, "hello");
int e,i;
for( e = 0; e < EXPERIMENT_COUNT; e++ )
{
start = clock();
for(i = 0; i != 1000000; i++)
mq_send(mq, buffer, MAX_SIZE, 0);
end = clock();
cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC;
printf("cpu_time_used = %f\n", cpu_time_used);
cpu_time_useda_avg += cpu_time_used;
}
printf("cpu_time_useda_avg = %f\n", cpu_time_useda_avg/EXPERIMENT_COUNT);
// printf("Send to server (enter \"exit\" to stop it):\n");
//
// do {
// printf("> ");
// fflush(stdout);
// memset(buffer, 0, MAX_SIZE);
// fgets(buffer, MAX_SIZE, stdin);
// /* send the message */
// CHECK(0 <= mq_send(mq, buffer, MAX_SIZE, 0));
// } while (strncmp(buffer, MSG_STOP, strlen(MSG_STOP)));
/* cleanup */
CHECK((mqd_t)-1 != mq_close(mq));
return 0;
}
int main(int argc, char const *argv[])
{
if( argc > 1 )
if( !strcasecmp(argv[1], "serv") )
mq_run_server();
else if( !strcasecmp(argv[1], "cli") )
mq_run_client();
return 0;
}
在这两种情况下,如果我运行./a.out serv,服务器端都会运行,如果我运行./a.out cli,则客户端会运行。
我的问题
为什么与System V MQ相比,Posix MQ性能如此低,而http://man7.org/linux/man-pages/man7/mq_overview.7.html说Posix MQ 与System V MQ 非常相似?
尝试更改:
mq_send(mq, buffer, MAX_SIZE, 0);
至
mq_send(mq, buffer, 104, 0);
要么
#define MAX_SIZE 104
这样您就可以比较相同的数据量。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/