Flink 任务 Jackson 解析 JSON 使用不当引发的反压问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 背景最近业务方反馈线上一个 topic 的数据延迟比较大,然后我查看了这个 topic 的数据是由一个 Flink 任务产生的,于是就找到了这个任务开始排查问题,发现这个任务是一个非常简单的任务,大致的逻辑是 kafka source -> flatmap -> filter -> map -> sink kafka.中间没有复杂的操作,我在本地写了一个简单的程序模拟线上的任务.方便大家理解, 任务的 DAG 如下图所示

背景


最近业务方反馈线上一个 topic 的数据延迟比较大,然后我查看了这个 topic 的数据是由一个 Flink 任务产生的,于是就找到了这个任务开始排查问题,发现这个任务是一个非常简单的任务,大致的逻辑是 kafka source -> flatmap -> filter -> map -> sink kafka.中间没有复杂的操作,我在本地写了一个简单的程序模拟线上的任务.方便大家理解, 任务的 DAG 如下图所示



image-20210827230429171


线上任务 source 的并行度设置的是 80 flatmap filter map 的并行度也是 80 所以和 source 都 chain 在一起,两个 sink 的并行度设置的都是 30,我这里本地就设置成 8 和 3 了.


排查过程

因为反馈说是下游 topic 的数据有延迟,我的第一反应是看任务是否有反压,通过 Flink 的 UI 上的 BackPressure 发现任务并没有反压,然后查看了第一个 operator 输入数据量的 QPS 大概是 50w/s 经过 filter 之后输出数据量的 QPS 是 15w/s 左右,然后查看了 kafka 的监控发现写入的 QPS 也是 15w/s ,这就说明确实不是因为写入 kafka 慢引起的反压,因为你产生了多少我就写入了多少,但是最终的数据有延迟,那就说明任务反压的地方是在第一个 operator 中的某个算子上,但是因为 operator chain 把所有的算子都 chain 在一起,不太方便定位反压的位置.于是我就在 flatmap 后面调用 disableChaining 打断了 operator chain.这个时候任务的 DAG 变成了下面这样.



image-20210827231856263


这个时候在查看 source 的 BackPressure 显示都是 high 状态,说明反压出现在 flatmap 算子,刚开始猜测可能是因为 flatmap 的时候数据量会暴增导致处理不过来了,所以就尝试增大了 flatmap 算子的并行度,但是发现并没有明显的效果,任务反压还是比较严重,哪怕是增大并行度到 500 效果都不是很明显,这就说明跟 flatmap 的数据量大小及并行度没有关系,没办法只能看代码了,大概看了下任务逻辑,代码非常简单,不应该会出现性能问题,唯一不足的就是代码逻辑不够简洁,略显繁琐,且有重复的逻辑,其实 filter 和 map 算子是没有必要的,优化了这些地方之后本以为就能解决反压问题,可是结果还是出现了反压.



image-20210829110127054


这就有点奇怪了,代码中没有复杂的逻辑,也没有和第三方交互,按道理来说不应该会有性能问题,再次查看代码发现代码里面解析 JSON 数据用的是 Jackson 类库,Jackson 的稳定性和性能肯定是没有问题的,在国外应用比较广泛,比如 springboot 包括 Flink 这种大型的框架内默认都使用 Jackson 更加能说明这一点,国内使用 Fastjson 类库会比较多一点.然后仔细看代码发现使用 Jackson 的时候需要实例化 ObjectMapper 对象,但是代码里面是在 flatmap 方法里面实例化 ObjectMapper 对象的,也就是说每来一条数据都需要实例化一次 ObjectMapper 对象,这显然是不合理的.刚开始没有发现是因为解析 json 被封装在一个方法里面,没有进到方法里面来看,那么究竟是不是因为 ObjectMapper 实例化影响到性能呢? 其实非常简单,我们可以在本地测试一下提前实例化好 ObjectMapper 和每条数据都实例化一遍的差别就能得到答案.测试代码如下


