大数据Spark Streaming Queries 2

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生网关 MSE Higress,422元/月
简介: 大数据Spark Streaming Queries

6 容错语义

针对任何流式应用处理框架(Storm、SparkStreaming、StructuredStreaming和Flink等)处理数据时,都要考虑语义,任意流式系统处理流式数据三个步骤:

1)、Receiving the data:接收数据源端的数据

采用接收器或其他方式从数据源接收数据(The data is received from sources usingReceivers or otherwise)。

2)、Transforming the data:转换数据,进行处理分析

针对StructuredStreaming来说就是Stream DataFrame(The received data is

transformed using DStream and RDD transformations)。

3)、Pushing out the data:将结果数据输出

最终分析结果数据推送到外部存储系统,比如文件系统HDFS、数据库等(The finaltransformed data is pushed out to external systems like file systems, databases,dashboards, etc)。

在处理数据时,往往需要保证数据处理一致性语义:从数据源端接收数据,经过数据处理分析,到最终数据输出仅被处理一次,是最理想最好的状态。在Streaming数据处理分析中,需要考虑数据是否被处理及被处理次数,称为消费语义,主要有三种:

At most once:最多一次,可能出现不消费,数据丢失;

At least once:至少一次,数据至少消费一次,可能出现多次消费数据;

Exactly once:精确一次,数据当且仅当消费一次,不多不少;

Structured Streaming的核心设计理念和目标之一:支持一次且仅一次Extracly-Once的语义。

7246b4cc9e7e4f52b0b6179f11ca862a.png

为了实现这个目标,Structured Streaming设计source、sink和execution engine来追踪计算处理的进度,这样就可以在任何一个步骤出现失败时自动重试。

1、每个Streaming source都被设计成支持offset,进而可以让Spark来追踪读取的位置;

2、Spark基于checkpoint和wal来持久化保存每个trigger interval内处理的offset的范围;

eb1bf4cb625e43b6a964cca6998324ff.png

3、sink被设计成可以支持在多次计算处理时保持幂等性,就是说,用同样的一批数据,无论多少次去更新sink,都会保持一致和相同的状态。综合利用基于offset的source,基于checkpoint和wal的execution engine,以及基于幂等性的sink,可以支持完整的一次且仅一次的语义。

7 Kafka 数据消费

Apache Kafka 是目前最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景,Kafka基本是标配。StructuredStreaming很好的集成Kafka,可以从Kafka拉取消息,然后就可以把流数据看做一个DataFrame, 一张无限增长的大表,在这个大表上做查询,Structured Streaming保证了端到端的 exactly-once,用户只需要关心业务即可,不用费心去关心底层是怎么做的。

StructuredStreaming集成Kafka,官方文档如下:

http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

目前仅支持Kafka 0.10.+版本及以上,底层使用Kafka New Consumer API拉取数据,如果公司

Kafka版本为0.8.0版本,StructuredStreaming集成Kafka参考文档:

StructuredStreaming既可以从Kafka读取数据,又可以向Kafka 写入数据,添加Maven依赖:

<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>

Maven Project工程中目录结构如下:

Kafka把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟structured commit log类似,生产者和消费者使用Kafka 进行解耦,消费者不管你生产者发送的速率如何,只要按照一定的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号offset(偏移量),Kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。Kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了。涉及一个问题:如果开始消费,就要定一下从什么位置开始。

39cac21d0cdd4bc7b65d252707d195ac.png

第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;

第二、latest:从最末位置开始消费;

第三、per-partition assignment:对每个分区都指定一个offset,然后从offset位置开始消费;当第一次开始消费一个Kafka 流的时候,上述策略任选其一,如果之前已经消费了,而且做了checkpoint ,比如消费程序升级了,这时候就会从上次结束的位置开始继续消费。目前StructuredStreaming和Flink框架从Kafka消费数据时,采用的就是上述的策略。

8 Kafka 数据源

Structured Streaming消费Kafka数据,采用的是poll方式拉取数据,与Spark Streaming中New

Consumer API集成方式一致。从Kafka Topics中读取消息,需要指定数据源(kafka)、Kafka集群

的连接地址(kafka.bootstrap.servers)、消费的topic(subscribe或subscribePattern), 指定topic的时候,可以使用正则来指定,也可以指定一个 topic 的集合。官方提供三种方式从Kafka topic中消费数据,主要区别在于每次消费Topic名称指定,

  • 方式一:消费一个Topic数据
  • 方式二:消费多个Topic数据
  • 方式三:消费通配符匹配Topic数据

从Kafka 获取数据后Schema字段信息如下,既包含数据信息有包含元数据信息:

在实际开发时,往往需要获取每条数据的消息,存储在value字段中,由于是binary类型,需要

转换为字符串String类型;此外了方便数据操作,通常将获取的key和value的DataFrame转换为

Dataset强类型,伪代码如下:

从Kafka数据源读取数据时,可以设置相关参数,包含必须参数和可选参数:

  • 必须参数:kafka.bootstrap.servers和subscribe,可以指定开始消费偏移量assign。
  • 可选参数:

