Kafka是什么

Kafka是一个开源的时间流处理平台,由Scala和Java编写,持久化本质是分布式事务日志架构的大规模发布订阅消息队列

为什么要用Kafka

优点:

  • 解耦
  • 异步
  • 流量削峰填谷

缺点:

  • 可用性降低
  • 复杂性提高
  • 一致性问题

Kafka能做什么

  • 消息系统:用户下单后将订单信息发送到Kafka主题中,各处理订单的微服务可以从Kafka中消费订单消息,进行库存检查、支付处理、订单确认等操作
    • 预警通知:当库存到达某个阈值时向Kafka发生预警信息,其他采购系统或管理人员系统订阅预警信息
  • 物流跟踪:将物流运输状态更新发送到Kafka,其他系统订阅状态获取物流信息
  • 日志收集
  • 流式处理

Kafka不能做什么

  • 限时订单

Kafka架构

在这里插入图片描述

  • Producer:消息生产者,向Kafka主题(Topic)发送消息(Message)的客户端应用程序或进程
  • Consumer:消息消费者,从kafka主题(Topic)读取消息的客户端应用程序或进程
  • ConsumerGroup:消费者组,由一个或多个消费者组成,共同消费一个或多个主题的消息,默认从最新偏移量开始消费,将auto.offset.reset设置为earliest从头开始消费
  • Broker:代理,Kafka集群中的一个服务器节点
  • Topic:主题,一类消息的逻辑分类
  • Partition:分区,为实现可扩展性和高吞吐量,一个主题可以被分成多个分区
  • Message:消息,生产者发送到Kafka主题和消费者从主题读取的数据单元
  • Partition:分区,一个Topic默认一个分区
  • Offset:偏移量,每个分区写入事件的序号,从0开始

Topic 主题

主题用于存储事件

  • 事件也称为消息、记录,如支付记录、地理位置、运输订单、遥测数据、日志

AMQP

Docker部署Kafka

以前需要用到zookeeper,现在可以直接用Kraft

一、命令部署Kafka

由于docker部署一般在开发环境,监听了所有端口
ADVERTISED_LISTENERS必须设置外部连接Kafka broker的地址,本机开发设置为localhost,服务器开发设置为宿主机ip地址

  • CONTROLLER:管理和协调集群的各种元数据操作以及分区(Partition)的分配等重要任务
  • 不同的协议监听的端口必须不同

  • LISTENER_SECURITY_PROTOCOL_MAP必须指定协议映射

  • PLAINTEXT:明文协议,使用未加密的TCP进行通信
  • INTERNAL:内部网络,可以使用服务发现机制
  • EXTERNAL:外部网络,指定可以从外部监听的地址
docker run -it -d \
-p 9092:9092 \
--name broker \
--network pub-network \
--restart always \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@broker:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
apache/kafka:latest

删除

docker rm -f broker

二、docker-compose部署Kafka

docker-compose.yml

