通过Spark SQL实时归档SLS数据

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 流式计算和SQL 简要介绍Spark SQL流式开发语法 实时归档SLS数据到HDFS

作者:木艮,阿里云E-MapReduce开发工程师

我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。本文主要分成三部分:

  • 流式计算和SQL
  • 简要介绍Spark SQL流式开发语法
  • 实时归档SLS数据到HDFS

1. 流式计算和SQL

数据的价值随着时间逐渐降低。及时尽早的对数据进行处理提升了数据的价值,所以流式计算系统的应用也越来越广泛。目前常用的流式计算框架有Storm,Spark Streaming及Flink等,也有Kafka Streams这类基于Kafka的流式处理类库。各种流式处理框架都有其各自的API,开发者不可避免的需要学习如何使用这些API。如何提供简单而有效的开发工具,从而把更多的精力投放在业务处理中。所以,各个流式处理系统都逐渐支持SQL API作为开发语言,让使用者可以像处理Table一样处理Stream。例如KSQL支持使用SQL进行流式处理Kafka数据。Spark同样提出来Structured Streaming作为最新一代的流式处理系统,底层的处理引擎也是Spark SQL。不过在上层SQL API,缺少Structured Streaming必要的功能,例如window,watermark等。EMR在Spark开源版本上进行了功能扩展,支持使用SQL API在Spark上进行完整的流式查询开发。

2. Spark SQL流式开发入门

这节将简要介绍Spark SQL中关于流式开的概念和语法。

2.1 建表
当我们需要对流式数据源进行读写操作时,需要首先创建一张表来表示这个数据源。定义表的语法如下:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

以上语法中,针对特殊source,不要求一定指定表的列定义。当不指定列定义时,会自动识别数据源的schema信息。举一个例子:

CREATE TABLE driver_behavior 
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当数据源是Kafka时,会根据Kafka Topic名去到Kafka Schema Registry中查找schema信息。当然,我们也可以指定列定义,例如:

CREATE TABLE driverbehavior(deviceId string, velocity double)
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当指定列定义时,要求必须和Source中的字段定义是一致的。当执行完CREATE TABLE操作,表的定义会保存到Hive MetaStore中。

2.2 CTAS
我们可以将创建表和将查询结果写入到表的语句合并到一起,那么就是CREATE TABLE ... AS SELECT ...语法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;

举一个例子(引用自这里: q103):

CREATE TABLE kafka_temp_table
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

当执行完操作,将创建出表并实际生成一个StreamQuery实例,将查询结果写入到结果表中。

2.3 DML
流式查询SQL和离线SQL标准语法大部分是一样,这边主要介绍insert操作。流式查询是不允许单独进行SELECT操作,必须将SELECT的查询结果写入到表中。所以,需要在SELECT操作之前执行INSERT操作。

INSERT INTO tbName[(columnName[,columnName]*)]
queryStatement;

以上语法为一次流式查询:这个语句将实际生成一个StreamQuery实例,将查询结果写入到结果表中。举一个例子:

INSERT INTO kafka_temp_table
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

2.4 window及watermark
限于篇幅,本文暂且不介绍Spark SQL中如何使用window和watermak,有兴趣的可以先看看资料,后续会专门撰文介绍。

2.5 流式作业配置

使用SQL进行流式作业开发时,有些必要的配置无法在Query表达出来,需要单独进行设置。这里我们使用SET操作进行流式作业必要参数配置,当前有两个参数需要设置:

image

每一个流式查询实例前都需要进行配置,也就是说,当使用CTAS或者Insert操作时,必须前置这两个配置。一个SQL文件支持多个流式查询,例如:

-- test.sql

SET streaming.query.name=query1;
SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
INSERT INTO tbName1 [(columnName[,columnName]*)]
queryStatement1;

SET streaming.query.name=query2;
SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
INSERT INTO tbName2 [(columnName[,columnName]*)]
queryStatement2;

3. SLS数据实时归档实战

假定一个场景,现在通过SLS收集了业务服务器上的日志,需要归档到HDFS中,便于后续进行离线分析。这里涉及到两个数据源:SLS和HDFS。HDFS是Spark官方支持的数据源,支持流和批的读写。SLS是阿里云的服务,EMR已经支持了流式读写。

  • 环境准备

需要E-MapReduce 3.21.0以上版本集群环境,当前正在发布准备中,很快和大家见面,敬请期待。

  • 命令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar将会在EMR SDK 1.7.0版本发布出来。

  • 分别创建两张表:sls_service_log和hdfs_service_log
