基于SLS+Blink的实时计算最佳实践

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: SLS与阿里云实时计算结合,可以把SLS作为数据源(Source),实现日志端到端的实时采集与处理,也可以把SLS作为结果的输出目标(Sink),在SLS对结果实时查询分析,配置可视化报表等。

日志服务简介

阿里云的日志服务(SLS)是针对日志类数据的一站式服务,无需开发就能快捷完成海量日志数据的采集、消费、投递以及查询分析等功能,提升运维、运营效率。在采集端支持30多种写入方式,包括自研的客户端Logtail,开源软件如Logstash、Fluent,Flume,Beats等,各种语言的SDK/Producer,无论是嵌入式设备、网页、服务器、程序等都能轻松接入。在消费端,支持与Storm、Spark Streaming、Flink/Blink等大数据系统无缝对接。
image.png

阿里云实时计算(Blink)

阿里云实时计算是基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理等场景。阿里云实时计算提供了如下两种数据处理API:

  • Flink SQL:通过DDL的方式定义Source和Sink,用SQL来实现数据的处理。
  • Flink Datastream: 在程序中使用各个Source和Sink的SDK,通过提交jar的方式运行托管的Flink Job。

通过这两种API,既可以把SLS作为数据源(Source),实现日志端到端的实时采集与处理,也可以把SLS作为结果的输出目标(Sink),在SLS对结果实时查询分析,配置可视化报表等。
image.png

Flink SQL

Flink SQL是阿里云实时计算为了简化计算模型、降低用户使用实时计算门槛而设计的一套符合标准SQL语义的开发语言。

创建SLS源表

SLS源表对应SLS中的Logstore,表中的字段与Logstore中日志字段一一映射,像执行SQL一样流式处理SLS中的数据。除了Logstore所属的Region对应的Endpoint和Project之外,还需要具有读SLS Logstore权限的Access Key以及消费数据起始位置对应的时间点。

create table sls_stream(
  a INT,
  b INT,
  c VARCHAR
) with (
  type ='sls',
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='<yourAccessId>',
  accessKey ='<yourAccessKey>',
  startTime = '2017-07-05 00:00:00',
  project ='<yourProjectName>',
  logStore ='<yourLogStoreName>',
  consumerGroup ='<yourConsumerGroupName>'
);

Checkpoint

SLS的底层存储Loghub,是一个类似Kafka的Append Only的存储系统,覆盖Kafka 100%的功能。与Kafka的partition类型,Logstore中的数据存储在每个Shard中。每个Shard都可以通过cursor或者时间戳,确定日志在Shard中的存储位置(对应Kafka中的Offset)。在消费过程中,为了支持程序重启时尽可能少的数据重复,需要将最新消费位置记录下来,用于进程重启之后继续消费。这个位置就是我们所说的checkpoint。
image.png
目前,Flink SQL任务checkpoint保存在Flink的State中,如果任务Failover 或者暂停之后恢复,会从State中恢复消费位置继续消费。然而如果任务重启或者其他原因造成的State丢失,任务将从启动时指定的时间点开始消费。创建源表时建议指定参数consumerGroup,Blink将自动同步消费位置到SLS服务端,以用于在SLS控制台监控消费进度。注意:任务重启时不会使用SLS服务端的消费位置恢复。

创建SLS结果表

通过定义结果表,通过INSERT语句,把从源表中处理之后的数据写入到SLS的Logstore中。

create table sls_output(
 `name` VARCHAR,
 age BIGINT,
 birthday BIGINT
)with(
 type='sls',
 endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
 accessId='<yourAccessId>',
 accessKey='<yourAccessKey>',
 project='<yourProjectName>',
 logstore='<yourLogstoreName>'
);

INSERT INTO sls_output SELECT age, birthday FROM source_table;

Flink Datastream

Flink SQL 的不足

  1. Flink SQL已经能够实现许多场景下的数据处理需求,但是受限于SQL的表达能力,对于比较复杂的业务场景,SQL实现起来比较复杂。
  2. 对于DDL 的定义,需要指定固定的字段列表,对于日志场景而言,日志字段不固定的情况非常普遍,这就意味着很难提前定义好全部的字段。
  3. 在日志中,很多时候单个字段是JSON或者其他复杂的形式,如果要在SQL里面解析和处理这类字段,不如在程序中之间处理起来灵活。

对于这些场景,可以考虑使用Data Stream API,通过自定义程序,实现更复杂的业务逻辑。

Maven 依赖

SLS开发了一个与开源Flink集成的SDK,同样适用于Blink。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>flink-log-connector</artifactId>
    <version>0.1.19.1</version>
</dependency>

Github源码:https://github.com/aliyun/aliyun-log-flink-connector

