Kafka数据如何同步至MaxCompute之实践讲解

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 本次分享主要介绍Kafka产品的原理和使用方式,以及同步数据到MaxCompute的参数介绍、独享集成资源组与自定义资源组的使用背景和配置方式、Kafka同步数据到MaxCompute的开发到生产的整体部署操作等内容。

摘要:本次分享主要介绍Kafka产品的原理和使用方式,以及同步数据到MaxCompute的参数介绍、独享集成资源组与自定义资源组的使用背景和配置方式、Kafka同步数据到MaxCompute的开发到生产的整体部署操作等内容。

演讲嘉宾简介:耿江涛,阿里云智能技术支持工程师

以下内容根据演讲视频以及PPT整理而成。
本次分享主要围绕以下两个方面:

一、背景介绍
二、具体操作流程
1.Kafka消息队列使用以及原理
2.资源组介绍以及配置
3.同步过程及其注意事项
4.开发测试以及生产部署

一、背景介绍
1. 实验目的
在日常工作中,很多企业将APP或网站产生的行为日志和业务数据通过Kafka收集之后做两方面的处理。一方面是离线处理,一方面是实时处理。并且一般会投递到MaxCompute中作为模型的构建,进行相关的业务处理,如用户的特征、销售排名、订单地区分布等。这些数据形成之后会在数据报表中作为展示。

2. 方案说明
Kafka数据同步到DataWorks有两条链路。一条链路是业务数据和行为日志通过Kafka,再通过Flume 上传到Datahub,以及Max Compute,最终在QuickBI进行展示。另一条链路是业务数据和行为日志通过Kafka以及DataWorks,MaxCompute,最终在QuickBI当中展示。
本次展示Kafka通过DataWorks上传到MaxCompute的流程。从DataWorks上传到MaxCompute是通过两种方案进行上传数据同步的。方案一是自定义资源组,方案二是独享资源组。自定义资源组一般适用于复杂网络的数据上云场景。独享资源组操作方式主要针对集成资源不足的情况。

image

二、具体操作流程
1.Kafka消息队列使用及其原理
Kafka产品概述:消息队列 for Apache Kafka 是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列for Apache Kafka一般用于日志收集、监控数据聚合、流式数据处理、在线离线分析等大数据领域。消息队列 for Apache Kafka 针对开源的 Apache Kafka 提供全托管服务,彻底解决开源产品长期以来的痛点。云上Kafka具有低成本、更弹性、更可靠的优势,用户只需专注于业务开发,无需部署运维。

image

Kafka架构介绍:如下图所示,一个典型的Kafka集群主要分为四部分。Producer生产数据并通过 push 模式向消息队列 for Apache Kafka 的 Kafka Broker 发送消息。发送的消息可以是网站的页面访问、服务器日志,也可以是 CPU 和内存相关的系统资源信息。Kafka Broker用于存储消息的服务器。Kafka Broker 支持水平扩展。 Kafka Broker 节点的数量越多,Kafka 集群的吞吐率越高。Kafka Broker针对topic会partition一个概念,partition有leader、follower的角色分配。Consumer通过 pull 模式从消息队列 for Apache Kafka Broker 订阅并消费leader的信息数据。其中partition内部有offset作为消息的消费点位。通过ZooKeeper管理集群的配置、选举 leader 分区,并且在Consumer Group 发生变化时,管理partition_leader的负载均衡。

image

Kafka消息队列购买以及部署:如下图,用户首先可以到Kafka消息队列产品页面点击购买,根据个人情况选择对应包年、包月等消费方式、地区、实例类型、磁盘、流量以及消息存放时间。其中较为重要的一点是要选择对应地区,如果用户的MaxCompute在华北,那么尽量选择华北地区。选择开通完成后需要进行部署。点击部署,选择合适的VPC及其交换机进行部署。

image

部署完成后进入Kafka Topic管理页面,点击创建Topic输入自己的Topic。Topic命名下面有三条注意信息,命名尽量跟自己的业务一致,比如是财经业务或者是商务业务,尽量进行区分。第四步进入Consumer Group管理,点击创建Consumer Group创建自己所需要的Consumer Group。Consumer Group的命名也需要规范,如果是财经或商务业务,尽量和自己的Topic相对应。

image

Kafka白名单配置:Kafka安装部署完成之后确认需要访问Kafka的服务器或产品的白名单。下图中的默认接入点即为访问接口。

image

