大数据常用调度平台

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据常用调度平台

1. 项目结构

  • 目标
  • 理解项目中为什么需要调度平台
  • 步骤
  1. 项目介绍
  2. 项目结构

1.1. 项目介绍

项目功能

  • 标签创建
  • 标签计算
  • 标签审计


如果要对标签进行计算, 就需要把标签和标签的计算都管理起来


创建标签时, 执行标签计算任务

把计算和标签关联起来, 通过标签能找到任务, 通过任务也知道它计算的是什么标签

标签, 年龄就是一个标签

计算任务, 为了计算年龄标签, 需要去读取用户数据, 计算每一个用户是哪一个标签

用户画像数据, 每一个用户的标签, 就是用户画像数据

修改标签时, 标签计算任务同步修改

删除标签时, 标签计算任务停止

所以, 我们的项目需要和调度平台结合起来, 不仅管理标签本身, 也需要管理标签相关联的任务, 所以需要配合调度平台来管理


1.2. 项目结构

  • 管理者通过画像系统添加, 修改, 删除标签
  • 画像系统将标签抽象为标签和任务, 通过 Oozie 调度到 Yarn 中
  • Yarn 中的 Spark 任务读取 MySQL 中的标签元信息
  • Spark 任务通过 HBase 读取需要计算的数据
  • 计算完成后, 插入 HBase 对应的表中

2. Oozie 介绍

  • 目标
  • 了解 Oozie 的作用
  • 步骤
  1. 需求
  2. 可选的方式
  3. Oozie 和同类的对比

2.1. 需求

  • 现在要计算年龄标签, 有七个阶段, 50后、60后、70后、80后、90后、00后、10后、20后
  • 通过 Spark 写了一个程序, 读取用户年龄, 把每个用户的年龄映射到每个阶段内
  • 因为年龄标签变化的频率不高, 所以决定要一年计算一次
  • 如何完成这个计算任务的调度?

2.2. 可选的方式

2.2.1. Crontab

  • Crontab 是 Linux 的一个调度程序, 允许我们通过一个简洁的语法表示调度周期, 在对应的时间点周期性的执行某个 Shell 脚本或者 Shell 命令
# 分 时 日 月 星期 要运行的命令
# 每1分钟执行一次myCommand
* * * * * myCommand
# 每小时的第3和第15分钟执行
3,15 * * * * myCommand
# 在上午8点到11点的第3和第15分钟执行
3,15 8-11 * * * myCommand
# 每周一上午8点到11点的第3和第15分钟执行
3,15 8-11 * * 1 myCommand

我们可以编写一个 Shell 脚本, 其中写上

spark-submit --master yarn \
--class xx
--executor-memory 500m
--executor-cores 1
--num-executors 1
xx.jar
args1

然后再将如下的 cron 加入 crontab

0 20 * * * tag_age.shell

这样的方式有如下几个问题

  • cron 任务只能通过命令来管理
  • crontab -e
  • 如果换成如下的需求, 这个流程我们该如何控制呢?
  1. 把数据抽取到 HBase 中
  2. 同步执行如下两个任务
  • 合并用户数据和订单数据, 生成宽表
  • 合并用户数据和商品数据, 生成宽表
  1. 合并两个宽表, 计算用户标签
  • 如果执行的程序出错了, 只能登录到对应的主机查看 Log

2.2.2. Oozie

  1. 创建一个 Workflow, 表示一个任务的执行流程
  2. 创建一个 Coordinator, 表示任务的执行周期
  1. 通过 Hue 提交任务
  2. 通过 Hue 查看任务执行的 Log

2.3. Oozie 和竞品的对比

现在市面上比较多见的调度平台分别是 Oozie 和 Azkaban, 还有一个后起之秀 AirFlow

