开发者学堂课程【SaaS 模式云数据仓库实战:日志数据如何同步到 MaxCompute】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/332/detail/3719
日志数据如何同步到 MaxCompute
内容介绍:
一、实验目的
二、方案介绍
三、方案比较及场景应用
四、操作步骤
一、实验目的
日常工作中,企业需要将通过 ECS、容器、移动端、开源软件、网站服务、JS 等接入的实时日志数据进行日志实时查询与分析、采集与消费、数据清洗与流计算(ETL/Stream Processing)、数据仓库对接(Data Warehouse)等场景的使用。
二、方案介绍
方案一:使用 Tunnel 命令上传日志数据到 MaxCompute。
方案二:通过 DataHub 投递数据到 MaxCompute。DataHub DataConnector 是把 DataHub 服务中的流式数据同步到其他云产品中的功能,目前支持将 Topic 中的数据实时/准实时同步到 MaxCompute、OSS、ElasticSearch、RDS Mysql、ADS、TableStore 中。用户只需要向 DataHub 中写入一次数据,并在 DataHub 服务中配置好同步功能,便可以在各个云产品中使用这份数据。
方案三:通过 SLS 实时采集与消费(LogHub)投递数据到 MaxCompute。
通过 DataWorks 的数据集成(Data Integration)功能投递至 MaxCompute。
方案四:通过 Kafka 订阅实时数据投递至 MaxCompute。
方案二和方案三的差异很小,都是消息队列,对DataHub 建议进行公测或者自研。
三、方案比较及场景应用
Tunnel
Tunnel 用于批量上传数据到离线表里,适用于离线计算的场景。对于特殊格式日志,我们一般建议作为一个字段上传,到 MaxComopute 里再进行拆分。
DataHub
Datahub 用于实时上传数据的场景,主要用于流式计算的场景,数据上传后会保存到实时表里,后续会在几分钟内通过定时住务的形式同步到离线表里,供离线计算使用。
日志服务(SLS)
(实时采集与消费)LogHub 用途:数据清洗(ETL)、流计算(Stream Compute)、监控与报警、机器学习与迭代计算等。
投递数仓(LogShipper):稳定可靠的日志投递。将日志中枢数据投递至存储类服务进行存储。
支持压缩、自定义 Partition、以及行列等各种存储方式。
数据仓库+数据分析、审计、推荐系统与用户画像。
支持通过控制台数据接入向导一站式配置正则模式采集日志与设置索引。
(实时采集与消费)LogHub:实时性强,写入即可消费。Logtail(采集 Agent)实时采集传输,1秒内到服务端(99.9%情况)。写入即可查询分析。
海量数据:对数据量不设上限。
种类丰富:支持行、列、TextFile 等各种存储格式。
配置灵活:支持用户自定义 Partition 等配置。
Kafka
Kafka 是一款分布式发布与订阅的消息中间件,有高性能、高吞量的特点,每秒能处理上百万的消息。Kafka 适用于流式数据处理。
大数据领域:主要应用于用户行为跟踪、日志收集等场景。
数据集成:将消息导入 MaxCompute、OSS、RDS、Hadoop、HBase 等离线数据仓库。
四、操作步骤
1、方案一:通过 Tunnel 上传日志数据到 MaxCompute
环境准备及步骤:
(1)开通 MaxCompute 服务,安装 odpscmd 客户端。
日志数据,通过 Tunnel 上传到 MaxCompute。
(2)准备日志服务数据。
(3)创建 MaxCompute 表,用来储存日志数据。
(4)使用命令:
tunnel u C:\Users\Desktop\weijing_loghub_demo.csv tunnel_log。
下图表明执行后,我们投递的日志数据是成功的,说明已经通过 Tunnel 上传日志数据到 MaxCompute。
(5)查询表数据是否导入成功。
查询后发现表中有数据导入,说明日志数据已经成功导入到 MaxCompute 表中。
注意事项:
(1)使用 Tunnel 命令行工具上传数据当前不支持通配符或正则表达式命令,如果想使用正则表达式上传,可以借助方案三。
(2)对于特殊格式的日志数据,我们一般建议作为一个字段上传,到 MaxComopute 里再进行拆分。
2、方案二:通过 DataHub 投递日志数据到 MaxCompute
环境准备及步骤:
(1)登录阿里云 DataHub 控制台,创建 Project。
(2)进入 Project 列表->Project 查看,创建 Topic,有两种方式,一种是直接创建,一种是导入 MaxComopute 表结构,这里先介绍导入 MaxComopute 表结构。首先输入 MaxComopute 项目,选择项目名称,第二步输入 MaxComopute 表,这个表可以是已经创建的表,也可以是新建的表,新建表名会在 MaxComopute 中自动创建一个表,然后填写 AK 信息,选择自动创建 DataConnector,它会自动创建一个 DataConnector,然后填写 Topic 的名称。
(3)选择导入 MaxCompute 表结构、并勾选自动创建 DataConnector。
注意:准备对应的 MaxCompute 表,该表字段类型、名称、顺序必须与 DataHub Topic 字段完全一致,如果三个条件中的任意一个不满足,则归档 Connector 无法创建。Shard 数量中,默认一个 Shard 对应1000个 KPS,可以根据数据流量进行相应的设置,然后设置生命周期。
(4)创建好的 DataConnector 详细信息如下图所示。
包括同步时间、最新写入数据的时间、MaxCompute Endpoint 以及运行状态、章数据量、当前点位,当前点位是从0开始的,下图显示是2,说明导入了3条数据。现在介绍直接创建 Topic,首先需要输入 MaxCompute 的一个项目,然后同样输入 MaxCompute table,可以是已知表,也可以是新建表,然后输入相应的 AK 消息。
(5)如果已经创建 Topic,只需要在详情页的右上角点击
+DataConnector。
分区范围:SYSTEM_TIME、EVENT_TIME、USER_DE FINE 三种模式,SystemTime 模式会使用写入时间转化为字符串进行分区,EventTime 模式会根据 topic 中固定的 event_time 字段时间进行分区(需要在创建 Topic 时增加一个 TIMESTAMP 类型名称为 event_time 的字段,并且写入数据时向这个字段写入其微秒时间),UserDefine 模式将会直接使用用户自定义的分区字段字符串分区。分区格式定义现阶段仅支持固定的格式。
(6)创建完成后,回到 DataHub 控制台,点击 Topic,点击 DataConnector 可以查看 DataConnector 的详细信息,包括配置的目标服务、目标描述以及最新写入数据的时间。
(7)日志数据抽样
下图是点击数据抽样后,可以看到同步到 DataHub 中的一个日志数据。
查看 DataConnector 的详细信息,可以看到归档信息,包括当前点位、章数据以及运行状态,如果运行状态有失败,需要检查其原因,DataHub 投入数据到 MaxCompute 离线表中时,默认是64兆就会commit一次,合作5分钟进行一次强行写入,可以保证的是至少5分钟会有一次数据同步。
(8)测试日志数据是否投递成功。
进行全段扫描,可以查询下表有数据,说明通过 DataHub 投递日志数据到 MaxCompute 是成功的。
注意事项:
(1)目前所有 DataConnector 均仅支持同一 Region 的云服务之间同步数据,不支持同步数据到跨 Region 的服务。
(2)DataConnector 所配置的目标服务 Endpoint 需要填写相应的内网域名(经典网络),不支持使用公网域名同步。
(3)数据同步目前仅支持 at least once 语义,在网络服务异常等小概率场景下可能会导致目的端的数据产生重复但不会丢失,需要做去重处理。
(4)topic 默认20个,如果需要创建更多,需提交工单申请。
3、方案三:通过 LogHub 投递日志数据到 MaxCompute
有两种方式,一种是直接通过 LogHub 投递到 MaxCompute 表中,另外一种是借助数据集成 DataWorks 通过写脚本的方式投递数据,先介绍第一种方式,直接通过 LogHub 投递到 MaxCompute 表中的方式。
环境准备及步骤:
(1)开通日志服务,登录日志服务控制台,创建新的 Project 或者单击已经创建好的 Project 名称。
(2)创建新的 Logstore 或者单击已经创建好的 Logstore 名称。
(3)单击对应的 Logstore,查询分析导入到 LogHub 的日志数据。
下面几条数据导入到 LogHub 中,需要将这几条数据同步到 MaxCompute 表中。
(4)选择需要投递的日志库名称并依次展开节点,日志库名称->数据处理->导出->MaxCompute。单击开始投递。
(5)单击开启投递以进入 LogHub->数据投递页面。
(6)配置投递规则,在 LogHub->数据投递页面配置字段关联等相关内容。
a)自定义一个投递名称
b)MaxCompute 表名称,请输入自定义的新建的 MaxCompute 表名称或者选择已有的 MaxCompute 表。
c)按序,左边填写与 MaxCompute 表数据列相映射的日志服务字段名称,右边填写或选择 MaxCompute 表的普通字段名称及字段类型。
d)_partition_time_格式:将日志时间作为分区字段,通过日期来筛选数据是 MaxCompute 常见的过滤数据方法。_partition_time_是根据日志_time_值计算得到(不是日志写入服务端时间,也不是日志投递时间),结合分区时间格式,向下取整。
(7)在投递管理页面,单击修改即可针对之前的配置信息进行编辑。其中如果想新增列,可以在大数据计算服务 MaxCompute 修改投递的数据表列信息,则单击修改后会加载最新的数据表信息。
(8)投递任务管理。
在启动投递功能后,日志服务后台会定期启动离线投递任务,用户可以在控制台上看到这些投递任务的状态和错误信息。当投递任务发生错误时,请查看错误信息,问题解决后可以通过云控制台中日志投递任务管理或 SDK 来重试失败任务。
(9)查看日志投递的运行状态。
当日志开始投递时,它的状态是运行中,数据行数为0,当数据投递成功时,数据行数变成11,状态由运行中变为成功,说明通过 LogHub 投递日志数据到 MaxCompute 中成功了。
(10)日志投递 MaxCompute后,检查数据完整性。
a)通过控制台或 API/SDK 判断(推荐)
使用 API、SDK 或者控制台获取指定 Project/Logstore 投递任务列表。控制台会对该返回结果进行可视化展示。
b)通过 MaxCompute 分区粗略估计
比如在 MaxCompute 中以半小时做一次分区,投递任务为每30分钟一次,当表中包含以下分区:
2019_10_25_10_00
2019_10_25_10_30
当发现分区2019_10_25_11_00出现时,说明11:00之前分区数据已经完整。
该方法有一定的缺陷,它不依赖 API,判断方式简单但结果并不精确,仅用作粗略估计。如果要进行检查,建议使用另一种方式,通过控制台或 API/SDK 进行判断。
(11)查询 MaxCompute 表中数据,确认数据是否导入成功。
通过 LogHub 投递日志数据到 MaxCompute的注意事项:
(1)数加控制台创建、修改投递配置必须由主账号完成,不支持子账号操作。
(2)不同 Logstore 的数据请勿导入到同一个 MaxCompute 表中,否则会造成分区冲突、丢失数据等后果。
(3)MaxCompute 表至少包含一个数据列、一个分区列。
(4)MaxCompute 单表有分区数目6万的限制,分区数超出后无法再写入数据,所以日志服务导入 MaxCompute 表至多支持3个分区列。请谨慎选择自定义字段作为分区列,保证其值是可枚举的。
(5)日志服务数据的一个字段最多允许映射到一个 MaxCompute 表的列(数据列或分区列),不支持字段冗余,同一个字段名第二次使用时其投递的值为 null,如果 null 出现在分区列会导致数据无法被投递。
(6)投递 MaxCompute 是批量任务,请谨慎设置分区列及其类型:保证一个同步任务内处理的数据分区数小于512个;用作分区列的字段值不能为空或包括‘/’等 MaxCompute 保留字段。
(7)不支持海外 Region 的 MaxCompute 投递,海外 Region 的 MaxCompute 请使用DataWorks 进行数据同步。
4、方案三:通过 LogHub 借助 DataWorks 投递日志数据到 MaxCompute
环境准备及步骤:
(1)登录阿里云 LogHub 控制台,创建 Project。
(2)登录 DataWorks 控制台,单击对应项目进入数据集成。
(3)进入同步资源管理->数据源页面,单击右上角的新增数据源。
(4)选择数据源类型为 LogHub,填写新增 LogHub 数据源对话框中的配置。需要填写环境、数据源名称、LogHub Endpoint、在 LogHub 中创建的 project,以及 AK 信息。
填写完成后,单击测试连通性。测试连通性通过后,说明数据源可以正确使用,单击确定。
(5)配置同步任务
可选择向导模式,通过简单便捷的可视化页面完成任务配置,注意填写日志开始的时间和日志结束的时间;或者选择脚本模式,深度自定义配置您的同步任务。
新建业务流程->数据集成->新建数据集成节点->数据同步进入数据同步任务配置页面。
日志开始时间:数据消费的开始时间位点,为 yyyyMMddHHmmss 格式的时间字符串(比如20191025103000),精确到微秒,左闭右开。
日志结束时间:数据消费的结束时间位点,为 yyyyMMddHHmmss 格式的时间字符串(比如20191025113000),左闭右开。
批量条数:一次读取的数据条数,默认为256。
a)向导模式配置同步任务。
I)配置数据源及数据去向。数据来源就是 LogHub 填写 Logstore,配置日志开始时间、日志结束时间,数据去向就是选择 MaxCompute 中的数据源、选择要导入的表、填写分区信息、进行字段映射。
II)保存数据同步的配置。提交并运行。
III)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute。
b)脚本模式配置同步任务。
I)导入模板,选择数据源和目标数据源。
II)编辑脚本。
"
steps
"
:[
{
"
category
"
:
"
reader
"
,
"
name
"
:
"
Reader
"
,
"
parmeter
"
:{
"
batchSize
"
:
"
256
"
"
beginDateTime
"
:
"
20191105110000
"
,//日志开始时间
"
colum
"
:[
"
id
"
,
"
name
"
,
"
salenum
"
]
"
datasource
"
:
"
weitest
"
,//数据源名称
"
encoding
"
:
"
UTF-8
"
,//编码格式
"
endDateTime
"
:
"
20191105235959
"
,//日志结束时间
"
fieldDellmlter
"
:
"
.
"
,
"
logstore
"
:
"
loghub_ppt
"
//日志库
},
"
stepType
"
:
"
loglub
"
},
{
"
category
"
:
"
writer
"
,
"
name
"
:
"
writer
"
,
"
parmeter
"
:{
"
colum
"
:[
"
id
"
,
"
name
"
,
"
salenum
"
]
"
datasource
"
:
"
odps_first
"
,
"
partition
"
:
"
at_20191105
"
"
table
"
:
"
loghub_odps_copy
"
,
"
truncate
"
:true
},
"
stepType
"
:
"
odps
"
}
],
III)保存脚本数据节点,提交并运行。
IV)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute。
5、方案四:通过 Kafka 投递日志数据到 MaxCompute
环境准备及步骤:
(1)搭建 Kafka 集群。
(2)在控制台创建 Topic 和 Consumer Group。
(3)Flume 读取日志文件数据写入到 Kafka。
a)为 flume 构建 agent:先进入 flume 下的配文件夹里面编写构建 agent 的配置文件。
b)启动 flume 的 agent
bin/flume-ng agent-c conf-f 配置文件夹名/配置文件名
-n a1 -Dflume.root.logger=INFO,console
c)启动 kafka 的消费者 bin/kafka-console-consumer.sh
--zookeeper 主机名:2181--topic kafka_odps
这样就开启了日志采集,文件会写入到 kafka 中。
(4)通过数据集成 DataWorks 同步数据到 MaxCompute。
新建业务流程->新建数据同步节点->转换为脚本->配置脚本。
(5) 配置脚本信息,运行脚本进行数据节点的同步。
红色框是 read 端配置,绿色框是 writer 端配置。read 端要注意配置 server,格式是 IP和端口号,框写的列是 Kafka 的属性列,然后再填写相应的 topic,还需要填写 beginOffset 数据消费的开始节点。writer 端需要填写 table 的表名、datasource,这些都是必须进行选择的。
脚本配置完成后,点击提交,然后运行,运行成功后日志数据便会投递到 MaxCompute 表中。
(6)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute。下图表明数据投递成功。
注意事项:
(1)数据同步的脚本编写。如果脚本配置有误,数据运行将会失败,要注意关于 Reader、Writer 端的配置。
(2)Flume 采集日志数据中配置文件的配置。配置文件中的 topic 必须与 Kafka 创建的 topic 保持一致。