1. 项目结构
- 目标
- 理解项目中为什么需要调度平台
- 步骤
- 项目介绍
- 项目结构
1.1. 项目介绍
项目功能
- 标签创建
- 标签计算
- 标签审计
- …
如果要对标签进行计算, 就需要把标签和标签的计算都管理起来
创建标签时, 执行标签计算任务
把计算和标签关联起来, 通过标签能找到任务, 通过任务也知道它计算的是什么标签
标签, 年龄就是一个标签
计算任务, 为了计算年龄标签, 需要去读取用户数据, 计算每一个用户是哪一个标签
用户画像数据, 每一个用户的标签, 就是用户画像数据
修改标签时, 标签计算任务同步修改
删除标签时, 标签计算任务停止
所以, 我们的项目需要和调度平台结合起来, 不仅管理标签本身, 也需要管理标签相关联的任务, 所以需要配合调度平台来管理
1.2. 项目结构
- 管理者通过画像系统添加, 修改, 删除标签
- 画像系统将标签抽象为标签和任务, 通过 Oozie 调度到 Yarn 中
- Yarn 中的 Spark 任务读取 MySQL 中的标签元信息
- Spark 任务通过 HBase 读取需要计算的数据
- 计算完成后, 插入 HBase 对应的表中
2. Oozie 介绍
- 目标
- 了解 Oozie 的作用
- 步骤
- 需求
- 可选的方式
- Oozie 和同类的对比
2.1. 需求
- 现在要计算年龄标签, 有七个阶段, 50后、60后、70后、80后、90后、00后、10后、20后
- 通过 Spark 写了一个程序, 读取用户年龄, 把每个用户的年龄映射到每个阶段内
- 因为年龄标签变化的频率不高, 所以决定要一年计算一次
- 如何完成这个计算任务的调度?
2.2. 可选的方式
2.2.1. Crontab
- Crontab 是 Linux 的一个调度程序, 允许我们通过一个简洁的语法表示调度周期, 在对应的时间点周期性的执行某个 Shell 脚本或者 Shell 命令
# 分 时 日 月 星期 要运行的命令 # 每1分钟执行一次myCommand * * * * * myCommand # 每小时的第3和第15分钟执行 3,15 * * * * myCommand # 在上午8点到11点的第3和第15分钟执行 3,15 8-11 * * * myCommand # 每周一上午8点到11点的第3和第15分钟执行 3,15 8-11 * * 1 myCommand
我们可以编写一个 Shell 脚本, 其中写上
spark-submit --master yarn \ --class xx --executor-memory 500m --executor-cores 1 --num-executors 1 xx.jar args1
然后再将如下的 cron 加入 crontab
0 20 * * * tag_age.shell
这样的方式有如下几个问题
- cron 任务只能通过命令来管理
crontab -e
- 如果换成如下的需求, 这个流程我们该如何控制呢?
- 把数据抽取到 HBase 中
- 同步执行如下两个任务
- 合并用户数据和订单数据, 生成宽表
- 合并用户数据和商品数据, 生成宽表
- 合并两个宽表, 计算用户标签
- 如果执行的程序出错了, 只能登录到对应的主机查看 Log
- …
2.2.2. Oozie
- 创建一个 Workflow, 表示一个任务的执行流程
- 创建一个 Coordinator, 表示任务的执行周期
- 通过 Hue 提交任务
- 通过 Hue 查看任务执行的 Log
2.3. Oozie 和竞品的对比
现在市面上比较多见的调度平台分别是 Oozie 和 Azkaban, 还有一个后起之秀 AirFlow
Airflow | Azkaban | Oozie |
机构 | Airbnb | Apache | |
社区 | Very Active | Somewhat active | Active |
历史 | 4年 | 7 年 | 8 年 |
目的 | General Purpose Batch Processing | Hadoop 作业调度 | Hadoop 作业调度 |
流程定义 | Python | Custom DSL | XML |
单节点 | 支持 | Yes | Yes |
快速开始 | 支持 | 支持 | 不支持 |
高可用 | Yes | Yes | Yes |
单点故障 | 是 | 是 | 不, 使用 Yarn |
运行模式 | Push | Push | Poll |
Rest API 触发 | Yes | Yes | Yes |
外部事件触发 | Yes | No | Yes |
Web 权限 | LDAP/Password | XML Password | Kerberos |
监控 | Yes | Limited | Yes |
可扩展性 | Depending on executor setup | Good | Very Good |
- 在对工具的支持上, Airflow 和 Oozie 都很好, 但是 Airflow 要简便一些
- 功能上, Oozie 最强大
- 稳定性上, Oozie 最稳定
- 上手速度, Azkaban 最快
- 带有个人情绪的简评, Oozie 最稳定, 也最难以驾驭, 坑多
3. Oozie 组件
- 目标
- 理解 Oozie 各个组件
- 步骤
- Workflow
- Coordinator
3.1. Workflow
Workflow 就是一个工作流, 从 start 开始, 到 end 结束
Workflow 中提供了 Fork 和 Join 机制
Fork 可以使得多个任务并行
Join 可以让并行的任务汇总
Workflow 中提供了 Decision 机制
类似于 if 一样, 可以提供分支选择功能
- 在任务执行完成后, Workflow 可以以发送邮件, 短信等方式通知
- Workflow 提供了一种类似 DAG 的方式来执行一组任务
- Oozie 支持 MR, Hive, Spark, Shell 等的支持
3.2. Coordinator
- Workflow 表示了任务的实现和关系
- Oozie 提供了 Coodinator, 用来调度 Workflow
- Coordinator 在 Hue 中叫做 Scheduler
4. 调度实现
- 目标
- 实现工程中的调度功能
- 步骤
- 执行流程
- Workflow
- Coordinator
- Java 代码
- 调度 Workflow
4.1. 执行流程
- 用户在网页中添加四级标签, 因为四级标签代表标签, 五级标签代表标签值域
- 上传 Jar 包
- 提交标签
- 后端收到添加标签的请求, 存储标签信息并将 Jar 包上传到 HDFS
cn.oldlu.tag.web.basictag.controller.BasicTagController#modelUpload
- 用户在网页中开启标签计算
- 后端收到请求, 联系 Oozie 开始调度
添加标签的执行流程如下
- 调用了 BasicTagController
- 在 BasicTagController 中调用了 BasicTagService
- 保存 Tag 信息, 指的就是具体的标签数据, 比如说标签叫什么, 谁创建的, 什么时候创建的, 级别…
- 保存 Model 信息, 标签是需要计算的, 我们使用 Spark Job 计算, Main方法在哪个类, Jar 包的位置等…
- 在保存之前, 把 Model 对应 Jar 包上传到 HDFS 中
- 路径
/app/models/Tag_10/xx.jar
开启任务的流程如下
- 调用 TaskProcessing 中的方法
- TaskProcessing 会调用 EngineService 中的 start 方法
- 传进来了一个 标签 ID
4.2. Workflow
- Workflow 使用 XML 编写, 是一种通过配置表示过程的方式
- Workflow 中的配置无需太过认真的了解, 大致知道流程即可
- 因为一般情况下, 我们使用 Hue 或者 Ambari 的图形化方式去编辑 Workflow , 手写的可能性比较低
- Workflow 中可以使用 EL 表达式填充信息
<workflow-app name="Extra goods from hive to hbase fully" xmlns="uri:oozie:workflow:0.5"> <start to="tag-model-job"/> <action name="tag-model-job"> <spark xmlns="uri:oozie:spark-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <master>${sparkMaster}</master> <mode>${sparkDeployMode}</mode> <name>${oozieWFName}</name> <class>${sparkJobMain}</class> <jar>${sparkJobJar}</jar> <spark-opts>${sparkJobOpts}</spark-opts> <arg>${sparkMainOpts}</arg> <file>${sparkContainerCacheFiles}</file> </spark> <ok to="End"/> <error to="Kill"/> </action> <kill name="Kill"> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="End"/> </workflow-app>
4.3. Coordinator
- 此处的 Coordinator 只是为了周期性调度 Workflow
- 在项目中提供的调度周期有: 日, 月, 年
<?xml version="1.0" encoding="UTF-8"?> <coordinator-app name="cron-coordinator" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="Asia/Shanghai" xmlns="uri:oozie:coordinator:0.5"> <action> <workflow> <app-path>${oozieWorkflowPath}</app-path> </workflow> </action> </coordinator-app>
4.4. Java 代码
private boolean startEngineActually(EngineBean engineBean) { ModelBean modelBean = modelMapper.get(new ModelBean(engineBean.getTagId())); MetaDataBean metaDataBean = metaDataMapper.get(new MetaDataBean(engineBean.getTagId())); // 1. 判断模型和元数据是否存在, 如果不存在则启动失败 HdfsUtil hdfsUtil = HdfsUtil.getInstance(); if (!hdfsUtil.exist(modelBean.getModelPath())) { logger.error("模型包不存在, 启动失败"); return false; } if (StringUtils.isBlank(metaDataBean.getInType())) { logger.error("模型元信息不存在或者缺失 InType, 启动失败"); return false; } // 2. 上传 Oozie 配置到 HDFS String localOozieConfigPath = engineBean.getRemark() + oozieConfigDirName + "/"; String tagModelPath = hdfsUtil.getPath(new File(modelBean.getModelPath()).getParent()); hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + workflowFileName, tagModelPath); hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + coordinatorFileName, tagModelPath); // 3. 构建 Oozie job 参数 OozieUtil oozie = oozieUtil.build(); Properties oozieConf = oozie.getConf(); // 3.1. Workflow 部分 String separator = "/"; oozieConf.setProperty("nameNode", nameNode); oozieConf.setProperty("sparkJobMain", modelBean.getModelMain()); if (StringUtils.isNotBlank(modelBean.getArgs())) { oozieConf.setProperty("sparkJobOpts", modelBean.getArgs()); } oozieConf.setProperty("sparkJobJar", "${nameNode}" + modelBean.getModelPath()); oozieConf.setProperty("sparkContainerCacheFiles", "${nameNode}" + modelBean.getModelPath()); // 3.2. Coordinator 部分, Coordinator 中的时间格式是 UTC, 如 2018-06-019T20:01Z oozieConf.setProperty("oozieWorkflowPath", "${nameNode}" + tagModelPath + separator); oozieConf.setProperty("oozie.coord.application.path", "${nameNode}" + tagModelPath + separator + coordinatorFileName); // ScheTime 的格式为 每天#2018-06-01 20::01#2018-06-01 20::01 String[] scheduleArr = modelBean.getScheTime().split("#"); String freq = scheduleArr[0]; String startDateTime = scheduleArr[1]; String endDateTime = scheduleArr[2]; // 设置频率 String freqStr = ""; switch (freq) { case "每天": freqStr = "day"; break; case "每周": freqStr = "week"; break; case "每月": freqStr = "month"; break; case "每年": freqStr = "year"; break; } oozieConf.setProperty("freq", freqStr); // 设置开始时间 String[] startDateTimeArr = startDateTime.split(" "); String startDate = startDateTimeArr[0]; String startTime = startDateTimeArr[1]; oozieConf.setProperty("start", startDate + "T" + startTime + "Z"); // 设置结束时间 String[] endDateTimeArr = endDateTime.split(" "); String endDate = endDateTimeArr[0]; String endTime = endDateTimeArr[1]; oozieConf.setProperty("end", endDate + "T" + endTime + "Z"); logger.info(oozieConf.toString()); // 4. 提交 Oozie 任务 String jobId = oozie.start(oozieConf); // 5. 设置监控 EngineBean mEngineBean = new EngineBean(); mEngineBean.setJobid(jobId); mEngineBean.setTagId(engineBean.getTagId()); mEngineBean.setStatus("3"); int state = engineMapper.addMonitorInfo(mEngineBean); return state > 0; }
4.5. 执行流程
- 上传 Jar 包
- 创建四级标签
- 开启标签任务
- Hue 中查看
- 注意把 Filter 清空
4.5. 调度 Workflow
因为在测试阶段, 我们希望每次提交任务都立刻执行, 所以可以注释掉提交 Oozie 任务的代码, 改为直接提交 Workflow 任务
- 去掉 Coordinator 的位置, 改为 APP_PATH, 则会直接找到 Workflow 提交
private boolean startEngineActually(EngineBean engineBean) { ModelBean modelBean = modelMapper.get(new ModelBean(engineBean.getTagId())); MetaDataBean metaDataBean = metaDataMapper.get(new MetaDataBean(engineBean.getTagId())); // 1. 判断模型和元数据是否存在, 如果不存在则启动失败 HdfsUtil hdfsUtil = HdfsUtil.getInstance(); if (!hdfsUtil.exist(modelBean.getModelPath())) { logger.error("模型包不存在, 启动失败"); return false; } if (StringUtils.isBlank(metaDataBean.getInType())) { logger.error("模型元信息不存在或者缺失 InType, 启动失败"); return false; } // 2. 上传 Oozie 配置到 HDFS String localOozieConfigPath = engineBean.getRemark() + oozieConfigDirName + "/"; String tagModelPath = hdfsUtil.getPath(new File(modelBean.getModelPath()).getParent()); hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + workflowFileName, tagModelPath); hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + coordinatorFileName, tagModelPath); // 3. 构建 Oozie job 参数 OozieUtil oozie = oozieUtil.build(); Properties oozieConf = oozie.getConf(); // 3.1. Workflow 部分 String separator = "/"; oozieConf.setProperty("nameNode", nameNode); oozieConf.setProperty("sparkJobMain", modelBean.getModelMain()); if (StringUtils.isNotBlank(modelBean.getArgs())) { oozieConf.setProperty("sparkJobOpts", modelBean.getArgs()); } oozieConf.setProperty("sparkJobJar", "${nameNode}" + modelBean.getModelPath()); oozieConf.setProperty("sparkContainerCacheFiles", "${nameNode}" + modelBean.getModelPath()); oozieConf.setProperty(OozieClient.APP_PATH, "${nameNode}" + tagModelPath + separator + workflowFileName); // 3.2. Coordinator 部分, Coordinator 中的时间格式是 UTC, 如 2018-06-019T20:01Z // oozieConf.setProperty("oozieWorkflowPath", "${nameNode}" + tagModelPath + separator); // oozieConf.setProperty("oozie.coord.application.path", "${nameNode}" + tagModelPath + separator + coordinatorFileName); // // // ScheTime 的格式为 每天#2018-06-01 20::01#2018-06-01 20::01 // String[] scheduleArr = modelBean.getScheTime().split("#"); // String freq = scheduleArr[0]; // String startDateTime = scheduleArr[1]; // String endDateTime = scheduleArr[2]; // // // 设置频率 // String freqStr = ""; // switch (freq) { // case "每天": // freqStr = "day"; // break; // case "每周": // freqStr = "week"; // break; // case "每月": // freqStr = "month"; // break; // case "每年": // freqStr = "year"; // break; // } // oozieConf.setProperty("freq", freqStr); // // // 设置开始时间 // String[] startDateTimeArr = startDateTime.split(" "); // String startDate = startDateTimeArr[0]; // String startTime = startDateTimeArr[1]; // oozieConf.setProperty("start", startDate + "T" + startTime + "Z"); // // // 设置结束时间 // String[] endDateTimeArr = endDateTime.split(" "); // String endDate = endDateTimeArr[0]; // String endTime = endDateTimeArr[1]; // oozieConf.setProperty("end", endDate + "T" + endTime + "Z"); // // logger.info(oozieConf.toString()); // 4. 提交 Oozie 任务 String jobId = oozie.start(oozieConf); // 5. 设置监控 EngineBean mEngineBean = new EngineBean(); mEngineBean.setJobid(jobId); mEngineBean.setTagId(engineBean.getTagId()); mEngineBean.setStatus("3"); int state = engineMapper.addMonitorInfo(mEngineBean); return state > 0; }