【Flink-API】Table API & SQL 以及自定义UDF函数

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink-API】Table API & SQL 以及自定义UDF函数

一、 Flink Table API & SQL简介


1.1 Table API & SQL的背景


Flink虽然已经拥有了强大的DataStream/DataSet API,而且非常的灵活,但是需要熟练使用Eva或Scala的编程Flink编程API编写程序,为了满足流计算和批计算中的各种场景需求,同时降低用户使用门槛,Flink供- -种关系型的API来实现流与批的统一,那么这就是Flink的Table & SQL API。

自2015年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于Flink打造新一代计算引擎,针对Flink存在的不足进行优化和改进,并且在2019年初将最终代码开源,也就是我们熟知的Blink。Blink 在原来的Flink基础_上最显著的一个贡献就是Flink SQL的实现。


1.2 Table API & SQL的特点


Table & SQL API是-种关系型API,用户可以像操作mysql数据库表一样的操作数据, 而不需要写java代码完成Flink Function,更不需要手工的优化java代码调优。另外,SQL 作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供SQL支持,将很容易被用户接受。

●Table API & SQL是关系型声明式的,是处理关系型结构化数据的

●Table API & SQL批流统一 ,支持stream流计算和batch离线计算

●Table API & SQL查询能够被有效的优化,查询可以高效的执行

●Table API & SQL编程比较容易,但是灵活度没有DataStream/DataSet API和底层Low-leve |API强


20200924172416804.png


二、离线计算TableAPI & SQL


2.1 ●BatchSQLEnvironmept (离线批处理Table API)

public class BachWordCountSQL {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        DataSet<WordCount> input = env.fromElements(
                new WordCount("storm", 1L),
                new WordCount("flink", 1L),
                new WordCount("hadoop", 1L),
                new WordCount("flink", 1L),
                new WordCount("storm", 1L),
                new WordCount("storm", 1L)
        );
        tEnv.registerDataSet("wordcount",input,"word,counts");
        String sql = "select word,sum(counts) as counts from wordcount group by word" +
                "having sum(counts) >=2 order by counts desc";
        Table table = tEnv.sqlQuery(sql);
        DataSet<WordCount> result = tEnv.toDataSet(table, WordCount.class);
        result.print();
    }
}

2.2 ●BatchTableEnvironmept (离线批处理Table API)

public class BachWordCountTable {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        DataSet<WordCount> input = env.fromElements(
                new WordCount("storm", 1L),
                new WordCount("flink", 1L),
                new WordCount("hadoop", 1L),
                new WordCount("flink", 1L),
                new WordCount("storm", 1L),
                new WordCount("storm", 1L)
        );
        Table table = tEnv.fromDataSet(input);
        Table filtered = table.groupBy("word")
                .select("word,counts.sum as counts")
                .filter("counts>=2")
                .orderBy("counts.desc");
        DataSet<WordCount> wordCountDataSet = tEnv.toDataSet(filtered, WordCount.class);
        wordCountDataSet.print();
    }
}

执行结果:


20200924192835516.png

三、实时计算TableAPI & SQL


3.1 ●StreamSQLEnvironment (实时流处理Table API)

public class StreamSqlWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.实时的table的上下文
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // socket 数据源[hadoop spark flink]
        DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                Arrays.stream(line.split(" ")).forEach(out::collect);
            }
        });
        //2.注册成为表
        tableEnv.registerDataStream("t_wordcount",words,"word");
        //3.SQL
        Table table = tableEnv.sqlQuery("SELECT word,COUNT(1) counts FROM t_wordcount GROUP BY word");
        //4.结果
        DataStream<Tuple2<Boolean, WordCount>> dataStream = tableEnv.toRetractStream(table, WordCount.class);
        dataStream.print();
        env.execute();
    }
}

运行结果如下:

20200924190153331.png


3.2 ●StreamTableEnvironment (实时流处理Table API)

    //2.注册成为表
        Table table = tableEnv.fromDataStream(words, "word");
        Table table2 = table.groupBy("word").select("word,count(1) as counts");
        DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table2, Row.class);
        dataStream.print();
        env.execute();


四、Window窗口和TableAPI & SQL


4.1 Thumb滚动窗口


实现滚动不同窗口内相同用户的金额计算,将窗口的起始结束时间,金额相加。


数据如下:

1000,user01,p1,5

2000,user01,p1,5

2000,user02,p1,3

3000,user01,p1,5

9999,user02,p1,3

19999,user01,p1,5

程序如下:

public class TumblingEventTimeWindowTable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> socketDataStream = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<Row> rowDataStream = socketDataStream.map(new MapFunction<String, Row>() {
            @Override
            public Row map(String line) throws Exception {
                String[] fields = line.split(",");
                Long time = Long.parseLong(fields[0]);
                String uid = fields[1];
                String pid = fields[2];
                Double money = Double.parseDouble(fields[3]);
                return Row.of(time, uid, pid, money);
            }
        }).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.DOUBLE));
        SingleOutputStreamOperator<Row> waterMarkRow = rowDataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(Row row) {
                        return (long) row.getField(0);
                    }
                }
        );
        tableEnv.registerDataStream("t_orders",waterMarkRow,"atime,uid,pid,money,rowtime.rowtime");
        Table table = tableEnv.scan("t_orders")
                .window(Tumble.over("10.seconds").on("rowtime").as("win"))
                .groupBy("uid,win")
                .select("uid,win.start,win.end,win.rowtime,money.sum as total");
        tableEnv.toAppendStream(table,Row.class).print();
        env.execute();
    }
}

