Kafka JAVA客户端代码示例--高级应用

简介:

什么时间使用高级应用?

  1. 针对一个消息读取多次
  2. 在一个process中,仅仅处理一个topic中的一组partitions
  3. 使用事务,确保每个消息只被处理一次

使用高级应用(调用较底层函数)的缺点?

    SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)

  1. 在应用程序中跟踪上次消息处理的offset
  2. 确定一个topic partition的lead broker
  3. 手工处理broker leander的改变

使用底层函数(SimpleConsumer)开发的步骤

  1.  通过active broker,确定topic partition的lead broker
  2. 确定topic partition的replicat brokers
  3. 根据需要,创建数据请求
  4. 抓取数据
  5. 识别lead brokder改变并进行恢复

代码示例

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 * @author Fung
 *
 */
public class ConsumerSimpleExample {
	public static void main(String arg[]) {
		String[] args={"20","page_visits","2","172.168.63.233","9092"};
		ConsumerSimpleExample example = new ConsumerSimpleExample();
		long maxReads = Long.parseLong(args[0]);
		String topic = args[1];
		int partition = Integer.parseInt(args[2]);
		List<String> seeds = new ArrayList<String>();
		seeds.add(args[3]);
		int port = Integer.parseInt(args[4]);
		try {
			example.run(maxReads, topic, partition, seeds, port);
		} catch (Exception e) {
			System.out.println("Oops:" + e);
			e.printStackTrace();
		}
	}

	private List<String> m_replicaBrokers = new ArrayList<String>();

	public ConsumerSimpleExample() {
		m_replicaBrokers = new ArrayList<String>();
	}

	public void run(long a_maxReads, String a_topic, int a_partition,
			List<String> a_seedBrokers, int a_port) throws Exception {
		// find the meta data about the topic and partition we are interested in
		//
		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
				a_partition);
		if (metadata == null) {
			System.out
					.println("Can't find metadata for Topic and Partition. Exiting");
			return;
		}
		if (metadata.leader() == null) {
			System.out
					.println("Can't find Leader for Topic and Partition. Exiting");
			return;
		}
		String leadBroker = metadata.leader().host();
		String clientName = "Client_" + a_topic + "_" + a_partition;

		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
				100000, 64 * 1024, clientName);
		long readOffset = getLastOffset(consumer, a_topic, a_partition,
				kafka.api.OffsetRequest.LatestTime(), clientName);

		int numErrors = 0;
		while (a_maxReads > 0) {
			if (consumer == null) {
				consumer = new SimpleConsumer(leadBroker, a_port, 100000,
						64 * 1024, clientName);
			}
			// Note: this fetchSize of 100000 might need to be increased if
			// large batches are written to Kafka
			FetchRequest req = new FetchRequestBuilder().clientId(clientName)
					.addFetch(a_topic, a_partition, readOffset, 100000).build();
			FetchResponse fetchResponse = consumer.fetch(req);

			if (fetchResponse.hasError()) {
				numErrors++;
				// Something went wrong!
				short code = fetchResponse.errorCode(a_topic, a_partition);
				System.out.println("Error fetching data from the Broker:"
						+ leadBroker + " Reason: " + code);
				if (numErrors > 5)
					break;
				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
					// We asked for an invalid offset. For simple case ask for
					// the last element to reset
					readOffset = getLastOffset(consumer, a_topic, a_partition,
							kafka.api.OffsetRequest.LatestTime(), clientName);
					continue;
				}
				consumer.close();
				consumer = null;
				leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
						a_port);
				continue;
			}
			numErrors = 0;

			long numRead = 0;
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
					a_topic, a_partition)) {
				long currentOffset = messageAndOffset.offset();
				if (currentOffset < readOffset) {
					System.out.println("Found an old offset: " + currentOffset
							+ " Expecting: " + readOffset);
					continue;
				}
				readOffset = messageAndOffset.nextOffset();
				ByteBuffer payload = messageAndOffset.message().payload();
				byte[] bytes = new byte[payload.limit()];
				payload.get(bytes);
				System.out.println(String.valueOf(messageAndOffset.offset())
						+ ": " + new String(bytes, "UTF-8"));
				numRead++;
				a_maxReads--;
			}

			if (numRead == 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		if (consumer != null)
			consumer.close();
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	private String findNewLeader(String a_oldLeader, String a_topic,
			int a_partition, int a_port) throws Exception {
		for (int i = 0; i < 3; i++) {
			boolean goToSleep = false;
			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
					a_topic, a_partition);
			if (metadata == null) {
				goToSleep = true;
			} else if (metadata.leader() == null) {
				goToSleep = true;
			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
					&& i == 0) {
				// first time through if the leader hasn't changed give
				// ZooKeeper a second to recover
				// second time, assume the broker did recover before failover,
				// or it was a non-Broker issue
				//
				goToSleep = true;
			} else {
				return metadata.leader().host();
			}
			if (goToSleep) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		System.out
				.println("Unable to find new leader after Broker failure. Exiting");
		throw new Exception(
				"Unable to find new leader after Broker failure. Exiting");
	}

	private PartitionMetadata findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic, int a_partition) {
		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup");
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}
		return returnMetaData;
	}
}

参考

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

目录
相关文章
|
2月前
|
Java 开发工具
【Azure Storage Account】Java Code访问Storage Account File Share的上传和下载代码示例
本文介绍如何使用Java通过azure-storage-file-share SDK实现Azure文件共享的上传下载。包含依赖引入、客户端创建及完整示例代码,助你快速集成Azure File Share功能。
377 5
|
2月前
|
Java 数据处理 API
为什么你的Java代码应该多用Stream?从循环到声明式的思维转变
为什么你的Java代码应该多用Stream?从循环到声明式的思维转变
265 115
|
2月前
|
安全 Java 编译器
为什么你的Java代码需要泛型?类型安全的艺术
为什么你的Java代码需要泛型?类型安全的艺术
189 98
|
2月前
|
Java 编译器 API
java最新版和java8的区别,用代码展示
java最新版和java8的区别,用代码展示
293 43
|
2月前
|
安全 Java 容器
告别空指针噩梦:Optional让Java代码更优雅
告别空指针噩梦:Optional让Java代码更优雅
391 94
|
2月前
|
安全 Java 容器
告别繁琐判空:Optional让你的Java代码更优雅
告别繁琐判空:Optional让你的Java代码更优雅
|
安全 JavaScript 前端开发
java基础示例
这篇文章将为大家详细讲解有关java基础之方法的示例分析
|
2月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
196 1
|
2月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
216 1
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案