Flink运行时之网络通信NetworkEnvironment分析

简介: 网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换。每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建。

网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换。每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建。NetworkEnvironment管理着多个协助通信的关键部件,它们是:

  • NetworkBufferPool:网络缓冲池,负责申请一个TaskManager的所有的内存段用作缓冲池;
  • ConnectionManager:连接管理器,用于管理本地(远程)通信连接;
  • ResultPartitionManager:结果分区管理器,用于跟踪一个TaskManager上所有生产/消费相关的ResultPartition;
  • TaskEventDispatcher:任务事件分发器,从消费者任务分发事件给生产者任务;
  • ResultPartitionConsumableNotifier:结果分区可消费通知器,用于通知消费者生产者生产的结果分区可消费;
  • PartitionStateChecker:分区状态检查器,用于检查分区状态;

当NetworkEnvironment被初始化时,它首先根据配置创建网络缓冲池(NetworkBufferPool)。创建NetworkBufferPool时需要指定Buffer数目、单个Buffer的大小以及Buffer所基于的内存类型,这些信息都是可配置的并封装在配置对象NetworkEnvironmentConfiguration中。

NetworkEnvironment对象包含了上面列举的网络I/O相关的各种部件,这些对象并不随着NetworkEnvironment对象实例化而被立即实例化,它们的实例化会被延后到NetworkEnvironment对象跟TaskManager以及JobManager**关联**(associate)上之后。TaskManager在启动后会向JobManager注册,随后NetworkEnvironment的associateWithTaskManagerAndJobManager方法会得到调用,在其中所有的辅助部件都会得到实例化:

this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
    executionContext,jobManagerGateway,taskManagerGateway,jobManagerTimeout);

this.partitionStateChecker = new JobManagerPartitionStateChecker(jobManagerGateway, taskManagerGateway);

final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
    : new LocalConnectionManager();

try {
    //启动网络连接管理器
    connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
} catch (Throwable t) {
    throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}

当然在TaskManager触发stop动作之后,在其postStop逻辑中,也会跟JobManager进行解关联操作。从而触发NetworkEnvironment的disassociate方法。在disassociate方法中,上述所有的辅助通信部件也将会被释放或回收资源。

在任务执行的核心逻辑中,有一个步骤是需要将自身(Task)注册到网络栈(也就是这里的NetworkEnvironment)。该步骤会调用NetworkEnvironment的实例方法registerTask进行注册,注册之后NetworkEnvironment会对任务的通信进行管理:

public void registerTask(Task task) throws IOException {
    //获得当前任务对象所生产的结果分区集合
    final ResultPartition[] producedPartitions = task.getProducedPartitions();
    //同时获得所有的结果分区写入器
    final ResultPartitionWriter[] writers = task.getAllWriters();

    //正常情况下结果分区数与写入器的数目应该是相等的
    if (writers.length != producedPartitions.length) {
        throw new IllegalStateException("Unequal number of writers and partitions.");
    }

    ResultPartitionConsumableNotifier jobManagerNotifier;

    synchronized (lock) {
        if (isShutdown) {
            throw new IllegalStateException("NetworkEnvironment is shut down");
        }

        //如果当前网络环境对象还没有跟TaskManager进行关联,那么说明调用的时机出现问题,直接抛出异常
        if (!isAssociated()) {
            throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
        }

        //遍历任务的每个结果分区,依次进行初始化
        for (int i = 0; i < producedPartitions.length; i++) {
            final ResultPartition partition = producedPartitions[i];
            final ResultPartitionWriter writer = writers[i];

            BufferPool bufferPool = null;

            try {
                //用网络缓冲池创建本地缓冲池,该缓冲池是非固定大小的且请求的缓冲个数是结果分区的子分区个数
                bufferPool = networkBufferPool.createBufferPool(
                    partition.getNumberOfSubpartitions(), false);
                //将本地缓冲池注册到结果分区
                partition.registerBufferPool(bufferPool);
                //结果分区会被注册到结果分区管理器
                partitionManager.registerResultPartition(partition);
            } catch (Throwable t) {
                if (bufferPool != null) {
                    bufferPool.lazyDestroy();
                }

                if (t instanceof IOException) {
                    throw (IOException) t;
                } else {
                    throw new IOException(t.getMessage(), t);
                }
            }

            //向任务事件分发器注册结果分区写入器
            taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
        }

        //获得任务的所有输入闸门
        final SingleInputGate[] inputGates = task.getAllInputGates();

        //遍历输入闸门,为它们设置缓冲池
        for (SingleInputGate gate : inputGates) {
            BufferPool bufferPool = null;

            try {
                //为每个输入闸门设置本地缓冲池,这里创建的本地缓冲池也非固定大小的,且初始化的缓冲数为其包含的输入信道数
                bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
                gate.setBufferPool(bufferPool);
            }
            catch (Throwable t) {
                if (bufferPool != null) {
                    bufferPool.lazyDestroy();
                }

                if (t instanceof IOException) {
                    throw (IOException) t;
                } else {
                    throw new IOException(t.getMessage(), t);
                }
            }
        }

        jobManagerNotifier = partitionConsumableNotifier;
    }

    //遍历所有的结果分区
    for (ResultPartition partition : producedPartitions) {
        //如果某个结果分区的消费者是主动部署的
        if (partition.getEagerlyDeployConsumers()) {
            //则直接通知JobManager,让其告知消费者任务,当前结果分区可被消费
            jobManagerNotifier.notifyPartitionConsumable(
                        partition.getJobId(), partition.getPartitionId());
        }
    }
}

