java 多线程消费kfk队列消息案例

简介: 很久之前老师写的,记录一下,不然找不到了

package org.training.hadoop.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerExample
{

//config
public static Properties getConfig()
{
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "testGroup");
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    return props;
}

public void consumeMessage()
{
    // launch 3 threads to consume
    int numConsumers = 3;
    final String topic = "test1";
    final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
    final List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();
    for (int i = 0; i < numConsumers; i++) {
        KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);
        consumers.add(consumer);
        executor.submit(consumer);
    }

    //关闭线程并清理---------------------------
    //当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,清理
    Runtime.getRuntime().addShutdownHook(new Thread()
    {
        @Override
        public void run()
        {
            for (KafkaConsumerRunner consumer : consumers) {
                consumer.shutdown();  //关闭线程
            }
            executor.shutdown();
            try {
                //当前线程阻塞,直到
                //等所有已提交的任务(包括正在跑的和队列中等待的)执行完或者等超时时间到
                //或者线程被中断,抛出InterruptedExcepti
                executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

// Thread to consume kafka data
public static class KafkaConsumerRunner
        implements Runnable
{
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, String> consumer;
    private final String topic;

    public KafkaConsumerRunner(String topic)
    {
        Properties props = getConfig();
        consumer = new KafkaConsumer<String, String>(props);
        this.topic = topic;
    }

    public void handleRecord(ConsumerRecord record)
    {
        System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
    }

    public void run()
    {
        try {
            // subscribe
            consumer.subscribe(Arrays.asList(topic));
            while (!closed.get()) {
                //read data
                ConsumerRecords<String, String> records = consumer.poll(10000); //poll方法消费数据,心跳机制通知broker是否正常
                // Handle new records
                for (ConsumerRecord<String, String> record : records) {
                    handleRecord(record);  //打印消费数据
                }
            }
        }
        catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) {
                throw e;
            }
        }
        finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown()
    {
        closed.set(true);
        consumer.wakeup();
    }
}

public static void main(String[] args)
{
    KafkaConsumerExample example = new KafkaConsumerExample();
    example.consumeMessage();
}

}

相关文章
|
6天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
6天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
24 3
|
7天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
10天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
19 2
|
10天前
|
监控 Java 开发者
Java线程管理:守护线程与本地线程的深入剖析
在Java编程语言中,线程是程序执行的最小单元,它们可以并行执行以提高程序的效率和响应性。Java提供了两种特殊的线程类型:守护线程和本地线程。本文将深入探讨这两种线程的区别,并探讨它们在实际开发中的应用。
15 1
|
Java
Java中需要注意的一些案例
Java中需要注意的一些案例
120 0
|
21天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
12天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
10天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####