SpringBoot:整合Kafka
虽然Kafka可作为消息组件使用,但它并不是单纯的消息组件,它被定位成“开源的分布式事件流平台(open-source distributed event strea ming platform)”,因此它和JMS实现或AMQP实现存在较大的差异。由于目前Kafka还依赖于ZooKeeper,因此在安装Kafka之前需要先安装,运行 ZooKeeper。
1,基本概念
1.1,Kafka和RabbitMQ
Kafka是一个高吞吐量、分布式、可持久化的消息传递系统,主要用于处理大量的实时数据流。它的设计目标是提供一种高效的方式来处理和传递数据,支持多个消费者同时订阅同一主题,并且能够保证数据的持久性和可靠性。Kafka使用基于磁盘的存储,可以在集群中快速地处理大量数据,并且支持水平扩展。
RabbitMQ是一个开源的消息传递系统,它实现了AMQP(高级消息队列协议)标准,支持多种编程语言和操作系统。RabbitMQ的设计目标是提供一种可靠的、可扩展的、灵活的消息传递机制,支持多个消费者同时订阅同一主题,并且能够保证消息的可靠性和顺序性。RabbitMQ使用基于内存的存储,可以在集群中快速地处理大量数据,并且支持垂直和水平扩展。
总的来说,Kafka适用于需要处理大量实时数据的场景,而RabbitMQ适用于需要可靠消息传递的场景。选择哪个系统取决于具体的业务需求和技术架构。
对比项 kafka rabbitmq 开发语言 scala,Java erlang 是否支持多租户 2.x.x支持多租户 支持多租户 是否支持topic优先级 不支持 支持 是否支持消息全局有序 不支持 支持 是否支持消息分区有序 支持 支持 是否内置监控 无内置监控 内置监控 是否支持多个生产者 一个topic支持多个生产者 是否支持多个消费者 一个topic支持多个消费者(一个消费者可消费多个分区,一个分区可被多个消费组消费,但同一消费组内仅能有一个消费者同时消费1个分区) 否支持一个分区多个消费者 不支持 不支持 是否支持JMX 支持 不支持(非java语言编写) 是否支持加密 支持 支持 消息队列协议支持 仅支持自定义协议 支持AMQP、MQTT、STOMP协议 客户端语言支持 支持多语言客户端 支持多语言客户端 是否支持消息追踪 不支持消息追踪 支持消息追踪 是否支持消费者推模式 不支持消费者推模式 支持消费者推模式 是否支持消费者拉模式 支持消费者拉模式 支持消费者拉模式 是否支持广播消息 支持广播消息 支持广播消息 是否支持消息回溯 支持消息回溯,因为消息持久化,消息被消费后会记录offset和timstamp 不支持,消息确认被消费后,会被删除 是否支持消息数据持久化 支持消息数据持久 支持消息数据持久 是否支持消息堆积 支持消息堆积,并批量持久化到磁盘 支持阈值内的消息对接,无法支持较大的消息堆积 是否支持流量控制 支持控制用户和客户端流量 支持生产者的流量控制 是否支持事务性消息 支持 不支持 元数据管理 通过zookeeper进行管理 支持消息数据持久 默认服务端口 9092 5672 默认监控端口 kafka web console 9000;kafka manager 9000; 15672 网络开销 相对较小 相对较大 内存消耗 相对较小 相对较大 cpu消耗 相对较大 相对较小
1.2,ZooKeeper下载安装
虽然Kafka可作为消息组件使用,但它并不是单纯的消息组件,它被定位成“开源的分布式事件流平台(open-source distributed event strea ming platform)”,因此它和JMS实现或AMQP实现存在较大的差异。由于目前Kafka还依赖于ZooKeeper,因此在安装Kafka之前需要先安装,运行 ZooKeeper。ZooKeeper的本质就是用一个类似于“文件系统”的属性结构来管理节点,并提供了对节点的监控、通知机制,因此它常用于管理分布式应用的多个节点。
(1)登录官网(地址)下载ZooKeeper最新发行版压缩包。
(2)将 apache-zookeeper-3.8.0-bin.tar 解压到硬盘
这个压缩包是一个Linux文件系统的压缩包,有些解压缩软件会解压缩失败。
(3)将 ZooKeeper目录下的conf目录下的zoo_sample.cfg文件复制一份,重名为zoo.cfg。打开zoo.cfg文件,找到其中的如下配置行:
dataDir=/tmp/zookeeper
上面配置指定了ZooKeeper的数据存储目录,这是一个Linux风格的路径,但由于该目录只是一个临时目录,因此即使在Linux平台上也建议修改该目录。在Windows平台上自然也应该修改该目录,将上面这行配置改为如下形式:
dataDir=../zookeeper-data
上面配置意味着使用 ZooKeeper 解压缩路径下的 zookeeper-data 子 目录作为数据存储目录,ZooKeeper启动时会自动创建zookeeper-data子目录。
admin.serverPort=9999
上面配置表示ZooKeeper admin启动默认端口是8080,需要修改为其他端口。
(4)设置环境变量
- JAVA_HOME:ZooKeeper需要Java环境,因此使用该环境变量 指定JDK的安装路径。
- PATH:E:\apache\apache-zookeeper-3.8.0-bin\bin
(5)启动ZooKeeper(cmd管理员运行),然后可以使用 zkCli.cmd命令查看节点状态。
zkServer.cmd
zkCli.cmd [zk: localhost:2181(CONNECTED) 0]
zkCli.cmd就是ZooKeeper提供的命令行客户端工具,可用于查看,添加,删除,修改节点等,可通过help子命令来获取该工具的帮助信息。
1.3,Kafka下载安装
(1)登录官网(地址),下载Kafka二进制下载包(3.0+版本在Win下可能有BUG,推荐2.0系列)。
由于Kafka是用Scala(一种JVM语言,编译结果依然运行在Java平台上)开发的,因此Kafka提供了使用不同版本的Scala编译器所编译得到的二进制包,但对于Java用户来说,它们都差不多,无论下载哪个都行。
(2)将压缩包压缩到硬盘
(3)配置环境变量
- JAVA_HOME:Kafka需要Java环境,因此通过该环境变量指定JDK的安装路径。
- PATH:E:\apache\kafka_2.13-2.8.2\bin\windows
(4)修改配置文件 E:\apache\kafka_2.13-2.8.2\config\server.properties
log.dirs=/tmp/kafka-logs 修改为 log.dirs=E:/apache/kafka_2.13-2.8.2/kafka-data
(5)启动Kafka服务器(先启动ZooKeeper服务器)
kafka-server-start.bat E:\apache\kafka_2.13-2.8.2\config\server.properties
该命令的参数需要指定启动Kafka服务器所使用的配置文件。
(6)如果要配置Kafka集群,也就是启动多个Kafka节点:则应该将 config 子目录下的 server.properties 文件复制两份,重命名为 server-1.properties 和server-2.properties,并将它们分别修改如下。
config/server-1.properties #指定该节点的唯一标识 broker.id=1 #指定该节点的监听端口 listeners=PLAINTEXT://:9093 #指定该节点的数据存储目录 log.dir=E:/apache/kafka_2.13-2.8.2/kafka-data-1 config/server-2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dir=E:/apache/kafka_2.13-2.8.2/kafka-data-2
kafka-server-start.bat E:\apache\kafka_2.13-2.8.2\config\server-1.properties kafka-server-start.bat E:\apache\kafka_2.13-2.8.2\config\server-2.properties
由于本例是在同一台主机上运行多个Kafka节点,因此将3个节点的监听端口分别设为9092 (默认端口),9093(第2个节点)和9094(第3 个节点)。如果在不同主机上分别启动不同的Kafka节点,则可以让它们都使用9092默认端口。
如果 Kafka 节点与 ZooKeeper 不在同一台主机上,则需要修改 Kafka 的 config 目录下的server.properties文件中的如下一行:
zookeeper.connect=localhost:2181
1.4,CMAK安装
Kafka 本身并没有提供Web 管理工具,而是推荐使用bin目录下的各种工具命令来管理Kafka,这些工具 命令其实用起来也不错,只不过对初学者不太友好,而且不够直观。 因此这里安装Kafka的Web管理工具:CMAK。
(1)登录官网(地址)下载CMAK,不要下载源代码压缩包。
(2)解压文件夹并重命名cmak(不重命名会报错,并且不能在深层次目录)
(3)修改配置文件 E:\apache\cmak\conf\application.conf
cmak.zkhosts="kafka-manager-zookeeper:2181" 修改为: cmak.zkhosts="localhost:2181"
(4)设置环境变量
- JAVA_HOME
- PATH:E:\cmak\bin
1.5,添加Cluster
运行cmak命令访问 http://localhost:9000
只需设置这3项,其他项保持默认即可,然后将该页面拖到最下方,单击“Save”按钮完成添加。返回CMAK管理页面,可以看到集群列表。
单击“MyKafka”(刚刚添加的集群)链接,CMAK显示:
点击Brokers:
单击任一Broker的id链接,即可查看该Broker的详细信息,
当前Broker上有1个主题,50个作为“领导者副本(Leader Replica)”的分区。页面拖到下方,可以看到:
- 主题:Kafka默认就有一个名为“__consumer_offsets”的主题,该主题是Kafka自动创建的内部主题:位移主题,用于保存Kafka内部的位移信息。一般不建议手动管理它,让系统自己管理它是最好的。位移主题的消息格式也是Kafka自定义的,用户不能修改。 换言之,开发者不应该向这个主题写入消息;否则,一旦写入的消息格式不满足Kafka的定义,就会导致Broker崩溃。
- 复制因子:它控制主题内各分区的副本数量,复制因子为 1,表明每个分区的副本数量为1—也就是每个分区都仅有一个副本。 复制因子必须大于或等于1,这意味着每个分区至少要有一个副本。Kafka的副本机制(也可称为备份机制)仅提供数据冗余功能,比 如将某个主题的复制因子设为3,这意味着该主题内每个分区都有3个副本,这3个副本中有一个是“领导者副本”,该领导者副本负责与客户端交互(接受客户端读/写操作),另外两个是“追随者副本(Follower Replica)”,追随者副本完全不能与客户端交互,追随者副本仅仅是作为领导者副本的后备(自动与领导者副本的数据保持同步,但会有一 定的滞后性):当领导者副本挂掉之后,Kafka会从追随者副本中重新选一个作为新的领导者副本。可见,Kafka的副本机制不具备性能提升功能,仅仅提供了数据备份功能。
- Broker分区:当前Broker上的50个分区的编号,这说明Kafka会将主题内的分区“分摊”到不同节点上,从而让不同节点能并行地向客户端提供服务,因此增加节点可以明显地提高Kafka的响应速度。
- 领导者分区:当前Broker上包含50个领导者分区,这说明该Broker上的所有分区都是领导者分区,这是理所当然的事情。由于该主题的复制因子为1,这意味着每个分区仅有一个副本,因此该副本只能是领导者分区,只不过它没有追随者分区。如果该Broker挂掉,这50个分区将全部不能对外提供服务,因为它们都没有后备(追随者分区)。
点击 Topics:
可以看到每个主题所包含的分区分布在几个Broker(节点)上分布率达到多少。从提升性能的角度来看,肯定是分布率越高越好,分布率越高就意味着有更多的节点可以一起为该主题对外提供服务。
如果想对主题所包含的分区进行重新分布,可通页面上的“Generate Partition Assignments”和“Run Partition Assignments”按钮进行操作,通过“Add Partitions”按钮为指定主题添加分区。
1.6,主题和分区
Kafka的主题虽然也叫Topic,但它和Pub-Sub消息模型中的Topic不同,和AMQP的Topic也不同(AMQP的Topic只是Exchange的类型)。
Kafka 的主题只是盛装消息的逻辑容器(注意是逻辑容器),主题之下会分为若干个分区,分区才是盛装消息的物理容器。简而言之, 消息存在于分区中,一个或多个分区组成主题。因此Kafka的消息组织方式实际上是三级结构:主题 → 分区 → 消息。
- 主题只是消息的逻辑分类,它是发布消息的类别或消费源的名称。
- 分区才是真正存储消息的地方,分区在物理存储层面就是一个个日志文件。分区文件的结构有点类似于不可变的 List 集合,只不过 List 集合存储在内存中,而分区文件则持久化地存储在磁盘上。
Kafka提供了kafka-topics.bat工具命令来操作主题,包括创建主题,删除主题,列出主题,查看主题等。
【创建主题】
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1
- --create:指定创建主题。
- --bootstrap-server:指定Kafka Broker的地址。
- --replication-factor:指定复制因子。分区数目为:
- --partitions:指定该主题包含的分区数量。此处指定的分区数量为3,表明该主题由3个分区组成。
- --topic:指定主题名。
复制因子的值必须小于或等于Kafka Broker(节点)的数量。因此,如果只启动了一个Kafka Broker,那么就没法创建复制因子为2的主题了。
当然,也可在CMAK管理界面中通过“Topic”下拉菜单中的“Create” 菜单项来创建主题:
【查看主题】
kafka-topics.bat --list --bootstrap-server localhost:9092
如果希望查看指定主题的具体信息,则可使用 kafka-topics.bat 的 --describe 选项。
kafka-topics.bat --describe --topic test1 --bootstrap-server localhost:9092
Topic: test1 TopicId: 2Qoj7ud8RX28JF0H2ENJ6w PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
- 第1行是该主题的概要信息,包括分区数量,复制因子及其他额外的配置信息。
- 第2行显示第1个分区的信息,其中“Leader:0”说明该分区的领导者副本位于id为0的Broker上,“Replicas:0”说明该分区的1个分别存储在id为0的两个Broker上, “Isr:0”说明该分区的ISR副本也存储在id为0的两个Broker上。
ISR副本:Kafka认为与领导者副本的数据同步的副本。根据该定义可以看出,领导者副本天然就是 ISR 副本,毕竟它自己与自己肯定是同步的。甚至在某些情况下,ISR中只有领导者一个副本。
此处领导者副本中已有10条消息,而追随者副本A中当前只有4条消息,追随者副本B中当前有6条消息。很明显这两个追随者副本都有一定的滞后性,那这两个副本哪个更符合ISR标准 呢?这两个副本都有可能符合ISR标准,也都有可能不符合ISR标准,甚至有可能副本A符合ISR标准,而副本B不符合ISR标准。判断一个副本是否符合ISR标准,取决于server.properties文件中的replica.lag.time.max.ms配置参数,该参数的默认值为30000(即30s),Kafka建议将该参数配置为10~30s。
replica.lag.time.max.ms参数的含义是允许追随者副本滞后于领导者副本的最长时间,比如将该参数设置为 15s,则意味着只要一个追随者副本滞后于领导者副本的时间不连续超过 15s,Kafka就认为该追随者副本符合 ISR 标准,即使该追随者副本中保存的消息明显少于领导者副本中的消息也没关系。
由于追随者副本总在不断地尽力从领导者副本中拉取消息,然后写入自己的日志中,如果这个同步速度持续慢于领导者副本的消息写入速度,那么在达到 replica.lag.time.max.ms 时间后,该追随者副本就会被移出ISR集合;如果该追随者副本后来又慢慢地追上了领导者副 本的进度,那么它是能够被重新加入ISR集合的,因此ISR副本也是一 个动态调整的集合。 Kafka将所有不符合ISR标准的副本称为非同步副本。通常而言,非同步副本滞后于领导者副本太多,当领导者副本挂掉时,非同步副本不适合被选举为领导者副本,否则会造成数据丢失,这也是Kafka的默认设置。 但是,如果剥夺了非同步副本被选举为领导者副本的资格,则势必会造成可用性降低。比如将复制因子设为4,这意味着一个分区有1个领导者副本和3个追随者副本,当领导者副本挂掉时,有可能这 3 个追随者副本都不符合 ISR 标准,那么就没法选出新的领导者副本,这个分区也就不可用了。 因此,在允许一定数据丢失的场景中,也可开启“Unclean 领导者选举”,也就是允许选举非同步副本作为领导者副本—只要将server.pro perties文件中的unclean.leader.election.enable参数设为true即可。开启“Unclean领导者选举”可以提高Kafka的可用性,但可能会造成数据丢失。
1.7,其他操作
当发送消息的主题不存在且希望Kafka能自动创建主题时,可以在config/server.properties文件中增加如下配置:
#允许自动创建主题 auto.create.topics.enable=true #默认复制因子1 default.replication.factor=1 #设置各主题的默认分区数量 num.partitions=2
如果要修改主题的分区数量,复制因子等,则可使用kafka-topics.bat 命令的--alter选项。
kafka-topics.bat --alter --bootstrap-server localhost:9092 --partitions 4 --topic test1
如果要删除主题,则可使用kafka-topics.bat命令的--delete选项。删除该主题后,该主题所包含的全部分区被删除,该主题下的所有消息也被删除了。
kafka-topics.bat --delete --bootstrap-server localhost:9092 --topic test1
Windows环境下kafka删除主题需要同时删除Zookeeper下的Topic
- 查找Topic列表:ls /brokers/topics
- 删除topic:deleteall /brokers/topics/[topic name]命令进行删除。
- 重启zookeeper、重启kafka。
2,消费者与生产者
2.1,消息生产者
消息就是Kafka所记录的数据节点,消息在Kafka中又被称为记录(record)或事件(event),从存储上看,消息就是存储在分区文件(有点类似于 List 集合)中的一个数据项,消息具有key,value,时间戳和可选的元数据头。
key:"name". value:"燕双嘤". timestamp:"Feb.15,2022 at 2:06 p.m.".
消息生产者向消息主题发送消息,这些消息将会被分发到该主题下的分区(此处说的是领导者分区)中保存,主题下的每条消息只会被保存在一个领导者分区中,而不会在多个领导者分区中保存多份。
分区的主要目的就是实现负载均衡,可以将同一个主题的不同分 区放在不同的节点上,因此对消息的读/写操作也都是针对分区这个粒度进行的。所以,每个节点都能独立地处理各自分区的读/写请求,通过添加新节点即可很方便地提高Kafka的吞吐量。
当消息生产者发送一条消息时,它会按如下方式来决定该消息被分发到哪个分区:
- 如果在发送消息时指定了分区,则消息被分发到指定的分区。
- 如果在发送消息时没有指定分区,但消息的key不为空,则基于key的hashCode来选择一个分区。
- 如果既没有指定分区,且消息的key也为空,则用round-robin(轮询)策略来选择一个分区。
【发送消息】Kafka提供了kafka-console-producer.bat(.sh)命令来发送消息
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic Test --property parse.key=true
该命令指定向 Test 主题发送消息,并通过“parse.key=true”指定在发送消息时会解析消息的key,默认的解析规则为:制表符(Tab键)之前的是key,制表符(Tab键)之后的是value。如果不指定“parse.key=true”属性,则默认不解析消息的key,也就是发送不带key的消息。 运行该命令之后,将会打开一个以大于符号(>)开头的交互界面,通过该界面可不断地向Test主题发送消息。
>a 1 >b 2 >c 3
注意上面 3 条消息之间都要用制表符分隔,制表符前面的是消息 key,制表符后面的是消息value。
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic Test
>4 >5 >6
上面命令向Test主题发送了6条消息,其中前3条消息具有相同的key,因此它们会进入同一个分区中;后3条消息则没有key,Kafka将使用轮询机制将它们“分摊”到该主题下的所有分区中。
2.2,消息消费者
消费者用于从消息主题读取消息。我们先看一下Kafka提供的kafkaconsole-consumer.bat命令,该命令可作为命令行的消费者从指定主题甚 至指定分区读取消息。
- --bootstrap-server:指定要连接的Kafka主机和端口。
- --from-beginning:指定从开始处读取消息。
- --group:指定组ID。
- --offset﹤String:consume offset﹥:指定从特定下标开始读取消息,比如将该选项设为1,表明从第2条消息开始读取。该选项还支持earliest和latest两个字符串值,其中earliest表示从最开始处读取(类似 于--from-beginning 选项的作用),latest 表示从最新处开始读取,即不读取之前的消息。latest是默认值。
- --partition﹤Integer:partition﹥:用于指定分区。“--partition 1”指定读取1分区中的消息。
- --property:用于指定一些额外属性,比如print.timestamp=true表 示输出时间戳,print.key=true表示输出消息key,print.offset=true表示打印消息的下标,print.partition=true表示打印分区信息。
- --topic:用于指定主题。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic Test --from-beginning --property print.key=true --property print.offset=true --property print.timestamp=true --property print.partition=true
Kafka的消息主题与JMS,AMQP 的消息队列是不同的—JMS,AMQP消息队列中的消息只能被消费一 次,当消息被消费时,这条消息就会被移出队列;但Kafka主题中的消息完全可以被多次重复消费,甚至可以从指定下标处开始读取消息。从某种角度来看,Kafka主题中的消息就像数据表中的记录,它会在一段时间内被持久化保存,客户端(消费者)可根据需要反复地读取它们,这正是一开始就介绍过的:Kafka并不是单纯的消息组件,而是被定位成“开源的分布式事件流平台(open-source distributed e vent streaming platform)”。
Kafka主题中的消息默认保存时间为7天,这个默认保存时间可通过 server.properties文件中的如下配置进行修改。
log.retention.hours=168
当消息过期之后,Kafka可以对消息进行两种处理:delete或compact,其中delete表示直接删除过期消息,compact则表示对消息进行压缩整理。通过server.properties文件中的如下配置来设置对过期消息的处理策略。
log.cleanup.policy=delete
如果仅想修改某个主题下消息的保存时间,则可专门配置该主题的retention.ms属性。修改指定主题的额外属性,推荐使用kafka-configs.bat(.sh)命令。
- --alter:修改。
- --describe:显示。该选项与--alter选项只能选择其中之一。
- --add-config:指定要添加的配置属性,该选项的值应该符合“k1= v1,k2=[v1,v2,v2],k3=v3”的形式。
- --delete-config:指定要删除的配置属性,该选项的值应该符合“k 1,k2”的形式。 ➢--bootstrap-server:指定要连接的服务器。
- --entity-type:指定要配置的实体类型,该选项可支持 topics(主 题),clients(客户端),users(用户),brokers(代理)和broker-loggers(代理日志)这些值。
- --entity-name:指定要配置的实体名称,该选项与--entity-type结合使用,用于指定主题名,客户端ID,用户名,Broker ID。
#将Test主题的retention.ms属性设为10小时 kafka-configs.bat --alter --bootstrap-server localhost:9092 --entity-type topics --entity-name Test --add-config retention.ms=3600000
显示Test主题的配置信息:
kafka-configs.bat --describe --bootstrap-server localhost:9092 --entity-type topics --entity-name Test
强制删除指定主题下的所有消息:将Test主题的retention.ms属性设为3秒,这意味着该主题下的消息只能保存3秒。
kafka-configs.bat --alter --bootstrap-server localhost:9092 --entity-type topics --entity-name Test --add-config retention.ms=3000
需要说明的是,将retention.ms属性设为3秒,并不意味着该主题下的消息会在3秒之后被删除,这是因为Kafka采用轮询机制来检测消息是否过期,这意味着即使某些消息已经过期,但只要轮询机制还没有处理到这些过期消息,它们就会依然保留在该主题下。
设置Kafka轮询检查时间:置指定了对过期消息进行轮询检查的间隔时间为5分钟,这意味着每隔5分钟就会检查一次消息是否过期。
log.retention.check.interval.ms=300000
如果希望指定主题的retention.ms属性配置依然使用默认值,那么只要使用 kafka-configs.bat命令的--delete-config选项删除该配置即可。
kafka-configs.bat --alter --bootstrap-server localhost:9092 --entity-type topics --entity-name Test --delete-config retention.ms
2.3,消息消费者组
如果消费者要以组的名义来订阅主题,则可使用kafka-console-consumer.bat命令的--group选项来指定组ID。 一个主题包含多个分区,一个消费者组也包含多个消费者实例。 同一个消费者组内的所有消费者共享一个公共的ID,这个ID被称为组ID。一个消费者组内的多个消费者实例一起协调消费主题所包含的全部分区。记住:每个分区只能由同一个消费者组内的一个消费者实例来消费,但一个消费者实例可负责消费多个分区。
- 在同一个消费者组内,每个分区只能由一个消费者实例来负责消费,这意味着同一个消费者组内的多个消费者实例不可能消费相同的消息,这就是典型的P2P消息模型。
- 由于消费者组之间彼此独立,互不影响,因此它们能订阅相同的主题而互不干涉。如果消费者实例属于不同的消费者组,这就是典型的Pub-Sub消息模型。
在理想情况下,消费者组中的消费者实例数恰好等于该组所订阅主题的分区总数,这样每个消费者实例就恰好负责消费一个分区;否则,可能出现如下两种情况:
- 消费者实例数大于所订阅主题的分区总数:此时一个消费者实例要负责消费一个分区,但会有消费者实例处于空闲状态。造成了消费者实例的浪费。
- 消费者实例数小于所订阅主题的分区总数:此时一个消费者实例要负责消费多个分区。此时就需要处理为消费者实例分配分区的问题。
Kafka为消费者实例提供了3种分配分区的策略:
(1)range策略
- 将每个主题的分区按数字顺序进行排列,消费者实例则按消费者名称的字典顺序进行排列。
- 用主题的分区总数除消费者实例总数,如果恰好能除尽,则每个消费者实例都分得相同数量(就是除得的商)的分区;如果除不尽,则排在前面的几个消费者实例将会分得额外的分区。
(2)round-robin策略:round-robin策略会把所有订阅主题的所有分区按顺序排列,然后采用轮询方式依次分给各消费者实例。由此可见,round-robin策略与range策略最大的不同就是它不再局限于某个主题。
一般来说,如果消费者组内所有消费者实例所订阅的主题是相同的,那么使用 round-robin策略能带来更公平的分配方案,否则使用range策略的效果更好。
(3)sticky策略:sticky策略主要用于处理重平衡(Rebalance)需求,重平衡就是指重新为消费者实例分配分区的过程。比如以下3种情况就会触发重平衡:
- 消费者组中的消费者实例数发生变化。比如有新的消费者实例 加入消费者组,或者有消费者实例退出消费者组。
- 订阅的主题数发生改变。当消费者组以正则表达式的方式订阅主题时,符合正则表达式的主题可能会动态地变化。
- 订阅主题的分区数发生改变。当主题的分区数增加时,必须为之分配消费者实例来处理它。
当触发重平衡处理时,如果使用range策略或round-robin策略,Kafka会彻底抛弃原有的分配方案,对变化后的消费者实例,分区进行彻底的重新分配。而 sticky 策略则有效地避免了上述两种策略的缺点,sticky 策略会 尽力维持之前的分配方案,只对改动部分进行最小的再分配,因此通常认为sticky策略在处理重平衡时具有最佳的性能。
3,Kafka 使用
3.1,Kafka核心API
- Producer API(生产者API):应用程序通过该API向主题发布消息。
- Consumer API(消费者API):应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)。
- Streams Processor API(流API):应用程序可通过该API来实现流处理器,将一个主题的消息“导流”到另一个主题,并能对消息进行任意自定义的转换。
- Connector API(连接器API):应用程序可通过该API来实现连接器,这些连接器不断地从源系统或应用程序将数据导入 Kafka,也将 Kafka 消息不断地导入某个接收系统或应用程序。
- Admin API(管理API):应用程序可通过该API管理和检查主题,Broker以及其他Kafka实体。
生产者API的核心类是KafkaProducer,它提供了一个send() 方法来发送消息,该send() 方法需要传入一个ProducerRecord<K, V>对象,该对象代表一条消息(记录),该对象定义了如下构造器。
- ProducerRecord(String topic,Integer partition,K key,V value):创建一条发送到指定主题和指定分区的消息。
- ProducerRecord(String topic,Integer partition,K key,V value,Iterable<Header> headers):创建一条发送到指定主题和指定分区 的消息,且包含多个消息头。
- ProducerRecord(String topic,Integer partition,Long timestamp, K key,V value):创建一条发送到指定主题和指定分区的消息,且使 用给定的时间戳。
- ProducerRecord(String topic,Integer partition,Long timestamp, K key,V value,Iterable<Header> headers):创建一条发送到指定主 题和指定分区的消息,使用给定的时间戳,且包含多个消息头。
- ProducerRecord(String topic,K key,V value):创建一条发送到指定主题的消息。
- ProducerRecord(String topic,V value):创建一条发送到指定主题,不带key的消息。
使用生产者API发送消息很简单,基本只要两步:
- 创建KafkaProducer对象,在创建该对象时要传入Properties对象,用于对该生产者进行配置。
- 调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。
(1)添加依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
(2)Producer
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { public final static String TOPIC = "Test"; public static void main(String[] args) { var props = new Properties(); // 指定Kafka的节点地址 props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094"); // 指定确认机制,默认值是0。 props.put("acks", "all"); // 指定发送失败后的重试次数 props.put("retries", 0); // 当多条消息要发送到同一分区时,生产者将尝试对多条消息进行批处理, // 从而减少网络请求数,这有助于提高客户机和服务器的性能。 // 该参数控制默认的批处理的数据大小 props.put("batch.size", 16384); // 指定消息key的序列化器 props.put("key.serializer", StringSerializer.class.getName()); // 指定消息value的序列化器 props.put("value.serializer", StringSerializer.class.getName()); try ( // 创建消息生产者 var producer = new KafkaProducer<String, String>(props)) { for (var messageNo = 1; messageNo < 101; messageNo++) { var msg = "你好,这是第" + messageNo + "条消息"; if (messageNo < 51) { // 发送带消息 producer.send(new ProducerRecord<>(TOPIC, "king", msg)); } else { // 发送不带key的消息 producer.send(new ProducerRecord<>(TOPIC, msg)); } // 每生产了20条消息输出一次 if (messageNo % 20 == 0) { System.out.println("发送的信息:" + msg); } } } } }
其中ack包括三种属性值:
- acks=0:表示生产者不会等待Kafka的确认响应。
- acks=1:表示只要领导者分区已将消息写入本地日志文件,Kafka 就会向生产者发送确认响应,无须等待集群中其他机器的确认。
- acks=all:表示领导者分区会等待所有追随者分区都同步完成后才发送确认响应。这种确认机制可确保消息不会丢失,这是最强的可用性保证。
消费者API的核心类是KafkaConsumer,它提供了如下常用方法:
- subscribe(Collection<String> topics):订阅主题。
- subscribe(Pattern pattern):订阅符合给定的正则表达式的所有主题。
- subscription():返回该消费者所订阅的主题集合。
- unsubscribe():取消订阅。
- close():关闭消费者。
- poll(Duration timeout):拉取消息。
- assign(Collection<TopicPartition> partitions):手动为该消费者分配分区。
- assignment():返回分配给该消费者的分区集合。
- commitAsync():异步提交offset。
- commitSync():同步提交offset。
如果开启了自动提交offset,则无须调用commitAsync() 或commitSync() 方法进行手动提交。自动提交offset比较方便,但手动提交offset则更精确。
- enforceRebalance():强制执行重平衡。
- seek(TopicPartition partition,long offset):跳到指定的offset处,即下一条消息从offset处开始拉取。
- seekToBeginning(Collection<TopicPartition> partitions):跳到指定分区的开始处。
- seekToEnd(Collection<TopicPartition> partitions):跳到指定分区的结尾处。
- position(TopicPartition partition):返回指定分区当前的offset。
使用消费者API发送消息很简单,基本只要三步:
- 创建KafkaConsumer对象,在创建该对象时要传入Properties对 象,用于对该消费者进行配置。
- 调用KafkaConsumer对象的poll()方法拉取消息,该方法返回 ConsumerRecords。
- 对ConsumerRecords执行迭代,即可获取拉取到的每条消息。
package com.example.springboot; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.Scanner; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class ConsumerA { // 定义消费的主题 public final static String TOPIC = "Test"; // 定义该消费者实例所属的组ID private static final String GROUPID = "groupA"; private static KafkaConsumer<String, String> consumer; public static void main(String[] args) throws InterruptedException { // 启动一条新线程来处理程序退出 new Thread(() -> { var scanner = new Scanner(System.in); if (scanner.nextLine().equals(":exit")) { if (consumer != null) { // 取消订阅 consumer.unsubscribe(); // 关闭消费者 consumer.close(); } System.exit(0); } }).start(); var props = new Properties(); // 指定Kafka的节点地址 props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094"); // 指定消费者组ID props.put("group.id", GROUPID); // 设置是否自动提交offset props.put("enable.auto.commit", "true"); // 设置自动提交offset的时间间隔 props.put("auto.commit.interval.ms", "1000"); // session超时时长 props.put("session.timeout.ms", "30000"); // 程序读取消息的初始offset props.put("auto.offset.reset", "latest"); // 指定消息key的反序列化器 props.put("key.deserializer", StringDeserializer.class.getName()); // 指定消息value的反序列化器 props.put("value.deserializer", StringDeserializer.class.getName()); consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Arrays.asList(TOPIC)); System.out.println("---------开始消费---------"); while (true) { // 拉取消息 ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofMillis(100)); if (null != msgList && msgList.count() > 0) { // 遍历取得的消息 for (ConsumerRecord<String, String> record : msgList) { System.out.println("收到消息: key = " + record.key() + ", value = " + record.value() + " offset = " + record.offset()); } } else { Thread.sleep(1000); } } } }
package com.example.springboot; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.Scanner; public class ConsumerB { // 定义消费的主题 public final static String TOPIC = "Test"; // 定义该消费者实例所属的组ID private static final String GROUPID = "groupA"; private static KafkaConsumer<String, String> consumer; public static void main(String[] args) throws InterruptedException { // 启动一条新线程来处理程序退出 new Thread(() -> { var scanner = new Scanner(System.in); if (scanner.nextLine().equals(":exit")) { if (consumer != null) { // 取消订阅 consumer.unsubscribe(); // 关闭消费者 consumer.close(); } System.exit(0); } }).start(); var props = new Properties(); // 指定Kafka的节点地址 props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094"); // 指定消费者组ID props.put("group.id", GROUPID); // 设置是否自动提交offset props.put("enable.auto.commit", "true"); // 设置自动提交offset的时间间隔 props.put("auto.commit.interval.ms", "1000"); // session超时时长 props.put("session.timeout.ms", "30000"); // 程序读取消息的初始offset props.put("auto.offset.reset", "latest"); // 指定消息key的反序列化器 props.put("key.deserializer", StringDeserializer.class.getName()); // 指定消息value的反序列化器 props.put("value.deserializer", StringDeserializer.class.getName()); consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Arrays.asList(TOPIC)); System.out.println("---------开始消费---------"); while (true) { // 拉取消息 ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofMillis(100)); if (null != msgList && msgList.count() > 0) { // 遍历取得的消息 for (ConsumerRecord<String, String> record : msgList) { System.out.println("收到消息: key = " + record.key() + ", value = " + record.value() + " offset = " + record.offset()); } } else { Thread.sleep(1000); } } } }
由于程序为消费者配置了 auto.offset.reset=latest,这意味着这两个消费者都不会拉取队列中原有的消息(因为初始offset被设置到了结尾处),只能拉取它们在运行期间收到的消息。
- 让ConsumerA和ConsumerB中的GROUPID使用相同的字符串,ConsumerA和ConsumerB模拟的是 P2P 消息模型,先运行 ConsumerA 和 ConsumerB 两个程序,然后再运行前面的 Producer程序,此时将看到所有key为king的消息只会由一个消费者处理,因为所有key为king的消息都在同一个分区中;而没有key的消息则会分别由两个消费者处理,因为没有key的消息会被“轮询”分配到不同的分区中。
- 让ConsumerA和ConsumerB中的GROUPID使用不同的字符串,ConsumerA和ConsumerB模拟的是Pub-Sub消息模型,先运行ConsumerA和ConsumerB两个程序,然后再运行前面的Producer程序,此时将看到ConsumerA和ConsumerB都可拉取到完全相同的100条消息,它们之间互不干扰。
3.2,使用Kafka流API
流 API 的作用是创建多个主题之间的消息流,从而允许将消息从一个主题“导流”到另一个主题,在消息“导流”的过程中,客户端程序可以对消息进行任意自定义的转换。
流API包括如下几个核心API。:
- StreamsBuilder:从名称就可知道,它的作用是创建Stream。但是它不直接创建KafkaStream,而是创建KStream。
- KStream:KStream 代表 key-value 数据流,它的主要功能就是定义流的拓扑(Topology)结构。通俗地说,就是设置source主题(源主题),sink主题(目标主题)等。
- Topology:代表流的拓扑结构,它也提供了大量重载的addSource(),addSink()方法来添加source主题和sink主题。
- KafkaStreams:代表程序要用到的数据流,调用它的start()方法开始导流,调用它的close()方法可关闭流。
使用流API的关键就是通过KStream,Topology来设置source主题和sink主题,然后使用Topology创建KafkaStreams。因此使用流API编程的大致步骤如下:
- 使用StreamsBuilder创建KStream,在创建KStream时已经指定 了source主题。
- 通过KStream设置sink主题和流要做到转换处理。KStream提供了大量重载的flatMap(),map(),filter()等方法对流进行转换,在调用这些处理方法时,通常都需要传入自定义的处理器,常使用Lambda表达式来定义这些处理器。
- 调用StreamsBuilder的build()方法创建代表流关系的Topology 对象,该对象已经封装了通过KStream所设置的source主题,sink主题等信息。如果需要对流关系进行修改,则可调用Topology对象的addSource(),addSink()方法来添加source主题和sink主题。
- 以Topology为参数,创建KafkaStreams对象,在创建该对象时,还需要传入一个Properties对象对该流进行配置。
- 调用KafkaStreams对象的start()方法开始导流,导流结束后 调用close()方法关闭流。
(1)添加依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.7.0</version> </dependency>
(2)Pipe:程序的作用是将“replic”主题的消息导流到“Test”主题,因此在运行该程序之前应先创建一个“replic”主题。
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.ValueMapper; import java.util.Arrays; import java.util.Properties; import java.util.Scanner; public class Pipe{ public final static String SOURCE_TOPIC = "replic"; public final static String SINK_TOPIC = "Test"; private static KafkaStreams streams; public static void main(String[] args){ // 启动一条新线程来处理程序退出 new Thread(() -> { var scanner = new Scanner(System.in); if (scanner.nextLine().equals(":exit")){ if (streams != null){ // 关闭流 streams.close(); } System.exit(0); } }).start(); var props = new Properties(); // 程序的唯一标识符,用于区别于其他应用程序与同一Kafka集群通信 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // 指定Kafka的节点地址 props.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094"); // 指定消息key默认的序列化和反序列化器 props.put("default.key.serde", Serdes.String().getClass()); // 指定消息value默认的序列化和反序列化器 props.put("default.value.serde", Serdes.String().getClass()); // 创建StreamsBuilder final var builder = new StreamsBuilder(); // stream()方法指定源主题 builder.<String, String>stream(SOURCE_TOPIC) // 设置对消息进行处理的处理器(可换成Lambda表达式) .mapValues(new ValueMapper<String, String>(){ @Override public String apply(String value) { return "疯狂Java:" + value; } }) // 设置对消息进行处理的处理器(可换成Lambda表达式) .flatMapValues(new ValueMapper<String, Iterable<String>>(){ @Override public Iterable<String> apply(String value){ return Arrays.asList(value.split("\\W+")); } }) // to()方法指定目标主题 .to(SINK_TOPIC); // 创建Topology对象 final Topology topology = builder.build(); // 输出Topology对象代表的流关系 System.out.println(topology.describe()); // 创建KafkaStreams实例 streams = new KafkaStreams(topology, props); // 开始执行“导流” streams.start(); } }
builder.<String, String>stream(SOURCE_TOPIC):使用 StreamsBuilder 创建了KStream,在创建KStream时调用的是stream()方法,这说明该KStream已经指定了source主题。
mapValues(new ValueMapper<String, String>():在调用该方法时 传入一个ValueMapper对象,该对象负责对消息value进行处理,在消息value前附加一个字符串前缀。
final Topology topology = builder.build():通过StreamsBuilder创建了代表流关系的Topology 对象;
streams = new KafkaStreams(topology, props):使用Topology对象创建了KafkaStreams对象。接下来就是调用start()方法开始导流,导流结束后调用close()方法关闭流。
4,SpringBoot整合Kafka
4.1,配置内容
【pom.xml】Sping Boot并没有为Kafka提供Starter,而是通过spring-kafka项目的自动配置来提供支持的,只要 Spring Boot 检测到类加载路径下包含了 spring-kafka 依赖库,Spring Boot 就会自动配置KafkaAdmin和KafkaTemplate,其中KafkaAdmin封装了Kafka的管理API,KafkaTemplate则提供了 大量重载的send()方法用于发送消息。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
【application.properties】
# 配置Kafka默认的节点地址 spring.kafka.bootstrap-servers=localhost:9092 # 指定生产者的确认机制 spring.kafka.producer.acks=all # 指定生产者发送失败后的重试次数 spring.kafka.producer.retries=0 # 指定生产者批处理的数据大小 spring.kafka.producer.batch-size=16384 # 指定生产者的消息key的序列化器 spring.kafka.producer.key-serializer=\ org.apache.kafka.common.serialization.StringSerializer # 指定生产者的消息value的序列化器 spring.kafka.producer.value-serializer=\ org.apache.kafka.common.serialization.StringSerializer server.port=8080 # 指定默认的消费者组ID spring.kafka.consumer.group-id=defaultGroup # 设置消费者是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 设置消费者自动提交offset的时间间隔 spring.kafka.consumer.auto-commit-interval=1000 # 程序读取消息的初始offset spring.kafka.consumer.auto-offset-reset=latest # 指定消息key的反序列化器 spring.kafka.consumer.key-deserializer=\ org.apache.kafka.common.serialization.StringDeserializer # 指定消息value的反序列化器 spring.kafka.consumer.value-deserializer=\ org.apache.kafka.common.serialization.StringDeserializer # session超时时长 spring.kafka.consumer.properties[session.timeout.ms]=30000
4.2,发送消息
【Service】发送消息很简单,Spring Boot可以将自动配置的KafkaTemplate注入任意组件,接下来该组件调用KafkaTemplate的send()方法即可发送消息。
package com.example.springboot.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.Objects; @Service public class MessageService { public static final String TOPIC = "Test"; private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public MessageService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void produce(String key, String message) { if (Objects.nonNull(key)) { // 发送消息 this.kafkaTemplate.send(TOPIC, key, message); } else { // 发送不带key的消息 this.kafkaTemplate.send(TOPIC, message); } } }
如果在 Spring 容器中配置了 RecordMessageConverter,该 Bean 将会自动作为 KafkaTemplate的消息转换器。
【Controller】
package com.example.springboot.controller; import com.example.springboot.service.MessageService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { private final MessageService messService; public HelloController(MessageService messService) { this.messService = messService; } @GetMapping("/produce/{key}/{message}") public String produce(@PathVariable String message, @PathVariable(required = false) String key) { messService.produce(key, message); return "发送消息"; } @GetMapping("/produce/{message}") public String produce(@PathVariable String message) { messService.produce(null, message); return "发送消息"; } }
4.3,接收消息
Spring Boot会自动将@KafkaListener注解修饰的方法注册为消息监听器。如果没有显式地通过containerFactory属性指定监听器容器工厂(KafkaListenerContainerFactory),Spring Boot会在容器中自动配置一 个ConcurrentKafkaListenerContainerFactory Bean作为监听器容器工厂。如果要对ConcurrentKafkaListenerContainerFactory进行设置,则可在 application.properties配置文件中增加以“spring.kafka.listener”开头的配置属性。
# 设置监听器的确认模式 spring.kafka.listener.ack-mode=batch
如果在容器中配置了Kafka Transaction Manager,它将会被自动关联到监听器容器工厂。此外,如果在容器中配置了Record Filter Strategy Error Handler After Rollback Processor或Consumer Aware RebalanceListener,它们也会被自动关联到监听器容器工厂。
设置如下属性可将监听器配置成处理单条消息的监听器:
spring.kafka.listener.type=single
Spring Boot会自动将容器中的RecordMessageConverter Bean关联到默认的监听器容器工厂。设置如下属性可将监听器配置成批处理的监听器:
spring.kafka.listener.type=batch
【Listener】
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class TopicListener1 { @KafkaListener(topics = "test1", groupId = "groupA") public void processMessage(ConsumerRecord<String, String> message) { System.out.println("从Test收到消息,其key为:" + message.key() + ",其value为:" + message.value()); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class TopicListener2 { @KafkaListener(topics = "Test", groupId = "groupB") public void processMessage(ConsumerRecord<String, String> message) { System.out.println("从Test收到消息,其key为:" + message.key() + ",其value为:" + message.value()); } }
4.4,Spring Boot整合Kafka流API
Spring Boot为Kafka流API并未提供太多额外的支持,它只提供了一 个@EnableKafkaStreams注解,通过该注解能让Spring Boot 自动配置StreamsBuilder,当然也能将StreamsBuilder注入任意的其他组件,剩下的事情Spring Boot就不再参与了。
【pom.xml】
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency>
【application.properties】
# 指定Streams API的应用ID spring.kafka.streams.application-id=spring-pipe
如果没有配置spring.kafka.streams.application-id属性,Spring Boot默认使用spring.application.name属性值作为应用ID。
# 指定应用启动时自动创建流 spring.kafka.streams.auto-startup=true
如果将spring.kafka.streams.auto-startup配置为false,则意味着开发者需要自行创建流。
# 指定消息key默认的序列化和反序列化器 spring.kafka.streams.properties[default.key.serde]=\ org.apache.kafka.common.serialization.Serdes$StringSerde # 指定消息value默认的序列化和反序列化器 spring.kafka.streams.properties[default.value.serde]=\ org.apache.kafka.common.serialization.Serdes$StringSerde
为流的key,value指定了默认的序列化器和反序列化器。
【KafkaConfig】
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import java.util.Arrays; import java.util.Properties; @Configuration(proxyBeanMethods = false) public class KafkaConfig{ @Configuration(proxyBeanMethods = false) // 启用Kafka流 @EnableKafkaStreams public static class KafkaStreamsConfiguration{ public final static String SOURCE_TOPIC = "replic"; public final static String SINK_TOPIC = "Test"; @Bean // 通过自动注入的StreamBuilder来创建KStream public KStream<String, String> kStream(StreamsBuilder builder){ KStream<String, String> stream = builder .stream(SOURCE_TOPIC); // 设置对消息进行处理的处理器 stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split("\\W+"))) // to()方法指定目标主题 .to(SINK_TOPIC); // 创建Topology对象(其实已不需要创建该对象,此处只是为了方便查看流的拓扑关系) System.out.println(builder.build().describe()); // 直接返回KStream就行了 return stream; } } }
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)