MaxCompute的任务状态和多任务执行

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 我们在使用MaxCompute的时候,我们其实非常期望知道当前有多少任务在跑,哪些任务耗时长,哪些任务已经完成,并且能通过任务的logview来分析任务耗时长的原因。

本文作者:龙利民
企业介绍:

ofo 小黄车是一个无桩共享单车出行平台,缔造了“无桩单车共享”模式,致力于解决城市出行问题。用户只需在微信服务号或App输入车牌号,即可获得密码解锁用车,随取随用,随时随地,也可以共享自己的单车到 ofo 共享平台,获得所有 ofo 小黄车的终身免费使用权,以1换N。

我们在使用MaxCompute的时候,我们其实非常期望知道当前有多少任务在跑,哪些任务耗时长,哪些任务已经完成,并且能通过任务的logview来分析任务耗时长的原因。

任务状态监控

MaxCompute的任务状态分Running和Terminated, 其中Running是包含:正在运行和等待运行的两种状态,Terminated包含:完成、失败、cancel的任务三个状态。阿里云提供了获取上述2种状态的SDK函数,odps.list_instances(status=Running|Terminated, start_time=开始时间,结束时间)。为了实现秒级别更新任务状态我们可以用以下思路来实现。

1、对于已经running的任务,我们需要快速更新它的状态,有可能已经完成了;

2、不断获取新的任务状态。

我们用Mysql来记录任务的状态表设计如下:


CREATE TABLE maxcompute_task (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
instanceid varchar(255) DEFAULT NULL comment '任务实例ID',
logview varchar(1024) DEFAULT NULL comment 'logview链接,查看问题非常有用',
start_time varchar(64) DEFAULT NULL comment '任务开始时间',
end_time varchar(64) DEFAULT NULL comment '任务结束时间',
cast_time varchar(32) DEFAULT NULL comment '耗时',
project_name varchar(255) DEFAULT NULL comment '项目名',
status varchar(64) DEFAULT NULL comment '任务状态',
PRIMARY KEY (id),
UNIQUE KEY instanceid (instanceid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

下面的页面可以查看当前的任务耗时,开始时间,对超过1小时的任务颜色使用红色标注,并且能查看logview,还能对任务进行取消,非常方便。
maxcompute_running

我们来看看代码的实现:

 

!/usr/bin/env python

-- coding: utf-8 --

author: lemon

import time
import threading
import traceback
import datetime
from odps import ODPS
from dataflow import config
from libs.myconn import Cursor
from config import DBINFO_BI_MASTER
from libs import logger as _logger

g_table_name = "bi_maxcompute_task"

def save_task(instanceid, odps, mysqlconn):

# 保存任务状态到Mysql, 分别传入odps连接器和mysql连接器
instance = odps.get_instance(instanceid)
project_name = odps.project
status = instance.status.value
start_time = instance.start_time
end_time =  instance.end_time

sql = "select logview,status from {0} where instanceid='{1}'".format(g_table_name, instanceid)

sqlret = mysqlconn.fetchone(sql)
if sqlret and sqlret["status"] == "Terminated":
    return
if sqlret and sqlret["logview"] is not None:
    logview = sqlret["logview"]
else:
    logview = instance.get_logview_address()
start_time = start_time + datetime.timedelta(hours=8)
if status == "Running":
    end_time = datetime.datetime.now()
else:
    end_time = end_time + datetime.timedelta(hours=8)
cast_time = end_time - start_time
colname = "instanceid,start_time,end_time,cast_time,project_name,status,logview"
values = ",".join(["'{0}'".format(r) for r in [instanceid, str(start_time),str(end_time), cast_time, project_name, status,logview]])
sql = """replace into {0}({1}) values({2}) """.format(g_table_name, colname, values)
mysqlconn.execute(sql)

class MaxcomputeTask(threading.Thread):

# 获取所有任务

def __init__(self, logger):
    threading.Thread.__init__(self)
    self.logger = logger
    self.hour = 1
    self.status_conf = [("demo", "Running"), ("demo", "Terminated"),
                        ("demo1", "Running"), ("demo1","Terminated")]

def run(self):
    # 建立mysql连接, 根据你的需要来使用
    self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
    while True:
        try:
            self.start_more()
            time.sleep(10)
        except:
            self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
            self.logger.error(traceback.format_exc())

def start_more(self,):
    for params in self.status_conf:
        self.get_task(*params)

def get_task(self, project_name, status):
    odps = ODPS(**config.ODPS_INFO)
    odps.project = project_name
    list = odps.list_instances(status=status, start_time=time.time() - self.hour * 3600)
    self.logger.info("start {0} {1} ".format(project_name, status))
    for row in list:
        save_task(instanceid=str(row), odps=odps, mysqlconn=self.mysqlconn)
    self.logger.info( "end {0} {1}".format(project_name, status))

    

class MaxcomputeTaskRunning(threading.Thread):

# 更新running任务的状态

def __init__(self, logger):
    threading.Thread.__init__(self)
    self.logger = logger

def run(self):
    self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
    while True:
        try:
            self.update_running()
            time.sleep(1)
        except:
            self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
            self.logger.error(traceback.format_exc())

def update_running(self):
    sql = "select instanceid, project_name from {0} where status='Running'".format(g_table_name)
    sqlret = self.mysqlconn.fetchall(sql)
    if not sqlret:
        return

    self.logger.info("{1} running update length:{0}".format(len(sqlret), time.strftime("%Y-%m-%d %H:%M:%S") ))
    for row in sqlret:
        odps = ODPS(**config.ODPS_INFO)
        odps.project = row["project_name"]
        save_task(row["instanceid"], odps, self.mysqlconn)

if name == "__main__":

# logger是自己编写的日志工具类
logger = _logger.Logger("maxcompute_task.log").getLogger()
running = MaxcomputeTaskRunning(logger)
running.setDaemon(True)
running.start()

task = MaxcomputeTask(logger)
task.start()

多任务执行

MaxCompute可以在命令行下运行,也可以用SDK,阿里云的集成环境跑任务等。很多时候我们面临的任务是非常多的,如何做一个多任务的代码执行器,也是经常遇到的问题。任务执行是一个典型的生产者和消费者的关系,生产者获取任务,消费者执行任务。这么做有2个好处。

1)任务执行的数量是需要可控的,如果同时运行的任务不可控势必对服务器资源造成冲击,
2)多机运行服务,避免单点故障,MaxCompute的任务是运行在云端的,可以通过instanceid获取到结果,此结果是保留7天的。