Airflow Azkaban Oozie
机构 Airbnb LinkedIn Apache
社区 Very Active Somewhat active Active
历史 4年 7 年 8 年
目的 General Purpose Batch Processing Hadoop 作业调度 Hadoop 作业调度
流程定义 Python Custom DSL XML
单节点 支持 Yes Yes
快速开始 支持 支持 不支持
高可用 Yes Yes Yes
单点故障 不, 使用 Yarn
运行模式 Push Push Poll
Rest API 触发 Yes Yes Yes
外部事件触发 Yes No Yes
Web 权限 LDAP/Password XML Password Kerberos
监控 Yes Limited Yes
可扩展性 Depending on executor setup Good Very Good
  • 在对工具的支持上, Airflow 和 Oozie 都很好, 但是 Airflow 要简便一些
  • 功能上, Oozie 最强大
  • 稳定性上, Oozie 最稳定
  • 上手速度, Azkaban 最快
  • 带有个人情绪的简评, Oozie 最稳定, 也最难以驾驭, 坑多

3. Oozie 组件

  • 目标
  • 理解 Oozie 各个组件
  • 步骤
  1. Workflow
  2. Coordinator

3.1. Workflow


Workflow 就是一个工作流, 从 start 开始, 到 end 结束

Workflow 中提供了 Fork 和 Join 机制

Fork 可以使得多个任务并行

Join 可以让并行的任务汇总

Workflow 中提供了 Decision 机制

类似于 if 一样, 可以提供分支选择功能

  • 在任务执行完成后, Workflow 可以以发送邮件, 短信等方式通知
  • Workflow 提供了一种类似 DAG 的方式来执行一组任务
  • Oozie 支持 MR, Hive, Spark, Shell 等的支持

3.2. Coordinator

  • Workflow 表示了任务的实现和关系
  • Oozie 提供了 Coodinator, 用来调度 Workflow
  • Coordinator 在 Hue 中叫做 Scheduler

4. 调度实现

  • 目标
  • 实现工程中的调度功能
  • 步骤
  1. 执行流程
  2. Workflow
  3. Coordinator
  4. Java 代码
  5. 调度 Workflow

4.1. 执行流程

  1. 用户在网页中添加四级标签, 因为四级标签代表标签, 五级标签代表标签值域
  1. 上传 Jar 包
  2. 提交标签
  1. 后端收到添加标签的请求, 存储标签信息并将 Jar 包上传到 HDFS
  • cn.oldlu.tag.web.basictag.controller.BasicTagController#modelUpload
  1. 用户在网页中开启标签计算
  2. 后端收到请求, 联系 Oozie 开始调度

添加标签的执行流程如下

  1. 调用了 BasicTagController
  2. 在 BasicTagController 中调用了 BasicTagService
  3. 保存 Tag 信息, 指的就是具体的标签数据, 比如说标签叫什么, 谁创建的, 什么时候创建的, 级别…
  4. 保存 Model 信息, 标签是需要计算的, 我们使用 Spark Job 计算, Main方法在哪个类, Jar 包的位置等…
  • 在保存之前, 把 Model 对应 Jar 包上传到 HDFS 中
  • 路径 /app/models/Tag_10/xx.jar

开启任务的流程如下

  1. 调用 TaskProcessing 中的方法
  1. TaskProcessing 会调用 EngineService 中的 start 方法
  • 传进来了一个 标签 ID

4.2. Workflow

  • Workflow 使用 XML 编写, 是一种通过配置表示过程的方式
  • Workflow 中的配置无需太过认真的了解, 大致知道流程即可
  • 因为一般情况下, 我们使用 Hue 或者 Ambari 的图形化方式去编辑 Workflow , 手写的可能性比较低
  • Workflow 中可以使用 EL 表达式填充信息
