点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
物理存储 日志存储概述
LogSegment
日志切分文件
索引切分过程
索引文件等等
索引文件
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系,时间戳索引文件则根据时间戳查找对应的偏移量。
文件:查看一个topic分区目录下的内容,发现有Log,Index和Timeindex三个文件:
log文件名是以文件中第一条message的offset来命名的,实际offset长度是64位,但是这里只使用20位,应付生产是足够的。
一组index+log+timeindex文件的名字是一样的,并且log文件默认写满1G之后,会进行log rolling形成一个新的组合记录消息,这个通过Broker端log.segment.bytes=1073741824指定的。
index和timeindex在刚使用时会分配10M的大小,当进行log rolling后,它会修剪为实际的大小。
index 和 timeindex 内容如下:
创建主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_test_demo_05 --partitions 1 --replication-factor 1 --config segment.bytes=104857600
执行结果如下图:
创建消息
for i in `seq 10000000`; do echo "hello kangkang $i" >> test_data.txt; done
生产消息
kafka-console-producer.sh --broker-list h121.wzk.icu:9092 --topic wzk_test_demo_05 < test_data.txt • 1
运行结果如下图:
查看存储
cd /opt/kafka-logs cd wzk_test_demo_05-0 ll
运行结果如下图:
查看详细
如果想查看这些文件,可以使用Kafka提供的Shell来完成,几个关键信息如下:
Offset 是主键增加的整数,每个offset对应一个消息的偏移量
Position:消息批字节数,用于计算物理地址
CreateTime:时间戳
Magic:2代表这个消息类型是V2,如果是0则代表是V0,1代表V1类型。
Compresscodec:None说明没有指定压缩类型,Kafka目前提供了4种可选择:0-Node、1-GZIP、2-Snappy、3-lz4。
- crc:对所有字段进行校验后的crc值
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-lo
执行结果如下图:
消息偏移
消息存储
- 消息内容保存在log日志文件中
- 消息封装为Record,追加到log日志文件末尾,采用的是顺序写模式。
- 一个topic的不同分区,可认为是queue,顺序写入接受到的消息
消费者有Offset,下图中,消费者A消费的Offset是9,消费者B消费的Offset是11,不同的消费者Offset是交给一个内部公共topic来记录的。
时间戳索引文件,它的作用是可以让用户查询某个时间段的内的消息,它一条数据的结构是时间戳(8 byte) + 相对Offset(4 byte)。如果要使用这个索引文件,首先需要通过时间范围,找到相对Offset,然后再去对应的Index文件中找到Position信息,然后才能遍历log文件,它也需要使用上面说的Index文件的。
但是Producer生产消息可以指定消息的时间戳,这可能将导致消息的时间戳不一定有先后顺序,因为尽量不要生产消息时指定时间戳。
偏移量索引
位置索引保存在Inde文件中
log日志默认每写入4K(log.index.interval.bytes设定的),会写入一条索引信息到index文件中,因此索引文件是稀疏索引,它不会为每条日志都建立索引信息。
log文件中的日志,是顺序写入的,由Message+实际Offset + Position组成
索引文件的数据结构则是由相对Offset(4 byte) + Position(4 byte)组成,由于保存的是相对于第一个消息的相对Offset,只需要 4 byte就可以了,可以节省空间,在实际查找后还需要计算回实际的Offset,这对用户是透明的。
稀疏索引的密度不高,但是Offset有序,二分查找的时间复杂度为O(LogN),如果从头遍历时间复杂度是O(N)如下图:
可以通过下面的命令解析 .index 文件:
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data
注意:Offset 与 Position 没有直接关系,因为会删除数据和清理日志
注意:Offset 与 Position 没有直接关系,因为会删除数据和清理日志
注意:Offset 与 Position 没有直接关系,因为会删除数据和清理日志
执行结果如下图所示:
在偏移量索引文件索引中,索引数据都是顺序记录Offset,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在Kafka 0.11.0.0以后,消息元数据中存在若干的时间戳信息。
如果Broker端参数 log.message.timestamp.type 设置为 LogAppendTime,那么时间戳必定能保持单调增长。反之如果是CreateTime则无法保证顺序。
注意:timestamp文件中的Offset与Index文件中的relativeOffset不是一一对应的,因为数据的写入是各自追加的
注意:timestamp文件中的Offset与Index文件中的relativeOffset不是一一对应的,因为数据的写入是各自追加的
注意:timestamp文件中的Offset与Index文件中的relativeOffset不是一一对应的,因为数据的写入是各自追加的
思考: 如何查看偏移量为23的消息?
Kafka中存在一个 ConcurrentSkipListMap来保存在每个日志分段中,通过跳跃表方式,定位到00000000000000000000.index,通过二分法在偏移量索引文件中找到不大于23的最大索引项,即Offset 20那栏,然后从日志分段文件中的物理位置为320开始顺序查找偏移量为23的消息。
时间戳
在偏移量索引文件中,索引数据都是顺序记录Offset,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。
在Kafka 0.11.0.0以后,消息信息中存在若干的时间戳消息。如果Broker端参数log.message.timestamp.type设置为LogAppendTime,那么时间戳必定能保持单调增长。反之如果是CreateTime则无法保证顺序。
通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引格式:前八个字节表示时间戳,后四个字节表示偏移量。
思考: 查找指定时间戳开始的消息?
假设某个时间戳为A
查找时间戳A应该在哪个日志分段中,将A和每个日志分段中最大时间戳LargestTimestamp逐一对比,直到找到不小于A所对应的日志分段。
日志分段中的LargestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。
查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。
日志文件中从320的物理位置开始查找小于A的数据。