代码仓库
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
内容介绍
上节我们已经实现了,对Kafka数据的消费和计算,最终把结果输出到了控制台上。如下图:
Kafka In Docker
TestKafkaProducer
将数据写入到Kafka中的效果
FlinkConsumer
Flink消费Kafka的效果如下图,已经按照我们的需求进行计算了。
这节内容
本节依然使用Flink对Kafka进行消费,但与上节不同的是(上节将结果输出到控制台上),本节将把Flink计算的结果输出到Redis中进行保存(当然也可以存储到别的地方,这里以Redis为例)。
pom.xml
重点关注 flink-connector-redis_2.11 这个包。这是Redis相关的依赖。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flink-demo-01</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.2</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.0</version> </dependency> </dependencies> </project>
KafkaProducer.java
生产数据存入到Kafka这种
package icu.wzk.demo05; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class TestKafkaProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "0.0.0.0:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 500; i++) { String key = "key-" + i; String value = "value-" + i; ProducerRecord<String, String> record = new ProducerRecord<>("test", key, value); producer.send(record); System.out.println("send: " + key); Thread.sleep(200); } producer.close(); } }
StartApp
Flink消费Kafka,计算后写入到Redis中。
FlinkJedisPoolConfig
连接池的配置
MyRedisMapper
自定义的Mapper,需要实现RedisMapper
完整代码
package icu.wzk.demo05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import java.util.Properties; public class StartApp { private static final String KAFKA_SERVER = "0.0.0.0:9092"; private static final Integer KAFKA_PORT = 9092; private static final String KAFKA_TOPIC = "test"; private static final String REDIS_SERVER = "0.0.0.0"; private static final Integer REDIS_PORT = 6379; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", String.format("%s:%d", KAFKA_SERVER, KAFKA_PORT)); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), properties); DataStreamSource<String> data = env.addSource(consumer); SingleOutputStreamOperator<Tuple2<String, String>> wordData = data.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String value) throws Exception { return new Tuple2<>("l_words", value); } }); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig .Builder() .setHost(REDIS_SERVER) .setPort(REDIS_PORT) .build(); RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper()); wordData.addSink(redisSink); env.execute(); } public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } @Override public String getKeyFromData(Tuple2<String,String> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String,String> data) { return data.f1; } } }