Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

简介: Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

20191116123525638.png

官方说明


https://kafka.apache.org/documentation/

选择对应的版本,我这里选的是 2.4.X

https://kafka.apache.org/24/documentation.html

选择


20210223002842611.png


https://kafka.apache.org/24/documentation.html#consumerconfigs


查找 auto.offset.reset



20210223002923314.png


让我们来品一品官方的解读


参数解读


What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)


no initial offset || current offset does not exist


no initial offset


举个例子 在消费组ConsumerGroupA里有个消费者A1, 已经消费到了100条数据, 这个时候你又新起了一个消费者, 但是呢这个新起的消费者的消费组和消费组A的名称不同,我们暂且称之为消费组ConsumerGroupB.

根据kafka的机制, 这个新起的消费组中的消费者再消费分区数据的时候,auto.offset.reset参数就起作用了


current offset does not exist


我们知道kafka提供了API可以按照消费offset记录继续消费,如果指定的offset不存在,那么 这个参数也会生效


earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer’s group

anything else: throw exception to the consumer.



啥意思?。。。。。。。


当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费


latest(默认) :只消费自己启动之后发送到主题的消息

earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)


Code


20210223164651466.png


POM依赖

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 引入 Spring-Kafka 依赖 -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>


配置文件

20210223164836742.png


生产者

 package com.artisan.springkafka.producer;
import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:25
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanProducerMock {
    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate ;
    /**
     * 同步发送
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public SendResult sendMsgSync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"artisanTestMessage-" + id);
        // 同步等待
       return  kafkaTemplate.send(TOPIC.TOPIC, messageMock).get();
    }
    public ListenableFuture<SendResult<Object, Object>> sendMsgASync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
        // 异步发送消息
        ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);
        return result ;
    }
}


消费者

 package com.artisan.springkafka.consumer;
import com.artisan.springkafka.domain.MessageMock;
import com.artisan.springkafka.constants.TOPIC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:33
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanCosumerMock {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final String CONSUMER_GROUP_PREFIX = "ConsumerGroup-" ;
    /**
     * 消费者的groupId ,每次启动都通过SPEL 随机一个出来,确保每次都是一个新的消费组 用于测试 auto.offset.reset 参数 设置为 earliest的情况
     * @param messageMock
     */
    @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC +  "-" + "#{T(java.util.UUID).randomUUID()})")
    public void onMessage(MessageMock messageMock){
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }
}


看注释


单元测试

 package com.artisan.springkafka.produceTest;
import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:40
 * @mark: show me the code , change the world
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private ArtisanProducerMock artisanProducerMock;
    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        SendResult sendResult = artisanProducerMock.sendMsgSync();
        logger.info("testSyncSend Result =  topic:[{}] , partition:[{}], offset:[{}]",
                sendResult.getRecordMetadata().topic(),
                sendResult.getRecordMetadata().partition(),
                sendResult.getRecordMetadata().offset());
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
    @Test
    public void testAsynSend() throws ExecutionException, InterruptedException {
            artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    logger.info(" 发送异常{}]]", throwable);
                }
                @Override
                public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                    logger.info("回调结果 Result =  topic:[{}] , partition:[{}], offset:[{}]",
                            objectObjectSendResult.getRecordMetadata().topic(),
                            objectObjectSendResult.getRecordMetadata().partition(),
                            objectObjectSendResult.getRecordMetadata().offset());
                }
            });
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
}


测试

看结果之前,先看看当前topic中的数据


image.png

20210223165332129.png


earliest

earliest 按照预期,新起了一个ConsumerGroup , 肯定会从头消费 ,让我们来验证下

2021-02-23 16:54:38.014  INFO 5684 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[51]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage9'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=68, name='artisanTestMessage-68'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=48, name='artisanTestMessage-48'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=81, name='artisanTestMessage-81'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='artisanTestMessage-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=23, name='artisanTestMessage-23'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=37, name='messageSendByAsync-37'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=16, name='messageSendByAsync-16'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='messageSendByAsync-79'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=19, name='messageSendByAsync-19'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=42, name='messageSendByAsync-42'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=70, name='messageSendByAsync-70'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=84, name='messageSendByAsync-84'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=56, name='messageSendByAsync-56'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=82, name='messageSendByAsync-82'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='messageSendByAsync-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=94, name='messageSendByAsync-94'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=20, name='messageSendByAsync-20'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=52, name='messageSendByAsync-52'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=32, name='messageSendByAsync-32'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=45, name='messageSendByAsync-45'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=18, name='messageSendByAsync-18'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=53, name='messageSendByAsync-53'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=46, name='messageSendByAsync-46'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=33, name='messageSendByAsync-33'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=83, name='artisanTestMessage-83'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=61, name='artisanTestMessage-61'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.075  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=43, name='artisanTestMessage-43'}]


latest(默认)

2021-02-23 16:55:21.541  INFO 21472 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[52]
2021-02-23 16:55:21.587  INFO 21472 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]


none

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)-1, groupId=ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [MOCK_TOPIC-0]
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
  at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) [kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) [kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) [spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) [spring-kafka-2.6.4.jar:2.6.4]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : No offset and no reset policy
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
  at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) ~[kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.6.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) ~[spring-kafka-2.6.4.jar:2.6.4]
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) ~[spring-kafka-2.6.4.jar:2.6.4]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Fatal consumer exception; stopping container
2021-02-23 16:55:44.730  INFO 19956 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-02-23 16:55:44.749  INFO 19956 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[53]

exception


20210223165718384.png


后两种设置,看看就行

懂了么,老兄 ~


源码地址

https://github.com/yangshangwei/boot2/tree/master/springkafka_auto.offset.reset


相关文章
|
8月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
785 4
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
1144 7
|
10月前
|
消息中间件 存储 监控
Apache Kafka 3.0与KRaft模式的革新解读
在该架构中,Kafka集群依旧包含多个broker节点,但已不再依赖ZooKeeper集群。被选中的Kafka集群Controller将从KRaft Quorum中加载其状态,并在必要时通知其他Broker节点关于元数据的变更。这种设计支持更多分区与快速Controller切换,并有效避免了因数据不一致导致的问题。
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
842 5
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
797 1
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
313 0
|
7月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1341 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
607 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
9月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
1102 9
Apache Flink:从实时数据分析到实时AI

推荐镜像

更多