2.资源组介绍及其配置
自定义资源组的使用背景:自定义资源组一般针对IDC之间的网络问题。本地网络和云上网络存在差异,如DataWorks可以通过免费传输能力(默认任务资源组)进行海量数据上云,但默认资源组无法实现传输速度存在较高要求或复杂环境中的数据源同步上云的需求。此时用户可以使用自定义资源组可实现复杂环境同步上云的需求,解决DataWorks默 认资源组与您的数据源不通的问题,或实现更高速度的传输能力。然而,自定义资源组主要解决的还是复杂网络环境上云同步问题,打通任意网络环境之间的数据传输同步。

image

自定义资源组的配置:自定义资源组的配置需要六步操作,首先点击进入DataWorks控制台,点开工作空间的列表,选择用户需要的项目空间,点击进入数据集成,即确认自己的数据集成是要在哪个空间项目下进行添加。之后,点击进入数据源界面,点击新增自定义资源组。要注意页面右上角的新增自定义资源组是只有项目管理员有权限添加。

image

第三步是确认Kafka与需要添加的自定义资源组属于同一个VPC下。本次实验是ECS向Kafka发送消息,二者的VPC应该一致。第四步登录ECS,即个人的自定义资源组。执行命令dmidecode|grep UUID得到ECS的UUID。

image

第五步是将添加服务器UUID以及自定义资源组的IP或机器CPU和内存填写进来。最后是在ECS上执行相关命令,Agent安装共5步,做一一确认,在第4小步完成后点击刷新查看服务是否为可用状态。添加完成后进行检查连通测试,检查是否添加成功。

image

独享资源组的使用背景:一些客户反映在Kafka同步到MaxCompute时会报资源不足的问题,可以通过新增独享资源组的方式进行数据同步。独享资源模式下,机器的物理资源(网络、磁盘、CPU和内存等)完全独享。不仅可以隔离用户间的资源使用,也可以隔离不同工作空间任务的资源使用。此外,独享资源也支持灵活的扩容、缩容功能,可以满足资源独
享、灵活配置等需求。独享资源组可以访问在同一地域下的VPC数据源,同时也可以访问跨地域的公网RDS地址。

image

独享资源组的配置:独享资源组的配置主要需要两步操作,首先进入DataWorks控制台的资源列表,点击新增独享资源组,包括独享集成资源组和独享调度资源组。此处选择新增独享集成资源组,点击购买时仍要注意选择对应的购买方式、区域、资源、内存、时间期限、数量等。

image

购买完成后需要把独享集成资源组绑定到与Kafka对应的VPC,点击专有网络绑定,选择与Kafka对应的交换机(最明显的是可用区的区别)、安全组。

image

3.同步过程及其注意事项
Kafka同步到MaxCompute的需要进行相关参数配置同时需要注意以下几个事项。
DataWorks数据集成操作:进入DataWorks操作界面,点击创建业务流程,在新建的业务流程添加数据同步节点,再进行命名。

image

如下图所示,进入数据同步节点,包括Reader端和Writer端,点击Reader端数据源为Kafka,Writer端数据源为ODPS。点击转化为脚本模式。下图右上角是帮助文档,Reader或Writer端的一些同步参数可以在此处就近点击,方便阅读、操作和理解。

image

Kafka Reader的主要参数:Kafka Reader的主要参数首先server,上文所述Kafka的默认接入点就是其中一个server,ip:port。注意此处server是必填参数。topic,表示在Kafka部署完成之后,Kafka处理数据源的topic,此处也是必填参数。下一个参数是针对列column,column支持常量列、数据列、属性列。常量列和数据列不太重要。同步的完整消息一般存放在属性列 value 中,如果需要其它信息,如partition、offset、timestamp,也可以在属性列中筛选。column是必填参数。

image

keyType、valueType各有6种类型,根据用户同步的数据,选择相应的信息,同步一个类型。需要注意同步方式是按消息时间同步,还是按消费点位置同步的。按数据消费点位置同步有四个场景,beginDateTime,endDateTime,beginOffset,endOffset。 beginDateTime 和beginOffset 二选其一,作为数据消费起点。endDateTime 和endOffset 二选其一。需要注意beginDateTime、endDateTime 中需要Kafka0.10.2版本以上才支持按数据消费点位置同步功能。另外需要注意beginOffset有三个比较特殊的形式:seekToBeginning,表示从开始点位消费数据;seekToLast,表示从上次消费的偏移位置消费数据,按照beginOffset从上次偏移位置只能一次消费,如果使用beginDateTime则可以多次消费,这取决于消息存放时间;seekToEnd,表示从最后点位消费数据,会读取到空数据。

