RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

本文涉及的产品
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: **RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。

RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

RocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的
,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

1 Netty客户端/服务端运行

20231020-01.png

1.1 Broker端Netty服务器

  • BrokerController引导启动Netty服务器

创建NettyRemotingServer对象并调用start()方法来启动Netty服务器

public class BrokerController {
   
   
    protected void initializeRemotingServer() {
   
   
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        ...
    }
    protected void startBasicService() throws Exception {
   
   
        ...
        if (this.remotingServer != null) {
   
   
            this.remotingServer.start();
        }
        ...
    }
}
  • Netty服务端启动

这里就是我们熟悉的Netty的ServerBootstrap服务端引导类,通过设置EventLoopGroup然后绑定端口接着添加一系列的ChannelHandler启动服务器;
对于RocketMQ来说主要的Handler是NettyServerHandler这个处理类,它主要负责接收客户端请求并进行处理。

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
   
   
    // sharable handlers
    private NettyServerHandler serverHandler;

    @Override
    public void start() {
   
   
        ...
        // 用于处理客户端请求命令
        serverHandler = new NettyServerHandler();
        ...
        serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                ...
                .childHandler(new ChannelInitializer<SocketChannel>() {
   
   
                    @Override
                    public void initChannel(SocketChannel ch) {
   
   
                        ch.pipeline()
                                ...
                                .addLast(defaultEventExecutorGroup, ..., serverHandler);
                    }
                });
        try {
   
   
            ChannelFuture sync = serverBootstrap.bind().sync();
            ...
        } catch (Exception e) {
   
   
            ...
        }
        ...
    }
}

1.2 Producer/Consumer端Netty客户端

Producer和Consumer的启动最终都会调用mQClientFactory.start()来创建Netty客户端

// Producer
public class DefaultMQProducerImpl implements MQProducerInner {
   
   

    private MQClientInstance mQClientFactory;

    public void start(final boolean startFactory) throws MQClientException {
   
   
        switch (this.serviceState) {
   
   
            case CREATE_JUST:
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                ...
                // 这里创建Netty客户端
                mQClientFactory.start();
                ...
            case RUNNING:
            ...
            default:
                break;
        }
        ...
    }
}
// Consumer
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   
   

    private MQClientInstance mQClientFactory;

    public synchronized void start() throws MQClientException {
   
   
        switch (this.serviceState) {
   
   
            case CREATE_JUST:
                ...
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                ...
                // 这里创建Netty客户端
                mQClientFactory.start();
                ...
            case RUNNING:
            ...
            default:
                break;
        }
        ...
    }
}
  • 创建Netty客户端对象

执行mQClientFactory.start() --> this.mQClientAPIImpl.start() --> this.remotingClient.start(),最终NettyRemotingClient对象调用start()方法来创建Netty客户端;
Netty客户端由Bootstrap引导程序创建,之后请求/响应通过Netty客户端处理。

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
   
   
    @Override
    public void start() {
   
   
        this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
                ...
                .handler(new ChannelInitializer<SocketChannel>() {
   
   
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
   
   
                        ...
                        ch.pipeline().addLast(..., new NettyClientHandler());
                    }
                });
        ...
    }
}

2 消息的生产及保存

20231020-02.png

2.1 Producer生产消息到Broker

我们调用producer.send发送消息时,程序会使用RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE)把消息封装为自定义的通信协议RemotingCommand,
之后NettyRemotingClient会找到Broker地址并建立连接生成Channel对象调用writeAndFlush方法将请求(RemotingCommand)发送到Netty服务器

  • 使用producer.send发送同步消息
public class Producer {
   
   
    public static void main(String[] args) throws MQClientException, InterruptedException {
   
   
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.start();
        Message msg = new Message(TOPIC /* Topic */,
                TAG /* Tag */,
                ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        SendResult sendResult = producer.send(msg);
    }
}
  • 封装为RemotingCommand消息协议进一步执行发送逻辑
public class MQClientAPIImpl implements NameServerUpdateCallback {
   
   
    public SendResult sendMessage(final Message msg, final SendMessageRequestHeader requestHeader, ...) {
   
   
        // 构建请求命令 RequestCode.SEND_MESSAGE
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        request.setBody(msg.getBody());
        switch (communicationMode) {
   
   
            ...
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            ...
        }
        return null;
    }
}
  • 调用与服务端建立的Channel的writeAndFlush方法将请求RemotingCommand发送到Broker
public abstract class NettyRemotingAbstract {
   
   
    public RemotingCommand invokeSyncImpl(Channel channel, RemotingCommand request) {
   
   
        ...
        try {
   
   
            ...
            // 向服务端Broker通道发送消息
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
   
   
                ...
            });
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            ...
            return responseCommand;
        } 
        ...
    }
}

2.2 Broker接收消息并保存

在开始介绍说Broker中NettyServer的启动时会添加NettyServerHandler一个处理器,这个handler负责处理client发过来的请求指令。

