技术干货 | 源码解析 Github 上 14.1k Star 的 RocketMQ

本文涉及的产品
mPaaS订阅基础套餐,标准版 3个月
简介: 站在发送方视角,通过源码,来分析在事务消息发送中 RocketMQ 是如何工作的。

封面图0423头条.jpg

前言

Apache RocketMQ 作为广为人知的开源消息中间件,诞生于阿里巴巴,于 2016 年捐赠给了 Apache。从 RocketMQ 4.0 到如今最新的 v4.7.1,不论是在阿里巴巴内部还是外部社区,都赢得了广泛的关注和好评。

本文将站在发送方视角,通过阅读 RocketMQ Producer 源码,来分析在事务消息发送中 RocketMQ 是如何工作的。

需要说明的是,本文所贴代码,均来自 4.7.1 版本的 RocketMQ 源码。本文中所讨论的发送,仅指从 Producer 发送到 Broker 的过程,并不包含 Broker 将消息投递到 Consumer 的过程。

宏观概览

RocketMQ 事务消息发送流程:

3.png

结合源码来看,RocketMQ 的事务消息 TransactionMQProducer 的 sendMessageInTransaction 方法,实际调用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。我们进入 sendMessageInTransaction 方法,整个事务消息的发送流程清晰可见。

首先,做发送前检查,并填入必要参数,包括设 prepare 事务消息。

源码清单-1

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener(); 
        if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

进入发送处理流程:

源码清单-2

    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

根据 broker 返回的处理结果决策本地事务是否执行,半消息发送成功则开始本地事务执行:

源码清单-3

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) { 
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) { 
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg); 
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:  // 当备broker状态不可用时,半消息要回滚,不执行本地事务
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

本地事务执行结束,根据本地事务状态进行二阶段处理:

源码清单-4

    try {
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    // 组装发送结果
    // ...
    return transactionSendResult;
}

接下来,我们深入每个阶段代码分析。

深扒内幕

Ⅰ阶段发送

重点分析 send 方法。进入 send 方法后,我们发现,RocketMQ 的事务消息的一阶段,使用了 SYNC 同步模式:

源码清单-5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

这一点很容易理解,毕竟事务消息是要根据一阶段发送结果来决定要不要执行本地事务的,所以一定要阻塞等待 broker 的 ack。

我们进入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的实现,通过读这个方法的代码,来尝试了解在事务消息的一阶段发送过程中 producer 的行为。

值得注意的是,这个方法并非为事务消息定制,甚至不是为 SYNC 同步模式定制的,因此读懂了这段代码,基本可以对 RocketMQ 的消息发送机制有了一个较为全面的认识。

这段代码逻辑非常通畅,不忍切片。为了节省篇幅,将代码中较为繁杂但信息量不大的部分以注释代替,尽可能保留流程的完整性。个人认为较为重要或是容易被忽略的部分,以注释标出,后文还有部分细节的详细解读。

源码清单-6

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    // 一、消息有效性校验。见后文
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;

    // 获取当前topic的发送路由信息,主要是要broker,如果没找到则从namesrv获取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 二、发送重试机制。见后文
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            // 第一次发送是mq == null, 之后都是有broker信息的
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 三、rocketmq发送消息时如何选择队列?——broker异常规避机制 
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 发送核心代码
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // rocketmq 选择 broker 时的规避机制,开启 sendLatencyFaultEnable == true 才生效
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

                    switch (communicationMode) {
                    // 四、RocketMQ的三种CommunicationMode。见后文
                        case ASYNC: // 异步模式
                            return null;
                        case ONEWAY: // 单向模式
                            return null;
                        case SYNC: // 同步模式
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    // ...
                    // 自动重试
                } catch (MQClientException e) {
                    // ...
                    // 自动重试
                } catch (MQBrokerException e) {
                   // ...
                    // 仅返回码==NOT_IN_CURRENT_UNIT==205 时自动重试
                    // 其他情况不重试,抛异常
                } catch (InterruptedException e) {
                   // ...
                    // 不重试,抛异常
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }

        // 组装返回的info信息,最后以MQClientException抛出
        // ... ...

        // 超时场景抛RemotingTooMuchRequestException
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        // 填充MQClientException异常信息
        // ...
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

