Flink / Scala - DataSource 之 DataSet 获取数据总结

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。

一.引言

image.gif编辑

数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。创建数据集的机制一般抽象在 InputFormat 后面,这里有点类似 spark 的 sparkContext,Flink 的 ExecutionEnvironment 也提供了很多快捷的方法。主要分为下面几大类,基于文件的和基于集合的 :

File-Based 基于文件        
readTextFile(path) TextInputFormat -读取文件行并返回字符串。
readTextFileWithValue(path)  TextValueInputFormat -读取文件行并返回StringValues。StringValues是可变字符串。
readCsvFile(path) CsvInputFormat -解析逗号(或其他char)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。
readFileOfPrimitives(path, Class) 

PrimitiveInputFormat—解析以新行(或其他字符序列)分隔的原始数据类型的文件,如String或Integer。

readFileOfPrimitives(path, delimiter, Class) PrimitiveInputFormat—使用给定的分隔符,解析以新行(或其他字符序列)分隔的原始数据类型(如String或Integer)的文件。
Collection-Based 基于集合
fromCollection(Collection) 从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型,当然也可以是 scala 的。
fromCollection(Iterator, Class) 从迭代器创建一个数据集。该类指定迭代器返回的元素的数据类型。
fromElements(T…)

根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。

fromParallelCollection(SplittableIterator, Class) 

从一个迭代器中并行创建一个数据集。该类指定迭代器返回的元素的数据类型。

generateSequence(from, to) 在给定的间隔内并行生成数字序列。
Generic 泛型

readFile(inputFormat, path) 

FileInputFormat - 接受文件输入格式。

createInput(inputFormat) / inputFormat  接受通用的输入格式。

Tips:

介绍前这里先初始化好执行的 ExecutionEnvironment ,后面的示例都将基于改 env 实现。注意最下面 import 的隐式转换,Flink 基于 Scala 时很多方法都需要隐式转换,否则 api 执行会报错。

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

image.gif

二.FileBased 基于文件

1.readTextFile

Api readTextFile 和 spark 的 textFile 很像,它可以读取本地的文件,也可以读取 HDFS 或者集群上的数据,并支持自动识别一些压缩格式文件,就像 textFile 可以直接读取 gz 一样。

val textLinesFromLocalFile: DataSet[String] = env.readTextFile("./myfile")
    val textLinesFromHdfs: DataSet[String] = env.raedTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
    //触发程序执行
    textLines.print()

image.gif

对于本地和集群的文件都可以直接调用执行,截止到 Flink v1.14.3 该接口支持以下压缩格式:

Compressed Method 压缩方法 File Extensions 文件扩展名 Parallelizable 可压缩
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no
ZStandart .zst no

2.readTextFileWithValue

从文本文件中读取数据并以StringValue类型返回,StringValue类型为可变字符串。此方法和readTextFile方法类似,只不过是制定了数据类型,返回的不是 DataSet[String] 而是 DataSet[StringValue]

val textLines = env.readTextFileWithValue("./yourfile")
    //触发程序执行
    textLines.print()

image.gif

3.readCsvFile

csv 文件内容如下:

1,2,3,4,5
2,3,4,5,6
3,4,5,6,7
4,5,6,7,8
5,6,7,8,9

image.gif

A.基础读法

[(隐式转换)] 中指定了 csv 文件中各个元素的数据类型

val csvInput = env.readCsvFile[(String,String,String,String,String)]("./info.csv")
    csvInput.print()

image.gif

B.读取指定行

includedFields 使用数组,其中数字代表的含义为要保留的数据列对应的列数,这里还支持 ignoreFirstLine = true 参数可以去除带表头的 csv 文件。

val csvInput2 = env.readCsvFile[(String, Double)]("./info.csv", 
        includedFields = Array(0, 3))
    csvInput2.print()

image.gif

C.读取生成指定类

scala 支持 caseClass 快速定义数据类,这里 [] 内代表返回的数据类型,pojoFields 指定对应列的 colName, 由于只给出了三列而原始数据有五列,所以只返回对应三列的数据

case class Person(a: String, b: String, c: String)
    val csvInput3 = env.readCsvFile[Person]("./info.csv",
          pojoFields = Array("name", "age", "gender"))
    csvInput3.print()

image.gif

Person(4,5,6)
Person(1,2,3)
Person(5,6,7)
Person(2,3,4)
Person(3,4,5)

image.gif

4.readFileOfPrimitives

读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet集合。这里第一个参数为对应文件 path,第二个参数为分割符,以上面的 csv 文件数据为例,读取文件时会自动分割原始数据,得到类似的 DateSet[1,2,3,4,5,6,......],原始方法中还有一个 class 参数指定输出数据类型,这里隐式方法 env.readFileOfPrimitives[String] 的 [Class] 已经实现了该功能,所以 readFileOfPrimitives(path, delimiter, Class) 可以看作是 readFileOfPrimitives(path, Class) 的一个扩展。

val textLinesOfPrimitives = env.readFileOfPrimitives[String]("./info.csv",
     delimiter = ",")
    textLinesOfPrimitives.print()

image.gif

