01 引言
在前面的博客,我们已经对Flink
的程序模型里的Transformation
使用有了一定的了解了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
本文开始继续讲解Flink
程序模型对里面的Sink
。
02 Sink
贴上一张官方对于sink
的描述:
可以看到sink
分为如下几类:
- writeAsText():基于文件的sink
- print()/printToErr():基于控制台的sink
- addSink:自定义的sink
2.1 基于控制台和文件的Sink
2.1.1 API
API如下:
ds.print
:直接输出到控制台ds.printToErr()
: 直接输出到控制台,用红色ds.writeAsText
(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)
注意:在输出到path
的时候,可以在前面设置并行度,如果
- 并行度>1,则path为目录
- 并行度=1,则path为文件名
2.1.2 示例代码
/** * @author : YangLinWei * @createTime: 2022/3/7 4:29 下午 * <p> * 1.ds.print 直接输出到控制台 * 2.ds.printToErr() 直接输出到控制台,用红色 * 3.ds.collect 将分布式数据收集为本地集合 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE) */ public class SinkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.source //DataStream<String> ds = env.fromElements("hadoop", "flink"); DataStream<String> ds = env.readTextFile("data/input/words.txt"); //3.transformation //4.sink ds.print(); ds.printToErr(); ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2); //注意: //Parallelism=1为文件 //Parallelism>1为文件夹 //5.execute env.execute(); } }
可以看到生成了两个文件:
2.2 自定义Sink
需求:将Flink
集合中的数据通过自定义Sink
保存到MySQL
;
示例代码如下:
/** * sink-custom * * @author : YangLinWei * @createTime: 2022/3/7 4:34 下午 */ public class SinkDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStream<Student> studentDS = env.fromElements(new Student(null, "jim", 18)); //3.Transformation //4.Sink studentDS.addSink(new MySQLSink()); //5.execute env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } public static class MySQLSink extends RichSinkFunction<Student> { private Connection conn = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { //加载驱动,开启连接 //Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/big_data", "root", "123456"); String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)"; ps = conn.prepareStatement(sql); } @Override public void invoke(Student value, Context context) throws Exception { //给ps中的?设置具体值 ps.setString(1, value.getName()); ps.setInt(2, value.getAge()); //执行sql ps.executeUpdate(); } @Override public void close() throws Exception { if (conn != null) conn.close(); if (ps != null) ps.close(); } } }
查看数据库,可以看到添加了一条数据:
04 文末
本文主要讲解Flink
批流一体API
中的Sink
用法,谢谢大家的阅读,本文完!