我大致贴一些我们在实际场景种的一些代码,生产者和消费者的代码:


class Consumer(threading.Thread):
def __init__(self, queue, lock):
    threading.Thread.__init__(self)
    self.queue = queue
    self.lock = lock
    self.timeout = 1800
def run(self):
    self.execute = Execute()
    logger.info("consumer %s start" % threading.current_thread().name)
    while G_RUN_FLAG:
        try:
            task = self.queue.get()
            self.execute.start(task)
        except:
            logger.error(traceback.format_exc())

class Producter(threading.Thread):

def __init__(self, queue, lock):
    threading.Thread.__init__(self)
    self.queue = queue
    self.lock = lock
    self.sleep_time = 30
    self.step_sleep_time = 5

def run(self):
    self.mysqlconn_bi_master = Cursor.new(**config.DBINFO_BI_MASTER)
    logger.info("producter %s start" % threading.current_thread().name)
    while G_RUN_FLAG:
        
        if self.queue.qsize() >= QUEUE_SIZE:
            time.sleep(self.sleep_time)
            continue

        # TODO
        self.queue.put(task)
        time.sleep(self.step_sleep_time)

def main():

queue = Queue.LifoQueue(QUEUE_SIZE)
lock = threading.RLock()

for _ in xrange(MAX_PROCESS_NUM):
    consumer = Consumer(queue, lock)
    consumer.setDaemon(True)
    consumer.start()

producter = Producter(queue, lock)
producter.start()
producter.join()

def signal_runflag(sig, frame):

global G_RUN_FLAG
if sig == signal.SIGHUP:
    logger.info("receive HUP signal ")
    G_RUN_FLAG = False

if name == "__main__":

logger.info("execute run")
if platform.system() == "Linux":
    signal.signal(signal.SIGHUP, signal_runflag)
main()
logger.info("execute exit.")

Maxcompute实际执行时的代码:


def _max_compute_run(self, taskid, sql):
    # 异步的方式执行
    hints = {
        'odps.sql.planner.mode': 'lot',
        'odps.sql.ddl.odps2': 'true',
        'odps.sql.preparse.odps2': 'lot',
        'odps.service.mode': 'off',
        'odps.task.major.version': '2dot0_demo_flighting',
        'odps.sql.hive.compatible': 'true'
    }
    new_sql = "{0}".format(sql)
    instance = self.odps.run_sql(new_sql, hints=hints)
    #instance = self.odps.run_sql(sql)

    # 异步的方式执行
    # instance = self.odps.run_sql(sql)
    self._save_task_instance_id(taskid, instance.id)
    # 阻塞直到完成
    instance.wait_for_success()
    return instance.id

获取结果时的代码:


def instance_result(odps, instance_id):
# 通过instance_id 获取结果
instance = odps.get_instance(instance_id)
response = []
with instance.open_reader() as reader:
    raw_response = [r.values for r in reader]
    column_names = reader._schema.names
    for  line in raw_response:
        tmp = {}
        for i in range(len(line)):
            tmp[column_names[i]] = line[i]
        response.append(tmp)
return response

总结:

阿里云的MaxCompute是非常好用的云计算服务,它的更新和迭代速度都非常快,使用阿里云解放工程师的搭建基础服务的时间,让我们更多的专注业务,站在巨人的肩膀上聪明的干活。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
SQL 分布式计算 运维
如何对付一个耗时6h+的ODPS任务:慢节点优化实践
本文描述了大数据处理任务(特别是涉及大量JOIN操作的任务)中遇到的性能瓶颈问题及其优化过程。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
23天前
|
存储 分布式计算 监控
大数据增加分区减少单个任务的负担
大数据增加分区减少单个任务的负担
28 1
|
2月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
108 0
|
5月前
|
SQL 缓存 分布式计算
DataWorks操作报错合集之执行DDL任务时遇到错误代码为152,报错:"ODPS-0110061",该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
分布式计算 资源调度 DataWorks
MaxCompute产品使用合集之如何增加MC中Fuxi任务的实例数
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之运行DDL任务时出现异常,具体错误是ODPS-0110061,该如何处理
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
110 3
|
5月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用合集之如何在MR任务中使用DECIMAL字段
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之当ODPS任务出错并重新运行时,数据的值可能会翻倍的原因是什么
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。