在 Kafka 的生产者设置压缩协议时,消费者不需要显式地设置压缩协议。消费者会自动解压生产者发送的压缩消息。因此,无论生产者使用的是 gzip、snappy、lz4 还是 zstd 压缩,消费者都会正确解压并处理消息。
为了进一步澄清这个问题,这里是一个完整的消费者示例,它能够正确处理任何压缩类型的消息,而无需额外配置压缩协议:
package main import ( "github.com/Shopify/sarama" "go.uber.org/zap" ) func main() { brokers := []string{"broker1:9092", "broker2:9092"} topic := "your_topic" // 创建 Sarama 配置 config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V2_1_0_0 // 使用 Kafka 版本 // 创建消费者 client, err := sarama.NewConsumer(brokers, config) if err != nil { zap.S().Fatal(err) } defer func() { if err := client.Close(); err != nil { zap.S().Errorf("client close error: %v", err) } }() // 获取分区列表 partitions, err := client.Partitions(topic) if err != nil { zap.S().Fatal(err) } // 选择偏移量 offset := sarama.OffsetOldest zap.S().Infof("start one offset: %d", offset) // 消费第一个分区 claim, err := client.ConsumePartition(topic, partitions[0], offset) if err != nil { zap.S().Fatal(err) } defer func() { zap.S().Infof("kafka is closing...") if err := claim.Close(); err != nil { zap.S().Errorf("partition consumer close error: %v", err) } }() zap.S().Infof("start received message...") // 消费消息 for message := range claim.Messages() { zap.S().Infof("Message received: %s", string(message.Value)) } }
在上述示例中:
配置和创建消费者:配置 Kafka 版本和返回错误信息。
获取分区并选择偏移量:消费指定分区的消息,从 OffsetOldest 开始。
处理消息:消费者会自动解压缩消息,无需额外的配置。
因此,无需在消费者代码中指定压缩协议。Kafka 的消费者会自动处理从生产者发送的任何压缩格式的消息。