DataWorks OpenAPI 实战-数据开发全流程介绍

本文涉及的产品
大数据开发治理平台DataWorks,Serverless资源组抵扣包300CU*H
简介: DataWorks作为飞天大数据平台操作系统,历经11年发展,形成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在使用产品的过程中希望他们的本地服务能够和阿里云上的DataWorks服务进行交互,从而提升企业大数据处理的效率,减少人工操作和运维工作,降低数据风险和企业成本,现在DataWorks开放OpenAPI能力满足企业的定制化需求。

DataWorks作为飞天大数据平台操作系统,历经11年发展,形成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在使用产品的过程中希望他们的本地服务能够和阿里云上的DataWorks服务进行交互,从而提升企业大数据处理的效率,减少人工操作和运维工作,降低数据风险和企业成本,现在DataWorks开放OpenAPI能力满足企业的定制化需求。
DataWorks OpenAPI涵盖租户、元数据、数据开发、运维中心、数据质量、数据服务等DataWorks核心能力,企业版和旗舰版分别赠送100万次/月、1000万次/月的免费调用额度。

关于Dataworks OpenAPI开通要求和开放地域可查阅DataWorks OpenAPI概述
限DataWorks企业版及以上使用立即开通
开通7天试用请使用钉钉扫码联系

墨祤二维码

实战简介

我们假设这样一个简单的场景,开发人员想把RDS库里面的数据同步到一张MaxCompute分区表中,然后在自建系统的页面上展示经过数据分析后的报表数据,那么如何通过DataWorks OpenAPI去完成整个链路的实现呢?

实战准备

一、引入DataWorks OpenAPI SDK

这一部分可参考 安装 DataWorks OpenAPI Java SDK,除了java语言,我们还支持Python,PHP,C#,Go 等语言支持。默认情况下我们不需要显式去指定DataWorks OpenAPI的EndPoint,但是如果aliyun-java-sdk-core版本偏低的情况下可能会找不到DataWorks OpenAPI的Endpoint,这时候可在不升级版本的情况下通过使用如下代码进行请求。

    IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
    DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");
    IAcsClient client = new DefaultAcsClient(profile);

如上代码是显式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.aliyuncs.com这样的域名格式在公网环境下可访问,但是有些用户需要在VPC环境下调用OpenAPI,那么则需要把域名dataworks.${regionId}.aliyuncs.com 变更成 dataworks-vpc.${regionId}.aliyuncs.com,这样在VPC网络环境下即使不能访问公网也能请求到DataWorks OpenAPI。
如果您不清楚regionId(地域ID)的概念,可参考地域和可用区

二、了解DataWorks OpenAPI文档

详细阅读DataWorks OpenAPI文档对开发非常有帮助,做API开发时如果对参数的约束不太理解时可参考DataWorks OpenAPI文档,里面对每个出入参、参数示例、错误码描述都有详细的解释。点击查看API参考>>

api文档1.png

实战步骤

步骤一:创建RDS数据源

集成租户API可创建引擎、创建数据源、查看项目空间等信息。在我们这个业务场景中,MaxCompute分区表存在于MaxCompute引擎中,我们在DataWorks管控台创建完MaxCompute工作空间后会自动创建好MaxCompute引擎的数据源,所以我们只需要使用【CreateConnection】创建好RDS数据源即可:

        CreateConnectionRequest createRequest = new CreateConnectionRequest();

        createRequest.setProjectId(-1L);
        createRequest.setName("TEST_CONNECTION");
        createRequest.setConnectionType("MYSQL");
        createRequest.setEnvType(1);
        createRequest.setContent("{\"password\":\"12345\"}");
        Long connectionId;

        try {
            CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);
            Assert.assertNotNull(createResponse.getData());
            connectionId = createResponse.getData();

            UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
            updateRequest.setConnectionId(connectionId);
            updateRequest.setDescription("1");
            UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);
            Assert.assertTrue(acsResponse.getData());

            DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
            deleteRequest.setConnectionId(connectionId);
            DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);
            Assert.assertTrue(deleteResponse.getData());
        } catch (ClientException e) {
            e.printStackTrace();
            Assert.fail();
        }

UpdateConnection和DeleteConnection可分别修改和删除数据源信息。另外对项目空间的成员进行管理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。

步骤二:表的创建

集成DataWorks元数据OpenAPI我们能管理引擎侧的表信息,通过DataWorks管控台和租户API我们完成了MaxCompute引擎和RDS数据源的创建工作,下一步需要完成表的创建,可通过元数据的【CreateTable】完成:

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateTableRequest request = new CreateTableRequest();
        request.setTableName("table_test");
        request.setColumnss(new ArrayList<>());
        request.setEndpoint("endpoint");
        CreateTableResponse response = client.getAcsResponse(request);
        String nextTaskId = response.getTaskInfo().getNextTaskId();
        System.out.println(nextTaskId);

