读Flink源码谈设计:Metric

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 前阵子笔者涉及了些许监控相关的开发工作,在开发过程中也碰到过些许问题,便翻读了Flink相关部分的代码,在读代码的过程中发现了一些好的设计,因此也是写成文章整理上来。

theme: awesome-green

版本 日期 备注
1.0 2021.10.8 文章首发
1.1 2022.3.9 fix typo
1.2 2022.7.3 fix typo

0. 前言

前阵子笔者涉及了些许监控相关的开发工作,在开发过程中也碰到过些许问题,便翻读了Flink相关部分的代码,在读代码的过程中发现了一些好的设计,因此也是写成文章整理上来。

本文的源码基于Flink1.13.2

1. 扩展插件化

在官网中,Flink社区自己提供了一些已接入的Reporter,如果我们有自己定制的Reporter,也可以根据它的规范去实现自己的Reporter。

在Flink的代码中,提供了反射机制实例化MetricReporter:要求MetricReporter的实现类必须是public的访问修饰符,不能是抽象类,必须有一个无参构造函数。

核心代码为ReporterSetup#getAllReporterFactories

    private static Iterator<MetricReporterFactory> getAllReporterFactories(
            @Nullable PluginManager pluginManager) {
   
   
        final Iterator<MetricReporterFactory> factoryIteratorSPI =
                ServiceLoader.load(MetricReporterFactory.class).iterator();
        final Iterator<MetricReporterFactory> factoryIteratorPlugins =
                pluginManager != null
                        ? pluginManager.load(MetricReporterFactory.class)
                        : Collections.emptyIterator();

        return Iterators.concat(factoryIteratorPlugins, factoryIteratorSPI);
    }

该代码会通过Java的SPI机制来获取MetricReporter的相关实现类,本质上是通过ClassLoder来获取。

|-- ReporterSetup
     \-- fromConfiguration //当集群启动时,会从配置读取监控并初始化相关类
         \-- loadAvailableReporterFactories // 加载有效的Reporter们
             \-- getAllReporterFactories //  核心代码,通过SPI以及ClassLoader机制获取Reporter们

2. 内置松耦合

上文提到了社区会提供常见的一些监控Reporter。在代码中,本质是工厂模式的实现。

/**
 * {@link MetricReporter} factory.
 *
 * <p>Reporters that can be instantiated with a factory automatically qualify for being loaded as a
 * plugin, so long as the reporter jar is self-contained (excluding Flink dependencies) and contains
 * a {@code META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory} file
 * containing the qualified class name of the factory.
 *
 * <p>Reporters that previously relied on reflection for instantiation can use the {@link
 * InstantiateViaFactory} annotation to redirect reflection-base instantiation attempts to the
 * factory instead.
 */
public interface MetricReporterFactory {
   
   

    /**
     * Creates a new metric reporter.
     *
     * @param properties configured properties for the reporter
     * @return created metric reporter
     */
    MetricReporter createMetricReporter(final Properties properties);
}

每接入一个监控,只要实现相应的工厂方法即可。目前实现的有:

  • org.apache.flink.metrics.graphite.GraphiteReporterFactory
  • org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
  • org.apache.flink.metrics.prometheus.PrometheusReporter
  • org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  • org.apache.flink.metrics.statsd.StatsDReporterFactory
  • org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
  • org.apache.flink.metrics.slf4j.Slf4jReporterFactory

每当社区需要接入新的Reporter时,仅仅需要实现MetricReporterFactory即可,而上层能感知到的也仅仅是MetricReporter,和任何具体实现无关,这也是典型的一种防腐设计。

FLink-Metric.jpg

3. Fail safe

在流计算业务中,如果监控这种旁路逻辑发生问题,是否应该影响到主干逻辑呢?答案是不应该的。

MetricRegistryImpl中(顾名思义,它会将所有的Reporter注册进这个类),构造函数会将相关的MetricReporter放到线程池中,定期的让它们上报数据。

|-- MetricRegistryImpl
  \-- constructor

WebMonitorEndpoint中,也有线程池的身影。这个类提供了RestAPI来便于查询Metric。对于其他组件的请求通过Akka来异步发送,并通过线程池来处理这些回调的回复。

|-- WebMonitorEndpoint
  \-- start
    \-- initializeHandlers
      \--   new JobConfigHandler
|-- AbstractExecutionGraphHandler
  \-- handleRequest

这是典型Fail-safe的设计。

4. 不仅只支持Push

在Flink中,监控数据不仅支持Push,同时还实现了Pull,而实现也非常的简单。

MetricQueryService实现了MetricQueryServiceGateway,这意味着它可以被远程调用。

其监控数据来源代码追踪:

|-- AbstractMetricGroup
  \-- counter
    |-- MetricRegistryImpl
      \-- register
        |-- MetricQueryService
          \-- addMetric

上面提到的WebMonitorEndpoint也是一样,不过是基于RestAPI的实现,同样提供了Pull的策略。

5. 参考资料

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
559 0
读Flink源码谈设计:图的抽象与分层
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
584 1
|
7月前
|
流计算
Flink源码解析
Flink源码解析
104 0
|
资源调度 流计算
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
158 1
|
4月前
|
SQL 消息中间件 监控
实时计算 Flink版产品使用问题之怎么使用Metric Reporters监控作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之源码 deploy,生成带有时间戳的jar包,如何修改配置信息
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
监控 流计算
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)
96 1
|
7月前
|
存储 SQL API
读Flink源码谈设计:流批一体的实现与现状
在Dataflow相关的论文发表前,大家都往往认为需要两套API来实现流计算和批计算,典型的实现便是Lambda架构。
628 0
|
7月前
|
存储 算法 Java
读Flink源码谈设计:Exactly Once
将Flink应用至生产已有一段时间,刚上生产的时候有幸排查过因数据倾斜引起的Checkpoint超时问题——当时简单的了解了相关机制,最近正好在读Flink源码,不如趁这个机会搞清楚。 在这里,我们首先要搞清楚两种Exactly-Once的区别: - Exactly Once:在计算引擎内部,数据不丢失不重复。本质是通过Flink开启检查点进行Barrier对齐,即可做到。 - End to End Exactly Once:这意味着从数据读取、引擎处理到写入外部存储的整个过程中,数据都是不丢失不重复的。这要求数据源可重放,写入端支持事务的恢复和回滚或幂等。
575 0

相关产品

  • 实时计算 Flink版