ODPS开发大全:入门篇(3)

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
云解析 DNS,旗舰版 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: ODPS开发大全:入门篇

  • 接下来,是一些经验总结

1、“数据来源”的“数据过滤”,不填表示全量同步MySQL的数据;也可以使用类似(gmt_create='${bizdate}')条件来过滤,每次增量同步MySQL的数据

2、“数据去向”的“一键生成目标表”功能,建表DDL语句需要人工检查下:

  • 填写好lifecycle,分区配置
  • 列名不要与odpsSQL关键字冲突
  • 可以自定义修改表名

3、“数据去向”的“分区信息”,当建的是分区表时,会自动出现该处的分区信息配置;若建的是非分区表,则不必配置

4、调度配置中,可以按需选择天、小时或其他时间粒度调度任务

5、非该odps项目空间的表不能在该odps项目空间做同步任务

6、odps个别字段内容太长,超出mysql表的该字段存储限制,也会导致写入idb失败,报脏数据(修改idb表字段类型,可将对应字段类型修改为 longtext)

7、idb表字段设为非null,但odps对应字段存在 null值,会导致写入idb失败,报脏数据(修改idb表定义,将对应字段改默认为null)

8、odps字段和idb字段不必非得一对一保持应,可以手动选择相关字段 连线 ,odps和idb字段可各有未参与同步的字段(注意:idb字段的剩余字段必须是可以自动填充或默认为null类型的)


 ODPS导入Hologres


目前对于需要周期性导入ODPS分区表数据到Hologres, Holo提供了两种导入方式:

方式一:一键可视化导入并且周期性导入,详情见datastudio一键导入

方式二:使用sql导入,详情见hologres sql


  • 一键可视化导入


  1. 新建任务,在数据开发单击一键数据同步,并填写节点信息。

image.png


2. 配置信息,填写同步信息。

image.png


3. 参数说明

参数

配置项

说明

备注

MaxCompute源表选择

目标连接

Hologres的实例名

目标库

Hologres的DB名

外部表来源

  • 已有外部表
  • 新建外部表
  • 已有外部表表示已经提前在Hologres中映射MaxCompute数据的外部表
  • 新建外部表表示无相应的外部表,需要同步时新建

外部表表名字

已有的外部表表名

外部表用于映射MaxCompute数据,需要与同步数据的MaxCompute表对应

目标表设置

目标Schema

当前DB下的schema名

默认为public,也可以选择新建schema并使用

目标表名

要导入数据的表名

需要同步表数据的内表名称,如已有表,执行后原表和数据将被删除重建

目标表描述

为目标表添加comment

同步设置

同步字段

选择需要同步的MaxCompute表字段

可以选择全部字段,也可以选择部分字段

分区配置

选择需要同步的分区字段

当前Hologres仅支持一级分区

索引配置

为目标表构建索引

索引的创建可以参见文档设置表属性

SQL Script

SQL Script

自动解析出当前运行的SQL,方便参照


  • 保持并运行,执行同步任务。任务执行完成之后,可以使用Hologres SQL查看数据。
  • 周期性调度


若是您需要周期性导数据,需要单机右侧调度配置进行任务配置,并且保存作业,然后点击右上角发布,将作业发布至生产环境进行周期性调度。


image.png


4. 总结

方式一优点:可视化操作,简单快捷,方便小白用户使用,能满足一般场景的使用。

方式一缺点:不支持修改SQL逻辑,若是需要修改,需将SQL Copy再新建一个Hologres SQL节点,根据业务逻辑修改SQL即可。


  • 自写sql导入


先介绍一下操作步骤,基本和方式一相同。


  1. 新建业务流程:选择左侧菜单栏数据开发--新建--业务流程,即可创建一个属于自己的业务流程。

image.png

2. 输入业务流程名称

image.png

3. 新建Hologres SQL

image.png

4. Hologres开发:打开新建的Hologres SQL选择对应Hologres实例,既可使用标准的Postgresql语言开发。

image.png


步骤1:准备MaxCompute表数据


--MaxCompute分区表DDLCREATE TABLE IF NOT EXISTS public_data.dwd_product_movie_basic_info(  movie_name STRING COMMENT '电影名称',  dirctor STRING COMMENT '导演',  scriptwriter STRING COMMENT '编剧',  area STRING COMMENT '制片地区/国家',  actors STRING COMMENT '主演',  `type` STRING COMMENT '类型',  movie_length STRING COMMENT '电影长度',  movie_date STRING COMMENT '上映日期',  movie_language STRING COMMENT '语言',  imdb_url STRING COMMENT 'imdb号') PARTITIONED BY (ds STRING) STORED AS ALIORC;
--查看分区表的某个分区数据SELECT * FROM public_data.xxxx_movie_basic_info WHERE ds = '20170112';


