MaxCompute账号费用及任务耗时TOPN统计

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 很多数据开发者在使用MaxCompute开发过程中需要统计每个账号所属任务的费用使用情况以及每个任务耗时来做任务的合理性规划和调整。但是在使用MaxCompute的时候通常情况下大多数用户通过DataWorks标准模式下使用MaxCompute,这样在MaxCompute提供的元数据视图信息中将记录所有的生产作业执行账号为同一个主账号,只有小部分的开发作业执行账号为个人RAM子账号。那么如何去做到各个账户的费用分摊和任务时间成本的统计是大部分MaxCompute使用者关注的问题。本文主要介绍如何通过MaxCompute元数据统计账号费用及任务耗时,同时定时通过钉钉推送到客户群。

一、需求场景

很多数据开发者在使用MaxCompute开发过程中需要统计每个账号所属任务的费用使用情况以及每个任务耗时来做任务的合理性规划和调整。但是在使用MaxCompute的时候通常情况下大多数用户通过DataWorks标准模式下使用MaxCompute,这样在MaxCompute提供的元数据视图信息中将记录所有的生产作业执行账号为同一个主账号,只有小部分的开发作业执行账号为个人RAM子账号。那么如何去做到各个账户的费用分摊和任务时间成本的统计是大部分MaxCompute使用者所关注的问题。本文主要介绍如何通过MaxCompute元数据统计账号费用及任务耗时,同时定时通过钉钉推送到客户群。

二、需求实现

1、目前任务的费用可以通过账单详情中的用量明细来查询,但是没有归属到对应的子账号。我们需要通过元数据Information_Schema视图中的历史使用信息TASKS_HISTORY来统计。
2、任务耗时需要通过元数据来统计。

三、MaxCompute账号费用及任务耗时TOPN统计

1、元数据介绍

MaxCompute的Information Schema提供了项目元数据及使用历史数据等信息
注意
(1)目前Information Schema提供的是当前项目的元数据视图,不支持跨项目的元数据访问。如果需要对多个项目的元数据进行统一查询、分析,需要分别获取各个项目中的元数据并整合在一起进行跨项目元数据分析。
(2)元数据及作业历史数据保存在Information_Schema空间下,如需对历史数据进行快照备份或获得超过14天的作业历史,您可以定期将Information Schema的数据保存备份到用户指定项目空间。

2、如何根据元数据去实现账号费用TOPN统计

(1)元数据下载

元数据Information_Schema视图中的历史使用信息TASKS_HISTORY记录MaxCompute项目内已完成的作业历史,保留近14天数据。需要通过元数据来做任务费用统计,因此需要定期将Information Schema的数据保存备份到用户指定项目空间。
开始使用前,需要以Project Owner身份安装Information Schema的权限包,获得访问本项目元数据的权限。安装方式有如下两种:
a、在MaxCompute客户端(odpscmd)中执行如下命令。

odps@myproject1>install package information_schema.systables;

b、在DataWorks中的数据开发 > 临时查询中执行如下语句。

install package information_schema.systables;

Information Schema的视图包含了项目级别的所有用户数据,默认Project Owner可以访问查看。如果项目内其他用户或角色访问查看,需要进行授权。
语法如下。

grant actions on package <pkgName> to user <username>;
grant actions on package <pkgName> to role <role_name>;

(2)元数据下载备份

--创建元数据数据备份表information_history
use project1;
CREATE TABLE IF NOT EXISTS project1.information_history
(
    task_catalog STRING
    ,task_schema STRING
    ,task_name STRING
    ,task_type STRING
    ,inst_id STRING
    ,`status` STRING
    ,owner_id STRING
    ,owner_name STRING
    ,result STRING
    ,start_time DATETIME
    ,end_time DATETIME
    ,input_records BIGINT
    ,output_records BIGINT
    ,input_bytes BIGINT
    ,output_bytes BIGINT
    ,input_tables STRING
    ,output_tables STRING
    ,operation_text STRING
    ,signature STRING
    ,complexity DOUBLE
    ,cost_cpu DOUBLE
    ,cost_mem DOUBLE
    ,settings STRING
    ,ds STRING
)
STORED AS ALIORC
;

--定时将数据写入备份表information_history
use project1;
insert into table project1.information_history
select * from information_schema.tasks_history where ds ='${datetime1}';
注意:${datetime1}为DataWorks调度参数,参数配置如下:datetime1=${yyyymmdd}

