点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
Sink 的基本概念等内容
Sink的相关信息 配置与使用
Sink案例写入Redis
JDBC Sink
在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。
Flink JDBC Sink 简介
Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。
Flink 到 MySQL 的基本步骤
将数据流写入 MySQL 的步骤主要包括以下几点:
依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。
定义数据源和数据流:创建并处理数据流。
配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。
启动任务:将数据流写入 MySQL。
优化建议
在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:
批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。
连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。
索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。
数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。
案例:流数据下沉到MySQL
添加依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency>
编写代码
一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。
package icu.wzk; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkSqlTest { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Person> data = env.getJavaEnv().fromElements( new Person("wzk", 18, 1), new Person("icu", 20, 1), new Person("wzkicu", 13, 2) ); data.addSink(new MySqlSinkFunction()); env.execute(); } public static class MySqlSinkFunction extends RichSinkFunction<Person> { private PreparedStatement preparedStatement = null; private Connection connection = null; @Override public void open(Configuration parameters) throws Exception { String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"; String username = "hive"; String password = "hive@wzk.icu"; connection = DriverManager.getConnection(url, username, password); String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)"; preparedStatement = connection.prepareStatement(sql); } @Override public void invoke(Person value, Context context) throws Exception { preparedStatement.setString(1, value.getName()); preparedStatement.setInt(2, value.getAge()); preparedStatement.setInt(3, value.getSex()); preparedStatement.executeUpdate(); } @Override public void close() throws Exception { if (null != connection) { connection.close(); } if (null != preparedStatement) { preparedStatement.close(); } } } public static class Person { private String name; private Integer age; private Integer sex; public Person() { } public Person(String name, Integer age, Integer sex) { this.name = name; this.age = age; this.sex = sex; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public Integer getSex() { return sex; } public void setSex(Integer sex) { this.sex = sex; } } }
数据库配置
我们新建一张表出来,person表,里边有我们需要的字段。
运行代码
我们运行代码,等待运行结束。
查看结果
查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。
案例:写入到Kafka
编写代码
package icu.wzk; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.scala.DataStream; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class SinkKafkaTest { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0); String brokerList = "h121.wzk.icu:9092"; String topic = "flink_test"; FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema()); data.addSink(producer); env.execute("SinkKafkaTest"); } }
运行代码
启动一个 nc
nc -lk 9999 • 1
我们通过回车的方式,可以发送数据。
Java 程序中等待
查看结果
我们登录到服务器查看信息
./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test -
可以看到刚才的数据已经写入了: