Flink 任务 Jackson 解析 JSON 使用不当引发的反压问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 背景最近业务方反馈线上一个 topic 的数据延迟比较大,然后我查看了这个 topic 的数据是由一个 Flink 任务产生的,于是就找到了这个任务开始排查问题,发现这个任务是一个非常简单的任务,大致的逻辑是 kafka source -> flatmap -> filter -> map -> sink kafka.中间没有复杂的操作,我在本地写了一个简单的程序模拟线上的任务.方便大家理解, 任务的 DAG 如下图所示

背景


最近业务方反馈线上一个 topic 的数据延迟比较大,然后我查看了这个 topic 的数据是由一个 Flink 任务产生的,于是就找到了这个任务开始排查问题,发现这个任务是一个非常简单的任务,大致的逻辑是 kafka source -> flatmap -> filter -> map -> sink kafka.中间没有复杂的操作,我在本地写了一个简单的程序模拟线上的任务.方便大家理解, 任务的 DAG 如下图所示



image-20210827230429171


线上任务 source 的并行度设置的是 80 flatmap filter map 的并行度也是 80 所以和 source 都 chain 在一起,两个 sink 的并行度设置的都是 30,我这里本地就设置成 8 和 3 了.


排查过程

因为反馈说是下游 topic 的数据有延迟,我的第一反应是看任务是否有反压,通过 Flink 的 UI 上的 BackPressure 发现任务并没有反压,然后查看了第一个 operator 输入数据量的 QPS 大概是 50w/s 经过 filter 之后输出数据量的 QPS 是 15w/s 左右,然后查看了 kafka 的监控发现写入的 QPS 也是 15w/s ,这就说明确实不是因为写入 kafka 慢引起的反压,因为你产生了多少我就写入了多少,但是最终的数据有延迟,那就说明任务反压的地方是在第一个 operator 中的某个算子上,但是因为 operator chain 把所有的算子都 chain 在一起,不太方便定位反压的位置.于是我就在 flatmap 后面调用 disableChaining 打断了 operator chain.这个时候任务的 DAG 变成了下面这样.



image-20210827231856263


这个时候在查看 source 的 BackPressure 显示都是 high 状态,说明反压出现在 flatmap 算子,刚开始猜测可能是因为 flatmap 的时候数据量会暴增导致处理不过来了,所以就尝试增大了 flatmap 算子的并行度,但是发现并没有明显的效果,任务反压还是比较严重,哪怕是增大并行度到 500 效果都不是很明显,这就说明跟 flatmap 的数据量大小及并行度没有关系,没办法只能看代码了,大概看了下任务逻辑,代码非常简单,不应该会出现性能问题,唯一不足的就是代码逻辑不够简洁,略显繁琐,且有重复的逻辑,其实 filter 和 map 算子是没有必要的,优化了这些地方之后本以为就能解决反压问题,可是结果还是出现了反压.



image-20210829110127054


这就有点奇怪了,代码中没有复杂的逻辑,也没有和第三方交互,按道理来说不应该会有性能问题,再次查看代码发现代码里面解析 JSON 数据用的是 Jackson 类库,Jackson 的稳定性和性能肯定是没有问题的,在国外应用比较广泛,比如 springboot 包括 Flink 这种大型的框架内默认都使用 Jackson 更加能说明这一点,国内使用 Fastjson 类库会比较多一点.然后仔细看代码发现使用 Jackson 的时候需要实例化 ObjectMapper 对象,但是代码里面是在 flatmap 方法里面实例化 ObjectMapper 对象的,也就是说每来一条数据都需要实例化一次 ObjectMapper 对象,这显然是不合理的.刚开始没有发现是因为解析 json 被封装在一个方法里面,没有进到方法里面来看,那么究竟是不是因为 ObjectMapper 实例化影响到性能呢? 其实非常简单,我们可以在本地测试一下提前实例化好 ObjectMapper 和每条数据都实例化一遍的差别就能得到答案.测试代码如下


public static long a(String json) throws IOException {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i ++) {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.readValue(json, List.class);
        }
        long end = System.currentTimeMillis();
        return end - start;
    }
    public static long b(String json) throws IOException {
        long start = System.currentTimeMillis();
        ObjectMapper objectMapper = new ObjectMapper();
        for (int i = 0; i < 100000; i ++) {
            objectMapper.readValue(json, List.class);
        }
        long end = System.currentTimeMillis();
        return end - start;
    }
    String json = "[{\"name\":\"JasonLee\",\"age\":18},{\"name\":\"JasonLee1\",\"age\":18}]";
    System.out.println(a(json));
    System.out.println(b(json));
    // 打印的结果是:
    68425
    316