如果要统计多个项目空间的元数据需要分别去各个项目空间安装元数据package。之后在其他工作空间执行相同的操作。把各个工作空间的元数据备份数据插入到同一个表中做集中统计分析。
本文将所有project元数据维护在project1.information_history表中。

use project2;
insert into table project1.information_history
select * from information_schema.tasks_history where ds ='${datetime1}';

备注:${datetime1}为DataWorks调度参数,参数配置如下:datetime1=${yyyymmdd}之后遇到的所有参数都是如此后续不在重复描述。
image.png

(3)通过元数据计算账号所属任务费用

通过DataWorks标准模式下使用MaxCompute,这样在MaxCompute提供的元数据视图信息中将记录所有的生产作业执行账号为同一个主账号,只有小部分的开发作业执行账号为个人RAM子账号。元数据视图TASKS_HISTORY中的字段settings记录上层调度或用户传入的信息,以JSON格式存储。包含字段:useragent、bizid、skynet_id和skynet_nodename。通过该字段可以具体到创建任务的子账号信息。

a、维护一张子账号明细表user_ram,记录需要统计的账号及账号ID

CREATE TABLE IF NOT EXISTS project1.user_ram
(
    user_id STRING
    ,user_name STRING
)
STORED AS ALIORC
;

b、账号所属任务消费(按量付费)TOPN统计

CREATE TABLE IF NOT EXISTS project1.cost_topn
(
    cost_sum DECIMAL(38,5)
    ,task_owner STRING
)
PARTITIONED BY 
(
    ds STRING
)
STORED AS ALIORC
;

---元数据TOPN按量付费消费统计
set odps.sql.decimal.odps2=true;
insert into table project1.cost_topn PARTITION (ds = '${datetime1}')
SELECT  
nvl(cost_sum,0) cost_sum
        ,CASE    WHEN a.task_owner='13************' OR a.task_owner='23************' OR a.task_owner='21************' THEN b.user_name 
                 ELSE a.task_owner 
         END task_owner
---注释部分为账号ID
FROM    (
            SELECT  inst_id
                    ,owner_name
                    ,task_type
                    ,a.input_bytes
                    ,a.cost_cpu
                    ,a.STATUS
                    ,CASE    WHEN a.task_type = 'SQL' THEN CAST(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 AS DECIMAL(18,5) )
                             WHEN a.task_type = 'SQLRT' THEN CAST(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 AS DECIMAL(18,5) )
                             WHEN a.task_type = 'CUPID' AND a.STATUS='Terminated'THEN CAST(a.cost_cpu/100/3600 * 0.66 AS DECIMAL(18,5) ) 
                             ELSE 0 
                     END cost_sum
                    ,a.settings
                    ,GET_JSON_OBJECT(settings, "$.SKYNET_ONDUTY") OWNER
                    ,CASE    WHEN GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") IS NULL THEN owner_name 
                             ELSE GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") 
                     END task_owner
            FROM    project1.information_history a
            WHERE   ds = '${datetime1}'
        ) a
LEFT JOIN project1.user_ram b
ON      a.task_owner = b.user_id
;

备注:
task_type = 'SQL' 为SQL任务、task_type = 'SQLRT' 为查询加速(MCQA)任务,task_type = 'CUPID' 为Spark任务,其他计费任务如MapReduce、Lightning(交互式分析)、Mras计算公式如下,详细介绍请参考计算费用(按量计费)
MapReduce作业的计费公式为:

MapReduce作业当日计算费用=当日总计算时×单价(0.46元/计算时)
一个执行成功的MapReduce作业计算时=作业运行时间(小时)×作业调用的Core数量。

Lightning查询作业的计费公式为:

一次Lightning查询作业费用=查询输入数据量×单价(0.03元/GB)

Mars作业的计费公式为:

Mars作业当日计算费用=当日总计算时×单价(0.66元/计算时)

3、如何根据元数据去实现任务耗时TOPN统计


---创建任务耗时TOPN表time_topn
CREATE TABLE IF NOT EXISTS project1.time_topn
(
    inst_id STRING
    ,cost_time BIGINT
    ,task_owner STRING
)
PARTITIONED BY 
(
    ds STRING
)
STORED AS ALIORC
;

