kafka核心配置

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: kafka核心配置

一、producer核心配置

1、acks :发送应答(默认值:1)

生产者在考虑完成请求之前要求leader收到的确认的数量。这控制了发送的记录的持久性。允许以下设置:

  1. acks=0:设置为0,则生产者将完全不等待来自服务器的任何确认。记录将立即添加到socket缓冲区,并被认为已发送。在这种情况下,不能保证服务器已经收到记录,重试配置将不会生效(因为客户机通常不会知道任何失败)。每个记录返回的偏移量总是-1。
  2. acks=1:leader会将记录写到本地日志中,但不会等待所有follower的完全确认。在这种情况下,如果leader在记录失败后立即失败,但在追随者复制记录之前失败,那么记录就会丢失。
  3. acks=all / -1:leader将等待完整的同步副本来确认记录。这保证了只要至少有一个同步副本仍然存在,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。

2、batch.size:批量发送大小(默认:16384,16K)

缓存到本地内存,批量发送大小,意思每次发送16K到broke。当多个记录被发送到同一个分区时,生产者将尝试将记录批处理成更少的请求。这有助于客户机和服务器上的性能。此配置以字节为单位控制默认批处理大小。

3、bootstrap.servers:服务器地址

broke服务器地址,多个用逗号割开。

4、buffer.memory:生产者最大可用缓存 (默认:33554432,32M)

生产者可以用来缓冲等待发送到服务器的记录的总内存字节。如果记录被发送的速度超过了它们可以被发送到服务器的速度,那么生产者将阻塞max.block。然后它会抛出一个异常。

该设置应该大致与生成器将使用的总内存相对应,但不是硬绑定,因为生成器使用的并非所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护飞行中的请求。

生产者产生的消息缓存到本地,每次批量发送batch.size大小到服务器。

5、client.id:生产者ID(默认“”)

请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称,从而能够跟踪ip/端口之外的请求源。

6、compression.type:压缩类型(默认值:producer)

指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“gzip”、“snappy”、“lz4”、“zstd”)。它还接受“未压缩”,相当于没有压缩;以及“生产者”,即保留生产者设置的原始压缩编解码器。

“gzip”:压缩效率高,适合高内存、CPU

“snappy”:适合带宽敏感性,压缩力度大

7、retries:失败重试次数(默认:2147483647)

异常是RetriableException类型或者TransactionManager允许重试;

transactionManager.canRetry()后面会分析;先看看哪些异常是RetriableException类型异常。

21.png

允许重试,但不需要设置max.in.flight.requests.per.connection(单个连接上发送的未确认请求的最大数量)。连接到1可能会改变记录的顺序,因为如果将两个批发送到单个分区,第一个批处理失败并重试,但是第二个批处理成功,那么第二个批处理中的记录可能先出现。

通过delivery.timeout.ms也可以控制重试次数,如果重试次数没有用尽,传输超时也会停止。

retry.backoff.ms:重试阻塞时间(默认:100)

这避免了在某些失败场景下以紧密循环的方式重复发送请求。

8、delivery.timeout.ms:传输时间(默认:120000,2分钟)

生产者发送完请求接受服务器ACk的时间,该时间允许重试 ,该配置应该大于request.timeout.ms + linger.ms。

9、connections.max.idle.ms:关闭空闲连接时间(默认:540000)

在此配置指定的毫秒数之后关闭空闲连接。

10、enable.idempotence:开启幂等(默认:false)

当设置为“true”时,生产者将确保在流中准确地写入每个消息的副本。如果“false”,则由于代理失败而导致生产者重试,等等,可能会在流中写入重试消息的副本。请注意,启用幂等需要使用max.in.flight.requests.per.connection,连接小于或等于5,重试大于0且ack必须为“all”。如果用户没有显式地设置这些值,将选择合适的值。如果设置了不兼容的值,就会抛出ConfigException。

11、max.in.flight.requests.per.connection:单个连接上发送的未确认请求的最大数量(默认:5)

阻塞前客户端在单个连接上发送的未确认请求的最大数量。请注意,如果该设置设置为大于1,并且发送失败,则有由于重试(即,如果启用重试)。

12、interceptor.classes:拦截器(默认:无)

用作拦截器的类的列表。实现接口:org.apache.kafka.clients.producer。ProducerInterceptor接口允许将生产者接收到的记录发布到Kafka集群之前拦截它们(可能还会发生突变)。默认情况下,没有拦截器。

13、key.serializer:key序列化器(默认无)

实现org.apache.kafka.common. serialize .Serializer接口的key的序列化器类。String可配置:class org.apache.kafka.common.serialization.StringSerializer。

14、value.serializer:value序列化器(默认无)

序列化器类的值,该值实现org.apache.kafka.common. serialize .Serializer接口。String可配置:class org.apache.kafka.common.serialization.StringSerializer

