海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(中)

简介: 海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用

在streampark添加flink conf




构建 flink1.16.0 基础镜像从 dockerhub拉取对应版本的镜像

#拉取镜像
docker pull flink:1.16.0-scala_2.12-java8
#打上 tag
docker tagflink:1.16.0-scala_2.12-java8  registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
#push 到公司仓库
docker pushregistry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8

创建 Dockerfile 文件 & target 目录将 flink-oss-fs-hadoop JAR包放置在该目录 Shaded Hadoop OSS file system jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/

.

├── Dockerfile

└── target

 └── flink-oss-fs-hadoop-1.16.0.jar

touch Dockerfile
mkdir target
#vim Dockerfile
FROM registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
RUN mkdir /opt/flink/plugins/oss-fs-hadoop
COPY target/flink-oss-fs-hadoop-1.16.0.jar  /opt/flink/plugins/oss-fs-hadoop

#build 基础镜像

docker build -t flink-table-store:v1.16.0 .
docker tag flink-table-store:v1.16.0 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0
docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0

#准备 paimon jar 包

可以在 Apache Repository下载对应版本,需要注意的是要和 flink 大版本保持一致


使用 streampark 平台提交 paimon 任务


前提条件:

Kubernetes 客户端连接配置

Kubernetes RBAC 配置

容器镜像仓库配置 (案例中使用的是阿里云的容器镜像服务个人免费版)

创建挂载 checkpoint/savepoint 的 pvc 资源


Kubernetes 客户端连接配置:

将k8s master节点~/.kube/config配置直接拷贝到streampark服务器的目录,之后在streampark服务器执行以下命令显示k8s集群running代表权限和网络验证成功。

kubectl cluster-info


Kubernetes RBAC 配置

创建streamx命名空间

kubectl create ns streamx


使用default账户创建clusterrolebinding 资源

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=streamx:default


容器镜像仓库配置


案例中使用阿里云容器镜像服务ACR,也可以使用自建镜像服务harbor代替。

创建命名空间streampark(安全设置需要设置为私有)



在streampark配置镜像仓库,任务构建镜像会推送到镜像仓库


创建k8s secret密钥用来拉取ACR中的镜像 streamparksecret为密钥名称 自定义

kubectl create secret docker-registry streamparksecret --docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com --docker-username=xxxxxx --docker-password=xxxxxx -n streamx


创建挂载 checkpoint/savepoint 的 pvc 资源


基于阿里云的对象存储OSS做K8S的持久化


OSS CSI 插件:

可以使用 OSS CSI 插件来帮助简化存储管理。您可以使用 csi 配置创建 pv,并且 pvc、pod 像往常一样定义,yaml 文件参考:https://bondextest.oss-cn-zhangjiakou.aliyuncs.com/ossyaml.zip


配置要求:

- 创建具有所需 RBAC 权限的服务帐户

参考:https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss.md 

kubectl -f rbac.yaml

- 部署OSS CSI 插件

kubectl -f oss-plugin.yaml

- 创建CP&SP的PV


kubectl -f checkpoints_pv.yaml kubectl -f savepoints_pv.yaml

- 创建CP&SP的PVC


kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml

配置好依赖环境,接下来我们就开始使用paimon进行流式数仓的分层开发。


案例:

统计海运空运实时委托单量


任务提交:

初始化paimon catalog配置


SET 'execution.runtime-mode' = 'streaming';
set 'table.exec.sink.upsert-materialize' = 'none'; 
SET 'sql-client.execution.result-mode' = 'tableau';
-- 创建并使用 FTS Catalog 底层存储方案采用阿里云oss
CREATE CATALOG `table_store` WITH (
'type' = 'paimon',
'warehouse' = 'oss://xxxxx/xxxxx' #自定义oss存储路径
);
USE CATALOG `table_store`;

一个任务同时抽取postgres、mysql、sqlserver三种数据库的表数据写入到paimon




Development Mode:Flink SQL

Execution Mode :kubernetes application