image

skipExceeedRecord没有太大作用,是不必填项。partition对topic所有分区共同读消费的,所以无需自定义一个分区,是非必填项。kafkaConfig,如果有其它相关配置参数可以扩展配置在kafkaConfig,kafkaConfig也是非必填项。

image

MaxCompute Writer的主要参数:dataSource是数据源名称,添加ODPS数据源。tables,表示所创建的数据表的表名称,Kafka的数据要同步到哪张表中,相应的字段也可以建立。
partition,如果表为分区表,则必须配置到最后一级分区,确定同步位置。若为非分区表,则不必填。column,尽量与Kafka column中的相关字段做一一对应的操作。同步的字段对应,信息同步才能确认成功。truncate,写入时同步的数据是选择以追加模式写还是以覆盖模式写,尽量避免多个DDL同时操作一个分区,或者在多个并发作业启动前提前创建分区。

image

Kafka同步数据到MaxCompute:将下图拆分为三部分。Kafka的Reader端,MaxCompute的Writer端以及限制参数。Reader包含server、endOffset、kafkaConfig、group.id、valueType、ByteArray、column字段、topic、beginOffset、seekToLast等。MaxCompute的Writer端包含覆盖、追加、压缩、查看源码、同步到的表、字段要和Kafka的Reader端做一一对应,最重要的是value数据同步。限制参数,主要有errorlimit,数据超过几个错误后会进行报错;speed,可以限制流速、并发度等。

image

参考Kafka生产者SDK编写代码:最终生产出的数据要发送到Kafka中,通过相关代码可以查看用户的生产数据。下图一段代码表示配置信息的读取,协议、序列化方式以及请求的等待时间,需要发送哪一个topic,发送什么样的消息。发送完成后回传一个信息。详细代码可以参考配置文件、消息来源、生产者消费者的代码模板:
https://help.aliyun.com/document_detail/99957.html?spm=a2c4g.11186623.6.566.45fc54eayX69b0

image

代码打包运行在ECS上(与Kafka同一个可用区):如下图所示,执行crontab-e命令,每到17:00执行一次。下图为发送日志完成后的消息记录。

image

在MaxCompute上创建表:进入DataWorks业务流程页面,创建目标表,使用一个DDL语句创建同步的表,或根据用户个人业务相应创建不同的表的字段。

image

4.开发测试以及生产部署
选择自定义资源组(或独享集成资源组)进行同步操作:下图所示,选择右上角“配置任务资源组”,根据用户个人需求选择资源组,点击执行。执行完成后,会出现标识显示成功,同步数据记录以及结果是否成功。同步过程基本结束。

image

查询同步的数据结果:在DataWorks临界面查看同步结果,在临时节点点击查询命令,select * from testkafka3(表),查看数据同步结果。数据已经同步过来,证明测试成功。

image

设置调度参数:业务流程开发数据同步之后,会对相关模型进行一些业务处理,最后设计一些SQL节点、同步节点,进行部署。如下图所示,在右侧点击调度配置,输入调度时间。具体操作可参考DataWorks官方文档完善业务处理流程。

image

提交业务流程节点,并打包发布:点击业务流程,选择所需要提交的节点并提交。一些业务流程提交之后不需要放到生产环境当中。然后进入任务发布界面,将节点添加到待发布进行任务部署。

image

确认业务流程发布成功:最后在运维中心页面,确认发布是否在生产环境中存在。至此Kafka同步数据到MaxCompute过程结束。到了对应的调度时间,在各个节点或者右上角会有节点的日志展示,可以查看日志运行情况是否正常,或是否需要进行后续操作,部署数据或是相关命令。

image

欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
image

目录
相关文章
|
1月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
79 4
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
175 0
|
14天前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
1月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
2月前
|
算法 大数据 数据库
云计算与大数据平台的数据库迁移与同步
本文详细介绍了云计算与大数据平台的数据库迁移与同步的核心概念、算法原理、具体操作步骤、数学模型公式、代码实例及未来发展趋势与挑战。涵盖全量与增量迁移、一致性与异步复制等内容,旨在帮助读者全面了解并应对相关技术挑战。
48 3
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
44 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
35 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
56 1
|
2月前
|
SQL 消息中间件 分布式计算
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
89 0

热门文章

最新文章