一、消息有效性校验

源码清单-7

 Validators.checkMessage(msg, this.defaultMQProducer);

在此方法中校验消息的有效性,包括对 topic 和消息体的校验。topic 的命名必须符合规范,且避免使用内置的系统消息 TOPIC。消息体长度 > 0 && 消息体长度 <= 102410244 = 4M 。

源码清单-8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

二、发送重试机制

Producer 在消息发送不成功时,会自动重试,最多发送次数 = retryTimesWhenSendFailed + 1 = 3 次 。

值得注意的是,并非所有异常情况都会重试,从以上源码中可以提取到的信息告诉我们,在以下三种情况下,会自动重试:

  • 发生 RemotingException,MQClientException 两种异常之一时
  • 发生 MQBrokerException 异常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 时
  • SYNC 模式下,未发生异常且发送结果状态非 SEND_OK

在每次发送消息之前,会先检查是否在前面这两步就已经耗时超长(超时时长默认 3000ms),若是,则不再继续发送并且直接返回超时,不再重试。这里说明了 2 个问题:

  • producer 内部自动重试对业务应用而言是无感知的,应用看到的发送耗时是包含所有重试的耗时在内的;
  • 一旦超时意味着本次消息发送已经以失败告终,原因是超时。这个信息最后会以 RemotingTooMuchRequestException 的形式抛出。

这里需要指出的是,在 RocketMQ 官方文档中指出,发送超时时长是 10s,即 10000ms,网上许多人对 rocketMQ 的超时时间解读也认为是 10s。然而代码中却明明白白写着 3000ms,最终我 debug 之后确认,默认超时时间确实是 3000ms。

三、broker 的异常规避机制

源码清单-9

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  

这行代码是发送前选择 queue 的过程。

这里涉及 RocketMQ 消息发送高可用的的一个核心机制,latencyFaultTolerance。这个机制是 Producer 负载均衡的一部分,通过 sendLatencyFaultEnable 的值来控制,默认是 false 关闭状态,不启动 broker 故障延迟机制,值为 true 时启用 broker 故障延迟机制,可由 Producer 主动打开。

选择队列时,开启异常规避机制,则根据 broker 的工作状态避免选择当前状态不佳的 broker 代理,不健康的 broker 会在一段时间内被规避,不开启异常规避机制时,则按顺序选取下一个队列,但在重试场景下会尽量选择不同于上次发送 broker 的 queue。每次消息发送都会通过 updateFaultItem 方法来维护 broker 的状态信息。

源码清单-10

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 计算延迟多久,isolation表示是否需要隔离该broker,若是,则从30s往前找第一个比30s小的延迟值,再按下标判断规避的周期,若30s,则是10min规避;
        // 否则,按上一次发送耗时来决定规避时长;
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}  

深入到 selectOneMessageQueue 方法内部一探究竟:

源码清单-11

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        // 开启异常规避
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 按顺序取下一个message queue作为发送的queue
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 当前queue所在的broker可用,且与上一个queue的broker相同,
                // 或者第一次发送,则使用这个queue
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    // 不开启异常规避,则随机自增选择Queue
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

四、RocketMQ 的三种 CommunicationMode

源码清单-12

 public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

以上三种模式指的都是消息从发送方到达 broker 的阶段,不包含 broker 将消息投递给订阅方的过程。三种模式的发送方式的差异:

单向模式:ONEWAY。消息发送方只管发送,并不关心 broker 处理的结果如何。这种模式下,由于处理流程少,发送耗时非常小,吞吐量大,但不能保证消息可靠不丢,常用于流量巨大但不重要的消息场景,例如心跳发送等。

异步模式:ASYNC。消息发送方发送消息到 broker 后,无需等待 broker 处理,拿到的是 null 的返回值,而由一个异步的线程来做消息处理,处理完成后以回调的形式告诉发送方发送结果。异步处理时如有异常,返回发送方失败结果之前,会经过内部重试(默认 3 次,发送方不感知)。这种模式下,发送方等待时长较小,吞吐量较大,消息可靠,用于流量大但重要的消息场景。