上面当客户端Producer发送RemotingCommand(RequestCode.SEND_MESSAGE)这个指令的请求时,Broker收到请求后通过RequestCode
找到对应的SendMessageProcessor处理器执行processRequest方法去处理消息接收后的逻辑。

  • SendMessageProcessor处理器来接收消息调用MessageStore保存消息
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
   
   

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
   
   
        ...
        response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
        ...
    }

    public RemotingCommand sendMessage(...final RemotingCommand request, ...) throws RemotingCommandException {
   
   
        ...
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        ...
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        ...
    }
}
  • MessageStore保存消息到CommitLog中
public class DefaultMessageStore implements MessageStore {
   
   

    @Override
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
   
   
        return waitForPutResult(asyncPutMessages(messageExtBatch));
    }

    @Override
    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
   
   
        ...
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
        ...
    }
}
public class CommitLog implements Swappable {
   
   
    public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
   
   
        ...
        result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
        ...
    }
}

3 消息的拉取及消费

20231020-03.pngRocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的
,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。

3.1 Consumer向Broker发送拉取请求

创建Consumer实例,订阅Topic并注册MessageListener后调用start方法启动程序;

接着开启一个PullMessageService任务去向Broker发送消息拉取请求,通过DefaultMQPushConsumerImpl.pullMessage方法设置请求回调逻辑(如:获取到消息则使用MessageListener去消费),
接着继续执行MQClientAPIImpl().pullMessage将请求信息封装为RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE),最终由NettyRemotingClient发送请求到Broker。

  • 创建DefaultMQPushConsumer调用start()启动消费者程序,PullMessageService任务开启
public class Consumer {
   
   

    public static void main(String[] args) throws MQClientException {
   
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
   
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

public class MQClientInstance {
   
   

    public void start() throws MQClientException {
   
   
        ...
        // Start pull service
        this.pullMessageService.start();
        ...
    }
}
  • 这里会设置回调逻辑等并继续调用底层实现发起请求,开始注册的MessageListener会在这里调用的
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   
   

    public void pullMessage(final PullRequest pullRequest) {
   
   
        ...
        PullCallback pullCallback = new PullCallback() {
   
   
            @Override
            public void onSuccess(PullResult pullResult) {
   
   
                if (pullResult != null) {
   
   
                    switch (pullResult.getPullStatus()) {
   
   
                        // 获取到消息
                        case FOUND:
                            ...
                            // 这个里面会执行注册的MessageListener
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), ...);
                            ...
                            // 继续拉取消息
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            ...
                        ...
                    }
                }
            }
            ...
        };
        ...
        try {
   
   
            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), ..., pullRequest.getNextOffset(), ..., pullCallback);
        } catch (Exception e) {
   
   
            ...
        }
    }
}

上面执行到pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl().pullMessage来封装请求报文最终交由NettyRemotingClient.invokeSyncImpl真正发出请求。

  • 封装请求报文RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE)
public class MQClientAPIImpl implements NameServerUpdateCallback {
   
   
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
   
   
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}
  • RemotingClient调用channel.writeAndFlush(request)发出拉取请求
public abstract class NettyRemotingAbstract {
   
   
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
   
   
        ...
        try {
   
   
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
   
   
                ...
            });
            ...
        } 
        ...
    }
}

3.2 Broker接收拉取请求匹配返回消息

同样Broker中NettyServerHandler收到RemotingCommand(RequestCode.PULL_MESSAGE)这个指令找到PullMessageProcessor调用processRequest方法去处理。

  • PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class PullMessageProcessor implements NettyRequestProcessor {
   
   
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, ...) {
   
   
        ...
        messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter)
                .thenApply(result -> {
   
   
                    ...
                })
                // 写回客户端
                .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
        ...
    }
}
  • PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class DefaultMessageStore implements MessageStore {
   
   
    @Override
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, ...) {
   
   
        ...
        GetMessageResult getResult = new GetMessageResult();
        ...
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        ...
        getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
        ...
        return getResult;
    }
}
  • 从commitLog中获取消息
public class CommitLog implements Swappable {
   
   
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
   
   
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
   
   
            int pos = (int) (offset % mappedFileSize);
            SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(pos, size);
            if (null != selectMappedBufferResult) {
   
   
                selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
                return selectMappedBufferResult;
            }
        }
        return null;
    }
}

最后

至此我们把RocketMQ中Broker与生产者/消费者基于Netty简单的通信调用链路讲完了,大家有什么问题可以下面留言哦,一起学习进步啊。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
87 12
|
2月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
48 2
|
7月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
7月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
57 0
|
7月前
|
消息中间件 运维 监控
ApsaraMQ Copilot for RocketMQ:消息数据集成链路的健康管家
阿里云消息队列 ApsaraMQ 始终围绕“高弹性低成本、更稳定更安全、智能化免运维”三大核心方向进行演进和拓展。在智能化免运维方面,通过 ApsaraMQ Copilot,为企业提供消息数据集成链路的健康管家,让消息服务走进智能化免运维的新时代。
71871 79
|
4月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
131 16
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
63 0

相关产品

  • 云消息队列 MQ