关于表管理的API集是CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable等,除了可对表进行管理,元数据API还能对表元数据、表主题进行管理,更多详情可参考DataWorks OpenAPI文档。

步骤三:任务开发和发布调度

集成数据开发API可管理文件,并对文件进行提交和发布后生成周期任务,周期任务会定时调度运行,创建不同类型的文件是根据FileType这个字段决定的,目前我们已支持非常多的FileType,通过运维中心的API【ListProgramTypeCount】可获取所有已支持的系统节点以及自定义节点。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        CreateFileRequest createFileRequest = new CreateFileRequest();
        createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());
        createFileRequest.setInputList(projectIdentifier+"_root");
        createFileRequest.setContent(content);
        createFileRequest.setFileName("create_file_" + caseId);
        createFileRequest.setFileFolderPath("业务流程/POP接口测试/MaxCompute/test_folder_3");
        createFileRequest.setFileDescription("create file " + caseId);
        createFileRequest.setRerunMode("ALL_ALLOWED");
        CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);

content字段存储SQL脚本、Shell脚本、数据集成的脚本代码,数据集成的脚本格式可参考通过脚本模式配置任务
使用【CreateFile】创建完脚本后,如需修改可使用UpdateFile、DeleteFile进行管理。和页面上的操作流程一致的是完成文件开发后得提交和发布文件才会生成周期实例,这里要注意的是需要轮询SubmitFile返回的 DeploymentId,只有当GetDeployment返回的状态是完成时(status.finished())才表示部署成功。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");
        IAcsClient client = new DefaultAcsClient(profile);
        SubmitFileRequest request = new SubmitFileRequest();
        request.setFileId(fileId);
        request.setComment("submit file");
        SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);

        //检查提交结果
        DeploymentStatus status = null;
        GetDeploymentResponse.Data.Deployment deployment = null;
        int retryTimes = 0;
        while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
            Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
            Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
        }

如果是在标准模式的项目下开发,提交完成后,还需要发布文件才能最终提交到调度成为周期任务。发布文件使用DeployFile,和提交文件一样,也需要使用GetDeployment轮询部署状态。

    DeployFileRequest request = new DeployFileRequest();
    request.setFileId(fileId);
    request.setComment("deploy file");
    DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);
    //检查发布部署结果
    DeploymentStatus status = null;
    GetDeploymentResponse.Data.Deployment deployment = null;
    int retryTimes = 0;
    while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                    deploymentId, new Gson().toJson(deployment));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
    }

数据开发API除了可对文件管理外,还能管理文件夹、资源、函数,更多详情可参考DataWorks OpenAPI文档。

步骤四:配置运维监控

通过API完成周期任务的生产之后,会在DataWorks平台每天生成调度实例被定时调度运行,使用运维中心API可对周期任务和周期实例进行运维操作,可通过GetNode、GetInstance、ListInstances等API查看周期任务和周期实例,监控实例运行情况。

        GetInstanceRequest request = new GetInstanceRequest();
        request.setInstanceId(INSTANCE_ID);
        request.setProjectEnv(PROJECT_ENV);
        try {
            GetInstanceResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
            BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
            Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
            Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
            Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
            Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
            Assert.assertEquals("", bizInstanceDto.getParamValues());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
            Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
            Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
            Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
            Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }

如果实例运行异常可通过RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance处理。
使用CreateRemind、UpdateRemind等API可创建自定义报警规则,确保每天基线顺利产出,一旦异常可告警通知到人工,然后介入。

        CreateRemindRequest createRemindRequest = new CreateRemindRequest();
        createRemindRequest.setRemindName("REMIND_CREATE_TEST");
        createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
        createRemindRequest.setRemindType(RemindType.ERROR.name());
        createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
        createRemindRequest.setDndEnd("08:00");
        createRemindRequest.setNodeIds("-1");
        createRemindRequest.setMaxAlertTimes(1);
        createRemindRequest.setAlertInterval(1800);
        createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
        createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
        try { 
            CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);
            MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
            Assert.assertTrue(createResponse.getData() > 0);
        } catch (Exception ex) {
            ex.printStackTrace();
            return;
        }

运维中心主要提供周期任务、手动业务流程、基线查询、告警配置和查询等相关API,可参考DataWorks OpenAPI文档。

步骤五:配置数据质量监控