15、linger.ms:发送延迟时间(默认:0)

为减少负载和客户端的请求数量,生产者不会一条一条发送,而是会逗留一段时间批量发送。batch.size和linger.ms满足任何一个条件都会发送。

16、max.block.ms:阻塞时间(默认:60000,一分钟)

配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()阻塞的时间。由于缓冲区已满或元数据不可用,也会阻塞。用户提供的序列化器或分区程序中的阻塞将不计入此超时。

17、max.request.size:最大请求字节大小(默认:1048576,1M)

请求的最大字节大小。此设置将限制生产者在单个请求中发送记录批的数量,以避免发送大量请求。这也有效地限制了最大记录批大小。注意,服务器对记录批处理大小有自己的上限,这可能与此不同。

18、metric.reporters:自定义指标报告器

用作指标报告器的类的列表。metricsreporter接口实现了org.apache.kafka.common.metrics.MetricsReporter接口,该接口允许插入将在创建新度量时得到通知的类。JmxReporter始终包含在注册JMX统计信息中。

19、partitioner.class:自定义分区策略

实现接口 org.apache.kafka.clients.producer.Partitioner,默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

20、request.timeout.ms:请求超时时间(默认:30000)

配置控制客户机等待请求响应的最长时间。如果在超时超时之前没有收到响应,客户端将在需要时重新发送请求,或者在重试耗尽时失败请求。这个应该大于replica.lag.time.max。ms(代理配置),以减少由于不必要的生产者重试而导致消息重复的可能性。

21、receive.buffer.bytes(默认:32768,32K)

读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值是-1,将使用OS默认值。

22、send.buffer.bytes(默认:131072,128K)

发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小。如果值是-1,将使用OS默认值。

23、retry.backoff.ms:重试阻塞时间(默认:100)

这避免了在某些失败场景下以紧密循环的方式重复发送请求。

回到顶部

二、consumer核心配置

1、enable.auto.commit:开启自动提交(默认:true)

如果为true,consumer的偏移量将在后台定期提交。

2、auto.commit.interval.ms:自动提交频率(默认:5000)

如果enable.auto.commit设置为true,则使用者偏移量自动提交到Kafka的频率(毫秒)。

3、client.id:客户ID

便于跟踪日志。

4、check.crcs:是否开启数据校验(默认:true)

自动检查消耗的记录的CRC32。这确保不会发生对消息的在线或磁盘损坏。此检查增加了一些开销,因此在寻求极端性能的情况下可能禁用此检查。

5、bootstrap.servers:服务器配置

多个用都好隔开。

6、connections.max.idle.ms:关闭空间连接时间(默认:540000)

在此配置指定的毫秒数之后关闭空闲连接。

7、group.id:群组(默认:“”)

唯一标识用户群组,同一个group每个partition只会分配到一个consumer。

8、max.poll.records:拉起最大记录(默认:500)

单次轮询()调用中返回的记录的最大数量。

9、max.poll.interval.ms:拉取记录间隔(默认:300000,5分钟)

使用消费者组管理时轮询()调用之间的最大延迟。这为使用者在获取更多记录之前空闲的时间设置了上限。如果在此超时过期之前没有调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员。

10、request.timeout.ms:请求超时时间(默认:30000 ,30S)

配置控制客户机等待请求响应的最长时间。如果在超时超时之前没有收到响应,客户端将在需要时重新发送请求,或者在重试耗尽时失败请求。

11、session.timeout.ms:consumer session超时

用于检测worker程序失败的超时。worker定期发送心跳,以向代理表明其活性。如果在此会话超时过期之前代理没有接收到心跳,则代理将从组中删除。请注意,该值必须位于group.min.session.timeout在broker配置中配置的允许范围内group.max.session.timeout.ms。

12、auto.offset.reset:初始偏移量 (默认:latest)

如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:

earliest:自动重置偏移到最早的偏移

latest:自动将偏移量重置为最新偏移量

none:如果没有为使用者的组找到以前的偏移量,则向使用者抛出exception

anything else:向使用者抛出异常

13、key.deserializer

用于实现org.apache.kafka.common. serialize .Deserializer接口的key的反序列化类,class org.apache.kafka.common.serialization.StringDeserializer

14、value.deserializer

用于实现org.apache.kafka.common. serialize .Deserializer接口的value的反序列化类,class org.apache.kafka.common.serialization.StringDeserializer

15、max.partition.fetch.bytes

每个分区服务器将返回的最大数据量。记录由consumer成批提取。如果fetch的第一个非空分区中的第一个记录批处理大于这个限制,那么仍然会返回批处理,以确保使用者能够取得进展。broker接受的最大记录批处理大小是通过message.max定义的。字节(broker配置)或max.message。字节(topic配置)。看fetch.max.bytes用于限制consumer请求大小的字节。

