开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):批量消息发送】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12385
批量消息发送
内容介绍:
一、批量消息概念
二、代码实现
一、批量消息概念
批量消息发送能显著提高传递消息的性能。限制是这些批量消息应该有相同的
topic,相同的 waitstoremsgok,而且不能是延时消息,此外,这一批消息的总大
小不应超过4mb。
之前的课程发送的消息是一条一条发送的,批量则是一次性同时发的,也就是在
send 处用一个集合。
二、代码实现
创建消费者和生产者,在 producer 中构建一个集合,再创建多个消息对象。
将 producer 和 consume r的 topic 都设置为 batchtopic,启动消费者。可以看到消费者一次性发送了三条,
在发送批量消息时,有一个注意事项就是每次发送不能超过4mb,则很容易使用批
量处理,
样例如下:
string topic="BatchTest‘’:
List<message> messages=new ArrayList<>():
messages.add(new Message(topic,"TagA"."orderid001","He1lo world 0" .getytes()));
messages.add(newMessage(topic,“Taga"。"orderID002", "Helloworld 1’’.getsytes()));
messages.add(newMessage(topic,’’tagA”,"order1D003","He11o world 2.getBytesO)));
try{
producer.send(messages);
}catch(Exception e){
e.printstackTrace():
//处理 error
}
如果超过4mb,需要把消息进行分割,创建一个迭代器,把集合传递到迭代器中
去,通过 lisitem,从集合当中取出分割的消息,
public class Listsplitter implements Iterator> {
private final int SIZE_LIMIT = 1024 * 1024* 4;
private final List messages;
private int currindex:
pub1ic Listsplitter(List messages){
this.messages =messages;
}
override
public boolean hasnext(){
return currIndex():
}
override
pub1ic Listnext(){
int nextindex=currIndex;
int totalsize = 0;
for (: nextIndex< messages.size(); nextIndex++){
Message message=messages.get(nextIndex);
int tmpsize = message.gettopic(). length()+ message. Getbody(). length;
Map propertin message.getproperties();
for (Map.Entry entry : properties.entryset()) (
tmpsize += entry.getkey().length() + entry.getvalue().length):
}
tmpsize = tmpsize + 20; // 增加日志的开20字节
消息的长度取决于 gettopiclength、getbodylength,得到一个消息长度后,做出
判断,如果大于4mb了,就是有问题了,就直接跳过看下一条消息。
所以在发送的过程中,通过迭代器的运算可以截取消息。