同步模式:SYNC。消息发送方需等待 broker 处理完成并明确返回成功或失败,在消息发送方拿到消息发送失败的结果之前,也会经历过内部重试(默认 3 次,发送方不感知)这种模式下,发送方会阻塞等待消息处理结果,等待时长较长,消息可靠,用于流量不大但重要的消息场景。需要强调的是,事务消息的一阶段半事务消息的处理是同步模式。

在 sendKernelImpl 方法中也可以看到具体的实现差异。ONEWAY 模式最为简单,不做任何处理。负责发送的 sendMessage 方法参数中,相比同步模式,异步模式多了回调方法、包含 topic 发送路由元信息的 topicPublishInfo、包含发送 broker 信息的 instance、包含发送队列信息的 producer、重试次数。另外,异步模式下,会对有压缩的消息先做 copy。

源码清单-13

    switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            } 

官方文档中有这样一张图,十分清晰的描述了异步通信的详细过程:

4.png

Ⅱ 阶段发送

源码清单-3 体现了本地事务的执行,localTransactionState 将本地事务执行结果与事务消息二阶段的发送关联起来。

值得注意的是,如果一阶段的发送结果是 SLAVENOTAVAILABLE,即便 broker 不可用时,也会将 localTransactionState 置为 Rollback,此时将不会执行本地事务。之后由 endTransaction 方法负责二阶段提交,见源码清单-4。具体到 endTransaction 的实现:

源码清单-14

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 采用oneway的方式发送二阶段消息
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

在二阶段发送时,之所以用 oneway 的方式发送,个人理解这正是因为事务消息有一个特殊的可靠机制——回查。

消息回查

当 Broker 经过了一个特定的时间,发现依然没有得到事务消息的二阶段是否要提交或者回滚的确切信息,Broker 不知道 Producer 发生了什么情况(可能 producer 挂了,也可能 producer 发了 commit 但网络抖动丢了,也可能……于是主动发起回查。

事务消息的回查机制,更多的是在 broker 端的体现。RocketMQ 的 broker 以 Half 消息、Op 消息、真实消息三个不同的 topic 来将不同发送阶段的事务消息进行了隔离,使得 Consumer 只能看到最终确认 commit 需要投递出去的消息。其中详细的实现逻辑在本文中暂不多赘述,后续可另开一篇专门来从 Broker 视角来解读。

回到 Producer 的视角,当收到了 Broker 的回查请求,Producer 将根据消息检查本地事务状态,根据结果决定提交或回滚,这就要求 Producer 必须指定回查实现,以备不时之需。当然,正常情况下,并不推荐主动发送 UNKNOW 状态,这个状态毫无疑问会给 broker 带来额外回查开销,只在出现不可预知的异常情况时才启动回查机制,是一种比较合理的选择。

另外,4.7.1 版本的事务回查并非无限回查,而是最多回查 15 次:

源码清单-15

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

附录

官方给出 Producer 的默认参数如下(其中超时时长的参数,在前文中也已经提到,debug 的结果是默认 3000ms,并非 10000ms):

5.png


作者介绍(源淇).png

动态-logo.gif

底部banner.png

点击“了解更多”,了解「mPaaS」更多资讯。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
98 4
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
54 3
|
3月前
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
117 0
RocketMQ消息重试机制解析!
|
5月前
|
算法 数据处理 数据安全/隐私保护
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
96 2
|
5月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
84 0
|
6月前
|
机器人 vr&ar 计算机视觉
|
7月前
|
Shell Python
GitHub星标破千Star!Python游戏编程的初学者指南
Python 是一种高级程序设计语言,因其简洁、易读及可扩展性日渐成为程序设计领域备受推崇的语言。 目前的编程书籍大多分为两种类型。第一种,与其说是教编程的书,倒不如说是在教“游戏制作软件”,或教授使用一种呆板的语言,使得编程“简单”到不再是编程。而第二种,它们就像是教数学课一样教编程:所有的原理和概念都以小的应用程序的方式呈现给读者。
|
7月前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
7月前
|
JavaScript 应用服务中间件 程序员
技术人如何利用 github+Jekyll ,搭建一个独立免费的技术博客
技术人如何利用 github+Jekyll ,搭建一个独立免费的技术博客

推荐镜像

更多