大数据中必须要掌握的 Flink SQL 详细剖析 (二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

另外,你需要为 Flink 的 Scala 批处理或流式 API 添加依赖项。对于批量查询,您需要添加:


<dependency>
  <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>


2. Flink SQL 实战案例



1) 批数据 SQL


用法:


  1. 构建 Table 运行环境
  2. 将 DataSet 注册为一张表
  3. 使用 Table 运行环境的 sqlQuery 方法来执行 SQL 语句


示例:使用 Flink SQL 统计用户消费订单的总金额、最大金额、最小金额、订单总数。


订单 id 用户名 订单日期 消费金额
1 Zhangsan 2018-10-20 15:30 358.5


测试数据(订单 ID、用户名、订单日期、订单金额):


Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)


步骤:


  1. 获取一个批处理运行环境
  2. 获取一个 Table 运行环境
  3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
  4. 基于本地 Order 集合创建一个 DataSet source
  5. 使用 Table 运行环境将 DataSet 注册为一张表
  6. 使用 SQL 语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
  7. 使用 TableEnv.toDataSet 将 Table 转换为 DataSet
  8. 打印测试


示例代码


import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
/**
 * 使用Flink SQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。
 */
object BatchFlinkSqlDemo {
  //3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
  case class Order(id:Int, userName:String, createTime:String, money:Double)
  def main(args: Array[String]): Unit = {
    /**
     * 实现思路:
     * 1. 获取一个批处理运行环境
     * 2. 获取一个Table运行环境
     * 3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
     * 4. 基于本地 Order 集合创建一个DataSet source
     * 5. 使用Table运行环境将DataSet注册为一张表
     * 6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
     * 7. 使用TableEnv.toDataSet将Table转换为DataSet
     * 8. 打印测试
     */
    //1. 获取一个批处理运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //2. 获取一个Table运行环境
    val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 基于本地 Order 集合创建一个DataSet source
    val orderDataSet: DataSet[Order] = env.fromElements(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    )
    //5. 使用Table运行环境将DataSet注册为一张表
    tabEnv.registerDataSet("t_order", orderDataSet)
    //6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    //用户消费订单的总金额、最大金额、最小金额、订单总数。
    val sql =
      """
        | select
        |   userName,
        |   sum(money) totalMoney,
        |   max(money) maxMoney,
        |   min(money) minMoney,
        |   count(1) totalCount
        |  from t_order
        |  group by userName
        |""".stripMargin  //在scala中stripMargin默认是“|”作为多行连接符
    //7. 使用TableEnv.toDataSet将Table转换为DataSet
    val table: Table = tabEnv.sqlQuery(sql)
    table.printSchema()
    tabEnv.toDataSet[Row](table).print()
  }
}


2) 流数据 SQL


流处理中也可以支持 SQL。但是需要注意以下几点:


  1. 要使用流处理的 SQL,必须要添加水印时间
  2. 使用 registerDataStream 注册表的时候,使用 ' 来指定字段
  3. 注册表的时候,必须要指定一个 rowtime,否则无法在 SQL 中使用窗口
  4. 必须要导入 import org.apache.flink.table.api.scala._ 隐式参数
  5. SQL 中使用 trumble(时间列名, interval '时间' sencond) 来进行定义窗口


示例:使用 Flink SQL 来统计 5 秒内 用户的 订单总数、订单的最大金额、订单的最小金额。


步骤


  1. 获取流处理运行环境
  2. 获取 Table 运行环境
  3. 设置处理时间为 EventTime
  4. 创建一个订单样例类 Order ,包含四个字段(订单 ID、用户 ID、订单金额、时间戳)
  5. 创建一个自定义数据源
  • 使用 for 循环生成 1000 个订单
  • 随机生成订单 ID(UUID)
  • 随机生成用户 ID(0-2)
  • 随机生成订单金额(0-100)
  • 时间戳为当前系统时间
  • 每隔 1 秒生成一个订单
  1. 添加水印,允许延迟 2 秒
  2. 导入 import org.apache.flink.table.api.scala._ 隐式参数
  3. 使用 registerDataStream 注册表,并分别指定字段,还要指定 rowtime 字段
  4. 编写 SQL 语句统计用户订单总数、最大金额、最小金额
    分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
  5. 使用 tableEnv.sqlQuery 执行 sql 语句
  6. 将 SQL 的执行结果转换成 DataStream 再打印出来
  7. 启动流处理程序


示例代码


import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.types.Row
import scala.util.Random
/**
 * 需求:
 *  使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额
 *
 *  timestamp是关键字不能作为字段的名字(关键字不能作为字段名字)
 */
object StreamFlinkSqlDemo {
    /**
     *  1. 获取流处理运行环境
     * 2. 获取Table运行环境
     * 3. 设置处理时间为 EventTime
     * 4. 创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳)
     * 5. 创建一个自定义数据源
     *    使用for循环生成1000个订单
     *    随机生成订单ID(UUID)
     *    随机生成用户ID(0-2)
     *    随机生成订单金额(0-100)
     *    时间戳为当前系统时间
     *    每隔1秒生成一个订单
     * 6. 添加水印,允许延迟2秒
     * 7. 导入 import org.apache.flink.table.api.scala._ 隐式参数
     * 8. 使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段
     * 9. 编写SQL语句统计用户订单总数、最大金额、最小金额
     * 分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
     * 10. 使用 tableEnv.sqlQuery 执行sql语句
     * 11. 将SQL的执行结果转换成DataStream再打印出来
     * 12. 启动流处理程序
     */
    // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
    case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
    def main(args: Array[String]): Unit = {
      // 1. 创建流处理运行环境
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      // 2. 设置处理时间为`EventTime`
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      //获取table的运行环境
      val tableEnv = TableEnvironment.getTableEnvironment(env)
      // 4. 创建一个自定义数据源
      val orderDataStream = env.addSource(new RichSourceFunction[Order] {
        var isRunning = true
        override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
          // - 随机生成订单ID(UUID)
          // - 随机生成用户ID(0-2)
          // - 随机生成订单金额(0-100)
          // - 时间戳为当前系统时间
          // - 每隔1秒生成一个订单
          for (i <- 0 until 1000 if isRunning) {
            val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
              System.currentTimeMillis())
            TimeUnit.SECONDS.sleep(1)
            ctx.collect(order)
          }
        }
        override def cancel(): Unit = { isRunning = false }
      })
      // 5. 添加水印,允许延迟2秒
      val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
          override def extractTimestamp(element: Order): Long = {
            val eventTime = element.createTime
            eventTime
          }
        }
      )
      // 6. 导入`import org.apache.flink.table.api.scala._`隐式参数
      // 7. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段
      import org.apache.flink.table.api.scala._
      tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
      // 8. 编写SQL语句统计用户订单总数、最大金额、最小金额
      // - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口
      val sql =
      """
        |select
        | userId,
        | count(1) as totalCount,
        | max(money) as maxMoney,
        | min(money) as minMoney
        | from
        | t_order
        | group by
        | tumble(createTime, interval '5' second),
        | userId
      """.stripMargin
      // 9. 使用`tableEnv.sqlQuery`执行sql语句
      val table: Table = tableEnv.sqlQuery(sql)
      // 10. 将SQL的执行结果转换成DataStream再打印出来
      table.toRetractStream[Row].print()
      env.execute("StreamSQLApp")
    }
}
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
105 2
ClickHouse与大数据生态集成:Spark & Flink 实战
zdl
|
23天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
139 56
|
27天前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
103 0
|
2月前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
131 0
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
126 13
|
5月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。