开发者社区> 问答> 正文

目前的实时处理模式我符合我们的业务形态,请问消息队列支持rocketmq的拉取模式吗?

已解决

展开
收起
游客xdzxzp3u5ysso 2018-03-04 04:36:43 1491 0
1 条回答
写回答
取消 提交回答
  • 采纳回答

    详细解答可以参考官方帮助文档

    本文介绍如何通过 MQ SDK 使用.NET 语言进行消息订阅。

    说明:

    订阅方式

    MQ 支持以下两种订阅方式:

    • 集群订阅:同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

      1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
      2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
    • 广播订阅:同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

      1. // 广播订阅方式设置
      2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);

    示例代码

    1. using System;
    2. using System.Collections.Generic;
    3. using System.Linq;
    4. using System.Text;
    5. using System.Runtime.InteropServices;
    6. using ons;
    7. namespace ons
    8. {
    9. //pushConsumer 拉取到消息后,会主动调用该实例的 consumer 函数
    10. public class MyMsgListener : MessageListener
    11. {
    12. public MyMsgListener()
    13. {
    14. }
    15. ~MyMsgListener()
    16. {
    17. }
    18. public override Action consume(Message value, ConsumeContext context)
    19. {
    20. // Message 包含了消费到的消息,通过 getBody 接口可以拿到消息体
    21. Console.WriteLine(value.getBody());
    22. /*
    23. 所有中文编码相关问题都在 SDK 压缩包包含的文档里做了说明,请仔细阅读
    24. */
    25. return ons.Action.CommitMessage;
    26. }
    27. }
    28. class onscsharp
    29. {
    30. static void Main(string[] args)
    31. {
    32. //pushConsumer 创建和工作需要的参数,必须输入
    33. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
    34. factoryInfo.setFactoryProperty(factoryInfo.ConsumerId, "XXX");//您在 MQ 控制台创建的 Consumer ID
    35. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "XXX");//您在 MQ 控制台创建的 Topic
    36. factoryInfo.setFactoryProperty(factoryInfo.AccessKey,"xx");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    37. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "xxxx");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    38. // 集群订阅方式 (默认)
    39. // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
    40. // 广播订阅方式
    41. // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
    42. //创建 consumer 用于消费消息
    43. ONSFactory onsfactory = new ONSFactory();
    44. PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
    45. //给 consumer 注册消费回调,一旦有消息就会触发回调函数
    46. MessageListener msgListener = new MyMsgListener();
    47. pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", msgListener);
    48. //启动 consumer
    49. pConsumer.start();
    50. //consumer 启动后,会异步拉取消息。拉取到消息后,回调 MyMsgListener 实例的 consumer 函数,将消息体通过参数传递给 consumer
    51. // 这里可以继续做业务相关的逻辑处理,确定消费完成后,调用 shutdown 函数,释放资源。 应用退出的时候也需要调用 shutdown 函数。
    52. pConsumer.shutdown();
    53. }
    54. }
    55. }
    2018-03-05 03:17:43
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
企业互联网架构之消息队列 立即下载
基于消息队列RocketMQ的大型分布式应用上云最佳实践 立即下载
云原生消息队列Apache RocketMQ 立即下载