Line: 1,2,3,4,5
DataSet.print():
1
2
3
4
5

image.gif

三. Collection-Based 基于集合

1.fromCollection

该方法有两个参数类型,一种是直接从 collection 中初始化,还有一种是从 iterator 中初始化,两者基本类似。这里如果是 java 则对应 java.util.Collection ,scala 则对应 scala.collection

//1.用Array创建DataSet
    val dataSet1: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    dataSet1.print()
    //2.用Iterable创建DataSet
    val dataSet2: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    dataSet2.print()

image.gif

由于 collection 中包含多种数据结构,写法相同,下面给出一些可以用于初始化的常见数据结构 :

Array,ArrayBuffer,List,ListBuffer,Vector,mutable.Queue,mutable.Stack,Stream,Seq,Set,Iteratable, Iterator,mutable.ArraySeq,mutable.ArrayStack,Map,Range。

还有一个特殊的 generateSequence 可以生成 DataSet :

val numbers = env.generateSequence(1, 10000000)

image.gif

2.fromElements

根据给定的对象序列创建数据集。所有对象必须是相同的类型。这个就比较好理解了,直接给出相同类型的元素即可,fromCollection 和 fromElements 身上都可以看到一丝 spark.parallelize 序列化函数的影子

//1.用element创建DataSet(fromElements)
    val dataSet1: DataSet[String] = env.fromElements("spark", "flink")
    dataSet1.print()
    //2.用Tuple创建DataSet(fromElements)
    val dataSet2: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    dataSet2.print()

image.gif

3. fromParallelCollection

package org.apache.flink.util;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.annotation.Public;
@Public
public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {
    private static final long serialVersionUID = 200377674313072307L;
    public SplittableIterator() {
    }
    public abstract Iterator<T>[] split(int var1);
    public Iterator<T> getSplit(int num, int numPartitions) {
        if (numPartitions >= 1 && num >= 0 && num < numPartitions) {
            return this.split(numPartitions)[num];
        } else {
            throw new IllegalArgumentException();
        }
    }
    public abstract int getMaximumNumberOfSplits();
}

image.gif

fromParallelCollection 的参数为 SplittableIterator, SplittableIterator是个抽象类,它定义了抽象方法 split 以及 getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator。两个实现类实现了常用 number 的迭代器实现和 Long 的迭代器实现,有兴趣的小伙伴可以去看下 SplittableIterator 和各自实现类的源码,没兴趣的话你就只需要知道该方法可以并行读取迭代器并返回指定元素的数据类型。

val start = System.currentTimeMillis()
    val it = (0 to 100).iterator
    val dataSetSingle: DataSet[Int] = env.fromCollection(it)
    dataSetSingle.print()
    println("Single thread Cost: ", (System.currentTimeMillis() - start))
    val start1 = System.currentTimeMillis()
    val itSequence = new NumberSequenceIterator(0, 100)
    val dataSetParellel = env.fromParallelCollection(itSequence)
    dataSetParellel.print()
    println("Parallel thread Cost: ", (System.currentTimeMillis() - start1))

image.gif

二者主要体现在并行的效率上 :

(Single thread Cost: 3886)
(Parallel thread Cost: 939)

image.gif

四.Generic 泛型

1.readFile

A.ExecutionEnvironment

该方法接受文件输入格式,指定 inputFormat 和 path 即可输出文件内容

val data = env.readFile(new TextInputFormat(null), "./info.csv")
    data.print()

image.gif

1,2,3,4,5
3,4,5,6,7
5,6,7,8,9
4,5,6,7,8
2,3,4,5,6

image.gif

B.StreamExecutionEnvironment

上述 env 采用 ExecutionEnvironment.getExecutionEnvironment,可以看作是 sparkContext 处理离线任务,还有一种 StreamExecutionEnvironment 可以看作是 StreamingContext 处理流式任务,该 env 也拥有 readFile api :

val envStreaming = StreamExecutionEnvironment.getExecutionEnvironment
    val dataSource = envStreaming.readFile(new TextInputFormat(null), "./info.csv", 
      FileProcessingMode.PROCESS_CONTINUOUSLY,
      5000L)
    dataSource.print()
    envStreaming.execute()

image.gif

ExecutionEnvironment 执行时只读取文件一次,StreamingExecutionEnvironment 在 PROCESS_CONTINUOUSLY 模式下会根据 interval = 5000L ms 持续扫描文件,如果文件发生修改则重新读取文件内容,这里 interval 可以自定义。如果选择 PROCESS_ONE 模式,则会退化为 ExecutionEnvironment 的 readFIle 即只读一次。

2.createInput

该方法下接受通用输入格式。该方法和 spark.HadoopRDD 接口比较类似了,自定义的部分比较大。

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file"))

image.gif

五.总结

ExecutionEnvironment 模型下主要以静态 DateSet 为 DataSource 并进行后续处理,很多接口的含义和执行与 spark 很类似,其主要思想为批处理,后续介绍 DataSet 常用的 transform 函数与批处理方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
64 3
|
3月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
88 0
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
222 61
|
3月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
106 1
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
69 0
|
3月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
68 0
|
3月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
63 0
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1544 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