<workflow-app name="Extra goods from hive to hbase fully" xmlns="uri:oozie:workflow:0.5">
    <start to="tag-model-job"/>
    <action name="tag-model-job">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>${sparkMaster}</master>
            <mode>${sparkDeployMode}</mode>
            <name>${oozieWFName}</name>
            <class>${sparkJobMain}</class>
            <jar>${sparkJobJar}</jar>
            <spark-opts>${sparkJobOpts}</spark-opts>
            <arg>${sparkMainOpts}</arg>
            <file>${sparkContainerCacheFiles}</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="End"/>
</workflow-app>

4.3. Coordinator

  • 此处的 Coordinator 只是为了周期性调度 Workflow
  • 在项目中提供的调度周期有: 日, 月, 年
<?xml version="1.0" encoding="UTF-8"?>
<coordinator-app name="cron-coordinator" frequency="${coord:days(1)}"
                 start="${start}" end="${end}" timezone="Asia/Shanghai" xmlns="uri:oozie:coordinator:0.5">
    <action>
        <workflow>
            <app-path>${oozieWorkflowPath}</app-path>
        </workflow>
    </action>
</coordinator-app>

4.4. Java 代码

private boolean startEngineActually(EngineBean engineBean) {
        ModelBean modelBean = modelMapper.get(new ModelBean(engineBean.getTagId()));
        MetaDataBean metaDataBean = metaDataMapper.get(new MetaDataBean(engineBean.getTagId()));
        // 1. 判断模型和元数据是否存在, 如果不存在则启动失败
        HdfsUtil hdfsUtil = HdfsUtil.getInstance();
        if (!hdfsUtil.exist(modelBean.getModelPath())) {
            logger.error("模型包不存在, 启动失败");
            return false;
        }
        if (StringUtils.isBlank(metaDataBean.getInType())) {
            logger.error("模型元信息不存在或者缺失 InType, 启动失败");
            return false;
        }
        // 2. 上传 Oozie 配置到 HDFS
        String localOozieConfigPath = engineBean.getRemark() + oozieConfigDirName + "/";
        String tagModelPath = hdfsUtil.getPath(new File(modelBean.getModelPath()).getParent());
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + workflowFileName, tagModelPath);
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + coordinatorFileName, tagModelPath);
        // 3. 构建 Oozie job 参数
        OozieUtil oozie = oozieUtil.build();
        Properties oozieConf = oozie.getConf();
        // 3.1. Workflow 部分
        String separator = "/";
        oozieConf.setProperty("nameNode", nameNode);
        oozieConf.setProperty("sparkJobMain", modelBean.getModelMain());
        if (StringUtils.isNotBlank(modelBean.getArgs())) {
            oozieConf.setProperty("sparkJobOpts", modelBean.getArgs());
        }
        oozieConf.setProperty("sparkJobJar", "${nameNode}" + modelBean.getModelPath());
        oozieConf.setProperty("sparkContainerCacheFiles", "${nameNode}" + modelBean.getModelPath());
        // 3.2. Coordinator 部分, Coordinator 中的时间格式是 UTC, 如 2018-06-019T20:01Z
        oozieConf.setProperty("oozieWorkflowPath", "${nameNode}" + tagModelPath + separator);
        oozieConf.setProperty("oozie.coord.application.path", "${nameNode}" + tagModelPath + separator + coordinatorFileName);
        // ScheTime 的格式为 每天#2018-06-01 20::01#2018-06-01 20::01
        String[] scheduleArr = modelBean.getScheTime().split("#");
        String freq = scheduleArr[0];
        String startDateTime = scheduleArr[1];
        String endDateTime = scheduleArr[2];
        // 设置频率
        String freqStr = "";
        switch (freq) {
            case "每天":
                freqStr = "day";
                break;
            case "每周":
                freqStr = "week";
                break;
            case "每月":
                freqStr = "month";
                break;
            case "每年":
                freqStr = "year";
                break;
        }
        oozieConf.setProperty("freq", freqStr);
        // 设置开始时间
        String[] startDateTimeArr = startDateTime.split(" ");
        String startDate = startDateTimeArr[0];
        String startTime = startDateTimeArr[1];
        oozieConf.setProperty("start", startDate + "T" + startTime + "Z");
        // 设置结束时间
        String[] endDateTimeArr = endDateTime.split(" ");
        String endDate = endDateTimeArr[0];
        String endTime = endDateTimeArr[1];
        oozieConf.setProperty("end", endDate + "T" + endTime + "Z");
        logger.info(oozieConf.toString());
        // 4. 提交 Oozie 任务
        String jobId = oozie.start(oozieConf);
        // 5. 设置监控
        EngineBean mEngineBean = new EngineBean();
        mEngineBean.setJobid(jobId);
        mEngineBean.setTagId(engineBean.getTagId());
        mEngineBean.setStatus("3");
        int state = engineMapper.addMonitorInfo(mEngineBean);
        return state > 0;
    }