步骤2:Hologres新建外部表
移步HoloStudio,在SQL Console中新建一张外部表,用于映射MaxCompute源头表数据。外表的字段顺序和字段类型需要和MaxCompute一一对应。示例使用import foreign schema语法新建外部表SQL如下:



import foreign schema public_data limit to (dwd_product_movie_basic_info) from server odps_server into public options(if_table_exist 'update');


步骤3:Hologres新建真实存储表
在Hologres中新建一张真实的存储表,用于接收并存储数据。因为本次示例是将MaxCompute分区表导入Hologres分区表,因此需要在Hologres中创建一张分区表。



BEGIN;CREATE TABLE "public"."holo_dwd_product_movie_basic_info" (  "movie_name" text,  "dirctor" text,  "scriptwriter" text,  "area" text,  "actors" text,  "type" text,  "movie_length" text,  "movie_date" text,  "movie_language" text,  "imdb_url" text,  "ds" text)PARTITION BY LIST (ds);CALL SET_TABLE_PROPERTY('"public"."holo_dwd_product_movie_basic_info"', 'orientation', 'column');CALL SET_TABLE_PROPERTY('"public"."holo_dwd_product_movie_basic_info"', 'bitmap_columns', '"movie_name","dirctor","scriptwriter","area","actors","type","movie_length","movie_date","movie_language","imdb_url","ds"');CALL SET_TABLE_PROPERTY('"public"."holo_dwd_product_movie_basic_info"', 'dictionary_encoding_columns', '"movie_name:auto","dirctor:auto","scriptwriter:auto","area:auto","actors:auto","type:auto","movie_length:auto","movie_date:auto","movie_language:auto","imdb_url:auto","ds:auto"');CALL SET_TABLE_PROPERTY('"public"."holo_dwd_product_movie_basic_info"', 'time_to_live_in_seconds', '3153600000');comment on column "public"."holo_dwd_product_movie_basic_info"."movie_name" is '电影名称';comment on column "public"."holo_dwd_product_movie_basic_info"."dirctor" is '导演';comment on column "public"."holo_dwd_product_movie_basic_info"."scriptwriter" is '编剧';comment on column "public"."holo_dwd_product_movie_basic_info"."area" is '制片地区/国家';comment on column "public"."holo_dwd_product_movie_basic_info"."actors" is '主演';comment on column "public"."holo_dwd_product_movie_basic_info"."type" is '类型';comment on column "public"."holo_dwd_product_movie_basic_info"."movie_length" is '电影长度';comment on column "public"."holo_dwd_product_movie_basic_info"."movie_date" is '上映日期';comment on column "public"."holo_dwd_product_movie_basic_info"."movie_language" is '语言';comment on column "public"."holo_dwd_product_movie_basic_info"."imdb_url" is 'imdb号';COMMIT;


步骤4:新建分区子表数据开发


在hologres sql中另开一个作业,用于分区表跑调度。



--创建临时分区子表BEGIN;CREATE TABLE IF NOT EXISTS "public".tmp_holo_dwd_product_movie_basic_info_${bizdate}  (  "movie_name" text,  "dirctor" text,  "scriptwriter" text,  "area" text,  "actors" text,  "type" text,  "movie_length" text,  "movie_date" text,  "movie_language" text,  "imdb_url" text,  "ds" text);COMMIT;
--更新外表数据import foreign schema public_data limit to (dwd_product_movie_basic_info) from server odps_server into public options(if_table_exist 'update');
--等待30s再导入Hologres,以防Hologres meta信息更新缓存慢导致的数据不一致而同步不成功select pg_sleep(30); 
--将Maxcompute数据导入临时分区子表INSERT INTO "public".tmp_holo_dwd_product_movie_basic_info_${bizdate} SELECT     "movie_name",    "dirctor",    "scriptwriter",    "area",    "actors",    "type",    "movie_length",    "movie_date",    "movie_language",    "imdb_url",    "ds"FROM "public".dwd_product_movie_basic_infoWHERE ds='${bizdate}';
--导入的场景逻辑比较多,下面有两个场景供参考,可以根据业务逻辑二选一即可
--场景1:导入新的分区数据可以参考以下逻辑,BEGIN;
ALTER TABLE "public".tmp_holo_dwd_product_movie_basic_info_${bizdate} RENAME TO holo_dwd_product_movie_basic_info_${bizdate};
--将临时分区子表绑定在分区父表上ALTER TABLE "public".holo_dwd_product_movie_basic_info ATTACH PARTITION "public".holo_dwd_product_movie_basic_info_${bizdate} FOR VALUES in ('${bizdate}');
COMMIT;
--场景2:重新对历史分区数据刷新可以参考该逻辑BEGIN;
ALTER TABLE IF EXISTS "public".holo_dwd_product_movie_basic_info DETACH PARTITION "public".holo_dwd_product_movie_basic_info_${bizdate};
DROP TABLE IF EXISTS "public".holo_dwd_product_movie_basic_info_${bizdate};
ALTER TABLE "public".tmp_holo_dwd_product_movie_basic_info_${bizdate} RENAME TO holo_dwd_product_movie_basic_info_${bizdate};
--将分区子表绑定在分区父表上ALTER TABLE "public".holo_dwd_product_movie_basic_info ATTACH PARTITION "public".holo_dwd_product_movie_basic_info_${bizdate} FOR VALUES in ('${bizdate}');
COMMIT;