从上面测试的结果可以发现,这两种写法性能相差甚大,耗时完全不在一个量级上,也就是说 jackson 的 ObjectMapper 实例化是一个非常耗时的过程,这就能解释任务为什么会在 flatmap 算子出现反压了.


解决


既然知道了反压的原因,那解决办法就非常的简单了,把 ObjectMapper 对象的实例化放在 open 方法里面即可,类似于创建 JDBC 连接,让每个 task 初始化一次 ObjectMapper 对象,然后在 flatmap 方法里面直接使用.这样就不用每条数据都初始化一次.


SingleOutputStreamOperator<JasonEntity> filter = jasonEntityDataStreamSource.flatMap(new RichFlatMapFunction<JasonEntity, JasonEntity>() {
    private ObjectMapper objectMapper = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        objectMapper = new ObjectMapper();
    }
    @Override
    public void flatMap(JasonEntity jasonEntity, Collector<JasonEntity> collector) throws Exception {
        List<Map<String, String>> list = objectMapper.readValue("[\n" +
                "  {\"name\":\"JasonLee\",\"age\":18},\n" +
                "  {\"name\":\"JasonLee1\",\"age\":18}\n" +
                "]", List.class);
        System.out.println(list.toString());
        collector.collect(jasonEntity);
    }
}).disableChaining();


当然除了改成上面的写法外,还可以直接用 fastjson 来解析 JSON 数据,jackson 和 fastjson 的性能是差不多的,网上也有很多这两者对比的文章,感兴趣的也可以自己测试一下.这里就不做对比测试了.


最后把修改后的代码提交到线上后发现没有出现反压,下游的数据自然也不会有延迟了.其实这是一个非常小的问题,或者说是一个小细节吧,但是却能带来严重的后果.所以我们在写代码的时候还是要多注意细节,尽可能提高程序的性能.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
22天前
|
存储 JSON API
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
——在成长的路上,我们都是同行者。这篇关于商品详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦! 淘宝API接口(如淘宝开放平台提供的API)允许开发者获取淘宝商品的各种信息,包括商品详情。然而,需要注意的是,直接访问淘宝的商品数据API通常需要商家身份或开发者权限,并且需要遵循淘宝的API使用协议。
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
|
7天前
|
JSON 前端开发 JavaScript
解析JSON文件
解析JSON文件
28 9
|
12天前
|
存储 JSON API
Python编程:解析HTTP请求返回的JSON数据
使用Python处理HTTP请求和解析JSON数据既直接又高效。`requests`库的简洁性和强大功能使得发送请求、接收和解析响应变得异常简单。以上步骤和示例提供了一个基础的框架,可以根据你的具体需求进行调整和扩展。通过合适的异常处理,你的代码将更加健壮和可靠,为用户提供更加流畅的体验。
39 0
|
22天前
|
资源调度 Java Scala
实时计算 Flink版产品使用问题之如何实现ZooKeeper抖动导致任务失败时,能从最近的检查点重新启动任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
25天前
|
JSON API 数据格式
基于服务器响应的实时天气数据进行JSON解析的详细代码及其框架
【8月更文挑战第25天】这段资料介绍了一个使用Python从服务器获取实时天气数据并解析JSON格式数据的基本框架。主要分为三个部分:一是安装必要的`requests`库以发起HTTP请求获取数据,同时利用Python内置的`json`库处理JSON数据;二是提供了具体的代码实现,包括获取天气数据的`get_weather_data`函数和解析数据的`parse_weather_data`函数;三是对代码逻辑进行了详细说明,包括如何通过API获取数据以及如何解析这些数据来获取温度和天气描述等信息。用户需要根据实际使用的天气API调整代码中的API地址、参数和字段名称。
|
27天前
|
JSON 数据格式 索引
【Azure Developer】Azure Logic App 示例: 解析 Request Body 的 JSON 的表达式? triggerBody()?
【Azure Developer】Azure Logic App 示例: 解析 Request Body 的 JSON 的表达式? triggerBody()?
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
1月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章