4.5. 执行流程

  1. 上传 Jar 包
  2. 创建四级标签
  3. 开启标签任务
  4. Hue 中查看
  • 注意把 Filter 清空

4.5. 调度 Workflow

因为在测试阶段, 我们希望每次提交任务都立刻执行, 所以可以注释掉提交 Oozie 任务的代码, 改为直接提交 Workflow 任务

  • 去掉 Coordinator 的位置, 改为 APP_PATH, 则会直接找到 Workflow 提交
    private boolean startEngineActually(EngineBean engineBean) {
        ModelBean modelBean = modelMapper.get(new ModelBean(engineBean.getTagId()));
        MetaDataBean metaDataBean = metaDataMapper.get(new MetaDataBean(engineBean.getTagId()));
        // 1. 判断模型和元数据是否存在, 如果不存在则启动失败
        HdfsUtil hdfsUtil = HdfsUtil.getInstance();
        if (!hdfsUtil.exist(modelBean.getModelPath())) {
            logger.error("模型包不存在, 启动失败");
            return false;
        }
        if (StringUtils.isBlank(metaDataBean.getInType())) {
            logger.error("模型元信息不存在或者缺失 InType, 启动失败");
            return false;
        }
        // 2. 上传 Oozie 配置到 HDFS
        String localOozieConfigPath = engineBean.getRemark() + oozieConfigDirName + "/";
        String tagModelPath = hdfsUtil.getPath(new File(modelBean.getModelPath()).getParent());
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + workflowFileName, tagModelPath);
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + coordinatorFileName, tagModelPath);
        // 3. 构建 Oozie job 参数
        OozieUtil oozie = oozieUtil.build();
        Properties oozieConf = oozie.getConf();
        // 3.1. Workflow 部分
        String separator = "/";
        oozieConf.setProperty("nameNode", nameNode);
        oozieConf.setProperty("sparkJobMain", modelBean.getModelMain());
        if (StringUtils.isNotBlank(modelBean.getArgs())) {
            oozieConf.setProperty("sparkJobOpts", modelBean.getArgs());
        }
        oozieConf.setProperty("sparkJobJar", "${nameNode}" + modelBean.getModelPath());
        oozieConf.setProperty("sparkContainerCacheFiles", "${nameNode}" + modelBean.getModelPath());
        oozieConf.setProperty(OozieClient.APP_PATH, "${nameNode}" + tagModelPath + separator + workflowFileName);
        // 3.2. Coordinator 部分, Coordinator 中的时间格式是 UTC, 如 2018-06-019T20:01Z
