1 实时流式计算
1.1 概念
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
1.2 应用场景
日志分析
网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
大屏看板统计
可以实时的查看网站注册数量,订单数量,购买数量,金额等。
公交实时数据
可以随时更新公交车方位,计算多久到达站牌等
实时文章分值计算
头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。
1.3 技术方案选型
Hadoop
Apche Storm
Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
Kafka Stream
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。
2 Kafka Stream
2.1 概述
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
除了Kafka外,无任何外部依赖
充分利用Kafka分区机制实现水平扩展和顺序性保证
通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
支持正好一次处理语义
提供记录级的处理能力,从而实现毫秒级的低延迟
支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
2.2 Kafka Streams的关键概念
(1)Stream处理拓扑
流是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
流处理器是处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。
(2)在拓扑中有两个特别的处理器:
源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
2.3 KStream&KTable
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->("alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回4了alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
(3)KTable
KTable传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(update)
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回3了alice。为什么?因为第二条数据记录将被视为先前记录的更新。
KStream - 每个新数据都包含了部分信息。
KTable - 每次更新都合并到原记录上。
2.4 Kafka Stream入门案例编写
(1)引入依赖
在之前的kafka-demo工程的pom文件中引入
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.client.version}</version> <exclusions> <exclusion> <artifactId>connect-json</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency>
(2)创建类
package com.oldlu.kafka.simple; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.util.Arrays; import java.util.Properties; /** * 需求: * 接收kafka消息内容并计算消息内单词的个数 * 如: * hello kafka stareams * hello oldlu kafka * hello beijing oldlu kafka * * 结果: * hello 3 * kafka 3 * streams 1 * oldlu 2 * beijing 1 */ public class KafkaStreamFastStart { public static void main(String[] args) { //kafka配置信息 Properties prop = new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-faststart"); //stream构建器 StreamsBuilder builder = new StreamsBuilder(); //流式计算 group(builder); //创建kafkaStream KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),prop); //开启kafka流计算 kafkaStreams.start(); } /** * 实时流式计算 * @param builder */ private static void group(StreamsBuilder builder) { //接收上游处理器的消息 KStream<String, String> stream = builder.stream("input_topic"); KStream<String, String> map = stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { /** * 把消息中的词组,转换为一个一个的单词放到集合中 * @param value * @return */ @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { /** * 把消息的key,重新赋值,目前消息的key,就是一个个的单词,把单词作为key进行聚合 * @param key * @param value * @return */ @Override public KeyValue<String, String> apply(String key, String value) { return new KeyValue<>(value, value); } }) //根据key进行分组 目前的key 就是value,就是一个个的单词 .groupByKey() //聚合的时间窗口 多久聚合一次 .windowedBy(TimeWindows.of(10000)) //聚合 求单词的个数,调用count后,消息的vlaue是聚合单词后的统计数值 是一个long类型 //Materialized.as("count-article-num-001") 是当前消息的状态值,不重复即可 .count(Materialized.as("count-article-num-001")) //转换成 Kstream .toStream() //把处理后的key和value转成string .map((key, value) -> { return new KeyValue<>(key.key().toString(), value.toString()); }); //处理后的结果,发送给下游处理器 map.to("out_topic"); } }
(3)测试
准备
使用生产者在topic为:input_topic中发送多条消息
使用消费者接收topic为:out_topic
①生产者代码修改ProducerFastStart
package com.oldlu.kafka.simple; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.protocol.types.Field; import java.util.Properties; /** * 消息生产者 */ public class ProducerFastStart { private static final String INPUT_TOPIC="input_topic"; public static void main(String[] args) { //添加kafka的配置信息 Properties properties = new Properties(); //配置broker信息 properties.put("bootstrap.servers","192.168.200.130:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.RETRIES_CONFIG,10); //生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); try { //封装消息 for (int i = 0; i < 10; i++) { if (i % 2 == 0) { ProducerRecord<String,String> record = new ProducerRecord<String, String>(INPUT_TOPIC,i+"","hello shanghai kafka stream hello"); //发送消息 producer.send(record); System.out.println("发送消息:"+record); }else { ProducerRecord<String,String> record = new ProducerRecord<String, String>(INPUT_TOPIC,i+"","helloworld kafka stream"); //发送消息 producer.send(record); System.out.println("发送消息:"+record); } } }catch (Exception e){ e.printStackTrace(); } //关系消息通道 producer.close(); } }
②消费者ConsumerFastStart
package com.oldlu.kafka.simple; import com.sun.scenario.effect.Offset; import jdk.nashorn.internal.runtime.logging.Logger; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Properties; /** * 消息消费者 */ public class ConsumerFastStart { private static final String OUT_TOPIC="out_topic"; public static void main(String[] args) { //添加配置信息 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //设置分组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //订阅主题 consumer.subscribe(Collections.singletonList(OUT_TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key()+":"+record.value()); /*try { //手动提交偏移量 consumer.commitSync(); }catch (CommitFailedException e){ e.printStackTrace(); System.out.println("记录错误信息为:"+e); }*/ } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.out.println("记录当前错误信息时提交的偏移量"+map+",异常信息为:"+e); } } }); } } }
结果:
通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
2.5 SpringBoot集成Kafka Stream
从资料文件夹中把提供好的4个类拷贝到项目的config目录下
当前kafka-demo项目需要添加lombok的依赖包
<properties> <lombok.version>1.18.8</lombok.version> </properties> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency>
(1)自定配置参数
/** * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数 */ @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; /** * 重新定义默认的KafkaStreams配置属性,包括: * 1、服务器地址 * 2、应用ID * 3、流消息的副本数等配置 * @return */ @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 消息副本数量 props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000); props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase()); return new KafkaStreamsConfiguration(props); } }
修改application.yml文件,在最下方添加自定义配置
kafka: hosts: 192.168.200.130:9092 group: ${spring.application.name}
(2)定义监听接口
/** * 流数据的监听消费者实现的接口类,系统自动会通过 * KafkaStreamListenerFactory类扫描项目中实现该接口的类, * 并注册为流数据的消费端。 * * 其中泛型可是KStream或KTable * @param <T> */ public interface KafkaStreamListener<T> { // 监听的类型 String listenerTopic(); // 处理结果发送的类 String sendTopic(); // 对象处理逻辑 T getService(T stream); }
(3)KafkaStream自动处理包装类
/** * KafkaStream自动处理包装类 */ public class KafkaStreamProcessor { // 流构建器 StreamsBuilder streamsBuilder; private String type; KafkaStreamListener listener; public KafkaStreamProcessor(StreamsBuilder streamsBuilder,KafkaStreamListener kafkaStreamListener){ this.streamsBuilder = streamsBuilder; this.listener = kafkaStreamListener; this.parseType(); Assert.notNull(this.type,"Kafka Stream 监听器只支持kstream、ktable,当前类型是"+this.type); } /** * 通过泛型类型自动注册对应类型的流处理器对象 * 支持KStream、KTable * @return */ public Object doAction(){ if("kstream".equals(this.type)) { KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST)); stream=(KStream)listener.getService(stream); stream.to(listener.sendTopic()); return stream; }else{ KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST)); table = (KTable)listener.getService(table); table.toStream().to(listener.sendTopic()); return table; } } /** * 解析传入listener类的泛型类 */ private void parseType(){ Type[] types = listener.getClass().getGenericInterfaces(); if(types!=null){ for (int i = 0; i < types.length; i++) { if( types[i] instanceof ParameterizedType){ ParameterizedType t = (ParameterizedType)types[i]; String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase(); if(name.contains("org.apache.kafka.streams.kstream.kstream")||name.contains("org.apache.kafka.streams.kstream.ktable")){ this.type = name.substring(0,name.indexOf('<')).replace("org.apache.kafka.streams.kstream.","").trim(); break; } } } } } }
(4)KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程
@Component public class KafkaStreamListenerFactory implements InitializingBean { Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class); @Autowired DefaultListableBeanFactory defaultListableBeanFactory; /** * 初始化完成后自动调用 */ @Override public void afterPropertiesSet() { Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class); for (String key : map.keySet()) { KafkaStreamListener k = map.get(key); KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k); String beanName = k.getClass().getSimpleName()+"AutoProcessor" ; //注册baen,并且执行doAction方法 defaultListableBeanFactory.registerSingleton(beanName,processor.doAction()); logger.info("add kafka stream auto listener [{}]",beanName); } } }
(5)手动创建监听器
1,该类需要实现KafkaStreamListener接口
2,listenerTopic方法返回需要监听的topic
3,sendTopic方法返回需要处理完后发送的topic
4,getService方法,主要处理流数据
package com.oldlu.kafka.stream.listerer; import com.oldlu.kafka.config.KafkaStreamListener; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.*; import org.springframework.stereotype.Component; import java.util.Arrays; /** * KafkaStreamListener的泛型是固定的,有两种泛型可以选择 * KTable * KStream */ @Component public class StreamHandlerListener implements KafkaStreamListener<KStream<String,String>> { /** * 在哪里接收消息 源处理器 * @return */ @Override public String listenerTopic() { return "input_topic"; } /** * 计算完成后的结果发送到什么位置 下游处理器 * @return */ @Override public String sendTopic() { return "out_topic"; } @Override public KStream<String, String> getService(KStream<String, String> stream) { //计算 return stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { /** * 把消息中的词组,转换为一个一个的单词放到集合中 * @param value * @return */ @Override public Iterable<String> apply(String value) { System.out.println("消息的value:"+value);//hello kafka stareams String[] strings = value.split(" "); return Arrays.asList(strings); } }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { /** * 把消息的key,重新赋值,目前消息的key就是单词 * @param key * @param value * @return */ @Override public KeyValue<String, String> apply(String key, String value) { return new KeyValue<>(value,value); } }).groupByKey() //时间聚合窗口 .windowedBy(TimeWindows.of(5000)) //消息的value就是聚合单词后的统计数值,long类型 .count(Materialized.as("count-word-num-0001")) //转换为Kstream .toStream() //把处理后的key和value转换String .map((key,value)->{ return new KeyValue<>(key.key().toString(),value.toString()); }); } }
测试:
启动微服务,正常发送消息,可以正常接收到消息