Flink Version :flink-1.16.0-scala-2.12

Kubernetes Namespace :streamx

Kubernetes ClusterId :(任务名自定义即可)

Flink Base Docker Image : registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0    #上传到阿里云镜像仓库的基础镜像

Rest-Service Exposed Type:NodePort

paimon基础依赖包:

paimon-flink-1.16-0.4-20230424.001927-40.jar

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

flinkcdc依赖包下载地址:

https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.2.0


pod template

apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
  - name: flink-main-container
    volumeMounts:
      - name: flink-checkpoints-csi-pvc
        mountPath: /opt/flink/checkpoints
      - name: flink-savepoints-csi-pvc
        mountPath: /opt/flink/savepoints
volumes:
  - name: flink-checkpoints-csi-pvc
    persistentVolumeClaim:
      claimName: flink-checkpoints-csi-pvc
  - name: flink-savepoints-csi-pvc
    persistentVolumeClaim:
      claimName: flink-savepoints-csi-pvc
imagePullSecrets:      
- name: streamparksecret

flink sql

1. 构建源表和paimon中ods表的关系,这里就是源表和目标表一对一映射

-- postgre数据库 示例
CREATE TEMPORARY TABLE `shy_doc_hdworkdochd` (
`doccode` varchar(50) not null COMMENT '主键',
`businessmodel` varchar(450) COMMENT '业务模式',
`businesstype` varchar(450)  COMMENT '业务性质',
`transporttype` varchar(50) COMMENT '运输类型',
......
`bookingguid` varchar(50) COMMENT '操作编号',
PRIMARY KEY (`doccode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',,
'table-name' = 'doc_hdworkdochd',
'debezium.slot.name' = 'hdworkdochdslotname03'
);
CREATE TEMPORARY TABLE `shy_base_enterprise` (
`entguid` varchar(50) not null COMMENT '主键',
`entorgcode` varchar(450) COMMENT '客户编号',
`entnature` varchar(450)  COMMENT '客户类型',
`entfullname` varchar(50) COMMENT '客户名称',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',
'table-name' = 'base_enterprise',
'debezium.snapshot.mode'='never', -- 增量同步(全量+增量忽略该属性)
'debezium.slot.name' = 'base_enterprise_slotname03'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_shy_jh_doc_hdworkdochd` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `doccode`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_doc_hdworkdochd` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
CREATE TABLE IF NOT EXISTS ods.`ods_shy_base_enterprise` (
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_base_enterprise` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_doc_hdworkdochd';
INSERT INTO
ods.`ods_shy_jh_doc_hdworkdochd`
SELECT
*
,YEAR(`docdate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_doc_hdworkdochd` where `docdate` is not null and `docdate` > '2023-01-01';
SET 'pipeline.name' = 'ods_shy_base_enterprise';
INSERT INTO
ods.`ods_shy_base_enterprise`
SELECT
*
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_base_enterprise` where entorgcode is not null and entorgcode <> '';
-- mysql数据库 示例
CREATE TEMPORARY TABLE `doc_order` (
`id` BIGINT NOT NULL COMMENT '主键',
`order_no` varchar(50) NOT NULL COMMENT '订单号',
`business_no` varchar(50) COMMENT 'OMS服务号',
......
`is_deleted` int COMMENT '是否作废',
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '库名',
'table-name' = 'doc_order'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_bondexsea_doc_order` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `id`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `doc_order` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_bondexsea_doc_order';
INSERT INTO
ods.`ods_bondexsea_doc_order`
SELECT
*
,YEAR(`gmt_create`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `doc_order` where gmt_create > '2023-01-01';
-- sqlserver数据库 示例
CREATE TEMPORARY TABLE `OrderHAWB` (
`HBLIndex` varchar(50) NOT NULL COMMENT '主键',
`CustomerNo` varchar(50) COMMENT '客户编号',
......
`CreateOPDate` timestamp COMMENT '制单日期',
PRIMARY KEY (`HBLIndex`) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dbo',
-- 'debezium.snapshot.mode' = 'initial' -- 全量增量都抽取
'scan.startup.mode' = 'latest-offset',-- 只抽取增量数据
'table-name' = 'OrderHAWB'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_airsea_airfreight_orderhawb` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `HBLIndex`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `OrderHAWB` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_airsea_airfreight_orderhawb';
INSERT INTO
ods.`ods_airsea_airfreight_orderhawb`
SELECT
RTRIM(`HBLIndex`) as `HBLIndex`
......
,`CreateOPDate`
,YEAR(`CreateOPDate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `OrderHAWB` where CreateOPDate > '2023-01-01';

业务表数据实时写入paimon ods表效果如下:

2. 将ods层表的数据打宽写入dwd层中,这里其实就是将ods层相关业务表合并写入dwd中,这里主要是处理count_order字段的值,因为源表中的数据存在逻辑删除和物理删除所以通过count函数统计会有问题,所以我们这里采用sum聚合来计算单量,每个reference_no对应的count_order是1,如果逻辑作废通过sql将它处理成0,物理删除paimon会自动处理。


dim维表我们这边直接拿的doris里面处理好的维表来使用,维表更新频率低,所以没有在paimon里面进行二次开发。


相关实践学习
Docker镜像管理快速入门
本教程将介绍如何使用Docker构建镜像,并通过阿里云镜像服务分发到ECS服务器,运行该镜像。
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
SQL 关系型数据库 MySQL
335 0
|
4月前
|
存储 分布式计算 Apache
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
小米通过将 Apache Doris(数据库)与 Apache Paimon(数据湖)深度融合,不仅解决了数据湖分析的性能瓶颈,更实现了 “1+1>2” 的协同效应。在这些实践下,小米在湖仓数据分析场景下获得了可观的业务收益。
872 9
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
|
6月前
|
人工智能 运维 监控
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
9月前
|
Java 网络安全 Apache
SshClient应用指南:使用org.apache.sshd库在服务器中执行命令。
总结起来,Apache SSHD库是一个强大的工具,甚至可以用于创建你自己的SSH Server。当你需要在服务器中执行命令时,这无疑是非常有用的。希望这个指南能对你有所帮助,并祝你在使用Apache SSHD库中有一个愉快的旅程!
582 29
|
9月前
|
SQL 分布式计算 流计算
官宣|Apache Paimon 1.0 发布公告
官宣|Apache Paimon 1.0 发布公告
661 8
|
9月前
|
存储 分布式数据库 Apache
小米基于 Apache Paimon 的流式湖仓实践
小米基于 Apache Paimon 的流式湖仓实践
252 0
小米基于 Apache Paimon 的流式湖仓实践
|
10月前
|
存储 分布式数据库 Apache
小米基于 Apache Paimon 的流式湖仓实践
本文整理自Flink Forward Asia 2024流式湖仓专场分享,由计算平台软件研发工程师钟宇江主讲。内容涵盖三部分:1)背景介绍,分析当前实时湖仓架构(如Flink + Talos + Iceberg)的痛点,包括高成本、复杂性和存储冗余;2)基于Paimon构建近实时数据湖仓,介绍其LSM存储结构及应用场景,如Partial-Update和Streaming Upsert,显著降低计算和存储成本,简化架构;3)未来展望,探讨Paimon在流计算中的进一步应用及自动化维护服务的建设。
587 0
小米基于 Apache Paimon 的流式湖仓实践
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
487 1
|
Java 应用服务中间件 Apache
Apache 与tomcat实现分布式应用部署
一:原理 tomcat是一个web应用服务器,能够解析静态文件和动态文件(如:html、jsp、servlet等);apache是一个web server,能够解析静态文件。Tomcat作为一个独立的web服务器是可以使用的,但是它对静态文件的解析能力不如apache,所以就产生现在的web应用的分布式部署,apache+tomcat。 两者之间的通信通过workers配置(由tomc
2254 0
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
423 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来

热门文章

最新文章

推荐镜像

更多