消费SLS示例

public class ConsumerSample {
    private static final String SLS_ENDPOINT = "";
    private static final String ACCESS_KEY_ID = "";
    private static final String ACCESS_KEY_SECRET = "";
    private static final String SLS_PROJECT = "";
    private static final String SLS_LOGSTORE = "";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("your checkpoint dir"));
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
        configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
        configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "23_ots_sla_etl_product1");
        configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
        configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");

        FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
        DataStream<FastLogGroupList> stream = env.addSource(
                new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));

        stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
            for (FastLogGroup logGroup : value.getLogGroups()) {
                int logCount = logGroup.getLogsCount();
                for (int i = 0; i < logCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    // processing log
                }
            }
        });
        stream.writeAsText("log-" + System.nanoTime());
        env.execute("Flink consumer");
    }
}

Checkpoint

与SQL类似,SDK也会把checkpoint保存到Flink State中,此外,还支持把Checkpoint同步到SLS服务端,这样当Flink本地的State无法恢复时,还能从服务端获取checkpoint,从而保证即便任务重启甚至重建,只要服务端的消费组没有删除,checkpoint就不会丢失。SDK同步checkpoint支持如下策略:

  • 与Flink snapshotState同步,即在Flink调用snapshotState时,同步到服务端。默认是这种方式。
  • 自动提交checkpoint,即定时提交checkpoint到服务端。好处是当Flink下游任务不支持exactly once时最大程度上避免数据重复。
  • 不同步到SLS服务端。

处理结果写入SLS示例

public class ProducerSample {
    private static final String SLS_ENDPOINT = "cn-hangzhou.log.aliyuncs.com";
    private static final String ACCESS_KEY_ID = "";
    private static final String ACCESS_KEY = "";
    private static final String SLS_PROJECT = "";
    private static final String SLS_LOGSTORE = "test-flink-producer";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);
        DataStream<String> stream = env.addSource(new EventsGenerator());
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY);
        configProps.put(ConfigConstants.LOG_PROJECT, SLS_PROJECT);
        configProps.put(ConfigConstants.LOG_LOGSTORE, SLS_LOGSTORE);

        FlinkLogProducer<String> logProducer = new FlinkLogProducer<>(new SimpleLogSerializer(), configProps);
        logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            @Override
            public String getHashKey(String element) {
                String hash = "";
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    hash = new BigInteger(1, md.digest()).toString(16);
                } catch (NoSuchAlgorithmException ignore) {
                }
                StringBuilder builder = new StringBuilder();
                while (builder.length() < 32 - hash.length()) {
                    builder.append("0");
                }
                builder.append(hash);
                return builder.toString();
            }
        });
        stream.addSink(logProducer);
        env.execute("Flink producer");
    }

    public static class EventsGenerator implements SourceFunction<String> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

任务监控

Metric监控

对于Flink SQL任务,Blink控制台提供了非常完备的监控报表,可以通过这些报表观察任务运行的状态,如延迟,内存状态,Failover等。
image.png
Metric解释可以参考Blink作业运维相关文档:https://help.aliyun.com/document_detail/62482.html
注意:SLS Datastream SDK暂时还没有实现Metric上报。

消费进度与监控报警

在SLS查看消费组对应的消费进度。
image.png
SLS对于每个消费组会定时输出延迟日志,根据这个延迟日志结合SLS的告警,可以用于监控消费延迟。参考如何开通消费组日志:https://help.aliyun.com/document_detail/85663.html
image.png

任务日志采集

Blink支持把作业日志存储到SLS中,在作业编辑页面右侧的作业参数页面,配置Log4j appender:
image.png

log4j.logger.org.apache.hadoop=OFF
log4j.appender.loghub = com.alibaba.blink.log.loghub.BlinkLogHubAppender
log4j.appender.loghub.Threshold = ERROR
log4j.appender.loghub.projectName = <your SLS Project>
log4j.appender.loghub.logstore = <your SLS Logstore>
log4j.appender.loghub.endpoint = <your SLS Endpoint>
log4j.appender.loghub.accessKeyId = <your Access Key ID>
log4j.appender.loghub.accessKey = <your Access Key Secret>

常见问题

部分task没有读到数据

从SLS消费数据的本质就是从每个shard的指定位置开始消费,直到最新位置。然后单个shard不支持并发消费,也就是说一个shard最多被一个task消费到。而不管是SQL还是Data Stream,Shard分配的方式都是对shard本身的信息做hash,然后对task总数取模来分配的。假设某个shard hashcode是x,task 个数为y,当前task的id为z,那么仅当 x%y=z 时,当前task会消费这个shard。因此可能存在某个task没有分配到任何shard的情况。