version: "3"
services:
  broker:
    image: apache/kafka:latest
    container_name: broker
    ports:
      - "9092:9092"
    networks:
      - pub-network
    restart: always
    environment:
      # 设置 Kafka 节点的 ID 为 1。在 Kafka 集群中,每个节点都有一个唯一的 ID
      KAFKA_NODE_ID: 1
      # 指定这个 Kafka 节点的角色为 broker(消息代理)和 controller(控制器)。Controller 负责管理集群中的分区分配等任务
      KAFKA_PROCESS_ROLES: broker,controller
      # 设置 Kafka 的监听器。这里配置了两种监听器,一种是普通的 PLAINTEXT 监听器在 9092 端口,用于接收客户端的连接;另一种是 CONTROLLER 监听器在 9093 端口,用于控制器的通信
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      # 设置对外公布的监听器地址,这里表示客户端可以通过 “[localhost:9092](https://localhost:9092/)” 连接到这个 Kafka 节点,如果不配置外部无法连接到Kafka,服务器请设置为外网ip
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      # 指定控制器使用的监听器名称为 “CONTROLLER”
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      # 定义监听器的安全协议映射。这里表示 CONTROLLER 监听器使用 PLAINTEXT 协议,普通的 PLAINTEXT 监听器也使用 PLAINTEXT 协议
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 设置控制器的法定人数投票者,这里表示只有一个节点(ID 为 1)在 “[localhost:9093](https://localhost:9093/)” 作为投票者
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      # 设置 Kafka 的Topic偏移量(用于记录消费者的消费位置)的复制因子为 1,即只有一个副本,最小1,最大不能超过节点数
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      # 设置 Kafka 的事务状态日志的复制因子为 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      # 设置事务状态日志的最小同步副本数为 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      # 设置消费者组的初始重新平衡延迟为 0 毫秒
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      # 设置默认的主题分区数量为 3
      KAFKA_NUM_PARTITIONS: 3

networks:
  pub-network:
    driver: bridge

部署

docker-compose -f docker-compose.yml up

删除

docker compose down

三、配置文件部署Kafka

先启动一个容器

docker run --name broker -p 9092:9092 apache/kafka:latest

创建配置文件文件夹

mkdir -p /opt/kafka/docker

把配置文件复制到宿主机

docker cp 容器id:/etc/kafka/docker/server.properties /opt/kafka/docker

编辑配置文件,详见https://github.com/apache/kafka/blob/trunk/config/server.properties,注意取消以下注释,否者外部无法访问

#listeners=PLAINTEXT://:9092
# 允许所有ip访问
#advertised.listeners=PLAINTEXT://0.0.0.0:9092

使用命令或者docker compose指定挂载卷
-v etc/kafka/docker/server.properties

docker run -d \
-p 9092:9092 \
--name broker \
--network pub-network \
--restart always \
-v /opt/kafka/docker:mnt/shared/config \
apache/kafka:latest

K8s部署Kafka集群

k8s是有状态应用,需要Statefulset资源、

命令行使用Kafka

在代理中启动一个shell

docker exec --workdir /opt/kafka/bin/ -it broker sh

执行脚本创建一个测试主题test-topic

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic

执行脚本向test-topic写入事件

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic

输入事件内容,按Ctrl+C停止脚本结束生产

>hello
>world

执行脚本消费事件

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

可以看到输出内容为

hello
world

Ctrl+C停止脚本结束消费

exit退出容器

topic 脚本使用

在代理中启动一个shell

docker exec --workdir /opt/kafka/bin/ -it broker sh

1、不指定参数获取帮助信息

./kafka-topics.sh

2、创建topic

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic

3、列出所有Topic

./kafka-topics.sh --bootstrap-server localhost:9092 --list

4、显示Topic详细信息

./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic

5、修改Topic分区数

./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test-topic --partitions 5

5、删除主题

./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test-topic

producer 脚本使用

1、不指定参数获取帮助信息

./kafka-console-producer.sh

2、写入事件

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic

consumer 脚本使用

1、不指定参数获取帮助信息

./kafka-console-consumer.sh

2、从Topic中消费历史事件(不加–from-beginning读取最新的事件)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

IDEA 使用Kafka

安装插件Kafka

在这里插入图片描述

Vscode 使用Kafka

安装插件Tools for Apache Kafka®

在这里插入图片描述

配置

偏移量

  • earliest:自动将偏移量重置为最早偏移量
  • latest:自动将偏移量重置为最新偏移量
  • none:如果没有为消费者组找到以前的偏量,则向消费者抛出异常
  • exception:向消费者抛出异常

Replica 副本

为实现备份功能,保证某个节点发生故障时,该节点上的分区(Partition)数据不会丢失,且Kafka集群仍然能够继续工作

  • Leader:每个分区多个副本中的主副本,生产者发送数据和消费者消费数据都来自于Leader副本
  • Follower:每个分区多个副本中的从副本,实时从Leader副本中同步数据,保持和Leader副本数据的同步

Leader副本发生故障时,某个Follower副本会成为新的Leader副本,
设置副本数量不能为0,也不能大于节点个数,否则不能创建Topic

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-Topic --partitions 3 --replication-factor 1

Partion 分区策略

  • BuiltlnPartitioner:默认分配策略
    • 有key:
    • 无key:使用随机数
  • RoundRobinPartitionrt:轮询分配策略
  • 自定义分配策略:

生产者发送事件流程

  • 1、Producer:生产者
  • 2、ProducerInterceptors:拦截器
  • 3、Serializer:序列化器
  • 4、Partitioner:分区器

消费分区策略

  • RangeAssignor:默认,将所有分区按数字序列排列,将消费者按字典顺序排列,分区范围=分区数量/消费者数量
    • 优点:消费者处理能力均衡且分区数量稳定时,可以均匀分配分区
    • 缺点:消费者数量变化时可能导致分区重新分配不均衡
  • RoundRobinAssignor:轮询策略,依次遍历消费者,为每个消费者分配一个分区,直到分区被分配完
    • 优点:分配均衡
    • 缺点:消费者消费能力差异大时,消费能力差的消费者分配过多分区,从而影响整体性能
  • StickyAssignor:粘性策略,尽量保持分区分配的稳定性,尽可能均匀地分配分区,新消费者加入时尽量只将新地分区分配给它,保持现有分区分配不变
    • 优点:消费者数量变化时可以保持分区分配的稳定性,减少重新分配的开销
    • 缺点:实现复杂、消费者处理能力差时可能无法完全实现负载均衡
  • CooperativeStickyAssignor:合作粘性策略,在分配分区时不仅考虑StickyAssignor策略分区的稳定性,还考虑消费者之间的合作和负载均衡
    • 优点:适合消费者数量众多、处理能力差异大且动态变化的环境
    • 缺点:实现复杂,对于简单消费场景会带来不必要开销

事件存储

所有事件默认存储在/tmp/kafka-logs目录下(这里没有挂载需要进入容器查看),可通过log.dirs=/tmp/kafka-logs配置

默认有50个Topic分区:__consumer_offsets,保存了每个consumer group某一时刻提交的offset记录

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