运行结果如下:

20200924200216704.png


五、Kafka数据源—>Table API & SQL


5.1 KafkaToSQL

public class KafkaWordCountToSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.connect(new Kafka()
                .version("universal")
                .topic("json-input")
                .startFromEarliest()
                .property("bootstrap.servers","hadoop1:9092")
        ).withFormat(new Json().deriveSchema()).withSchema(new Schema()
                .field("name", TypeInformation.of(String.class))
                .field("gender",TypeInformation.of(String.class))
        ).inAppendMode().registerTableSource("kafkaSource");
        Table select = tableEnv.scan("kafkaSource").groupBy("gender")
                .select("gender,count(1) as counts");
        tableEnv.toRetractStream(select, Row.class).print();
        env.execute();
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5月前
|
SQL Java 测试技术
3、Mybatis-Plus 自定义sql语句
这篇文章介绍了如何在Mybatis-Plus框架中使用自定义SQL语句进行数据库操作。内容包括文档结构、编写mapper文件、mapper.xml文件的解释说明、在mapper接口中定义方法、在mapper.xml文件中实现接口方法的SQL语句,以及如何在单元测试中测试自定义的SQL语句,并展示了测试结果。
3、Mybatis-Plus 自定义sql语句
|
2月前
|
SQL 缓存 Java
【详细实用のMyBatis教程】获取参数值和结果的各种情况、自定义映射、动态SQL、多级缓存、逆向工程、分页插件
本文详细介绍了MyBatis的各种常见用法MyBatis多级缓存、逆向工程、分页插件 包括获取参数值和结果的各种情况、自定义映射resultMap、动态SQL
【详细实用のMyBatis教程】获取参数值和结果的各种情况、自定义映射、动态SQL、多级缓存、逆向工程、分页插件
|
3月前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
2月前
|
Web App开发 人工智能 自然语言处理
WebChat:开源的网页内容增强问答 AI 助手,基于 Chrome 扩展的最佳实践开发,支持自定义 API 和本地大模型
WebChat 是一个基于 Chrome 扩展开发的 AI 助手,能够帮助用户理解和分析当前网页的内容,支持自定义 API 和本地大模型。
173 0
|
5月前
|
域名解析 网络协议 API
【API管理 APIM】APIM集成内部VNet时,常遇见的关于自定义DNS服务问题。
【API管理 APIM】APIM集成内部VNet时,常遇见的关于自定义DNS服务问题。
|
5月前
|
API Java 数据库连接
从平凡到卓越:Hibernate Criteria API 让你的数据库查询瞬间高大上,彻底告别复杂SQL!
【8月更文挑战第31天】构建复杂查询是数据库应用开发中的常见需求。Hibernate 的 Criteria API 以其强大和灵活的特点,允许开发者以面向对象的方式构建查询逻辑,同时具备 SQL 的表达力。本文将介绍 Criteria API 的基本用法并通过示例展示其实际应用。此 API 通过 API 构建查询条件而非直接编写查询语句,提高了代码的可读性和安全性。无论是简单的条件过滤还是复杂的分页和连接查询,Criteria API 均能胜任,有助于提升开发效率和应用的健壮性。
181 0
|
5月前
|
前端开发 开发者
Vaadin Grid的秘密武器:打造超凡脱俗的数据展示体验!
【8月更文挑战第31天】赵萌是一位热爱UI设计的前端开发工程师。在公司内部项目中,她面临大量用户数据展示的挑战,并选择了功能强大的Vaadin Grid来解决。她在技术博客上分享了这一过程,介绍了Vaadin Grid的基本概念及其丰富的内置功能。通过自定义列和模板,赵萌展示了如何实现复杂的数据展示。
55 0
|
5月前
|
SQL 开发框架 .NET
深入解析Entity Framework Core中的自定义SQL查询与Raw SQL技巧:从基础到高级应用的全面指南,附带示例代码与最佳实践建议
【8月更文挑战第31天】本文详细介绍了如何在 Entity Framework Core (EF Core) 中使用自定义 SQL 查询与 Raw SQL。首先,通过创建基于 EF Core 的项目并配置数据库上下文,定义领域模型。然后,使用 `FromSqlRaw` 和 `FromSqlInterpolated` 方法执行自定义 SQL 查询。此外,还展示了如何使用 Raw SQL 进行数据更新和删除操作。最后,通过结合 LINQ 和 Raw SQL 构建动态 SQL 语句,处理复杂查询场景。本文提供了具体代码示例,帮助读者理解和应用这些技术,提升数据访问层的效率和灵活性。
318 0
|
5月前
|
SQL Shell API
python Django教程 之 模型(数据库)、自定义Field、数据表更改、QuerySet API
python Django教程 之 模型(数据库)、自定义Field、数据表更改、QuerySet API
|
6月前
|
SQL Java 数据库连接
idea中配置mybatis 映射文件模版及 mybatis plus 自定义sql
idea中配置mybatis 映射文件模版及 mybatis plus 自定义sql
150 3