范例演示:从Kafka消费数据,进行词频统计,Topic为wordsTopic。

  • 第一步、创建Topic,相关命令如下:
# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
## ================================= wordsTopic =================================
# 查看Topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1.oldlu.cn:2181/kafka200
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replicatio
n-factor 1 --partitions 3 --topic wordsTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic wordsTopic
  • 第二步、编写代码,其中设置每批次消费数据最大量
package cn.oldlu.spark.kafka.source
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,SparkSession}
/**
 * 使用Structured Streaming从Kafka实时读取数据,进行词频统计,将结果打印到控制台。
 */
        object StructuredKafkaSource{
        def main(args:Array[String]):Unit={
// 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[3]")
// 设置Shuffle分区数目
        .config("spark.sql.shuffle.partitions","3")
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 从Kafka读取数据,底层采用New Consumer API
val kafkaStreamDF:DataFrame=spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers","node1.oldlu.cn:9092")
        .option("subscribe","wordsTopic")
// TODO: 设置每批次消费数据最大值
        .option("maxOffsetsPerTrigger","100000")
        .load()
// TODO: 进行词频统计
        val resultStreamDF:DataFrame=kafkaStreamDF
// 获取value字段的值,转换为String类型
        .selectExpr("CAST(value AS STRING)")
// 转换为Dataset类型
        .as[String]
// 过滤数据
        .filter(line=>null!=line&&line.trim.length>0)
// 分割单词
        .flatMap(line=>line.trim.split("\\s+"))
// 按照单词分组,聚合
        .groupBy($"value").count()
// 设置Streaming应用输出及启动
        val query:StreamingQuery=resultStreamDF.writeStream
        .outputMode(OutputMode.Complete())
        .format("console").option("numRows","10").option("truncate","false")
        .start()
        query.awaitTermination() // 查询器等待流式应用终止
        query.stop() // 等待所有任务运行完成才停止运行
        }
    }

9 Kafka 接收器

往Kafka里面写数据类似读取数据,可以在DataFrame上调用writeStream来写入Kafka,设置参

数指定value,其中key是可选的,如果不指定就是null。如果key为null,有时候可能导致分区数据

不均匀。

9.1 配置说明

将DataFrame写入Kafka时,Schema信息中所需的字段:

需要写入哪个topic,可以像上述所示在操作DataFrame 的时候在每条record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置。

写入数据至Kafka,需要设置Kafka Brokers地址信息及可选配置:

  • 必选参数:kafka.bootstrap.servers,使用逗号隔开【host:port】字符;
  • 可选参数:topic,如果DataFrame中没有topic列,此处指定topic表示写入Kafka Topic。
    官方提供示例代码如下:

9.2 实时数据ETL架构

在实际实时流式项目中,无论使用Storm、SparkStreaming、Flink及Structured Streaming处理流式数据时,往往先从Kafka 消费原始的流式数据,经过ETL后将其存储到Kafka Topic中,以便其他业务相关应用消费数据,实时处理分析,技术架构流程图如下所示:

接下来模拟产生运营商基站数据,实时发送到Kafka 中,使用StructuredStreaming消费,经过ETL(获取通话状态为success数据)后,写入Kafka中,便于其他实时应用消费处理分析。

9.3 模拟基站日志数据

模拟产生运营商基站通话日志数据,封装到样例类中,字段信息如下:

package cn.oldlu.spark.kafka.mock
/**
 * 基站通话日志数据,字段如下:
 *
 * @param stationId 基站标识符ID
 * @param callOut 主叫号码
 * @param callIn 被叫号码
 * @param callStatus 通话状态
 * @param callTime 通话时间
 * @param duration 通话时长
 */
case
class StationLog(
        stationId:String, //
        callOut:String, //
        callIn:String, //
        callStatus:String, //
        callTime:Long, //
        duration:Long //
) {
    override def
    toString:String =s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}

创建Topic,相关命令如下:

# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
## ================================= stationTopic =================================
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-
factor 1 --partitions 3 --topic stationTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic stationTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic station
Topic --from-beginning
# 删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.oldlu.cn:2181/kafka200 --topic statio
nTopic
## ================================= etlTopic =================================
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-
factor 1 --partitions 3 --topic etlTopic
# 模拟生产者
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic etlTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic etlTopi
c --from-beginning

编写代码,实时产生日志数据,发送Kafka Topic:

package cn.oldlu.spark.kafka.mock
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
 * 模拟产生基站日志数据,实时发送Kafka Topic中,数据字段信息:
 * 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间,通话时长
 */
