探究 | kafka-connector 同步 Elasticsearch速度慢根因分析?

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 1、kafka同步Elasticsearch的方式之前博文中也有介绍:方式一:logstash_input_kafka方式二:kafka_connector方式三:spark stream方式四:java程序读写自己实现

image.png

链接

2、kafka-connector同步kafka到ES

image.png

场景一:kafka实时数据流直接通过kafka-connector同步到ES。

场景二:kafka实时数据流需要中间数据处理后再同步到ES。


3、同步慢问题分析?

3.1 针对场景一:

可能的原因:kafka-connector写入ES速度慢?

可能的应对策略核心**:提升ES的写入速度**。

分解策略:


1)ES副本数设置为0

待写入完毕后再改成实际副本值。

2)调整 bulk 线程池和队列

结合物理机的线程大小配置与之匹配的线程池和队列大小。

3)增加refresh间隔

默认的refresh的间隔是1s,用index.refresh.interval可以设置。如果设置为默认值1s,则会强迫每秒将内存中的数据写入磁盘中,创建一个新的segment file。这个1s间隔是导致:写入数据后,需要1s才能看到的原因。

如果该值调大,比如60s,新写入的数据60s才能看到,这样就会获得了较大的写入吞吐量。

因为:60s的间隔都是写入内存的,每隔60s才会创建一个segment file。

调整translog flush 间隔

translog的写入可以设置,默认是request,每次请求都会写入磁盘(fsync),这样就保证所有数据不会丢,但写入性能会受影响。

如果改成async,则按照配置触发trangslog写入磁盘,注意这里说的只是trangslog本身的写盘。

translog什么时候清空?默认是512mb,或30分钟。这个动作就是flush,同时伴随着segment提交(写入磁盘)。flush之后,这段translog的使命就完成了,因为segment已经写入磁盘,就算故障,也可以从segment文件恢复。

index.translog.durability: async

index.translog.sync_interval: 120s

index.translog.flush_threshold_size: 1024mb

index.translog.flush_threshold_period: 120m

1

2

3

4

另外,有一个/_flush/sync命令,在做数据节点维护时很有用。其逻辑就是flush translog并且将sync_id同步到各个分片。可以实现快速恢复。

更多策略参考:


https://www.easyice.cn/archives/207

https://elasticsearch.cn/question/3847

3.2 针对场景二:

结合实际场景,从后往前分析?

思考问题:

(1)kafka-connector之前的实时速度怎么样?

可以在kafka-connector同步之前打印日志,看获取的实时数据实现和当前时刻进行比对。

如果二者差值较大, 则认为数据没有实时。

可能的原因需要进一步分析。

可能问题1:接入的时候中间可能有异常。

进一步排查kafka 接入的时候的问题。

可能问题2:中间处理慢了。


1)排查下,中间有没有调用第三方应用、服务。比如:读写数据库、调用第三方分词等服务。

2)考虑增大并行,提升调用速度。

(2)kafka-connector写入到ES的时刻是不是慢了?

如果是,需要统计一段时间,比如1小时、5小时,统计出每秒的写入速度?

这里的优化:


1)增大并行,kafka-connector写入ES考虑并行。

2)参考场景一中提及的ES方面的优化。

4、小结

问题排查的周期可能会很长,但是要有定力。

从后往前、找到问题的根源,“对症下药”放得持久疗效!

相关文章
|
1天前
|
存储 缓存 自然语言处理
深度解析ElasticSearch:构建高效搜索与分析的基石
【9月更文挑战第8天】在数据爆炸的时代,如何快速、准确地从海量数据中检索出有价值的信息成为了企业面临的重要挑战。ElasticSearch,作为一款基于Lucene的开源分布式搜索和分析引擎,凭借其强大的实时搜索、分析和扩展能力,成为了众多企业的首选。本文将深入解析ElasticSearch的核心原理、架构设计及优化实践,帮助读者全面理解这一强大的工具。
28 7
|
7天前
|
存储 自然语言处理 关系型数据库
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
聚合、补全、RabbitMQ消息同步、集群、脑裂问题、集群分布式存储、黑马旅游实现过滤和搜索补全功能
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
|
17天前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
36 3
|
27天前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
39 2
|
1月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
74 3
|
1月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
71 4
|
1月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【8月更文挑战第9天】利用Databricks与Confluent打造实时数据处理方案。Confluent的Kafka负责数据采集,通过主题接收IoT及应用数据;Databricks运用Structured Streaming处理Kafka数据,并以Delta Lake存储,支持ACID事务。这套组合实现了从数据采集、存储到分析的全流程自动化,满足企业对大数据实时处理的需求。
29 3
|
1月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
43 1
|
26天前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
29天前
|
消息中间件 Java Kafka
Kafka生产者同步和异步的JavaAPI代码演示
Kafka生产者同步和异步的JavaAPI代码演示
25 0