集群订阅即某个消费者集群只消费指定的 Topic,而不是消费所有 Topic。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Runtime.InteropServices;
- using ons;
- namespace ons
- {
- //pushConsumer拉取到消息后,会主动调用该实例的consumer函数
- public class MyMsgListener : MessageListener
- {
- public MyMsgListener()
- {
- }
- ~MyMsgListener()
- {
- }
- public override Action consume(Message value, ConsumeContext context)
- {
- /*
- 所有中文编码相关问题都在SDK压缩包包含的文档里做了说明,请仔细阅读
- */
- return ons.Action.CommitMessage;
- }
- }
- class onscsharp
- {
- static void Main(string[] args)
- {
- //pushConsumer创建和工作需要的参数,必须输入
- ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
- factoryInfo.setFactoryProperty(factoryInfo.ConsumerId, "XXX");//您在MQ控制台申请的Consumer ID
- factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "XXX");//您在MQ控制台申请的Topic
- factoryInfo.setFactoryProperty(factoryInfo.AccessKey,"xx");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
- factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "xxxx");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
- //create consumer
- ONSFactory onsfactory = new ONSFactory();
- PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
- //register msg listener and subscribe msg topic
- MessageListener msgListener = new MyMsgListener();
- pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", msgListener);
- //start consumer
- pConsumer.start();
- //consumer启动后,会自动拉取消息,拉取到消息后,会自动调用MyMsgListener实例的consume函数;
- //确定消费完成后,调用shutdown函数;在应用退出前,必须销毁Consumer 对象,否则会导致内存泄露等问题
- pConsumer.shutdown();
- }
- }
- }