步骤5:调度配置
1)基础属性配置将基础属性--参数赋值为时间节点,如示例所示:

image.png


2)时间属性设置

主要设置时间的重跑属性,其余参数可以根据业务情况自行设置。

image.png


3)调度依赖设置

调度依赖为root节点即可(也可以根据业务逻辑选择已有的父节点)请先单击自动解析为,然后单击使用工作空间根节点,会自动解析出root节点,然后将自动解析设置为

image.png


步骤6发布调度
调度参数配置完成之后,单击保存--提交,提交成功后单击运维前往运维中心。

步骤7:运维中心发布
在跳转出来的运维中心,选择已提交成功成功的节点,右键单击节点,选择补数据--当前节点。并根据业务情况设置节点配置。

image.png


补完数据之后,在补数据实例可以看到正在运行的任务,以及任务运行状态。

image.png
步骤8Hologres查看数据
任务执行成功之后,将会在Hologres中自动创建对应分区数据的分区子表,可以返回datastudio,新开一个hologres sql节点,执行语句查询数据是否写入成功。






--查看分区子表数据select * from holo_dwd_product_movie_basic_info_20170112;
--查看分区父表总数据select count (*) from holo_dwd_product_movie_basic_info;


总结:

方式二优点:可按业务需求进行定制,对sql进行修改,满足复杂特定场景的需求,包括历史数据格式转换、数据清理等;通过SQL导入性能更优。

方式二缺点:有一定学习成本,初学者不太适合,可先通过方式一了解其数据同步的流程和原理,再切换到方式二。


  • 流程梳理


-- 1.创建外表CREATE FOREIGN TABLE IF NOT EXISTS ${odps_table_name} (  "user_id" bigint,  "user_name" text,  "ds" text)SERVER odps_serverOPTIONS (project_name 'onetag', table_name '${odps_table_name}');COMMIT;
-- 2.刷新外表的SchemaIMPORT FOREIGN SCHEMA ${odps_project} LIMIT to(  ${odps_table_name}) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
-- 3.清理潜在的临时表BEGIN ;DROP TABLE IF EXISTS ${holo_table_name}_tmp_${bizhour};COMMIT ;
-- 4.创建临时表BEGIN ;CREATE TABLE IF NOT EXISTS "public".${holo_table_name}_tmp_${bizhour} (  "user_id" bigint,  "user_name" text,  "ds" text,  PRIMARY KEY (user_id,ds));  COMMIT;
-- 5.通过查询外表,向临时表插入数据INSERT INTO ${holo_table_name}_tmp_${bizhour}SELECT *FROM public.${odps_table_name}WHERE ds='${bizhour}';
-- 6.替换子表BEGIN ;
-- 6.1删除已经存在的子表DROP TABLE IF EXISTS ${holo_table_name}_${bizhour};
-- 6.2将临时表改名ALTER TABLE ${holo_table_name}_tmp_${bizhour} RENAME TO ${holo_table_name}_${bizhour};
-- 6.3将临时表绑定至指定分区表ALTER TABLE ${holo_table_name} ATTACH PARTITION ${holo_table_name}_${bizhour}FOR VALUES IN ('${bizhour}');COMMIT ;
-- 7. 大量数据导入后执行ANALYZE分区表父表操作ANALYZE ${holo_table_name};


