深度剖析Dinky源码(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 深度剖析Dinky源码(下)
3.2.4 Gateway (作业提交客户端)

所属模块dlink-gateway

所属类#方法com.dlink.gateway.yarn.YarnApplicationGateway#submitJar


通过断点可以得知jar作业,使用的是YarnApplicationGateway的submitJar去提交。

我们主要看看里面的实现(已补充注释):

/**
 * 提交jar作业
 *
 * @return 提交结果
 * @author : YangLinWei
 * @createTime: 2023/7/15 10:59
 */
@Override
public GatewayResult submitJar() {
    // 判断并初始化yarn客户端
    if (Asserts.isNull(yarnClient)) {
        init();
    }
    // 构造提交信息
    YarnResult result = YarnResult.build(getType());
    AppConfig appConfig = config.getAppConfig();
    configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
    String[] userJarParas = appConfig.getUserJarParas();
    if (Asserts.isNull(userJarParas)) {
        userJarParas = new String[0];
    }
    ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas,
            appConfig.getUserJarMainAppClass());
    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
            configuration, yarnConfiguration, yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
    ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder = new ClusterSpecification.ClusterSpecificationBuilder();
    if (configuration.contains(JobManagerOptions.TOTAL_PROCESS_MEMORY)) {
        clusterSpecificationBuilder
                .setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes());
    }
    if (configuration.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
        clusterSpecificationBuilder
                .setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes());
    }
    if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) {
        clusterSpecificationBuilder.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS))
                .createClusterSpecification();
    }
    if (Asserts.isNotNull(config.getJarPaths())) {
        yarnClusterDescriptor
                .addShipFiles(Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList()));
    }
    try {
        // 开始提交信息到yarn集群
        ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                clusterSpecificationBuilder.createClusterSpecification(),
                applicationConfiguration);
        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        // 封装提交后返回的信息
        Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
        int counts = SystemConfiguration.getInstances().getJobIdWait();
        while (jobStatusMessages.size() == 0 && counts > 0) {
            Thread.sleep(1000);
            counts--;
            jobStatusMessages = clusterClient.listJobs().get();
            if (jobStatusMessages.size() > 0) {
                break;
            }
        }
        if (jobStatusMessages.size() > 0) {
            List<String> jids = new ArrayList<>();
            for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
                jids.add(jobStatusMessage.getJobId().toHexString());
            }
            result.setJids(jids);
        }
        ApplicationId applicationId = clusterClient.getClusterId();
        result.setAppId(applicationId.toString());
        result.setWebURL(clusterClient.getWebInterfaceURL());
        result.success();
    } catch (Exception e) {
        result.fail(LogUtil.getError(e));
    } finally {
        yarnClusterDescriptor.close();
    }
    return result;
}

可以发现,最终还是使用flink的flink-yarn_xxx.jar包里面的YarnClusterDescriptor提交作业到yarn,最终yarn会执行指定提交的jar包。

到这里,flink的jar作业提交模式是结束了,到这里,可以结束本文的阅读了。


读者如果有兴趣的话,可以继续阅读jar提交到yarn之后的流程,我们看看flink sql模式下是怎样实现的?flink sql模式也是一样的,使用了指定的启动jar(也就是dlink-app.jar ),然后传入任务参数,执行一系列的流程。

3.4 yarn端

再次回顾看看官网是怎么描述Yarn Application下如何提交jar的:

注册集群配置 
==> 异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> TaskId & JDBC 
==> Gateway 
==> YarnApplicationGateway
==> YarnClient 
==> dlink-app.jar 
==> Executor 
==> AppStreamExecutor 
==> CustomTableEnvironmentImpl 
==> LocalEnvironmentFlink Yarn Application Cluster

似乎有点晦涩,这是我整理后对应到实际代码的流程:

前端提交接口:/api/task/submit
==> 【dlink-admin模块】:com.dlink.controller.TaskController#submit
   - 描述:后端controller提交接口
==> 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask
     - 描述:提交作业服务
==> 【dlink-core模块】:com.dlink.job.JobManager#executeJar
     - 描述:作业管理器提交jar作业
==> 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar
     - 描述:提交客户端提交
==> 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster
     - 描述:Flink yarn客户端提交源码

对比官方的流程,似乎到了==> dlink-app.jar 这一步骤就停止了,其实,dlink-app.jar就是flink作业的执行jar(entrypoint jar),需要手动上传到hdfs,具体得配置在界面:

提交作业到yarn后,创建容器时,会自动从hdfs下载 dlink-app.jar,然后启动jar,也就是==> dlink-app.jar 后面的逻辑,接下来讲讲其实现逻辑。

3.4.1 main(作业执行入口)

所属模块dlink-app

所属类#方法com.dlink.app.MainApp#main


/**
 * 作业执行入口
 *
 * @author : YangLinWei
 * @createTime: 2023/7/15 11:37
 */
public static void main(String[] args) throws IOException {
    Map<String, String> params = FlinkBaseUtil.getParamsFromArgs(args);
    String id = params.get(FlinkParamConstant.ID);
    Asserts.checkNullString(id, "请配置入参 id ");
    // 初始化数据库配置
    DBConfig dbConfig = DBConfig.build(params);
    // 提交
    Submiter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR));
}

从描述,可以看出,最终是使用了Submiter去提交作业。

3.4.2 submit(作业执行入口)

所属模块dlink-app-base

所属类#方法com.dlink.app.flinksql.Submiter#submit


/**
 * 提交任务
 *
 * @param id        任务ID
 * @param dbConfig  数据库连接配置
 * @param dinkyAddr 第三方jar下载,对应对象存储服务器的域名
 * @author : YangLinWei
 * @createTime: 2023/7/15 11:55
 */
public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
    logger.info(LocalDateTime.now() + "开始提交作业 -- " + id);
    if (NULL.equals(dinkyAddr)) {
        dinkyAddr = "";
    }
    StringBuilder sb = new StringBuilder();
    // 根据任务ID获取任务配置
    Map<String, String> taskConfig = Submiter.getTaskConfig(id, dbConfig);
    if (Asserts.isNotNull(taskConfig.get("envId"))) {
        String envId = getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig);
        if (Asserts.isNotNullString(envId)) {
            sb.append(envId);
        }
        sb.append("\n");
    }
    // 添加数据源全局变量
    sb.append(getDbSourceSqlStatements(dbConfig, id));
    // 添加自定义全局变量信息
    sb.append(getFlinkSQLStatement(id, dbConfig));
    // 拆分SQL字符串为sql集
    List<String> statements = Submiter.getStatements(sb.toString());
    ExecutorSetting executorSetting = ExecutorSetting.build(taskConfig);
    // 加载第三方jar
    loadDep(taskConfig.get("type"), id, dinkyAddr, executorSetting);
    String uuid = UUID.randomUUID().toString().replace("-", "");
    if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) {
        executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
                executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid);
    }
    if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) {
        executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
                executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
    }
    logger.info("作业配置如下: {}", executorSetting);
    // 根据配置,初始化Executor
    Executor executor = Executor.buildAppStreamExecutor(executorSetting);
    List<StatementParam> ddl = new ArrayList<>();
    List<StatementParam> trans = new ArrayList<>();
    List<StatementParam> execute = new ArrayList<>();
    // 遍历执行flink sql
    for (String item : statements) {
        String statement = FlinkInterceptor.pretreatStatement(executor, item);
        if (statement.isEmpty()) {
            continue;
        }
        SqlType operationType = Operations.getOperationType(statement);
        if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
            trans.add(new StatementParam(statement, operationType));
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        } else if (operationType.equals(SqlType.EXECUTE)) {
            execute.add(new StatementParam(statement, operationType));
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        } else {
            ddl.add(new StatementParam(statement, operationType));
        }
    }
    // 执行器依次执行flink sql
    for (StatementParam item : ddl) {
        logger.info("正在执行 FlinkSQL: " + item.getValue());
        executor.submitSql(item.getValue());
        logger.info("执行成功");
    }
    if (trans.size() > 0) {
        if (executorSetting.isUseStatementSet()) {
            List<String> inserts = new ArrayList<>();
            for (StatementParam item : trans) {
                if (item.getType().equals(SqlType.INSERT)) {
                    inserts.add(item.getValue());
                }
            }
            logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
            executor.submitStatementSet(inserts);
            logger.info("执行成功");
        } else {
            for (StatementParam item : trans) {
                logger.info("正在执行 FlinkSQL: " + item.getValue());
                executor.submitSql(item.getValue());
                logger.info("执行成功");
                break;
            }
        }
    }
    if (execute.size() > 0) {
        List<String> executes = new ArrayList<>();
        for (StatementParam item : execute) {
            executes.add(item.getValue());
            executor.executeSql(item.getValue());
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        }
        logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
        try {
            executor.execute(executorSetting.getJobName());
            logger.info("执行成功");
        } catch (Exception e) {
            logger.error("执行失败, {}", e.getMessage(), e);
        }
    }
    logger.info("{}任务提交成功", LocalDateTime.now());
}