16、partition.assignment.strategy:consumer订阅分区策略

(默认:class org.apache.kafka.clients.consumer.RangeAssignor)

当使用组管理时,客户端将使用分区分配策略的类名在使用者实例之间分配分区所有权。

17、fetch.max.bytes:拉取最大字节(默认:52428800,50M)

服务器应该为获取请求返回的最大数据量。记录由使用者成批地获取,并且如果获取的第一个非空分区中的第一个记录批处理大于这个值,仍然会返回记录批处理,以确保使用者能够取得进展。因此,这不是一个绝对最大值。代理接受的最大记录批处理大小是通过message.max定义的。字节(代理配置)或max.message。字节(主题配置)。请注意,使用者并行执行多个获取。

18、heartbeat.interval.ms:心跳时间(默认:3000, 3S)

使用Kafka的组管理工具时,从心跳到消费者协调器的预期时间。心跳被用来确保消费者的会话保持活跃,并在新消费者加入或离开组时促进再平衡。该值必须设置为小于session.timeout.ms的1/3。它可以调整甚至更低,以控制正常再平衡的预期时间。

19、fetch.max.wait.ms:拉取阻塞时间(默认:500)

如果没有足够的数据立即满足fetch.min.bytes提供的要求,服务器在响应fetch请求之前将阻塞的最长时间。

20、fetch.min.bytes:拉取最小字节数(默认:1)

服务器应该为获取请求返回的最小数据量。如果没有足够的数据可用,请求将等待那么多数据累积后再响应请求。默认的1字节设置意味着,只要数据的一个字节可用,或者获取请求超时等待数据到达,就会响应获取请求。将此设置为大于1的值将导致服务器等待更大数量的数据累积,这可以稍微提高服务器吞吐量,但代价是增加一些延迟。

21、exclude.internal.topics:公开内部topic(默认:true)

是否应该将来自内部主题(如偏移量)的记录公开给使用者,consumer共享offset。如果设置为true,从内部主题接收记录的唯一方法是订阅它。

22、isolation.level(隔离级别:默认:read_uncommitted)

控制如何以事务方式读取写入的消息。如果设置为read_committed, consumer.poll()将只返回已提交的事务消息。如果设置为read_uncommitted'(默认),consumer.poll()将返回所有消息,甚至是已经中止的事务消息。在任何一种模式下,非事务性消息都将无条件返回。

回到顶部

三、broke配置

1、zookeeper.connect:zk地址

多个用逗号隔开。

2、advertised.host.name(默认:null)

不赞成使用:

在server.properties 里还有另一个参数是解决这个问题的, advertised.host.name参数用来配置返回的host.name值,把这个参数配置为外网IP地址即可。

这个参数默认没有启用,默认是返回的java.net.InetAddress.getCanonicalHostName的值,在我的mac上这个值并不等于hostname的值而是返回IP,但在linux上这个值就是hostname的值。

3、advertised.listeners

hostname和端口注册到zk给生产者和消费者使用的,如果没有设置,将会使用listeners的配置,如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()来获取这个hostname和port,对于ipv4,基本就是localhost了。

4、auto.create.topics.enable(自动创建topic,默认:true)

第一次发动消息时,自动创建topic。

5、auto.leader.rebalance.enable:自动rebalance(默认:true)

支持自动领导平衡。如果需要,后台线程定期检查并触发leader balance。

6、background.threads:处理线程(默认:10)

用于各种后台处理任务的线程数。

7、broker.id 默认:-1

此服务器的broke id。如果未设置,将生成唯一的代理id。为了避免zookeeper生成的broke id和用户配置的broke id之间的冲突,生成的代理id从reserve .broker.max开始id + 1。

8、compression.type:压缩类型,默认:producer

指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“gzip”、“snappy”、“lz4”、“zstd”)。它还接受“未压缩”,相当于没有压缩;以及“producer”,即保留producer设置的原始压缩编解码器。

9、delete.topic.enable 删除topic(默认:true)

允许删除主题。如果关闭此配置,则通过管理工具删除主题将无效。

10、leader.imbalance.check.interval.seconds(rebalance检测频率,默认:300)

控制器触发分区rebalance检查的频率。

11、leader.imbalance.per.broker.percentage(触发rebalance比率,默认:10,10%)

每个broke允许的lead不平衡比率。如果控制器超过每个broke的这个值,控制器将触发一个leader balance。该值以百分比指定。

12、log.dir(日志目录,默认:/tmp/kafka-logs)

保存日志数据的目录。

13、log.dirs

保存日志数据的目录。如果未设置,则为日志中的值。使用dir。

14、log.flush.interval.messages(默认:9223372036854775807)

在将消息刷新到磁盘之前,日志分区上累积的消息数量

15、log.flush.interval.ms(默认:null)