object MockStationLog{
        def main(args:Array[String]):Unit={
// 发送Kafka Topic
        val props=new Properties()
        props.put("bootstrap.servers","node1.oldlu.cn:9092")
        props.put("acks","1")
        props.put("retries","3")
        props.put("key.serializer",classOf[StringSerializer].getName)
        props.put("value.serializer",classOf[StringSerializer].getName)
        val producer=new KafkaProducer[String,String](props)
        val random=new Random()
        val allStatus=Array(
        "fail","busy","barring","success","success","success",
        "success","success","success","success","success","success"
        )
        while(true){
        val callOut:String="1860000%04d".format(random.nextInt(10000))
        val callIn:String="1890000%04d".format(random.nextInt(10000))
        val callStatus:String=allStatus(random.nextInt(allStatus.length))
        val callDuration=if("success".equals(callStatus))(1+random.nextInt(10))*1000Lelse 0L
// 随机产生一条基站日志数据
        val stationLog:StationLog=StationLog(
        "station_"+random.nextInt(10), //
        callOut, //
        callIn, //
        callStatus, //
        System.currentTimeMillis(), //
        callDuration //
        )
        println(stationLog.toString)
        Thread.sleep(100+random.nextInt(100))
        val record=new ProducerRecord[String,String]("stationTopic",stationLog.toString)
        producer.send(record)
        }
        producer.close() // 关闭连接
        }
    }

运行程序,基站通话日志数据格式如下:

station_7,18600009710,18900000269,success,1590709965144,4000
station_6,18600003894,18900000028,success,1590709965333,8000
station_7,18600007207,18900001057,busy,1590709965680,0

9.4 实时增量ETL

编写应用实时从Kafka的【stationTopic】消费数据,经过处理分析后,存储至Kafka的

【etlTopic】,其中需要设置检查点目录,保证应用一次且仅一次的语义。

package cn.oldlu.spark.kafka.sink
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,Dataset,SparkSession}
/**
 * 实时从Kafka Topic消费基站日志数据,过滤获取通话转态为success数据,再存储至Kafka Topic中
 * 1、从KafkaTopic中获取基站日志数据(模拟数据,JSON格式数据)
 * 2、ETL:只获取通话状态为success日志数据
 * 3、最终将ETL的数据存储到Kafka Topic中
 */
        object StructuredEtlSink{
        def main(args:Array[String]):Unit={
// 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[3]")
// 设置Shuffle分区数目
        .config("spark.sql.shuffle.partitions","3")
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// 1. 从KAFKA读取数据
val kafkaStreamDF:DataFrame=spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers","node1.oldlu.cn:9092")
        .option("subscribe","stationTopic")
        .load()
// 2. 对基站日志数据进行ETL操作
// station_0,18600004405,18900009049,success,1589711564033,9000
        val etlStreamDF:Dataset[String]=kafkaStreamDF
// 获取value字段的值,转换为String类型
        .selectExpr("CAST(value AS STRING)")
// 转换为Dataset类型
        .as[String]
// 过滤数据:通话状态为success
        .filter{log=>
        null!=log&&log.trim.split(",").length==6&&"success".equals(log.trim.split(",")(3))
        }
        etlStreamDF.printSchema()
// 3. 针对流式应用来说,输出的是流
        val query:StreamingQuery=etlStreamDF.writeStream
// 对流式应用输出来说,设置输出模式
        .outputMode(OutputMode.Append())
        .format("kafka")
        .option("kafka.bootstrap.servers","node1.oldlu.cn:9092")
        .option("topic","etlTopic")
// 设置检查点目录
        .option("checkpointLocation",s"datas/structured/etl-100001")
// 流式应用,需要启动start
        .start()
// 查询器等待流式应用终止
        query.awaitTermination()
        query.stop() // 等待所有任务运行完成才停止运行
        }
    }

10 Kafka 特定配置从Kafka消费数据时,相关配置属性可以通过带有kafka.prefix的DataStreamReader.option进行设置,例如前面设置Kafka Brokers地址属性:stream.option(“kafka.bootstrap.servers”, “host:port”),更多关于Kafka 生产者Producer Config配置属和消费者Consumer Config配置属性,参考文档:


生产者配置(Producer Configs):

http://kafka.apache.org/20/documentation.html#producerconfigs

消费者配置(New Consumer Configs):

http://kafka.apache.org/20/documentation.html#newconsumerconfigs

注意以下Kafka参数属性可以不设置,如果设置的话,Kafka source或者sink可能会抛出错误:

1)、group.id:Kafka source将会自动为每次查询创建唯一的分组ID;

2)、auto.offset.reset:在将source选项startingOffsets设置为指定从哪里开始。结构化流管理

内部消费的偏移量,而不是依赖Kafka消费者来完成。这将确保在topic/partitons动态订阅时不

会遗漏任何数据。注意,只有在启动新的流式查询时才会应用startingOffsets,并且恢复操作

始终会从查询停止的位置启动;

3)、key.deserializer/value.deserializer:Keys/Values总是被反序列化为ByteArrayDeserializer

的字节数组,使用DataFrame操作显式反序列化keys/values;

4)、key.serializer/value.serializer:keys/values总是使用ByteArraySerializer或StringSerializer

进行序列化,使用DataFrame操作将keysvalues/显示序列化为字符串或字节数组;

5)、enable.auto.commit:Kafka source不提交任何offset;

6)、interceptor.classes:Kafka source总是以字节数组的形式读取key和value。使用

ConsumerInterceptor是不安全的,因为它可能会打断查询;

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
8天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
36 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
8天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
41 2
|
9天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
34 1
|
9天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
9天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
40 1
|
30天前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
8天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
42 1
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
47 3
|
3天前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
11 3
|
3天前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
15 2