package org.training.hadoop.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaConsumerExample
{
//config
public static Properties getConfig()
{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
public void consumeMessage()
{
// launch 3 threads to consume
int numConsumers = 3;
final String topic = "test1";
final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
final List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();
for (int i = 0; i < numConsumers; i++) {
KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);
consumers.add(consumer);
executor.submit(consumer);
}
//关闭线程并清理---------------------------
//当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,清理
Runtime.getRuntime().addShutdownHook(new Thread()
{
@Override
public void run()
{
for (KafkaConsumerRunner consumer : consumers) {
consumer.shutdown(); //关闭线程
}
executor.shutdown();
try {
//当前线程阻塞,直到
//等所有已提交的任务(包括正在跑的和队列中等待的)执行完或者等超时时间到
//或者线程被中断,抛出InterruptedExcepti
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// Thread to consume kafka data
public static class KafkaConsumerRunner
implements Runnable
{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String, String> consumer;
private final String topic;
public KafkaConsumerRunner(String topic)
{
Properties props = getConfig();
consumer = new KafkaConsumer<String, String>(props);
this.topic = topic;
}
public void handleRecord(ConsumerRecord record)
{
System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
}
public void run()
{
try {
// subscribe
consumer.subscribe(Arrays.asList(topic));
while (!closed.get()) {
//read data
ConsumerRecords<String, String> records = consumer.poll(10000); //poll方法消费数据,心跳机制通知broker是否正常
// Handle new records
for (ConsumerRecord<String, String> record : records) {
handleRecord(record); //打印消费数据
}
}
}
catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
}
finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown()
{
closed.set(true);
consumer.wakeup();
}
}
public static void main(String[] args)
{
KafkaConsumerExample example = new KafkaConsumerExample();
example.consumeMessage();
}
}