//        oozieConf.setProperty("oozieWorkflowPath", "${nameNode}" + tagModelPath + separator);
//        oozieConf.setProperty("oozie.coord.application.path", "${nameNode}" + tagModelPath + separator + coordinatorFileName);
//
//        // ScheTime 的格式为 每天#2018-06-01 20::01#2018-06-01 20::01
//        String[] scheduleArr = modelBean.getScheTime().split("#");
//        String freq = scheduleArr[0];
//        String startDateTime = scheduleArr[1];
//        String endDateTime = scheduleArr[2];
//
//        // 设置频率
//        String freqStr = "";
//        switch (freq) {
//            case "每天":
//                freqStr = "day";
//                break;
//            case "每周":
//                freqStr = "week";
//                break;
//            case "每月":
//                freqStr = "month";
//                break;
//            case "每年":
//                freqStr = "year";
//                break;
//        }
//        oozieConf.setProperty("freq", freqStr);
//
//        // 设置开始时间
//        String[] startDateTimeArr = startDateTime.split(" ");
//        String startDate = startDateTimeArr[0];
//        String startTime = startDateTimeArr[1];
//        oozieConf.setProperty("start", startDate + "T" + startTime + "Z");
//
//        // 设置结束时间
//        String[] endDateTimeArr = endDateTime.split(" ");
//        String endDate = endDateTimeArr[0];
//        String endTime = endDateTimeArr[1];
//        oozieConf.setProperty("end", endDate + "T" + endTime + "Z");
//
//        logger.info(oozieConf.toString());
        // 4. 提交 Oozie 任务
        String jobId = oozie.start(oozieConf);
        // 5. 设置监控
        EngineBean mEngineBean = new EngineBean();
        mEngineBean.setJobid(jobId);
        mEngineBean.setTagId(engineBean.getTagId());
        mEngineBean.setStatus("3");
        int state = engineMapper.addMonitorInfo(mEngineBean);
        return state > 0;
    }


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
123 1
|
29天前
|
SQL 数据采集 分布式计算
【赵渝强老师】基于大数据组件的平台架构
本文介绍了大数据平台的总体架构及各层的功能。大数据平台架构分为五层:数据源层、数据采集层、大数据平台层、数据仓库层和应用层。其中,大数据平台层为核心,负责数据的存储和计算,支持离线和实时数据处理。数据仓库层则基于大数据平台构建数据模型,应用层则利用这些模型实现具体的应用场景。文中还提供了Lambda和Kappa架构的视频讲解。
113 3
【赵渝强老师】基于大数据组件的平台架构
|
2月前
|
机器学习/深度学习 监控 搜索推荐
电商平台如何精准抓住你的心?揭秘大数据背后的神秘推荐系统!
【10月更文挑战第12天】在信息爆炸时代,数据驱动决策成为企业优化决策的关键方法。本文以某大型电商平台的商品推荐系统为例,介绍其通过收集用户行为数据,经过预处理、特征工程、模型选择与训练、评估优化及部署监控等步骤,实现个性化商品推荐,提升用户体验和销售额的过程。
88 1
|
4月前
|
搜索推荐 OLAP 流计算
OneSQL OLAP实践问题之基于 Flink 打造流批一体的数据计算平台如何解决
OneSQL OLAP实践问题之基于 Flink 打造流批一体的数据计算平台如何解决
60 1
|
4月前
|
数据可视化
Echarts数据可视化大屏开发| 大数据分析平台
Echarts数据可视化大屏开发| 大数据分析平台
|
6月前
|
人工智能 分布式计算 DataWorks
首批!阿里云 MaxCompute 完成中国信通院数据智能平台专项测试
2024年5月31日,在中国信通院组织的首批数据智能平台专项测试中,阿里云数据智能平台解决方案(MaxCompute、DataWorks、PAI)顺利完成测试。
336 5
首批!阿里云 MaxCompute 完成中国信通院数据智能平台专项测试
|
5月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。
|
6月前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI产品使用合集之如何在odps上启动独立的任务
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
人工智能平台PAI产品使用合集之如何在odps上启动独立的任务
|
6月前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI产品使用合集之在maxcompute上跑模型,如何在本地进行推理
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
人工智能平台PAI产品使用合集之在maxcompute上跑模型,如何在本地进行推理
|
5月前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI使用问题之如何在MaxCompute上使用Protobuf处理数据
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。