从上述代码,可以得知核心的代码是初始化ExecutorAppStreamExecutor实现)之后,然后依次执行拆分后的flink sql。

3.4.2 submitSql(sql执行入口)

所属模块dlink-executor

所属类#方法com.dlink.executor.AppStreamExecutor


/**
 * Streaming执行器
 *
 * @author : YangLinWei
 * @createTime: 2023/7/15 12:02
 */
public class AppStreamExecutor extends Executor {
    /**
     * 构造函数,初始化flink默认的TableEnvironment
     *
     * @param executorSetting 执行器配置
     */
    public AppStreamExecutor(ExecutorSetting executorSetting) {
        this.executorSetting = executorSetting;
        if (Asserts.isNotNull(executorSetting.getConfig())) {
            Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
            this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        } else {
            this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
        }
        init();
    }
    /**
     * 公共的逻辑都在Executor ,不同的Executor区别在于这里的TableEnvironment
     *
     * @return 自定义的TableEnvironment
     */
    @Override
    CustomTableEnvironment createCustomTableEnvironment() {
        return CustomTableEnvironmentImpl.create(environment);
    }
}

执行逻辑在基类Executor执行,就是取Executor实现类(AppStreamExecutor)里面的自定义TableEnvironment执行:

继续看看CustomTableEnvironment是如何实现的?

3.4.3 executeSql(执行flink sql)

所属模块dlink-client

所属类#方法com.dlink.executor.CustomTableEnvironmentImpl


这里应该到了flink底层之上的最底层了,有兴趣的同学可以自行阅读,篇幅有限本文不再分析了,总之按flink的标准来实现就好了。

04 总结

最终,整理后的流程如下:

4.1 前端

step1:【dlink-web模块】:dinky-web/src/components/Studio/StudioMenu/index.tsx#submit

  • 描述:提交接口 ,/api/task/submit

4.2 管理端

step1: 【dlink-admin模块】:com.dlink.controller.TaskController#submit

  • 描述:后端controller提交接口

step2: 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask

  • 描述:提交作业服务

step3: 【dlink-core模块】:com.dlink.job.JobManager#executeJar

  • 描述:作业管理器提交jar作业

step4: 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar

  • 描述:提交客户端提交

step5: 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster

  • 描述:Flink yarn客户端提交源码

4.3 yarn端

step1: 【dlink-app模块】:com.dlink.app.MainApp#main

  • 描述:执行jar包入口,所有的flink sql作业都在这里开始

step2: 【dlink-app-base模块】:com.dlink.app.flinksql.Submiter#submit

  • 描述:作业提交器,Executor的初始化,并执行

step3: 【dlink-executor模块】:com.dlink.executor.AppStreamExecutor

  • 描述:作业执行器,初始化TableEnvironment,并执行

step4: 【dlink-client模块】:com.dlink.executor.CustomTableEnvironmentImpl

  • 描述:自定义TableEnvironment,实际执行flink sql的逻辑,再进一步就是flink的底层了。

05 文末

本文主要讲解了Dinky的一些概念,以及剖析了它的源码,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
247 3
|
7月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
483 0
|
7月前
|
API Nacos
【想进大厂还不会阅读源码】ShenYu源码-重构同步数据服务
ShenYu源码阅读📚。我们看下PR的标题和Concersation的头一句,大概意思就是重构注册中心数据同步到ShenYu网关的方式。大家看看重构了有没好处呢?不仅获得了知识,还获得了一次开源贡献,何乐而不为呢
|
资源调度 前端开发 Java
深度剖析Dinky源码(上)
深度剖析Dinky源码
425 0
|
SQL Kubernetes Java
深度剖析FlinkX(纯钧)源码
深度剖析FlinkX(纯钧)源码
159 0
|
SQL 分布式计算 DataX
HIVE3 深度剖析 (下篇)
HIVE3 深度剖析 (下篇)
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。