大数据Spark Continuous Processing

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark Continuous Processing

1 连续处理概述

连续处理(Continuous Processing)是Spark 2.3中引入的一种新的实验性流执行模式,可实现低的(~1 ms)端到端延迟,并且至少具有一次容错保证。 将其与默认的微批处理(micro-batchprocessing)引擎相比较,该引擎可以实现一次性保证,但最多可实现~100ms的延迟。


在实时流式应用中,最典型的应用场景:网站UV统计。


业务需求一:实时统计网站UV,比如每日网站UV;

业务需求二:统计最近一段时间(比如一个小时)网站UV,可以设置水位Watermark;

在SparkStreaming或Flink框架中要想实现【网站UV统计】需要借助于外部存储系统,比如Redis内存数据库或者HBase列式存储数据库,存储UserId,利用数据库特性去重,最后进行count。Structured Streaming可以使用deduplication对有无Watermark的流式数据进行去重操作:

第一、无 Watermark:对重复记录到达的时间没有限制。查询会保留所有的过去记录作为状态用于去重;

第二、有 Watermark:对重复记录到达的时间有限制。查询会根据水印删除旧的状态数据;

官方提供示例代码如下:

演示范例:对网站用户日志数据,按照userId和eventType去重统计,网站代码如下。

package cn.oldlu.spark.deduplicate
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
object StructuredDeduplication {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      // 设置Shuffle分区数目
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    // 导入隐式转换和函数库
    import spark.implicits._
    import org.apache.spark.sql.functions._
    // 1. 从TCP Socket 读取数据
    val inputTable: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1.oldlu.cn")
      .option("port", 9999)
      .load()
    // 2. 数据处理分析
    val resultTable: DataFrame = inputTable
      .as[String]
      .filter(line => null != line && line.trim.length > 0)
      // 样本数据:{“eventTime”: “2016-01-10 10:01:50”,“eventType”: “browse”,“userID”:“1”}
      .select(
        get_json_object($"value", "$.eventTime").as("event_time"), //
        get_json_object($"value", "$.eventType").as("event_type"), //
        get_json_object($"value", "$.userID").as("user_id") //
      )
      // 按照UserId和EventType去重
      .dropDuplicates("user_id", "event_type")
      .groupBy($"user_id", $"event_type")
      .count()
    // 3. 设置Streaming应用输出及启动
    val query: StreamingQuery = resultTable.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination() // 流式查询等待流式应用终止
    // 等待所有任务运行完成才停止运行
    query.stop()
  }
}

测试数据如下:

{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:55","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:55","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:02:00","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"3"}
{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"2"}

运行应用结果如下:

连续处理(Continuous Processing)是“真正”的流处理,之所以说“真正”是因为 continuousmode是传统的流处理模式,通过运行一个long-running的operator用来处理数据。之前SparkStreaming是基于 micro-batch 模式的,就被很多人诟病不是“真正的”流式处理。continuous mode处理模式只要一有数据可用就会进行处理,如下图所示:

544da714853c4ad3ac98dfafc4241d19.png

epoch是input event stream中数据被发送给operator处理的最小单位,在处理过程中,epoch的offset会被记录到WAL中。另外continuous模式下的snapshot存储使用的一致性算法是Chandy-Lamport算法。

与micro-batch模式缺点和优点都很明显,缺点是不容易做扩展,优点是延迟更低。为什么延迟更低,下面两幅图目了然:

  • 微批处理(Micro-batch Processing)
  • 连续处理(Continue Processing)

在一台4核服务器上对Structured Streaming的连续处理模式进行基准测试,该测试展示了延迟

-吞吐量的权衡(因为分区是独立运行的,希望延迟与节点数量保持一致)。

上图展示了一个map任务的结果,这个map任务从Kafka中读取数据,虚线展示了微批模式能达到的最大吞吐量。可以看到,在连续模式下,吞吐量不会大幅下降,但是延迟会更低。(小于10毫秒的延迟,只有微批处理模式最大吞吐量的一半)。它的最大稳定吞吐量也略高,因为微批处理模式由于任务调度而导致延迟。

2 编程实现

要在连续处理模式下运行支持的查询,只需指定一个continuous trigger,并将所需的检查点

间隔作为参数,示例代码:

// 从Kafka消费数据
val streamDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
// 数据ETL后保存Kafka
streamDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second")) // only change in query
  .start()

检查点间隔为1秒意味着连续处理引擎将每秒记录查询的进度,生成的检查点采用与微批处理

引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。 例如以微批处理模式启动的支持

查询可以以连续模式(continuous mode)重新启动,反之亦然。

范例演示:从Kafka实时消费数据,经过ETL处理后,将数据发送至Kafka Topic。

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 从Spark 2.3版本开始,StructuredStreaming结构化流中添加新流式数据处理方式:Continuous processing
 * 持续流数据处理:当数据一产生就立即处理,类似Storm、Flink框架,延迟性达到100ms以下,目前属于实验开发阶段
 */
object StructuredContinuous {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    import spark.implicits._
    // 1. 从KAFKA读取数据
    val kafkaStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092")
      .option("subscribe", "stationTopic")
      .load()
    // 2. 对基站日志数据进行ETL操作
    // station_0,18600004405,18900009049,success,1589711564033,9000
    val etlStreamDF: Dataset[String] = kafkaStreamDF
      // 获取value字段的值,转换为String类型
      .selectExpr("CAST(value AS STRING)")
      // 转换为Dataset类型
      .as[String]
      // 过滤数据:通话状态为success
      .filter { log =>
        null != log && log.trim.split(",").length == 6 && "success".equals(log.trim.split(",")(3))
      }
    // 3. 针对流式应用来说,输出的是流
    val query: StreamingQuery = etlStreamDF.writeStream
      .outputMode(OutputMode.Append())
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092")
      .option("topic", "etlTopic")
      // 设置检查点目录
      .option("checkpointLocation", s"datas/structured/etl-100002")
      // TODO: 设置持续流处理 Continuous Processing, 指定CKPT时间间隔
      /*
      the continuous processing engine will records the progress of the query every second
      持续流处理引擎,将每1秒中记录当前查询Query进度状态
      */
      .trigger(Trigger.Continuous("1 second"))
      .start() // 流式应用,需要启动start
    // 查询器等待流式应用终止
    query.awaitTermination()
    query.stop() // 等待所有任务运行完成才停止运行
  }
}

运行应用程序,观察数据生成与数据处理发送Kafka时间,实时性很快,延迟性在毫秒级别。

3 支持查询

从Spark 2.3开始,连续处理模式才出现,目前仅支持以下类型的查询:


第一、数据源Sources:Kafka source(支持所有选项)及Rate source(仅仅适合测试)。

第二、接收器Sinks:Kafka sink(支持所有选项)、Console sink(适合调试)。

第三、DataFrame Operations操作:在连续模式下仅支持类似 map 的 Dataset/DataFrame 操

作,即仅投影(select,map,flatMap,mapPartitions等)和选择(where,filter等)。

连续处理引擎启动多个长时间运行的任务,这些任务不断从源中读取数据,处理数据并连续写

入接收器。 查询所需的任务数取决于查询可以并行从源读取的分区数。 因此,在开始连续处理查

询之前,必须确保群集中有足够的核数并行执行所有任务。例如,如果正在读取具有10个分区的

Kafka主题,则群集必须至少具有10个核数才能使查询取得进展。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
20天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
31 3
|
24天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
42 3
|
29天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
28天前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
20天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
39 0
|
30天前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
31 0
|
1月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
37 0
|
16天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
62 11
|
21天前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
51 1