点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
分区分配策略
Range、RoundRobin、Sticky
自定义分区策略实现
日志存储概述
- Kafka消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
- 每个主题又可以分为一个或多个分区
- 每个分区各自存在一个记录消息数据的日志文件
- 我们需要到 Kafka 的 Logs 目录下进行查看:
cd /opt/kafka-logs pwd ls
我这里的情况是:
有一些没展示全的,比如倒数的那几个,是Kafka中现在有的Topic:
假设我们创建了一个 demo_01 的主题,其存在 个Partition,每个Partition下存在一个[Topic-Partition]命名的消息日志文件。
在分区的日志文件中,可以查看到很多了类型的文件:比如 .index .timestamp .log .snapshot等等
其中文件名一致的合集就叫做:LogSement
假设我们创建了一个 demo_01 的主题,其存在 个Partition,每个Partition下存在一个[Topic-Partition]命名的消息日志文件。
在分区的日志文件中,可以查看到很多了类型的文件:比如 .index .timestamp .log .snapshot等等
LogSegment
分区日志文件中包含很多的LogSegment
Kafka日志追加是顺序写入的
LogSegment可以减小日志文件的大小
进行日志删除的时候和数据查找的时候可以快速定位
ActiveLogSegment是活跃的日志分段,读写权限,其余的LogSegment只有只读的权限
每个LogSegment都有一个基准偏移量,表示当前LogSegment中第一条消息的Offset。
偏移量是一个64位的长整型数,固定是20位数字,长度未达到,用0进行填补。
可见如下图:
我服务器上Kafka我的目录情况如下:
日志与索引
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系
时间戳索引文件则根据时间戳查找对应的偏移量。
Kafka中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每一个消息在索引文件中都有对应的索引项。
每当写入一定量的消息,偏移量索引文件和时间戳索引分别增加一个偏移量索引项和时间索引项。
通过修改 log.index.interval.bytes 的值,改变索引项的密度。
切分文件
当满足如下几个条件之一,就会触发切分:
当前日志分段文件的大小超过了Broker端的参数 log.segment.bytes 配置的值,默认是1GB
当前日志分段中消息的最大时间戳与当前系统的时间戳相差大于 log.roll.ms 或 log.roll.hour,ms的优先级高于hour,默认是hour,值为168 = 7天
偏移量索引文件或者时间戳索引文件的大小达到Broker参数log.index.size.max.bytes配置的值,默认是10MB。
追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE。即要追加的消息的偏移不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE
1024 * 1024 * 1024 = 1073741824
在偏移量索引文件中,每个索引项占用8个字节,并分为两部分。
相对偏移量和物理地址
相对偏移量:表示消息相对于基准偏移量的偏移量,占4个字节。
物理地址:消息在日志分段文件中对应的物理位置,占4个字节。
4个字节刚好对应:Integer.MAX_VALUE,如果大于 Integer.MAX_VALUE,则不能用4个字节进行表示了。
索引切分过程
索引文件会根据 log.index.size.max.bytes 值进行预分配空间,即文件创建的时候就是最大值,当真正的索引文件进行切分的时候,才会将其裁剪到实际数据大小的文件。
这一点是根日志文件有所区别的地方,其意义降低了代码逻辑的复杂性。
索引文件
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系,时间戳索引文件则根据时间戳查找对应的偏移量。
文件:查看一个topic分区目录下的内容,发现有Log,Index和Timeindex三个文件:
log文件名是以文件中第一条message的offset来命名的,实际offset长度是64位,但是这里只使用20位,应付生产是足够的。
一组index+log+timeindex文件的名字是一样的,并且log文件默认写满1G之后,会进行log rolling形成一个新的组合记录消息,这个通过Broker端log.segment.bytes=1073741824指定的。
index和timeindex在刚使用时会分配10M的大小,当进行log rolling后,它会修剪为实际的大小。
具体的列表如下: