单线程消费数据适合在本地跑。
参考文档: http://kafka.apache.org/documentation.html
对于一个topic,可以发送给若干个partitions. partition在创建topic的时候就指定分区的数目。
分区、Offset、消费线程、group.id的关系
1)一组(类)消息通常由某个topic来归类,我们可以把这组消息“分发”给若干个分区(partition),每个分区的消息各不相同;
2)每个分区都维护着他自己的偏移量(Offset),记录着该分区的消息此时被消费的位置,由consumer自己上报到zk;
3)一个消费线程可以对应若干个分区,但一个分区只能被具体某一个消费线程消费;
4)group.id用于标记某一个消费组,每一个消费组都会被记录他在某一个分区的Offset,即不同consumer group针对同一个分区,都有“各自”的偏移量。
必须要注意的一点是,必须确认kafka的server.properties里面的一个属性num.partitions必须被设置成大于1的值,否则消费端再怎么折腾,也用不了多线程哦。测试环境设置为5个partition. 因为只有一个分区是不存在多线程同时消费的情况。
看一下消费者的代码便于理解
//通过实现runnable接口实现多线程
public class KafkaConsumer implements Runnable { String ; KafkaStream<[], []> ; KafkaConsumer(String title, KafkaStream<[], []> stream) { .= title; .= stream; } run() { System..println(+ ); ConsumerIterator<[], []> it = .iterator(); (it.hasNext()) { MessageAndMetadata<[], []> data = it.next(); String topic = data.topic(); partition = data.partition(); offset = data.offset(); String msg = String(data.message()); System..println(String.( , , topic, partition, offset, msg)); } System..println(String.(, )); } main(String[] args) { Properties props = Properties(); props.put(, ); props.put(, ); props.put(, );props.put(, ); ConsumerConfig config = ConsumerConfig(props); String topic1 = ; Map<String, Integer> topicCountMap = HashMap<String, Integer>(); topicCountMap.put(topic1, ); ConsumerConnector consumerConn = Consumer.(config); Map<String, List<KafkaStream<[], []>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); List<KafkaStream<[], []>> streams = topicStreamsMap.get(topic1); ExecutorService executor = Executors.(); (i = ; i < streams.size(); i++) executor.execute(KafkaConsumer(+ (i + ), streams.get(i))); }
}
// producer 端的代码 public class KafkaConsumer implements Runnable { public String title; public KafkaStream<byte[], byte[]> stream; public KafkaConsumer(String title, KafkaStream<byte[], byte[]> stream) { this.title = title; this.stream = stream; } @Override public void run() { System.out.println("开始运行 " + title); ConsumerIterator<byte[], byte[]> it = stream.iterator(); /** * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞 * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false * */ while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); String topic = data.topic(); //主题 int partition = data.partition(); //分区 long offset = data.offset(); //偏移量 String msg = new String(data.message()); //数据 // System.out.println("Consumer:["+title+"], Topic: ["+); System.out.println(String.format( "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]", title, topic, partition, offset, msg)); //String keyWord=null; // try { // keyWord = URLDecoder.decode(msg, "UTF-8"); // System.out.println("keyWord:"+keyWord); // } catch (UnsupportedEncodingException e) { // e.printStackTrace(); // } // System.out.println(String.format( // "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], keyWord: [%s]", // title, topic, partition, offset, keyWord)); } System.out.println(String.format("Consumer: [%s] exiting ...", title)); } public static void main(String[] args) { Properties props = new Properties(); props.put("group.id", "test"); props.put("zookeeper.connect", "xxx:2181,xxx:2181"); props.put("auto.offset.reset", "smallest");//从最后开始消费数据 props.put("auto.commit.interval.ms", "1000"); //创建一个消费者配置文件 ConsumerConfig config = new ConsumerConfig(props); String topic1 = "test_kafka"; //String topic2 = "newTopic01"; //定义一个map集合,里面封装了需要消费的主题的分区数 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic1, 4); // topicCountMap.put(topic2, 5); //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出 ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config); //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流 Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); //取出 `test_kafka` 对应的 streams List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1); //创建一个容量为4的线程池 ExecutorService executor = Executors.newFixedThreadPool(4); //创建20个consumer threadsfor (int i = 0; i < streams.size(); i++)
executor.execute(new KafkaConsumer("消费者" + (i + 1), streams.get(i))); }
//运行结果: 证明了一个分区只能被一个线程消费,但一个消费线程可以消费多个分区的数据!虽然我指定了线程池的线程数为5,但并不是所有的线程都去消费了,这当然跟线程池的调度有关系了。并不是一个消费线程对应地去消费一个分区的数据。线程由zookeeper来声明它拥有1个或多个分区;真正有数据存在的分区是由生产发送端来决定,即使你的kafka设置了10个分区,消费端在消费的时候,消费线程虽然会根据zookeeper的某种机制来声明它所消费的分区,但实际消费过程中,还是会消费真正存在数据的分区;
建议设置:实际发送分区数(一般就等于设置的分区数)= topicCountMap的value = 线程池大小 否则极易出现reblance的异常;
本文转自 ChinaUnicom110 51CTO博客,原文链接:http://blog.51cto.com/xingyue2011/1941009