消费太慢导致数据堆积

数据堆积的根本原因是写入速度超过了消费速度,而如何提高消费速度取决于具体的场景。常见的原因有:

  • 下游处理节点慢导致source节点反压。这种情况可以通过观察Blink的反压状态是否是HIGH来确认,通过优化下游任务节点速度来解决。
  • Shard个数太少。当shard数量少于task个数时,无疑会造成部分task空跑的现象,此时增加task个数已经对总体的并发没有任何影响,此时可以尝试在SLS侧分裂shard个数解决。
消费组在多个作业之间没有产生分配shard的效果

Blink消费SLS的数据并没有使用消费组来实现均衡消费的效果,提供的消费组名称仅仅是用于在SLS服务端保存消费位点。如果需要类似Kafka 消费组的功能,应使用SLS的Consumer Library。参考文档 https://help.aliyun.com/document_detail/28998.html

Flink Datastream任务没有同步checkpoint到SLS服务端

检查Blink任务作业参数中的 blink.checkpoint.interval.ms 是否设置过大。

更多资料

Blink开发Datastream作业:https://help.aliyun.com/document_detail/111876.html
定义SLS源表:https://help.aliyun.com/knowledge_detail/62521.html
定义SLS结果表:https://help.aliyun.com/knowledge_detail/62529.html




目录
相关文章
|
3天前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
241 22
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
3月前
|
运维 监控 安全
选择主题1:实时计算Flink版最佳实践测评
本文介绍了使用实时计算Flink版进行用户行为分析的实践,涵盖用户行为趋势、留存分析、用户画像构建及异常检测等方面。与自建Flink集群相比,实时计算Flink版在稳定性、性能、开发运维和安全能力上表现更优,且显著降低了企业的IT支出和运维成本,提升了业务决策效率和系统可靠性,是企业级应用的理想选择。
92 32
|
3月前
|
XML JSON 监控
告别简陋:Java日志系统的最佳实践
【10月更文挑战第19天】 在Java开发中,`System.out.println()` 是最基本的输出方法,但它在实际项目中往往被认为是不专业和不足够的。本文将探讨为什么在现代Java应用中应该避免使用 `System.out.println()`,并介绍几种更先进的日志解决方案。
89 1
|
3月前
|
运维 监控 安全
实时计算 Flink 版最佳实践测评
本文介绍了结合电商平台用户行为数据的实时计算Flink版实践,涵盖用户行为分析、标签画像构建、业务指标监控和数据分析预测等场景。文章还对比了实时计算Flink版与其他引擎及自建Flink集群在稳定性、性能、开发运维和安全能力方面的差异,分析了其成本与收益。最后,文章评估了实时计算Flink版的产品内引导、文档帮助、功能满足情况,并提出了针对不同业务场景的改进建议和与其他产品的联动可能性。
77 2
|
4月前
|
SQL 运维 监控
实时计算Flink版最佳实践测评报告
本报告旨在评估阿里云实时计算Flink版在实际应用中的表现,通过一系列的测试和分析来探讨其在稳定性、性能、开发运维及安全性方面的优势。同时,我们将结合具体的业务场景,如用户行为分析、标签画像构建等,来说明其实时数据处理能力,并对比自建Flink集群以及其他实时计算引擎。最后,从成本效益的角度出发,讨论采用全托管服务对企业运营的影响。
86 13
|
3月前
|
消息中间件 运维 分布式计算
实时计算Flink版最佳实践测评
本文介绍了使用阿里云实时计算Flink版进行用户行为分析的实践,详细探讨了其在性能、稳定性和成本方面的优势,以及与自建Flink集群的对比。通过实时计算,能够快速发现用户行为模式,优化产品功能,提升用户体验和市场竞争力。文章还提到了产品的易用性、功能满足度及改进建议,并与其他Flink实时计算产品进行了对比,强调了Flink在实时处理方面的优势。
|
4月前
|
存储 运维 监控
实时计算Flink版最佳实践测评
实时计算Flink版最佳实践测评
124 1
|
4月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
62 2
|
4月前
|
开发者 Python
基于Python的日志管理与最佳实践
日志是开发和调试过程中的重要工具,然而,如何高效地管理和利用日志常常被忽略。本文通过Python中的logging模块,探讨如何使用日志来进行调试、分析与问题排查,并提出了一些实际应用中的优化建议和最佳实践。
|
5月前
|
JSON Java fastjson
Java日志通关(五) - 最佳实践
作者日常在与其他同学合作时,经常发现不合理的日志配置以及五花八门的日志记录方式,后续作者打算在团队内做一次Java日志的分享,本文是整理出的系列文章第五篇。

相关产品

  • 日志服务