大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(已更完)

ClickHouse(已更完)

Kudu(正在更新…)

章节内容

上节我们完成了如下的内容:


Kudu Java API

增删改查 编写案例测试

实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:


环境准备:确保 Flink 和 Kudu 环境正常运行,并配置好相关依赖。

创建 Kudu 表:在 Kudu 中定义要存储的数据表,包括主键和列类型。

数据流设计:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处理和转换。

写入 Kudu:通过 Kudu 的连接器将处理后的数据写入 Kudu 表。需要配置 Kudu 客户端和表的相关信息。

执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。

添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.11.1</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.17.0</version>
        </dependency>

    </dependencies>
</project>

数据源

new UserInfo("001", "Jack", 18),
new UserInfo("002", "Rose", 20),
new UserInfo("003", "Cris", 22),
new UserInfo("004", "Lily", 19),
new UserInfo("005", "Lucy", 21),
new UserInfo("006", "Json", 24),

自定义下沉器

package icu.wzk.kudu;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.log4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;


public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {

    private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");

    private KuduClient kuduClient;
    private KuduTable kuduTable;

    private String kuduMasterAddr;
    private String tableName;
    private Schema schema;
    private KuduSession kuduSession;
    private ByteArrayOutputStream out;
    private ObjectOutputStream os;

    public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
        this.kuduMasterAddr = kuduMasterAddr;
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        out = new ByteArrayOutputStream();
        os = new ObjectOutputStream(out);
        kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
        kuduTable = kuduClient.openTable(tableName);
        schema = kuduTable.getSchema();
        kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
    }

    @Override
    public void invoke(Map<String, Object> map, Context context) throws Exception {
        if (null == map) {
            return;
        }
        try {
            int columnCount = schema.getColumnCount();
            Insert insert = kuduTable.newInsert();
            PartialRow row = insert.getRow();
            for (int i = 0; i < columnCount; i ++) {
                Object value = map.get(schema.getColumnByIndex(i).getName());
                insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
                OperationResponse response = kuduSession.apply(insert);
                if (null != response) {
                    logger.error(response.getRowError().toString());
                }
            }
        } catch (Exception e) {
            logger.error(e);
        }
    }

    @Override
    public void close() throws Exception {
        try {
            kuduSession.close();
            kuduClient.close();
            os.close();
            out.close();
        } catch (Exception e) {
            logger.error(e);
        }
    }

    private void insertData(PartialRow row, Type type, String columnName, Object value) {
        try {
            switch (type) {
                case STRING:
                    row.addString(columnName, value.toString());
                    return;
                case INT32:
                    row.addInt(columnName, Integer.valueOf(value.toString()));
                    return;
                case INT64:
                    row.addLong(columnName, Long.valueOf(value.toString()));
                    return;
                case DOUBLE:
                    row.addDouble(columnName, Double.valueOf(value.toString()));
                    return;
                case BOOL:
                    row.addBoolean(columnName, Boolean.valueOf(value.toString()));
                    return;
                case BINARY:
                    os.writeObject(value);
                    row.addBinary(columnName, out.toByteArray());
                    return;
                case FLOAT:
                    row.addFloat(columnName, Float.valueOf(value.toString()));
                default:
                    throw new UnsupportedOperationException("Unknown Type: " + type);
            }

        } catch (Exception e) {
            logger.error("插入数据异常: " + e);
        }
    }
}

编写实体

package icu.wzk.kudu;

public class UserInfo {

    private String id;

    private String name;

    private Integer age;

    public UserInfo(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

执行建表

package icu.wzk.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;

import java.util.ArrayList;
import java.util.List;

public class KuduCreateTable {

    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
        KuduClient kuduClient = kuduClientBuilder.build();

        String tableName = "user";
        List<ColumnSchema> columnSchemas = new ArrayList<>();
        ColumnSchema id = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT32)
                .key(true)
                .build();
        columnSchemas.add(id);
        ColumnSchema name = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .key(false)
                .build();
        columnSchemas.add(name);
        ColumnSchema age = new ColumnSchema
                .ColumnSchemaBuilder("age", Type.INT32)
                .key(false)
                .build();
        columnSchemas.add(age);

        Schema schema = new Schema(columnSchemas);
        CreateTableOptions options = new CreateTableOptions();
        // 副本数量为1
        options.setNumReplicas(1);
        List<String> colrule = new ArrayList<>();
        colrule.add("id");
        options.addHashPartitions(colrule, 3);

        kuduClient.createTable(tableName, schema, options);
        kuduClient.close();
    }

}

主逻辑代码

package icu.wzk.kudu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

public class SinkToKuduTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserInfo> dataSource = env.fromElements(
                new UserInfo("001", "Jack", 18),
                new UserInfo("002", "Rose", 20),
                new UserInfo("003", "Cris", 22),
                new UserInfo("004", "Lily", 19),
                new UserInfo("005", "Lucy", 21),
                new UserInfo("006", "Json", 24)
        );
        SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
                .map(new MapFunction<UserInfo, Map<String, Object>>() {
                    @Override
                    public Map<String, Object> map(UserInfo value) throws Exception {
                        Map<String, Object> map = new HashMap<>();
                        map.put("id", value.getId());
                        map.put("name", value.getName());
                        map.put("age", value.getAge());
                        return map;
                    }
                });

        String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
        String tableInfo = "user";
        mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));

        env.execute("SinkToKuduTest");
    }

}

解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。


数据源创建

DataStreamSource dataSource = env.fromElements(…):创建了一个包含多个 UserInfo 对象的数据源,模拟了一个输入流。


数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处理和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。


Kudu 配置信息

String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:定义 Kudu 的主节点地址和目标表的信息。


数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自定义 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。


执行作业

env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处理流程。


测试运行

先运行建表

再运行主逻辑

我们建表之后,确认user表存在。然后我们运行Flink程序,将数据写入Kudu。

确认有表后,执行 Flink 程序:

注意事项

并发性:根据 Kudu 集群的规模和配置,可以调整 Flink 作业的并发性,以提高写入性能。

批量写入:Kudu 支持批量插入,可以通过适当配置 Flink 的 sink 来提高性能。

故障处理:确保在作业中处理异常和重试逻辑,以确保数据不会丢失。

监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
3天前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
3天前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
18 5
|
3天前
|
存储 SQL 分布式计算
大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图
大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图
12 3
|
3天前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
9 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
3天前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
7 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
3天前
|
SQL 分布式计算 大数据
大数据-160 Apache Kylin 构建Cube 按照日期构建Cube 详细记录
大数据-160 Apache Kylin 构建Cube 按照日期构建Cube 详细记录
13 2
|
3天前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
9 1
|
3天前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
12 1
|
2天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
17 0
|
3天前
|
SQL 存储 监控
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
7 0

推荐镜像

更多