当前kafka的版本为2.8.11,Spring Boot的版本为2.7.6,在pom.xml中引入下述依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.11</version>
</dependency>

提前说明:当前Kafka的使用是与Spring Boot做了整合,不是使用原生的Kafka,因此Kafka的某些功能Spring Boot是做了二次封装,使其更加符合于实际情况。 

1、Kafka客户端自动提交offset

Windosw环境下面使用下述两个命令重装Zookeeper和Kafka:

docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest
docker run  -d --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.15:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.15:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e TZ="Asia/Shanghai" wurstmeister/kafka:latest

已经提前规划好了当前要测试的消费者组为ONE,其消费的主题为topic0,使用下述命令来查看消费者组ONE的消费情况:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group ONE

由于还没有创建该消费者组,所以在执行上述命令时会报错:

Error: Consumer group 'ONE' does not exist.

在yml配置文件进行如下配置:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true
      auto-commit-interval: 6000ms
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

在项目中创建一个生产者用于往主题topic0中投递消息,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {

    // 自定义的主题名称
    public static final String TOPIC_NAME="topic0";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg")String msg) {
        log.info("准备发送消息为:{}",msg);
        // 1.发送消息
        ListenableFuture<SendResult<String,String>> future=kafkaTemplate.send(TOPIC_NAME,msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 2.发送失败的处理
                log.error("生产者 发送消息失败:"+throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, String> stringObjectSendResult) {
                // 3.发送成功的处理
                log.info("生产者 发送消息成功:"+stringObjectSendResult.toString());
            }
        });
        return "接口调用成功";
    }
}

接着再在项目中创建一个消费者用于消费主题topic0中的消息,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”
    public static final String TOPIC_NAME = "topic0";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者组One消费了消息:Topic:" + topic + ",Record:" + record + ",Message:" + msg);
        }
    }
}

启动整个项目,这时控制台中会打印下述信息:

ConsumerConfig values:
auto.commit.interval.ms = 6000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
client.id = consumer-ONE-1
enable.auto.commit = true
group.id = ONE
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

这个时候,我们使用下述命令来查看消费者组ONE的消费情况:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group ONE

可以看到初始时,消费者组ONE对于主题topic0的消息消费偏移量为0

调用 /kafka/send?msg=1 接口往主题topic0中生产1条消息,可以看到在控制台中该消息已经被消费了,如下所示:

消费者组One消费了消息:Topic:topic0,Record:ConsumerRecord(topic = topic0, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1702558156663, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1),Message:1

再次使用上述命令来查看消费者组ONE的消费情况,可以看到消费者组ONE对于主题topic0的消息消费偏移量为1,如下图所示:

2、设置自动提交时间间隔 

将yml文件中auto-commit-interval属性的值修改为60000ms,如下所示:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true
      auto-commit-interval: 60000ms
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

重启整个项目,然后调用 /kafka/send?msg=2 接口往主题topic0中生产1条消息。可以看到在控制台中日志打印消费者组ONE消费了消息以后,接着我们立马使用上述命令查看消费者组ONE的消费情况,我们会发现消费者组ONECURRENT-OFFSET(已经提交的消费位移偏移量)属性值并不会立刻更新。

大概在一分钟左右以后,我们才能看到该属性值发生了改变,如下图所示:

3、Spring Boot自动提交offset

在yml文件中关于enable-auto-commit和auto-commit-interval的配置全部移除,重启整个项目,这时控制台中会打印下述信息:

ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
client.id = consumer-ONE-1
enable.auto.commit = false
group.id = ONE
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

调用/kafka/send?msg=3接口往主题topic0中生产1条消息,当看到在控制台中日志打印消费者组ONE消费了消息以后,接着使用上述命令查看消费者组ONE的消费情况,我们会发现消费者组ONECURRENT-OFFSET属性值在几秒以内(已经提交的消费位移偏移量)就更新了。

虽然在项目启动过程中看到消费者配置信息中的enable.auto.commitfalse,即没有使用自动提交偏移量,但是Spring在当一个消息被某一个消费者消费了以后,它会自动帮我们进行人工提交,提交已消费消息的偏移量。

如果enable.auto.commit的设置为false,但是项目中最终真的没有进行人工提交offset,那么这个可能是一个隐藏的bug,会导致每次重启服务时,之前已经消费过的消息会被重复进行消费。

Spring的开发者早已料到这一情况,因此做了自动提交offset的封装处理,从而防止新手忘记手动提交offset,从另外一个方面来看这个处理也减轻了开发的复杂度。

4、Spring Boot中实现手动提交偏移量

为了实现在Spring Boot中手动提交偏移量,首先配置文件修改为如下所示:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      ack-mode: manual

上述yml文件中需要重点关注的是 spring.kafka.consumer.enable-auto-commit 和 spring.kafka.listener.ack-mode 这两个配置。

重启整个项目,然后调用 /kafka/send?msg=4 接口往主题topic0中生产1条消息,使用上述命令查看消费者组ONE的消费情况,我们会发现消费者组ONECURRENT-OFFSET(已经提交的消费位移偏移量)属性值不会更新,会一直是旧的偏移量。

再次重启项目以后后,在消费者连接到Kafka服务端以后,会出现前面已经消费过的消息会被重复消费。出现这个问题的原因是在消息被消费了以后,kafka客户端没有自动提交其消息消费的偏移量,由于yml文件中的配置Spring这个时候也没有自动帮我们提交。

那么这个时候就需要在消息被消费以后在代码中手动提交偏移量,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”
    public static final String TOPIC_NAME = "topic0";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment acknowledgment) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者组One消费了消息:Topic:" + topic + ",Record:" + record + ",Message:" + msg);
            // 手动提交offset(偏移量)
            acknowledgment.acknowledge();
        }
    }
}

重点在于 acknowledgment.acknowledge(); 这行代码,有了它我们就可以在代码中实现手动提交偏移量。重启服务后,当消息者再次消费了消息以后,再次查看消费者组ONE的消费情况,可以看到CURRENT-OFFSET(已经提交的消费位移偏移量)属性值已经更新为最新值。

5、总结

  • enable-auto-commit的值为true时,是采用kafka的默认提交模式,表示消费者组对于某个主题消息的消费偏移量将在后台交由Kafka客户端定期进行提交。只有设置了enable-auto-commit为true时,auto-commit-interval才会生效,它表示消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
  • Kafka与Spring Boot整合以后,非特殊情况下没必要设置enable-auto-commit和auto-commit-interval这两个属性,自动提交偏移交给Spring管理就行。
  • Spring Boot中开启kafka的手动提交消费偏移量,这个操作不是很建议,一旦你忘记在业务代码手动提交偏移量,那么将是一个新的故事。
  • 原生的kafka使用,细节点很多,其使用细节和上面这些案例可能有点不一样,需要特别注意。
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