---任务耗时TOPN统计
INSERT INTO TABLE project1.time_topn PARTITION(ds = '${datetime1}')
SELECT  inst_id
        ,cost_time
        ,CASE    WHEN a.task_owner='13**********' OR a.task_owner='23**********' OR a.task_owner='21**********' THEN b.user_name 
                 ELSE a.task_owner 
         END task_owner
FROM    (
            SELECT  inst_id
                    -- ,task_type
                    -- ,status
                    ,datediff(a.end_time, a.start_time, 'ss') AS cost_time
                    ,CASE    WHEN GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") IS NULL THEN owner_name 
                             ELSE GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") 
                     END task_owner
            FROM    project1.information_history a
            WHERE   ds = '${datetime1}'
        ) a
LEFT JOIN project1.user_ram b
ON      a.task_owner = b.user_id
;

4、通过钉钉机器人推送到钉群

(1)群机器人开发API

a、获取自定义机器人webhook

打开机器人管理页面。以PC端为例,打开PC端钉钉,点击头像,选择“机器人管理”。
image.png
在机器人管理页面选择“自定义”机器人,输入机器人名字并选择要发送消息的群,同时可以为机器人设置机器人头像。
image.png
完成必要的安全设置(至少选择一种),勾选 我已阅读并同意《自定义机器人服务及免责条款》,点击“完成”。安全设置目前有3种方式,设置说明参考 安全设置
image.png
完成安全设置后,复制出机器人的Webhook地址,可用于向这个群发送消息,格式如下:

https://oapi.dingtalk.com/robot/send?access_token=XXXXXX

注意:请保管好此Webhook 地址,不要公布在外部网站上,泄露后有安全风险。

(2)钉群消息推送demo

a、代码实现

package com.alibaba.sgri.message;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.task.SQLTask;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.dingtalk.api.response.OapiRobotSendResponse;
import com.taobao.api.ApiException;

/**
 * @class: OdpsMessageSendNew
 * @description:
 * @author: Liujianwei
 * @date: 2020-10-16 18:26:12
 **/

public class test {

    public static void main(String[] args) throws ApiException {
        if (args.length < 1) {
            System.out.println("请输入日期参数");
            System.exit(0);
        }
        System.out.println("开始读取数据");
        DingTalkClient client = new DefaultDingTalkClient(
                "https://oapi.dingtalk"
                        + ".com/robot/send?access_token=Webhook地址\n");
        OapiRobotSendRequest request = new OapiRobotSendRequest();
        request.setMsgtype("markdown");
        OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
        //这里的日期作为参数
        markdown.setText(getContent(args[0]));
        markdown.setTitle("任务消费TOPN");
        request.setMarkdown(markdown);
        OapiRobotSendResponse response = client.execute(request);
        System.out.println("消息发送成功");
    }

    /**
     * 读取ODPS,获取要发送的数据
     *
     * @return
     */

    public static String getContent(String day) {
        Odps odps = createOdps();
        StringBuilder sb = new StringBuilder();
        try {
            //==================这是任务消费=====================
            String costTopnSql = "select sum(cost_sum)cost_sum,task_owner from cost_topn where ds='" + day + "' " + "group by task_owner order by cost_sum desc limit 5;";
            Instance costInstance = SQLTask.run(odps, costTopnSql);
            costInstance.waitForSuccess();
            ResultSet costTopnRecords = SQLTask.getResultSet(costInstance);
            sb.append("<font color=#FF0000 size=4>").append("任务消费TOPN(").append(day).append(
                    ")[按照阿里云按量付费计算]").append("</font>").append("\n\n");
            AtomicInteger costIndex = new AtomicInteger(1);
            costTopnRecords.forEach(item -> {
                sb.append(costIndex.getAndIncrement()).append(".").append("账号:");
                sb.append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
                sb.append("  ").append(" ").append("消费:").append("<font color=#2E64FE>").append(item.get("cost_sum"))
                        .append("元").append(
                        "</font>").append("\n\n")
                        .append("</font>");
            });
            //==================这是任务耗时=====================
            String timeTopnSql = "select * from time_topn where ds='" + day + "' ORDER BY cost_time DESC limit 5;";
            Instance timeInstance = SQLTask.run(odps, timeTopnSql);
            timeInstance.waitForSuccess();
            ResultSet timeTopnRecords = SQLTask.getResultSet(timeInstance);
            sb.append("<font color=#FF8C00 size=4>").append("任务耗时TOPN(").append(day).append(")")
                    .append("\n\n").append("</font>");
            AtomicInteger timeIndex = new AtomicInteger(1);
            timeTopnRecords.forEach(item -> {
                sb.append(timeIndex.getAndIncrement()).append(".").append("任务:");
                sb.append("<font color=#2E64FE>").append(item.getString("inst_id")).append("\n\n").append("</font>");
                sb.append("   ").append("账号:").append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
                sb.append("   ").append("耗时:").append("<font color=#2E64FE>").append(item.get("cost_time"))
                        .append("秒").append(
                        "</font>").append("\n\n");
            });
        } catch (OdpsException | IOException e) {
            e.printStackTrace();
        }
        return sb.toString();
    }

