Flink Table Api & SQL 初体验,Blink的使用

简介: 介绍Flink Table Api & SQL和实现了两表连接的示例

概述

Flink具有Table API和SQL-用于统一流和批处理。

Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。

Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看 官方文档

使用

多表连接案例

pom依赖

flink 版本为:1.9.3


    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

模拟一个实时流

import lombok.Data;
@Data
public class Product {
    public Integer id;
    public String seasonType;
}

自定义Source

import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.Random;

public class ProductStremingSource implements SourceFunction<Product> {
    private boolean isRunning = true;

    @Override
    public void run(SourceContext<Product> ctx) throws Exception {
        while (isRunning){
            // 每一秒钟产生一条数据
            Product product = generateProduct();
            ctx.collect(product);
            Thread.sleep(1000);
        }
    }

    private Product generateProduct(){
        int i = new Random().nextInt(100);
        ArrayList<String> list = new ArrayList();
        list.add("spring");
        list.add("summer");
        list.add("autumn");
        list.add("winter");
        Product product = new Product();
        product.setSeasonType(list.get(new Random().nextInt(4)));
        product.setId(i);
        return product;
    }
    @Override
    public void cancel() {

    }
}

主程序

public class TableStremingDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用Blink
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStremingSource())
                .map(new MapFunction<Item, Item>() {
                    @Override
                    public Item map(Item value) throws Exception {
                        return value;
                    }
                });
        // 分割流
        final OutputTag<Item> even = new OutputTag<Item>("even") {
        };
        final OutputTag<Item> old = new OutputTag<Item>("old") {
        };

        SingleOutputStreamOperator<Item> sideOutputData = source.process(new ProcessFunction<Item, Item>() {
            @Override
            public void processElement(Item value, Context ctx, Collector<Item> out) throws Exception {
                if (value.getId() % 2 == 0) {
                    ctx.output(even,value);
                }else{
                    ctx.output(old,value);
                }
            }
        });

        DataStream<Item> evenStream = sideOutputData.getSideOutput(even);
        DataStream<Item> oldStream = sideOutputData.getSideOutput(old);
        // 注册两个 表 : evenTable,oddTable
        bsTableEnv.registerDataStream("evenTable",evenStream , "name,id");
        bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");

        // 执行sql 输出Table
        Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
        queryTable.printSchema();;
        // 获取流
        DataStream<Tuple2<Boolean, Tuple4<Integer, String, Integer, String>>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){}));
        dataStream.print();

        bsEnv.execute("demo");
    }
}

结果打印


输出name相同的元素。

总结

简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。

更多文章:www.ipooli.com
扫码关注公众号《ipoo》
ipoo

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
7月前
|
SQL 存储 大数据
Dataphin V5.0:支持创建异步调用API,实现慢 SQL 复杂计算的直连消费
本文介绍了数据服务产品中异步调用的应用场景与优势,包括大数据引擎查询、复杂SQL及大规模数据下载等场景,解决了同步调用可能导致的资源浪费和性能问题。通过创建异步API、测试发布以及权限申请等功能,实现高效稳定的服务提供。以电商订单查询为例,展示了如何利用异步调用提升系统性能与用户体验。
312 9
|
10月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
744 12
Flink CDC YAML:面向数据集成的 API 设计
|
10月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
841 7
Flink Materialized Table:构建流批一体 ETL
|
9月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
394 5
|
9月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
182 3
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL FOREIGN KEY 约束
【7月更文挑战第19天】CREATE TABLE 时的 SQL FOREIGN KEY 约束
159 8
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL FOREIGN KEY 约束
【7月更文挑战第24天】CREATE TABLE 时的 SQL FOREIGN KEY 约束。
132 5
|
SQL Oracle 关系型数据库
ALTER TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第24天】ALTER TABLE 时的 SQL PRIMARY KEY 约束。
316 3
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第24天】CREATE TABLE 时的 SQL PRIMARY KEY 约束。
211 2
|
SQL Oracle 关系型数据库
ALTER TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第19天】ALTER TABLE 时的 SQL PRIMARY KEY 约束。
175 3