public static long a(String json) throws IOException {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i ++) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.readValue(json, List.class);
        }
        long end = System.currentTimeMillis();
        return end - start;
    }
    public static long b(String json) throws IOException {
        long start = System.currentTimeMillis();
        ObjectMapper objectMapper = new ObjectMapper();
        for (int i = 0; i < 100000; i ++) {
            objectMapper.readValue(json, List.class);
        }
        long end = System.currentTimeMillis();
        return end - start;
    }
    String json = "[{\"name\":\"JasonLee\",\"age\":18},{\"name\":\"JasonLee1\",\"age\":18}]";
    System.out.println(a(json));
    System.out.println(b(json));
    // 打印的结果是:
    68425
    316


从上面测试的结果可以发现,这两种写法性能相差甚大,耗时完全不在一个量级上,也就是说 jackson 的 ObjectMapper 实例化是一个非常耗时的过程,这就能解释任务为什么会在 flatmap 算子出现反压了.


解决


既然知道了反压的原因,那解决办法就非常的简单了,把 ObjectMapper 对象的实例化放在 open 方法里面即可,类似于创建 JDBC 连接,让每个 task 初始化一次 ObjectMapper 对象,然后在 flatmap 方法里面直接使用.这样就不用每条数据都初始化一次.


SingleOutputStreamOperator<JasonEntity> filter = jasonEntityDataStreamSource.flatMap(new RichFlatMapFunction<JasonEntity, JasonEntity>() {
    private ObjectMapper objectMapper = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        objectMapper = new ObjectMapper();
    }
    @Override
    public void flatMap(JasonEntity jasonEntity, Collector<JasonEntity> collector) throws Exception {
        List<Map<String, String>> list = objectMapper.readValue("[\n" +
                "  {\"name\":\"JasonLee\",\"age\":18},\n" +
                "  {\"name\":\"JasonLee1\",\"age\":18}\n" +
                "]", List.class);
        System.out.println(list.toString());
        collector.collect(jasonEntity);
    }
}).disableChaining();


当然除了改成上面的写法外,还可以直接用 fastjson 来解析 JSON 数据,jackson 和 fastjson 的性能是差不多的,网上也有很多这两者对比的文章,感兴趣的也可以自己测试一下.这里就不做对比测试了.


最后把修改后的代码提交到线上后发现没有出现反压,下游的数据自然也不会有延迟了.其实这是一个非常小的问题,或者说是一个小细节吧,但是却能带来严重的后果.所以我们在写代码的时候还是要多注意细节,尽可能提高程序的性能.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
160 3
|
20天前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
13天前
|
JSON 小程序 UED
微信小程序 app.json 配置文件解析与应用
本文介绍了微信小程序中 `app.json` 配置文件的详细
74 12
|
13天前
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
32 12
|
2月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
103 16
|
2月前
|
JSON JavaScript 前端开发
一次采集JSON解析错误的修复
两段采集来的JSON格式数据存在格式问题,直接使用PHP的`json_decode`会报错。解决思路包括:1) 手动格式化并逐行排查错误;2) 使用PHP-V8JS扩展在JavaScript环境中解析。具体方案一是通过正则表达式和字符串替换修复格式,方案二是利用V8Js引擎执行JS代码并返回JSON字符串,最终实现正确解析。 简介: 两段采集的JSON数据因掺杂JavaScript代码导致PHP解析失败。解决方案包括手动格式化修复和使用PHP-V8JS扩展在JavaScript环境中解析,确保JSON数据能被正确处理。
|
4月前
|
JSON JavaScript API
商品详情数据接口解析返回的JSON数据(API接口整套流程)
商品详情数据接口解析返回的JSON数据是API接口使用中的一个重要环节,它涉及从发送请求到接收并处理响应的整个流程。以下是一个完整的API接口使用流程,包括如何解析返回的JSON数据:
|
3月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
121 2
|
2月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析

热门文章

最新文章

推荐镜像

更多