1 预定义Source
1.1 基于集合的Source
⚫ API
一般用于学习测试时编造数据时使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.generateSequence(开始,结束);
4.env.fromSequence(开始,结束);
⚫ 代码演示:
package cn.oldlu.source; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; /** * Author oldlu * Desc * 把本地的普通的Java集合/Scala集合变为分布式的Flink的DataStream集合! * 一般用于学习测试时编造数据时使用 * 1.env.fromElements(可变参数); * 2.env.fromColletion(各种集合); * 3.env.generateSequence(开始,结束); * 4.env.fromSequence(开始,结束); */ public class SourceDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //2.source // * 1.env.fromElements(可变参数); DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink"); // * 2.env.fromColletion(各种集合); DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink")); // * 3.env.generateSequence(开始,结束); DataStream<Long> ds3 = env.generateSequence(1, 10); //* 4.env.fromSequence(开始,结束); DataStream<Long> ds4 = env.fromSequence(1, 10); //3.Transformation //4.sink ds1.print(); ds2.print(); ds3.print(); ds4.print(); //5.execute env.execute(); } }
1.2 基于文件的Source
⚫ API
一般用于学习测试
env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
⚫ 代码演示:
package cn.oldlu.source; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author oldlu * Desc * 1.env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以 */ public class SourceDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //2.source // * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以 DataStream<String> ds1 = env.readTextFile("data/input/words.txt"); DataStream<String> ds2 = env.readTextFile("data/input/dir"); DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt"); DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz"); //3.Transformation //4.sink ds1.print(); ds2.print(); ds3.print(); ds4.print(); //5.execute env.execute(); } }
1.3 基于Socket的Source
一般用于学习测试
⚫ 需求:
1.在node1上使用nc -lk 9999 向指定端口发送数据nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据如果没有该命令可以下安装
yum install -y nc
2.使用Flink编写流处理应用程序实时统计单词数量
⚫ 代码实现:
package cn.oldlu.source; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * Author oldlu * Desc * SocketSource */ public class SourceDemo03 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //2.source DataStream<String> linesDS = env.socketTextStream("node1", 9999); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { //value就是一行行的数据 String[] words = value.split(" "); for (String word : words) { out.collect(word);//将切割处理的一个个的单词收集起来并返回 } } }); //3.2对集合中的每个单词记为1 DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { //value就是进来一个个的单词 return Tuple2.of(value, 1); } }); //3.3对数据按照单词(key)进行分组 //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0); KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.输出结果-sink result.print(); //5.触发执行-execute env.execute(); } }
2 自定义Source
2.1 随机生成数据
⚫ API
一般用于学习测试,模拟生成一些数据Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,
分类如下:
SourceFunction:非并行数据源(并行度只能=1)
RichSourceFunction:多功能非并行数据源(并行度只能=1)
ParallelSourceFunction:并行数据源(并行度能够>=1)
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)–后续学习的Kafka数据源使用的
就是该接口
⚫ 需求
每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
要求:
随机生成订单ID(UUID)
随机生成用户ID(0-2)
随机生成订单金额(0-100)
时间戳为当前系统时间
⚫ 代码实现
package cn.oldlu.source; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.Random; import java.util.UUID; /** * Author oldlu * Desc *需求 * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳) * 要求: * - 随机生成订单ID(UUID) * - 随机生成用户ID(0-2) * - 随机生成订单金额(0-100) * - 时间戳为当前系统时间 * * API * 一般用于学习测试,模拟生成一些数据 * Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下: * SourceFunction:非并行数据源(并行度只能=1) * RichSourceFunction:多功能非并行数据源(并行度只能=1) * ParallelSourceFunction:并行数据源(并行度能够>=1) * RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口 */ public class SourceDemo04_Customer { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //2.Source DataStream<Order> orderDS = env .addSource(new MyOrderSource()) .setParallelism(2); //3.Transformation //4.Sink orderDS.print(); //5.execute env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { private String id; private Integer userId; private Integer money; private Long createTime; } public static class MyOrderSource extends RichParallelSourceFunction<Order> { private Boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag){ Thread.sleep(1000); String id = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(101); long createTime = System.currentTimeMillis(); ctx.collect(new Order(id,userId,money,createTime)); } } //取消任务/执行cancle命令的时候执行 @Override public void cancel() { flag = false; } } }
2.2 MySQL
⚫ 需求:
实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据那么现在先完成一个简单的需求:从MySQL中实时加载数据
要求MySQL中的数据有变化,也能被实时加载出来
⚫ 准备数据
CREATE TABLE `t_student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8; INSERT INTO `t_student` VALUES ('1', 'jack', '18'); INSERT INTO `t_student` VALUES ('2', 'tom', '19'); INSERT INTO `t_student` VALUES ('3', 'rose', '20'); INSERT INTO `t_student` VALUES ('4', 'tom', '19'); INSERT INTO `t_student` VALUES ('5', 'jack', '18'); INSERT INTO `t_student` VALUES ('6', 'rose', '20');
⚫ 代码实现:
package cn.oldlu.source; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.concurrent.TimeUnit; /** * Author oldlu * Desc * 需求: * 实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据 * 那么现在先完成一个简单的需求: * 从MySQL中实时加载数据 * 要求MySQL中的数据有变化,也能被实时加载出来 */ public class SourceDemo05_Customer_MySQL { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1); //3.Transformation //4.Sink studentDS.print(); //5.execute env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } public static class MySQLSource extends RichParallelSourceFunction<Student> { private Connection conn = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { //加载驱动,开启连接 //Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root"); String sql = "select id,name,age from t_student"; ps = conn.prepareStatement(sql); } private boolean flag = true; @Override public void run(SourceContext<Student> ctx) throws Exception { while (flag) { ResultSet rs = ps.executeQuery(); while (rs.next()) { int id = rs.getInt("id"); String name = rs.getString("name"); int age = rs.getInt("age"); ctx.collect(new Student(id, name, age)); } TimeUnit.SECONDS.sleep(5); } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { if (conn != null) conn.close(); if (ps != null) ps.close(); } } }