开发者社区 > 云原生 > 云消息队列 > 正文

"rocketmq 发送异步消息时出错。但同步消息正常 是怎么回事?

"rocketmq 发送异步消息时出错。但同步消息正常 是怎么回事?
35a921e2cc2778ca6499bf6cbd374b50.jpg
public static void main(String[] args) throws RemotingException, InterruptedException {

// 创建一个默认的生产者实例
DefaultMQProducer producer = new DefaultMQProducer(""broker-a"",
        new AclClientRPCHook(new SessionCredentials(""xxxx"",""xxxx"")));

// 设置NameServer地址
producer.setNamesrvAddr(""xxxx:xxxx"");
producer.setSendMsgTimeout(30000);

// 获取已设置的NameServer地址
String configuredNamesrvAddr = producer.getNamesrvAddr();
System.out.println(""Configured NameServer address: "" + configuredNamesrvAddr);

try {
    // 启动生产者
    producer.start();

    // 创建消息实例,指定主题、标签和消息内容
    Message message = new Message(""test-topic"", ""test"", (""消息发布测试"" + System.currentTimeMillis())
            .getBytes(RemotingHelper.DEFAULT_CHARSET));

    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult.getSendStatus());
            if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                System.out.println(""消息发布失败"");
            }
        }

        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
            System.out.println(""消息发布异常"");
        }
    });
} catch (MQClientException e) {
    e.printStackTrace();
} catch (UnsupportedEncodingException e) {
    throw new RuntimeException(e);
} finally {
    // 关闭生产者实例
    producer.shutdown();
}

}
public static void main(String[] args) throws RemotingException, InterruptedException {

// 创建一个默认的生产者实例
DefaultMQProducer producer = new DefaultMQProducer(""broker-a"",
        new AclClientRPCHook(new SessionCredentials(""xxxx"",""xxxx"")));

// 设置NameServer地址
producer.setNamesrvAddr(""xxxx:xxxx"");
producer.setSendMsgTimeout(30000);

// 获取已设置的NameServer地址
String configuredNamesrvAddr = producer.getNamesrvAddr();
System.out.println(""Configured NameServer address: "" + configuredNamesrvAddr);

try {
    // 启动生产者
    producer.start();

    // 创建消息实例,指定主题、标签和消息内容
    Message message = new Message(""test-topic"", ""test"", (""消息发布测试"" + System.currentTimeMillis())
            .getBytes(RemotingHelper.DEFAULT_CHARSET));

    producer.send(message);
} catch (MQClientException e) {
    e.printStackTrace();
} catch (UnsupportedEncodingException e) {
    throw new RuntimeException(e);
} catch (MQBrokerException e) {
    throw new RuntimeException(e);
} finally {
    // 关闭生产者实例
    producer.shutdown();
}

}"

展开
收起
十一0204 2023-07-19 19:50:53 318 0
1 条回答
写回答
取消 提交回答
  • 当RocketMQ发送异步消息出错,而同步消息正常时,可能是由于以下原因之一:

    1. 异步发送的消息未正确处理回调:在使用异步发送消息时,需要提供一个回调函数(SendCallback),用于处理发送结果。请确保您的回调函数正确处理了发送成功和发送失败的情况。如果回调函数中没有正确处理发送失败的逻辑,可能导致错误的消息处理结果。

      以下是一个示例代码片段,展示了如何使用异步发送消息并处理回调:

      producer.send(message, new SendCallback() {
          @Override
          public void onSuccess(SendResult sendResult) {
              // 发送成功处理逻辑
              System.out.println("Message sent successfully: " + sendResult.getMsgId());
          }
      
          @Override
          public void onException(Throwable e) {
              // 发送失败处理逻辑
              System.err.println("Failed to send message: " + e.getMessage());
          }
      });
      

      确保在onException方法中正确处理发送失败的情况,并记录或处理相应的异常信息。

    2. 异步发送消息的超时设置过小:异步发送消息时,可以为发送操作设置一个超时时间。如果设置的超时时间过小,可能导致发送操作无法完成,从而产生错误。请确保为异步发送设置合理的超时时间,以允许足够的时间来完成消息的发送。

    3. 服务端或网络问题:异步消息与同步消息使用不同的发送方式和网络通信机制。如果出现服务端问题或网络故障,可能会导致异步消息发送失败。请确保RocketMQ服务端正常运行,并检查网络连接是否稳定。

    2023-07-28 13:42:13
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载