Apache Doris Routine Load快速体验之案例(2)1

简介: Apache Doris Routine Load快速体验之案例(2)1

Apache Doris Routine Load快速体验之案例(2)

环境信息

硬件信息

软件信息

Routine Load介绍

Routine Load案例

创建Doris结果测试表

创建Routine Load任务

查看Routine Load

发送测试Kafka测试数据

查看Doris结果数据

常见问题

Failed to get all partitions of kafka topic

current error rows is more than max error num

环境信息

硬件信息

  1. 1.CPU :4C
  2. 2.CPU型号:ARM64
  3. 3.内存 :10GB
  4. 4.硬盘 :66GB SSD

软件信息

  1. 1.VM镜像版本 :CentOS-7
  2. 2.Apahce Doris版本 :1.2.4.1
  3. 3.Kafka版本:3.2.0

Routine Load介绍

Routine Load适合Kafka直接实时写数据到Doris的场景;它支持用户提交一个常驻的导入任务,通过不断地从指定的数据源中读取数据,将数据导入到 Doris 中。

如上图,Client 向 FE 提交一个Routine Load 作业。

1.FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

2.在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

3.FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。

4.整个 Routine Load 作业通过不断的产生新的 Task,来完成数据不间断的导入。

Routine Load案例

创建Doris结果测试表

-- 创建测试库
create database routine_load;
-- 切换为测试库
use routine_load;
-- 创建测试结果表
CREATE TABLE rl_test01 (
  `id` varchar(1000) NULL COMMENT "来源库表键",
  `test01` BIGINT SUM DEFAULT "0" COMMENT "测试"
) ENGINE=OLAP
AGGREGATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);

创建Routine Load任务

CREATE ROUTINE LOAD routine_load.rl_test01 ON rl_test01
        COLUMNS TERMINATED BY ",",
        COLUMNS(id,test01) -- 字段名和表里对应
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "20",
            "max_batch_rows" = "200000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "192.168.1.61:9092",
            "kafka_topic" = "rl_test01",
            "property.group.id" = "rl_test01_group",
            "property.client.id" = "rl_test01_client",
            "property.kafka_default_offsets" = "OFFSET_BEGINNING"
        );
相关文章
|
9天前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
8天前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
15天前
|
SQL 消息中间件 Java
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
通过兼容 Connector 插件,Apache Doris 能够支持 Trino/Presto 可对接的所有数据源,而无需改动 Doris 的内核代码。
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
|
1天前
|
SQL 存储 缓存
Apache Doris 2.1.6 版本正式发布
2.1.6 版本在 Lakehouse、异步物化视图、半结构化数据管理持续升级改进,同时在查询优化器、执行引擎、存储管理、数据导入与导出以及权限管理等方面完成了若干修复
|
16天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
62 11
|
26天前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
31 1
|
24天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
90 2
|
26天前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
35 3
|
26天前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
34 2
|
27天前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls

推荐镜像

更多