通过Spark SQL实时归档SLS数据

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 流式计算和SQL 简要介绍Spark SQL流式开发语法 实时归档SLS数据到HDFS

作者:木艮,阿里云E-MapReduce开发工程师

我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。本文主要分成三部分:

  • 流式计算和SQL
  • 简要介绍Spark SQL流式开发语法
  • 实时归档SLS数据到HDFS

1. 流式计算和SQL

数据的价值随着时间逐渐降低。及时尽早的对数据进行处理提升了数据的价值,所以流式计算系统的应用也越来越广泛。目前常用的流式计算框架有Storm,Spark Streaming及Flink等,也有Kafka Streams这类基于Kafka的流式处理类库。各种流式处理框架都有其各自的API,开发者不可避免的需要学习如何使用这些API。如何提供简单而有效的开发工具,从而把更多的精力投放在业务处理中。所以,各个流式处理系统都逐渐支持SQL API作为开发语言,让使用者可以像处理Table一样处理Stream。例如KSQL支持使用SQL进行流式处理Kafka数据。Spark同样提出来Structured Streaming作为最新一代的流式处理系统,底层的处理引擎也是Spark SQL。不过在上层SQL API,缺少Structured Streaming必要的功能,例如window,watermark等。EMR在Spark开源版本上进行了功能扩展,支持使用SQL API在Spark上进行完整的流式查询开发。

2. Spark SQL流式开发入门

这节将简要介绍Spark SQL中关于流式开的概念和语法。

2.1 建表
当我们需要对流式数据源进行读写操作时,需要首先创建一张表来表示这个数据源。定义表的语法如下:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

以上语法中,针对特殊source,不要求一定指定表的列定义。当不指定列定义时,会自动识别数据源的schema信息。举一个例子:

CREATE TABLE driver_behavior 
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当数据源是Kafka时,会根据Kafka Topic名去到Kafka Schema Registry中查找schema信息。当然,我们也可以指定列定义,例如:

CREATE TABLE driverbehavior(deviceId string, velocity double)
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当指定列定义时,要求必须和Source中的字段定义是一致的。当执行完CREATE TABLE操作,表的定义会保存到Hive MetaStore中。

2.2 CTAS
我们可以将创建表和将查询结果写入到表的语句合并到一起,那么就是CREATE TABLE ... AS SELECT ...语法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;

举一个例子(引用自这里: q103):

CREATE TABLE kafka_temp_table
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

当执行完操作,将创建出表并实际生成一个StreamQuery实例,将查询结果写入到结果表中。

2.3 DML
流式查询SQL和离线SQL标准语法大部分是一样,这边主要介绍insert操作。流式查询是不允许单独进行SELECT操作,必须将SELECT的查询结果写入到表中。所以,需要在SELECT操作之前执行INSERT操作。

INSERT INTO tbName[(columnName[,columnName]*)]
queryStatement;

以上语法为一次流式查询:这个语句将实际生成一个StreamQuery实例,将查询结果写入到结果表中。举一个例子:

INSERT INTO kafka_temp_table
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

2.4 window及watermark
限于篇幅,本文暂且不介绍Spark SQL中如何使用window和watermak,有兴趣的可以先看看资料,后续会专门撰文介绍。

2.5 流式作业配置

使用SQL进行流式作业开发时,有些必要的配置无法在Query表达出来,需要单独进行设置。这里我们使用SET操作进行流式作业必要参数配置,当前有两个参数需要设置:

image

每一个流式查询实例前都需要进行配置,也就是说,当使用CTAS或者Insert操作时,必须前置这两个配置。一个SQL文件支持多个流式查询,例如:

-- test.sql

SET streaming.query.name=query1;
SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
INSERT INTO tbName1 [(columnName[,columnName]*)]
queryStatement1;

SET streaming.query.name=query2;
SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
INSERT INTO tbName2 [(columnName[,columnName]*)]
queryStatement2;

3. SLS数据实时归档实战