在这个业务场景中,我们通过前面介绍的API已经可以每天定时把数据从RDS同步到MaxCompute的表中了。如果我们担心产生脏数据或者数据缺失影响到线上业务,那么可通过数据质量API来集成DataWorks数据质量监控能力,当表数据产出异常时,可以立刻触发给规则订阅人。

        CreateQualityRuleRequest request = new CreateQualityRuleRequest();
        request.setBlockType(0);
        request.setComment("test-createTemplateRuleSuccess");
        request.setCriticalThreshold("50");
        request.setEntityId(entityId);
        request.setOperator("abs");
        request.setPredictType(0);
        request.setProjectName(PROJECT_NAME);
        request.setProperty("table_count");
        request.setPropertyType("table");
        request.setRuleName("createTemplateRuleSuccess");
        request.setRuleType(0);
        request.setTemplateId(7);
        request.setWarningThreshold("10");
        try {
            CreateQualityRuleResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
            Long templateRuleId = Long.parseLong(data.toString());
            Assert.assertTrue(templateRuleId > 0);
            return templateRuleId;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.assertFalse(true);
            return null;
        }

CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等数据质量API集可管理数据质量规则,更多数据质量API可参考DataWorks OpenAPI文档。

步骤六:生成数据服务API

我们通过元数据API完成了表创建,通过数据开发API完成文件和周期任务创建,通过数据质量和运维中心API配置好了监控规则,MaxCompute分区表数据亦可顺利产生,这时候我们还需要最后一个步骤把MaxCompute分区表的数据通过数据服务OpenAPI生成一个数据服务API向系统提供数据服务。

        CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
        createRequest.setTenantId(tenantId);
        createRequest.setProjectId(projectId);
        createRequest.setApiMode(apiMode);
        createRequest.setApiName(apiName);
        createRequest.setApiPath(apiPath);
        createRequest.setApiDescription("test");
        createRequest.setGroupId(groupId);
        createRequest.setVisibleRange(visibleRange);
        createRequest.setTimeout(10000);
        createRequest.setProtocols(protocols);
        createRequest.setRequestMethod(requestMethod);
        createRequest.setResponseContentType(responseType);

        CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);
        Long apiId = createResponse.getData();
        Assert.assertNotNull(apiId);

        GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
        getRequest.setTenantId(tenantId);
        getRequest.setProjectId(projectId);
        getRequest.setApiId(apiId);
        GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);
        GetDataServiceApiResponse.Data data = getResponse.getData();
        Assert.assertEquals(apiId, data.getApiId());
        Assert.assertEquals(0L, data.getFolderId().longValue());

使用CreateDataServiceApi、PublishDataServiceApi可把表数据转换成数据服务API,那么整个数据生产链路就完成了,集成以上的DataWorks OpenAPI即完成了本地系统和云上系统的无缝对接。

API调试小工具

DataWorks发布的所有API全部可在线调试,并以可见即所得的方式产生源码,这样可大大提高OpenAPI的开发效率,强烈推荐使用。DataWorks OpenAPI调试入口>>

总结

工欲善其数,必先利其器!DataWorks OpenAPI是2020年正式发布的企业数据开发提效神器。通过OpenAPI的方式,能够极大地提高企业使用DataWorks产品能力的灵活性。目前已发布150+个OpenAPI,并且还在持续增加中。本期实战旨在帮助企业用户了解如何快速上手DataWorks OpenAPI的实践应用,通过场景化的实战演练体验DataWorks OpenAPI的强大能力。实战系列内容持续更新中,感谢大家的关注!

关于Dataworks OpenAPI开通要求和开放地域可查阅DataWorks OpenAPI概述
限DataWorks企业版及以上使用立即开通

开通7天试用与折扣请使用钉钉扫码联系

墨祤二维码

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
10天前
|
数据采集 机器学习/深度学习 DataWorks
DataWorks产品评测:大数据开发治理的深度体验
DataWorks产品评测:大数据开发治理的深度体验
60 1
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
111 2
|
1月前
|
数据采集 DataWorks 搜索推荐
阿里云DataWorks深度评测:实战视角下的全方位解析
在数字化转型的大潮中,高效的数据处理与分析成为企业竞争的关键。本文深入评测阿里云DataWorks,从用户画像分析最佳实践、产品体验、与竞品对比及Data Studio公测体验等多角度,全面解析其功能优势与优化空间,为企业提供宝贵参考。
128 13
|
5月前
|
缓存 DataWorks 安全
DataWorks产品使用合集之如何进行触发式任务执行流程
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
存储 数据采集 DataWorks
DataWorks产品使用合集之在开发页面中看不到holiday_date资源,是什么导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
SQL JSON 分布式计算
DataWorks操作报错合集之如何解决在创建Hologres开发节点时报错
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
分布式计算 DataWorks 安全
DataWorks产品使用合集之如何通过API终止运行的流程
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
数据采集 运维 DataWorks
DataWorks产品使用合集之怎么区分是生产还是开发
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
分布式计算 运维 DataWorks
MaxCompute操作报错合集之用户已在DataWorks项目中,并有项目的开发和运维权限,下载数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。

热门文章

最新文章

相关产品

  • 大数据开发治理平台 DataWorks