从任务被注册到NetworkEnvironment对象的代码段中,我们能够得到一些信息。NetworkEnvironment对象会为当前任务生产端的每个ResultPartition都创建本地缓冲池,缓冲池中的Buffer数为结果分区的子分区数,同时为当前任务消费端的InputGate创建本地缓冲池,缓冲池的Buffer数为InputGate所包含的输入信道数。这些缓冲池都是非固定大小的,也就是说他们会按照网络缓冲池内存段的使用情况进行重平衡。



原文发布时间为:2016-12-14


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
664 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
312 11
|
6月前
|
数据采集 监控 网络安全
VMware Cloud Foundation Operations for Networks 9.0.1.0 发布 - 云网络监控与分析
VMware Cloud Foundation Operations for Networks 9.0.1.0 发布 - 云网络监控与分析
446 3
VMware Cloud Foundation Operations for Networks 9.0.1.0 发布 - 云网络监控与分析
|
6月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
8月前
|
数据采集 存储 数据可视化
Python网络爬虫在环境保护中的应用:污染源监测数据抓取与分析
在环保领域,数据是决策基础,但分散在多个平台,获取困难。Python网络爬虫技术灵活高效,可自动化抓取空气质量、水质、污染源等数据,实现多平台整合、实时更新、结构化存储与异常预警。本文详解爬虫实战应用,涵盖技术选型、代码实现、反爬策略与数据分析,助力环保数据高效利用。
427 0
|
人工智能 边缘计算 物联网
蜂窝网络未来发展趋势的分析
蜂窝网络未来发展趋势的分析
611 2
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
11月前
|
监控 安全 Linux
Arista CloudVision 2025.1 - 多云和数据中心网络自动化、监控和分析
Arista CloudVision 2025.1 - 多云和数据中心网络自动化、监控和分析
447 2
Arista CloudVision 2025.1 - 多云和数据中心网络自动化、监控和分析
|
12月前
|
运维 监控 安全
如何高效进行网络质量劣化分析与流量回溯分析?-AnaTraf
在数字化时代,网络质量分析与流量回溯对保障业务运行至关重要。网络拥塞、丢包等问题可能导致业务中断、安全隐患及成本上升。传统工具常缺乏细粒度数据,难以溯源问题。流量回溯分析可还原现场,助力精准排障。AnaTraf网络流量分析仪作为专业工具,能高效定位问题,提升团队响应力,降低运营风险。
如何高效进行网络质量劣化分析与流量回溯分析?-AnaTraf
|
12月前
|
大数据
“你朋友圈的真面目,大数据都知道!”——用社交网络分析看透人情世故
“你朋友圈的真面目,大数据都知道!”——用社交网络分析看透人情世故
453 16

热门文章

最新文章