假定一个场景,现在通过SLS收集了业务服务器上的日志,需要归档到HDFS中,便于后续进行离线分析。这里涉及到两个数据源:SLS和HDFS。HDFS是Spark官方支持的数据源,支持流和批的读写。SLS是阿里云的服务,EMR已经支持了流式读写。

  • 环境准备

需要E-MapReduce 3.21.0以上版本集群环境,当前正在发布准备中,很快和大家见面,敬请期待。

  • 命令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar将会在EMR SDK 1.7.0版本发布出来。

  • 分别创建两张表:sls_service_log和hdfs_service_log
CREATE DATABASE IF NOT EXISTS default;
USE default;

DROP TABLE IF EXISTS hdfs_service_log;
CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
USING PARQUET
LOCATION '/tmp/hdfs_service_log';

DROP TABLE IF EXISTS sls_service_log;
CREATE TABLE sls_service_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");
- 通过Spark SQL启动一个Stream Query将SLS数据实时同步到HDFS中

set streaming.query.name=sync_sls_to_hdfs;
set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;

INSERT INTO hdfs_service_log
select
__tag__hostname__ as instance_name,
ip,
content
from sls_service_log;
查看HDFS数据归档情况
image
  • 使用Spark SQL对归档的数据进行离线分析:例如统计一共有多少个IP
select distinct(ip) from hdfs_service_log;

image

4. 结语

以上,我们介绍了Spark SQL在流式处理中的一个非常简单的例子。其实,我们还可以使用Spark SQL进行更加复杂的流式处理任务。后续文章,我将介绍窗口操作,watermark等概念,以及如何在流式数据上进行简单的机器学习运算。

image

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
14天前
|
SQL 存储 缓存
日志服务 SQL 引擎全新升级
SQL 作为 SLS 基础功能,每天承载了用户大量日志数据的分析请求,既有小数据量的快速查询(如告警、即席查询等);也有上万亿数据规模的报表级分析。SLS 作为 Serverless 服务,除了要满足不同用户的各类需求,还要兼顾性能、隔离性、稳定性等要求。过去一年多的时间,SLS SQL 团队做了大量的工作,对 SQL 引擎进行了全新升级,SQL 的执行性能、隔离性等方面都有了大幅的提升。
|
2月前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
163 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
1月前
|
SQL 存储 缓存
MySQL进阶突击系列(02)一条更新SQL执行过程 | 讲透undoLog、redoLog、binLog日志三宝
本文详细介绍了MySQL中update SQL执行过程涉及的undoLog、redoLog和binLog三种日志的作用及其工作原理,包括它们如何确保数据的一致性和完整性,以及在事务提交过程中各自的角色。同时,文章还探讨了这些日志在故障恢复中的重要性,强调了合理配置相关参数对于提高系统稳定性的必要性。
|
2月前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的控制文件与归档日志文件
本文介绍了Oracle数据库中的控制文件和归档日志文件。控制文件记录了数据库的物理结构信息,如数据库名、数据文件和联机日志文件的位置等。为了保护数据库,通常会进行控制文件的多路复用。归档日志文件是联机重做日志文件的副本,用于记录数据库的变更历史。文章还提供了相关SQL语句,帮助查看和设置数据库的日志模式。
【赵渝强老师】Oracle的控制文件与归档日志文件
|
2月前
|
SQL 存储 缓存
SQL Server 数据太多如何优化
11种优化方案供你参考,优化 SQL Server 数据库性能得从多个方面着手,包括硬件配置、数据库结构、查询优化、索引管理、分区分表、并行处理等。通过合理的索引、查询优化、数据分区等技术,可以在数据量增大时保持较好的性能。同时,定期进行数据库维护和清理,保证数据库高效运行。
|
3月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
2月前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的联机重做日志文件与数据写入过程
在Oracle数据库中,联机重做日志文件记录了数据库的变化,用于实例恢复。每个数据库有多组联机重做日志,每组建议至少有两个成员。通过SQL语句可查看日志文件信息。视频讲解和示意图进一步解释了这一过程。
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
3月前
|
SQL 数据库
为什么 SQL 日志文件很大,我应该如何处理?
为什么 SQL 日志文件很大,我应该如何处理?
|
3月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录