任何主题中的消息在刷新到磁盘之前保存在内存中的最长时间。如果没有设置,则使用log.flush.scheduler.interval.ms中的值。

16、log.flush.offset.checkpoint.interval.ms(默认:60000)

作为日志恢复点的上次刷新的持久记录的更新频率。

17、log.retention.bytes 保存日志文件的最大值(默认:-1)

删除前日志的最大大小。

18、log.retention.hours日志文件最大保存时间(小时)默认:168,一周

日志文件最大保存时间。

19、log.retention.minutes日志文件最大保存时间(分钟)默认:null

20、log.retention.ms日志文件最大保存时间(毫秒)默认:null

21、log.roll.hours:新segment产生时间,默认:168,一周

即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。

22、log.roll.ms :新segment产生时间

滚出新日志段之前的最大时间(以毫秒为单位)。如果未设置,则为log.roll中的值使用。

23、log.segment.bytes:segment文件最大值,默认:1073741824(1G)

24、log.segment.delete.delay.ms:segment删除等待时间, 默认:60000

从文件系统中删除文件之前等待的时间量。

25、message.max.bytes 最大batch size 默认:1000012,0.9M

Kafka允许的最大记录batch size。如果增加了这个值,并且存在大于0.10.2的使用者,那么还必须增加consumer的fetch大小,以便他们能够获取这么大的记录批。在最新的消息格式版本中,记录总是按批进行分组,以提高效率。在以前的消息格式版本中,未压缩记录没有分组成批,这种限制只适用于单个记录。可以使用主题级别max.message设置每个主题。字节的配置。

26、min.insync.replicas(insync中最小副本值)

当producer将acks设置为“all”(或“-1”)时,min.insync。副本指定必须确认写操作成功的最小副本数量。如果不能满足这个最小值,则生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

当一起使用时,min.insync.replicas和ack允许您执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用“all”配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。

27、num.io.threads,默认:8

服务器用于处理请求的线程数,其中可能包括磁盘I/O。

28、num.network.threads,默认:3

服务器用于接收来自网络的请求和向网络发送响应的线程数。

29、num.recovery.threads.per.data.dir 默认:1

每个数据目录在启动时用于日志恢复和在关闭时用于刷新的线程数。

30、num.replica.alter.log.dirs.threads 默认:null

可以在日志目录(可能包括磁盘I/O)之间移动副本的线程数。

31、num.replica.fetchers

从leader复制数据到follower的线程数。

32、offset.metadata.max.bytes 默认:4096

与offset提交关联的metadata的最大大小。

33、offsets.commit.timeout.ms 默认:5000

偏移量提交将被延迟,直到偏移量主题的所有副本收到提交或达到此超时。这类似于生产者请求超时。

34、offsets.topic.num.partitions 默认:50

偏移量提交主题的分区数量(部署后不应更改)。

35、offsets.topic.replication.factor 副本大小,默认:3

低于上述值,主题将创建失败。

36、offsets.topic.segment.bytes 默认104857600 ,100M

segment映射文件(index)文件大小,以便加快日志压缩和缓存负载。

37、queued.max.requests 默认:500

阻塞网络线程之前允许排队的请求数。

38、replica.fetch.min.bytes默认:1

每个fetch响应所需的最小字节。如果字节不够,则等待replicaMaxWaitTimeMs。

39、replica.lag.time.max.ms 默认:10000

如果follower 没有发送任何获取请求,或者至少在这段时间没有消耗到leader日志的结束偏移量,那么leader将从isr中删除follower 。

40、transaction.max.timeout.ms 默认:900000

事务执行最长时间,超时则抛出异常。

41、unclean.leader.election.enable 默认:false

是否选举ISR以外的副本作为leader,会导致数据丢失。

42、zookeeper.connection.timeout.ms

客户端等待与zookeeper建立连接的最长时间。如果未设置,则用zookeeper.session.timeout中的值。

43、zookeeper.max.in.flight.requests

阻塞之前consumer将发送给Zookeeper的未确认请求的最大数量。

44、group.max.session.timeout.ms

注册使用者允许的最大会话超时。超时时间越长,消费者在心跳之间处理消息的时间就越多,而检测故障的时间就越长。

45、group.min.session.timeout.ms

注册使用者允许的最小会话超时。更短的超时导致更快的故障检测,但代价是更频繁的用户心跳,这可能会压倒broke资源。

46、num.partitions 默认:1

每个主题的默认日志分区数量。

目录
相关文章
|
8月前
|
消息中间件 缓存 Kafka
Kafka ProducerConfig和ConsumerConfig配置
Kafka ProducerConfig和ConsumerConfig配置
259 1
|
3月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
121 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
2月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
8月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
241 2
|
3月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
49 2
|
3月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
87 2
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
250 0
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
125 7
|
6月前
|
消息中间件 Kafka
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
79 1
|
6月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章