3秒学不会Palo Doris的数据导入你打我!(二)

简介: 3秒学不会Palo Doris的数据导入你打我!

准备工作

开通百度消息服务

百度消息服务(BMS)基于 Kafka 在百度智能云提供托管服务,请先按照以下流程开通服务。

  1. 请根据 (BMS快速入门https://cloud.baidu.com/doc/Kafka/s/9jwvygf3k 文档开通消息服务
  2. 下载证书压缩包 kafka-key.zip 并解压,解压后将得到以下文件

3.上传证书文件到 HTTP 服务器。

因为后续 Doris 需要从某个 HTTP 服务器上下载这些整数以供访问 Kafka。因此我们需要先将这些证书上传到 HTTP 服务器。这个 HTTP 服务器必须要能够被 Doris 的 Leader Node 节点所访问。

如果您没有合适的 HTTP 服务器,可以参照以下方式借助百度对象存储(BOS)来完成:

   1. 根据 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03开始使用),https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze(创建Bucket) 文档开通BOS服务并创建一个 Bucket。注意,Bucket所在地域必须和 Doris 集群所在地域相同

           2. 将以下三个文件上传到 Bucket

  • ca.pem
  • client.key
  • client.pem
  1.  3. 在 BOS Bucket 文件列表页面,点击文件右侧的 文件信息,可以获取 HTTP 访问连接。请将 连接有效时间 设为 -1,即永久。

注:请不要使用带有 cdn 加速的 http 下载地址。这个地址某些情况无法被 Doris 访问。


自建 Kafka 服务

如果使用自建 Kafka 服务,请确保 Kafka 服务和 Doris 集群在同一个 VPC 内,并且相互之间的网络能够互通。


订阅 Kafka 消息

订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。

用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。

请注意以下使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式如下:
  1. 仅支持 Kafka 0.10.0.0(含) 以上版本。

访问 SSL 认证的 Kafka 集群

例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。

访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Plao 中,并且 catalog 名称为 kafka

这里给出示例:

上传文件

CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");

上传完成后,可以通过 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-FILE/     (SHOW FLIES) 命令查看已上传的文件。


创建例行导入作业

访问无认证的 Kafka 集群

CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
COLUMNS TERMINATED BY ","
PROPERTIES
(
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
)
FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
  • max_batch_interval/max_batch_rows/max_batch_size 用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。

  1. 2. 访问 SSL 认证的 Kafka 集群
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
COLUMNS TERMINATED BY ",",
PROPERTIES
(
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
)
FROM KAFKA
(
   "kafka_broker_list"= "broker1:9091,broker2:9091",
   "kafka_topic" = "my_topic",
   "property.security.protocol" = "ssl",
   "property.ssl.ca.location" = "FILE:ca.pem",
   "property.ssl.certificate.location" = "FILE:client.pem",
   "property.ssl.key.location" = "FILE:client.key",
   "property.ssl.key.password" = "abcdefg"
);
  • 对于百度消息服务,property.ssl.key.password 属性可以在 client.properties 文件中获取。

查看导入作业状态

查看作业状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD/   (SHOW ROUTINE LOAD) 命令文档。

查看某个作业的任务运行状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD-TASK/    (SHOW ROUTINE LOAD TASK) 命令文档。

只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

用户可以修改已经创建的作业的部分属性。具体说明请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E8%BE%85%E5%8A%A9%E5%91%BD%E4%BB%A4/ALTER-ROUTINE-LOAD/   (ALTER ROUTINE LOAD) 命令手册。


作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。



使用JDBC同步数据


用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。

INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。INSERT 语句支持以下两种语法:

* INSERT INTO table SELECT ...
* INSERT INTO table VALUES(...)

单次写入

单次写入是指用户直接执行一个 INSERT 命令。示例如下:

INSERT INTO example_tbl (col1, col2, col3) VALUES (1000, "baidu", 3.25);

对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。

因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频词的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。

该方式仅用于线下简单测试或低频少量的操作。

或者可以使用以下方式进行批量的插入操作:

INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);

我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,使用 PreparedStatement 来进行批量插入。

JDBC 示例

这里我们给出一个简单的 JDBC 批量 INSERT 代码示例:

package demo.doris;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class DorisJDBCDemo {
    private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true";
    private static final String HOST = "127.0.0.1"; // Leader Node host
    private static final int PORT = 8030;   // http port of Leader Node
    private static final String DB = "example_db";
    private static final String TBL = "example_tbl";
    private static final String USER = "admin";
    private static final String PASSWD = "my_pass";
    private static final int INSERT_BATCH_SIZE = 10000;
    public static void main(String[] args) {
        insert();
    }
    private static void insert() {
        // 注意末尾不要加 分号 ";"
        String query = "insert into " + TBL + " values(?, ?)";
        // 设置 Label 以做到幂等。
        // String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)";
        Connection conn = null;
        PreparedStatement stmt = null;
        String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB);
        try {
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(dbUrl, USER, PASSWD);
            stmt = conn.prepareStatement(query);
            for (int i =0; i < INSERT_BATCH_SIZE; i++) {
                stmt.setInt(1, i);
                stmt.setInt(2, i * 100);
                stmt.addBatch();
            }
            int[] res = stmt.executeBatch();
            System.out.println(res);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            } catch (SQLException se2) {
                se2.printStackTrace();
            }
            try {
                if (conn != null) conn.close();
            } catch (SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

请注意以下几点:

1. JDBC 连接串需添加 rewriteBatchedStatements=true 参数,并使用 PreparedSta tement 方式。

目前 Doris 暂不支持服务器端的 PrepareStatemnt,所以 JDBC Driver 会在客户端进行批量 Prepare。

rewriteBatchedStatements=true 会确保 Driver 执行批处理。并最终形成如下形式的 INSERT 语句发往 Doris:

INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);

2. 批次大小

   因为是在客户端进行批量处理,因此一批次如果过大的话,话占用客户端的内存资源,需关注。

Doris 后续会支持服务端的 PrepareStatemnt,敬请期待。

3. 导入原子性

和其他到导入方式一样,INSERT 操作本身也支持原子性。每一个 INSERT 操作都是一个导入事务,能够保证一个 INSERT 中的所有数据原子性的写入。

前面提到,我们建议在使用 INSERT 导入数据时,采用 ”批“ 的方式进行导入,而不是单条插入。

同时,我们可以为每次 INSERT 操作设置一个 Label。通过 Label 机制 可以保证操作的幂等性和原子性,最终做到数据的不丢不重。



通过外部表同步数据


Doris 可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。

本文档主要介绍如何创建通过 ODBC 协议访问的外部表,以及如何导入这些外部表的数据。目前支持的数据源包括:

  • MySQL
  • Oracle
  • PostgreSQL

创建外部表

这里仅通过示例说明使用方式。

  1. 创建 ODBC Resource
    ODBC Resource 的目的是用于统一管理外部表的连接信息。
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
    "type" = "odbc_catalog",
    "host" = "192.168.0.1",
    "port" = "8086",
    "user" = "test",
    "password" = "test",
    "database" = "test",
    "odbc_type" = "oracle",
    "driver" = "Oracle"
);

这里我们创建了一个名为 oracle_odbc 的 Resource,其类型为 odbc_catalog,表示这是一个用于存储 ODBC 信息的 Resource。odbc_typeoracle,表示这个 OBDC Resource 是用于连接 Oracle 数据库的。


2. 创建外部表

CREATE EXTERNAL TABLE `ext_oracle_tbl` (
  `k1` decimal(9, 3) NOT NULL COMMENT "",
  `k2` char(10) NOT NULL COMMENT "",
  `k3` datetime NOT NULL COMMENT "",
  `k5` varchar(20) NOT NULL COMMENT "",
  `k6` double NOT NULL COMMENT ""
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
    "odbc_catalog_resource" = "oracle_odbc",
    "database" = "test",
    "table" = "baseall"
);

这里我们创建一个 ext_oracle_tbl 外部表,并引用了之前创建的 oracle_odbc Resource。


连接百度云数据库 RDS

1. 创建 RDS

注意:创建 RDS 实例时,网络类型 -> 选择网络 处,需要选择和 Doris 集群相同的网络(VPC)。可用区可以不同。


2. 创建资源

CREATE EXTERNAL RESOURCE `rds_odbc`
PROPERTIES (
    "type" = "odbc_catalog",
    "host" = "mysql56.rdsxxxxx.rds.gz.baidubce.com",
    "port" = "3306",
    "user" = "rdsroot",
    "password" = "12345",
    "odbc_type" = "mysql",
    "driver" = "MySQL"
);

需修改其中 hostportuserpassword 对应的参数。host port 可以在 RDS 实例信息也查看。user 和 password 需要在 RDS 控制台创建账户后获取。


3. 创建外部表

CREATE EXTERNAL TABLE `mysql_table` (
   k1 int,
   k2 int
) ENGINE=ODBC
PROPERTIES (
    "odbc_catalog_resource" = "rds_odbc",
    "database" = "mysql_db",
    "table" = "mysql_tbl"
);

创建之后,就可以进行查询等操作了。


导入数据


1. 创建 Doris 表

这里我们创建一张 Doris 的表,列信息和上一步创建的外部表 ext_oracle_tbl 一样:

CREATE EXTERNAL TABLE `doris_tbl` (
  `k1` decimal(9, 3) NOT NULL COMMENT "",
  `k2` char(10) NOT NULL COMMENT "",
  `k3` datetime NOT NULL COMMENT "",
  `k5` varchar(20) NOT NULL COMMENT "",
  `k6` double NOT NULL COMMENT ""
)
COMMENT "Doris Table"
DISTRIBUTED BY HASH(k1) BUCKETS 2;
PROPERTIES (
    "replication_num" = "1"
);

2. 导入数据 (从 ext_oracle_tbl表 导入到 doris_tbl 表)

INSERT INTO doris_tbl SELECT k1,k2,k3 FROM ext_oracle_tbl limit 100;

INSERT 命令是同步命令,返回成功,即表示导入成功。


注意事项

  • 必须保证外部数据源与 Doris 集群在同一个VPC内,并且 Compute Node 可以和外部数据源的网络是互通的。
  • ODBC 外部表本质上是通过单一 ODBC 客户端访问数据源,因此并不合适一次性导入大量的数据,建议分批多次导入。
相关文章
|
缓存 JavaScript 前端开发
【JavaScript 技术专栏】DOM 操作全攻略:从基础到进阶
【4月更文挑战第30天】本文深入讲解JavaScript与DOM交互,涵盖DOM基础、获取/修改元素、创建/删除元素、事件处理结合及性能优化。通过学习,开发者能掌握动态改变网页内容、结构和样式的技能,实现更丰富的交互体验。文中还讨论了DOM操作在实际案例、与其他前端技术结合的应用,助你提升前端开发能力。
914 0
|
编解码 定位技术 计算机视觉
多模态LLM视觉推理能力堪忧,浙大领衔用GPT-4合成数据构建多模态基准
【9月更文挑战第2天】浙江大学领衔的研究团队针对多模态大型模型(MLLM)在抽象图像理解和视觉推理上的不足,提出了一种利用GPT-4合成数据构建多模态基准的方法。该研究通过合成数据提高了MLLM处理图表、文档等复杂图像的能力,并构建了一个包含11,193条指令的基准,涵盖8种视觉场景。实验表明,这种方法能显著提升模型性能,但依赖闭源模型和高计算成本是其局限。论文详细内容见:https://arxiv.org/pdf/2407.07053
360 10
|
机器学习/深度学习 编解码 TensorFlow
MobileNetV3架构解析与代码复现
MobileNet模型基于深度可分离卷积,这是一种分解卷积的形式,将标准卷积分解为深度卷积和`1*1`的点卷积。对于MobileNet,深度卷积将单个滤波器应用于每个输入通道,然后,逐点卷积应用`1*1`卷积将输出与深度卷积相结合。
2943 0
MobileNetV3架构解析与代码复现
|
Web App开发 定位技术 Windows
【Windows】 谷歌翻译停服后,chrome无法自动翻译?解决办法来了~
【Windows】 谷歌翻译停服后,chrome无法自动翻译?解决办法来了~
558 3
|
开发工具 git
git如何创建新分支,GitHub默认分支是main怎么连上
git如何创建新分支,GitHub默认分支是main怎么连上
319 0
|
移动开发 小程序 API
【产品上新】openURL接口开放,实现在小程序与H5之间“反复横跳”
【产品上新】openURL接口开放,实现在小程序与H5之间“反复横跳”
323 1
|
人工智能 Ubuntu Linux
linux配置魔搭社区modelscope时的whl下载中断问题和解决方案
本文提供了断点续传和手动安装两个方案。
1165 3
|
应用服务中间件 nginx
nginx502错误和nginx服务器返回空响应体(err_empty_response)
问题:502错误有很多种情况,我这里只记录本次碰到的情况,日志如下: upstream sent duplicate header line: &quot;Transfer-Encoding: chunked&quot;, previous value: &quot;Transfer-Encoding: chunked&quot; while reading response header from upstream
2420 1
|
Prometheus Cloud Native Linux
Linux下安装prometheus & grafana
Linux下安装prometheus & grafana
977 0
|
传感器 前端开发
BJT差分式放大电路的介绍
一、基本原理 BJT差分式放大电路的基本原理是利用两个晶体管的共射极配置来放大差分输入信号。其中一个晶体管作为输入管,另一个晶体管作为负载管。当差分输入信号的差值发生变化时,输入管的基极电压也会发生变化,导致输入管的电流变化。负载管通过负载电阻来接收输入管的电流变化,并产生相应的输出电压。因此,BJT差分式放大电路可以将差分输入信号放大为差分输出信号。 二、电路结构 BJT差分式放大电路通常由两个晶体管、两个输入电阻、一个负载电阻和一个电源组成。其中,两个晶体管的发射极通过两个输入电阻与差分输入信号相连,基极通过电源与地相连。负载电阻通过两个晶体管的集电极与电源相连,输出信号通过负载电阻与地相
818 0