    /**
     * 创建ODPS
     *
     * @return
     */
    public static Odps createOdps() {
        String project = "******";
        String access_id = ""******";";
        String access_key = ""******";";
        String endPoint = "http://service.odps.aliyun.com/api";
        Account account = new AliyunAccount(access_id, access_key);
        Odps odps = new Odps(account);
        odps.setEndpoint(endPoint);
        odps.setDefaultProject(project);
        return odps;
    }
}

备注:自定义钉钉群机器人开发API参考:钉钉开发平台

b、pom文件参考

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>DingTalk_Information</groupId>
    <artifactId>DingTalk_Information</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-core</artifactId>
            <version>0.35.5-public</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>alibaba-dingtalk-service-sdk</artifactId>
            <version>1.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-jdbc</artifactId>
            <version>3.0.1</version>
            <classifier>jar-with-dependencies</classifier>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <!-- get all project dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <!-- MainClass in mainfest make a executable jar -->
                    <archive>
                        <manifest>
                            <mainClass>com.alibaba.sgri.message.test</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <!-- bind to the packaging phase -->
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

c、定时调度执行

程序调度可以打包提交服务器去设置定时调度。本文打包程序提交至DataWorks执行定时调度,同时元数据获取也是每日调度执行。
上传jar包为MaxCompute资源,然后引用资源执行jar包且配置定时调度。
上传MaxCompute资源及引用参考MaxCompute资源,这里不做详细介绍
image.png
各项目元数据采集、任务消费和耗时TOPN计算及钉群机器人推送上下游节点调度配置如下:
image.png
DataWorks节点上下游配置参考节点上下文

四、每日钉群推送任务消费及耗时TOPN效果展示

image.png

五、相关费用统计参考文档

1、MaxCompute账单分析最佳实践:MaxCompute账单分析最佳实践
2、查看账单详情:查看账单详情
3、在DataWorks标准模式下统计个人账号使用资源情况:在DataWorks标准模式下统计个人账号使用资源情况
4、利用InformationSchema与阿里云交易和账单管理API实现MaxCompute费用对账分摊统计:利用InformationSchema与阿里云交易和账单管理API实现MaxCompute费用对账分摊统计

六、MaxCompute开发者社区交流群

欢迎加入“MaxCompute开发者社区2群”,点击链接MaxCompute开发者社区2群申请申请加入或扫描以下二维码加入。
image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
4月前
|
SQL 分布式计算 运维
如何对付一个耗时6h+的ODPS任务:慢节点优化实践
本文描述了大数据处理任务(特别是涉及大量JOIN操作的任务)中遇到的性能瓶颈问题及其优化过程。
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
存储 分布式计算 监控
大数据增加分区减少单个任务的负担
大数据增加分区减少单个任务的负担
34 1
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
57 3
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
131 0
|
6月前
|
SQL 缓存 分布式计算
DataWorks操作报错合集之执行DDL任务时遇到错误代码为152,报错:"ODPS-0110061",该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
分布式计算 资源调度 DataWorks
MaxCompute产品使用合集之如何增加MC中Fuxi任务的实例数
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
分布式计算 大数据 MaxCompute
MaxCompute产品使用合集之如何实现根据商品维度统计每件商品的断货时长的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之运行DDL任务时出现异常,具体错误是ODPS-0110061,该如何处理
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
122 3

相关产品

  • 云原生大数据计算服务 MaxCompute