CREATE DATABASE IF NOT EXISTS default;
USE default;

DROP TABLE IF EXISTS hdfs_service_log;
CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
USING PARQUET
LOCATION '/tmp/hdfs_service_log';

DROP TABLE IF EXISTS sls_service_log;
CREATE TABLE sls_service_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");
- 通过Spark SQL启动一个Stream Query将SLS数据实时同步到HDFS中

set streaming.query.name=sync_sls_to_hdfs;
set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;

INSERT INTO hdfs_service_log
select
__tag__hostname__ as instance_name,
ip,
content
from sls_service_log;
查看HDFS数据归档情况
image
  • 使用Spark SQL对归档的数据进行离线分析:例如统计一共有多少个IP
select distinct(ip) from hdfs_service_log;

image

4. 结语

以上,我们介绍了Spark SQL在流式处理中的一个非常简单的例子。其实,我们还可以使用Spark SQL进行更加复杂的流式处理任务。后续文章,我将介绍窗口操作,watermark等概念,以及如何在流式数据上进行简单的机器学习运算。

image

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
13天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
37 2
|
19天前
|
Java 网络架构 数据格式
Struts 2 携手 RESTful:颠覆传统,重塑Web服务新纪元的史诗级组合!
【8月更文挑战第31天】《Struts 2 与 RESTful 设计:构建现代 Web 服务》介绍如何结合 Struts 2 框架与 RESTful 设计理念,构建高效、可扩展的 Web 服务。Struts 2 的 REST 插件提供简洁的 API 和约定,使开发者能快速创建符合 REST 规范的服务接口。通过在 `struts.xml` 中配置 `&lt;rest&gt;` 命名空间并使用注解如 `@Action`、`@GET` 等,可轻松定义服务路径及 HTTP 方法。
30 0
|
19天前
|
测试技术 Java
全面保障Struts 2应用质量:掌握单元测试与集成测试的关键策略
【8月更文挑战第31天】Struts 2 的测试策略结合了单元测试与集成测试。单元测试聚焦于单个组件(如 Action 类)的功能验证,常用 Mockito 模拟依赖项;集成测试则关注组件间的交互,利用 Cactus 等框架确保框架拦截器和 Action 映射等按预期工作。通过确保高测试覆盖率并定期更新测试用例,可以提升应用的整体稳定性和质量。
39 0
|
19天前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
36 0
|
19天前
|
Java 测试技术 容器
从零到英雄:Struts 2 最佳实践——你的Web应用开发超级变身指南!
【8月更文挑战第31天】《Struts 2 最佳实践:从设计到部署的全流程指南》深入介绍如何利用 Struts 2 框架从项目设计到部署的全流程。从初始化配置到采用 MVC 设计模式,再到性能优化与测试,本书详细讲解了如何构建高效、稳定的 Web 应用。通过最佳实践和代码示例,帮助读者掌握 Struts 2 的核心功能,并确保应用的安全性和可维护性。无论是在项目初期还是后期运维,本书都是不可或缺的参考指南。
29 0
|
19天前
|
测试技术 Java
揭秘Struts 2测试的秘密:如何打造无懈可击的Web应用?
【8月更文挑战第31天】在软件开发中,确保代码质量的关键在于全面测试。对于基于Struts 2框架的应用,结合单元测试与集成测试是一种有效的策略。单元测试聚焦于独立组件的功能验证,如Action类的执行逻辑;而集成测试则关注组件间的交互,确保框架各部分协同工作。使用JUnit进行单元测试,可通过简单示例验证Action类的返回值;利用Struts 2 Testing插件进行集成测试,则可模拟HTTP请求,确保Action方法正确处理请求并返回预期结果。这种结合测试的方法不仅提高了代码质量和可靠性,还保证了系统各部分按需协作。
9 0
|
26天前
|
Kubernetes Ubuntu Windows
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
|
7天前
|
Java
日志框架log4j打印异常堆栈信息携带traceId,方便接口异常排查
日常项目运行日志,异常栈打印是不带traceId,导致排查问题查找异常栈很麻烦。
|
18天前
|
存储 监控 数据可视化
SLS 虽然不是直接使用 OSS 作为底层存储,但它凭借自身独特的存储架构和功能,为用户提供了一种专业、高效的日志服务解决方案。
【9月更文挑战第2天】SLS 虽然不是直接使用 OSS 作为底层存储,但它凭借自身独特的存储架构和功能,为用户提供了一种专业、高效的日志服务解决方案。
50 9