注意点:

  • 使用临时表的原因是为了保证原子性,只有在导入完成后才绑定至分区表,为了避免导入任务失败时还需要重新删除表等操作。
  • 对于更新子表分区数据场景,需要删除子表和重新绑定临时表放入一个事务过程中,保证该过程的事务性。
  • MaxCompute的表数据更新之后,在Hologres存在缓存延迟(一般为10分钟内),建议在导入数据前使用IMPORT FOREIGN SCHEMA语法更新外部表以获取最新数据。
  • 导入MaxCompute数据至Hologres时,建议使用SQL导入,不建议使用数据集成导入,因为使用SQL导入性能表现更优。



注意事项


 调度配置


这里强调一下调度参数:

调度参数通常会被用于指代某些动态时间的场景,此场景下,可基于业务日期和定时时间进行调度参数的取值设置。配置调度参数前,您可先了解这两个时间概念,便于后续设置调度参数取值。


取值方式

参数格式

参数示例

相关参考

基于业务日期获取时间数据

通常,使用大括号${...},结合yyyy、yy、mm及dd自定义组合生成时间参数,获取业务日期前后多少年、月、天。

可通过${yyyymmdd}、${yyyy-mm-dd}等${...}自定义时间格式获取,例如:

  • ${yyyymmdd±N}
  • ${yyyymmdd±7*N}
  • ${yy±N}
  • ${mm}
  • ${yyyy-mm-dd±N}

更多赋值示例,请参见自定义参数${...}

基于定时时间获取时间数据

通常,使用中括号$[...],结合yyyy、yy、mm、dd、hh24、mi及ss自定义组合生成时间参数,获取定时时间前后多少年、月、天、小时、分钟、秒。

可通过$[yyyymmddhh24miss]等$[...]自定义时间格式获取。例如,取前一天的前一小时,参数表达式为$[yyyymmdd-1-1/24]。

  • 更多赋值示例,请参见自定义参数$[...]
  • 取多少小时、分钟,可能存在跨天问题,跨天时间的参数处理


内置参数


内置参数

定义

$bizdate

业务日期,格式为yyyymmdd,与自定义参数${yyyymmdd}取值一致。

该参数的应用较为广泛,日常调度中默认任务预期运行时间的前一天为业务日期。

$cyctime

任务的定时时间,格式为yyyymmddhh24miss,与自定义参数$[yyyymmddhh24miss]取值一致。

$gmtdate

当前日期,格式为yyyymmdd。

该参数默认取当天日期,执行补数据操作时输入的日期为业务日期+1。

$bizmonth

业务月份,格式为yyyymm。

  • 如果业务日期的月份与当前月份一致,则$bizmonth=业务日期月份-1。
  • 如果业务日期的月份与当前月份不一致,则$bizmonth=业务日期月份。

$jobid

任务所属的业务流程ID。

$nodeid

节点ID。

$taskid

节点产生的实例ID。


调度配置的各板块:

配置基础属性

名称,节点ID,节点类型,责任人,描述

配置调度参数

调度参数支持的格式

配置并使用调度参数

配置时间属性

时间属性配置说明

实例生成方式:发布后即时生成实例

调度周期:分钟/小时/天/月调度

配置资源属性

默认配置为公共调度资源组。

配置调度依赖

调度依赖配置

配置同周期调度依赖

配置依赖上一周期(跨周期依赖)

复杂依赖场景调度配置原则

配置节点上下文

在节点上下文配置本节点输入参数和本节点输出参数

输出参数的取值分为常量和变量两种类型

配置输入参数,在调度依赖中添加依赖的上游节点


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
Hadoop入门基础(五):Hadoop 常用 Shell 命令一网打尽,提升你的大数据技能!
|
3月前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
56 0
|
3月前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
101 0
|
4月前
|
SQL 存储 分布式计算
MaxCompute 入门:大数据处理的第一步
【8月更文第31天】在当今数字化转型的时代,企业和组织每天都在产生大量的数据。有效地管理和分析这些数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个用于处理海量数据的大规模分布式计算服务。它提供了强大的存储能力以及丰富的数据处理功能,让开发者能够快速构建数据仓库、实时报表系统、数据挖掘等应用。本文将介绍 MaxCompute 的基本概念、架构,并演示如何开始使用这一大数据处理平台。
600 0
|
4月前
|
数据可视化
Echarts数据可视化开发| 智慧数据平台
Echarts数据可视化开发| 智慧数据平台
|
4月前
|
数据可视化
Echarts数据可视化大屏开发| 大数据分析平台
Echarts数据可视化大屏开发| 大数据分析平台
|
4月前
|
分布式计算 大数据 Java
Scala 入门指南:从零开始的大数据开发
Scala 入门指南:从零开始的大数据开发
|
2月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
16天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
128 7