Kafka——Kafka Connect详解
Kafka Connect是一个高伸缩性、高可靠性的数据集成工具,用于在Apache Kafka与其他系统间进行数据搬运以及执行ETL操作,比如Kafka Connect能够将文件系统中某些文件的内容全部灌入Kafka topic中或者是把Kafka topic中的消息导出到外部的数据库系统,如图所示。如图所示,Kafka Connect主要由source connector和sink conne
目录
Kafka Connect
1、概要介绍
Kafka Connect是一个高伸缩性、高可靠性的数据集成工具,用于在Apache Kafka与其他系统间进行数据搬运以及执行ETL操作,比如Kafka Connect能够将文件系统中某些文件的内容全部灌入Kafka topic中或者是把Kafka topic中的消息导出到外部的数据库系统,如图所示。
如图所示,Kafka Connect主要由source connector和sink connector组成。事实上,几乎大部分的ETL框架都是由这两大类逻辑组件组成的,如Apache Flume、Kettle等。source connector负责把输入数据从外部系统中导入到Kafka中,而sink connector则负责把输出数据
导出到其他外部系统。
根据Kafka Connect官网的介绍,目前其主要的设计特点如下。
- 通用性:依托底层的Kafka核心系统封装了connector接口,方便开发、部署和管理。
- 兼具分布式(distributed)和单体式(standalone)两种模式:既可以以standalone单进程的方式运行,也可以扩展到多台机器成为分布式ETL系统。
- REST接口:提供常见的REST API方便管理和操作,只适用于分布式模式。
- 自动位移管理:connector自动管理位移,无须开发人员干预,降低开发成本。
- 集成性:方便与流/批处理系统对接。
显然,一个ETL框架或connector系统是否好用的主要标志之一就是,看source connector和sink connector的种类是否丰富。默认提供的connector越多,我们就能集成越多的外部系统,免去了用户自行开发的成本。
2、standalone Connect
在standalone模式下所有的操作都是在一个进程中完成的。这种模式非常适合运行在测试或功能验证环境,抑或是必须是单线程才能完成的场景(比如收集日志文件)。由于是单进程,standalone模式无法充分利用Kafka天然提供的负载均衡和高容错等特性。
2.1、数据抽取与加载示例
下面我们在一个单节点的Kafka集群上运行standalone模式的Kafka Connect,把输入文件foo.txt中的数据通过Kafka传输到输出文件bar.txt中。首先我们制作配置文件。Kafka Connectstandalone模式下通常有3类配置文件:connect配置文件,若干source connector配置文件和若干sink connector配置文件。由于本例分别启动一个source connector读取foo.txt和一个sink connector写入bar.txt,故source和sink配置文件都只有一个,所以总共有如下3个配置文件。
connect-standalone.properties
:connect standalone模式下的配置文件。connect--file-source.properties
:file source connector配置文件。connect-file-sink.properties
:file sink connector配置文件。
首先来编辑connect-standalone.properties文件。实际上,Kafka已经在config目录下为我们提供了一个该文件的模板。我们直接使用该模板并修改对应的字段即可,如下:
# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
bootstrap.servers
:指定Connect要连接的Kafka集群主机名和端口号。本例使用localhost::9092。key/value.converter
:设置Kafka消息key/value的格式转化类,本例使用JsonConverter,即把每条Kafka消息转化成一个JSON格式。key/value.converter.schemas.enable
:设置是否需要把数据看成纯JSON字符串或者JSON格式的对象。本例设置为tue,即把数据转换成JSON对象。offset.storage.file.filename
:connector会定期地将状态写入底层存储中。该参数设定了状态要被写入的底层存储文件的路径。本例使用/tmp/connect.offsets保存connector的
状态。
下面编辑connect-file-source.properties,它在Kafka的config目录下也有一份模板,本例直接在该模板的基础上进行修改:
# connect-file-source.properties
name=test-file-source
connector.class=FileStreamSource
tasks.max=1
file=foo.txt
topic=connect-file-test
name
:设置该file source connector的名称。connector.class
:设置source connector类的全限定名。有时候设置为类名也是可以的,Kafka Connect可以在classpath中自动搜寻该类并加载。tasks.max
:每个connector下会创建若干个任务(task)执行connector逻辑以期望增加并行度,但对于从单个文件读/写数据这样的操作,任意时刻只能有一个ask访问文件,故这里设置最大任务数为1。file
:输入文件全路径名。本例为foo.txt,即表示该文件位于Kafka目录下。实际使用时最好使用绝对路径。topic
:设置source connector把数据导入到Kafka的哪个topic,若该topic之前不存在,则source connector会自动创建。最好提前手工创建出该topic。.本例使用connect-file-test.
最后,我们编辑connect-file-sink.properties。同理,直接修改位于config目录下的connect-file-sink.properties模板文件:
# connect-file-sink.properties
name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=bar.txt
topics=connect-file-test
name
:设置该sink connector名称。connector.class
:设置sink connector类的全限定名。有时候设置为类名也是可以的,Kafka Connect可以在classpath中自动搜寻该类并加载。tasks.max
:依然设置为l,原理与source connector中配置设置相同。file
:输出文件全路径名。本例为bar.txt,即表示该文件位于Kafka目录下。实际使用时最好使用绝对路径。topic
:设置sink connector导出Kafka中的哪个topic的数据。
启动Kafka Connect的standalone模式:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties
启动之后,应该可以看到控制台不断地打印“Couldn’t find file foo.txt for FileStreamSourceTask,sleeping to wait for it to be created”之类的日志。这是正常的,因为我们尚未创建输入文件foo.txt。
下面我们在Kafka的目录下创建foo.txt并写入一些文本行:
echo 'hello' >> ./foo.txt
echo 'kafka connect test exaple' >> ./foo.txt
echo 'this is a file connector test.' >> ./foo.txt
如果一切正常,可以看到在当前目录下生成一个名为bar.txt的文件:
hello
kafka connect test exaple
this is a file connector test.
可见,foo.txt文件的内容已经成功地被file connector通过Kafka搬运到bar.txt文件中了。
为了验证数据的确是通过Kafka topic进行转移的,我们读取一下topic(connect-file-test)的数据,如:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-file-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka connect test exaple"}
{"schema":{"type":"string","optional":false},"payload":"this is a file connector test."}
2.2、数据抽取、转换与加载示例
上面的例子只涉及ETL中的E和L,即数据抽取(extract.)与加载(load)。作为一个ETL框架,Kafka Connect也支持相当程度的数据转换操作。下面演示在将文件数据导出到目
标文件之前为每条消息增加一个IP字段。如果要插入P静态字段,我们必须修改source connector的配置文件,增加以下这些行:
transforms=WrapMap,InsertHost
transforms.WrapMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.WrapMap.field=line
transforms.InsertHost.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertHost.static.field=ip
transforms.InsertHost.static.value=com.connector.machinel
然后重启kafka Connect,然后写入foo.txt文件:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties
echo 'this is a transformation test' >> ./foo.txt
然后查看bar.txt:
hello
kafka connect test exaple
this is a file connector test.
Struct{line=this is a transformation test,ip=com.connector.machinel}
显然,新增的数据被封装成一个结构体(Struct),并增加了ip字段。这就是上面WrapMap和InsertHost的作用。
3、distributed Connect
和standalone模式不同,distributed Connect天然地结合了Kafka提供的负载均衡和故障转移功能,能够自动地在多节点机器上平衡负载。用户可以增减机器来实现整体系统的高伸缩性。用户需要执行下列命令来启动distributed模式的Connect,假设我们依然使用Kafka config目录下的配置文件模板:
bin/connect-distributed.sh config/connect-distributed.properties
和standalone模式不同的是,在distributed模式中我们不需要指定source和sink的配置文件。distributed模式中的connector只能通过REST API来创建和管理。
3.1、示例
依然以FileStreamSourceConnector/FileStreamSinkConnector为例来演示如何在distributed模式下运行Kafka Connect。上述命令启动成功后,我们可以执行以下命令来获取当
前所有connector:
curl http://localhost:8083/connectors
[]
值得注意的是,distributed模式下默认的REST端口是8083,用户可以修改connect-distributed.properties文件中的rest.port属性来变更这一端口。如上可见,当前集群中没有创建任何的connector。
下面分别创建file source connector和file sink connector,命令如下:
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d '{"name":"test-file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","topic":"connect-file-test","file":"foo.txt"}}' http://localhost:8083/connectors
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d '{"name":"test-file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","topics":"connect-file-test","file":"bar.txt"}}' http://localhost:8083/connectors
本例中使用curl工具给Kafka Connect发送POST请求。当前REST API只支持application/json作为请求(request)和响应(response)的内容类型(content type),因此在发送POST请求时必须显式指定HTTP的Accept头部为application/json,以设置response的content type。另外,我们还需要设置Content-Type头部信息为application/json,以指定request
的content type。在上面命令中我们只是把standalone模式下配置文件中的所有属性封装成JSON字符串传递给curl工具。注意,connector的name字段和其他字段是分开的,即其他字段首先要被封装到config下,然后和name一起做成JSON串。
下面再次获取当前所有connector以检查之前的两个connector是否已被创建出来:
这次我们可以看到两个connector都已经被创建出来了。REST API还提供了/connectors//{name}/config
,允许用户查询某个connector的具体配置信息,我们使用这个endpoint来查询file sink connector的信息:
同时使用GET/connectors//{name}/status
查询两connector的运行状态:
目前两个connector都正常工作。下面开始写入输入文件foo.txt:
echo 'one' >> ./foo.txt
echo 'two' >> ./foo.txt
echo 'three' >> ./foo.txt
查看bar.txt:
做完这些之后,我们删除这两个connector把系统还原回初始状态。若要删除connector,可以使用REST API–DELETE/connectors/.{name}
,如下:
curl -i -X DELETE http://localhost:8083/connectors/test-file-source
curl -i -X DELETE http://localhost:8083/connectors/test-file-sink
3.2、REST API
我们可以通过Kafka Connect提供的基于REST风格的API接口来管理连接器,默认端口号为8083,可以通过Worker进程的配置文件中的rest,port参数来修改端口号。Kafka ConnectREST API接口如表所示。
REST API | 描述 |
---|---|
GET / | 查看Kafka集群版本信息 |
GET /connectors | 查看当前活跃的连接器列表,显示连接器的名字 |
POST /connectors | 根据指定配置,创建一个新的连接器 |
GET /connectors/{name} | 查看指定连接器的信息 |
GET /connectors/{name}/config | 查看指定连接器的配置信息 |
PUT /connectors/{name}/config | 修改指定连接器的配置信息 |
GET /connectors/{name}/status | 查看指定连接器的状态 |
POST /connectors/{name}/restart | 重启指定的连接器 |
PUT /connectors/{name}/pause | 暂停指定的连接器 |
GET /connectors/{name}/tasks | 查看指定连接器正在运行的Task |
POST /connectors/{name}/tasks | 修改Task的配置 |
GET /connectors/{name}/tasks/{taskId}/status | 查看指定连接器中指定Task的状态 |
POST /connectors/{name}/tasks/{tasked}/restart | 重启指定连接器中指定的Task |
DELETE /connectors/{name}/ | 删除指定的连接器 |
3.3、其它连接器类
connector.class用来设置连接器类的全限定名称,有时候设置为类名也是可以的,Kafka Connect会在classpath中自动搜索这个类并加载。Kafka中默认只提供了与文件相关的连接器,如果要实现与其他数据存储系统相连接,那么可以参考文件连接器的具体实现来自定义一套连接器,或者搜寻开源的实现,比如Confluent公司提供的
一些产品:
kafka-connect-elasticsearch
(https://github.com/confluentinc/kafka-connect-elasticsearch);kafka-connect-jdbc
(https://github.com/confluentinc/kafka-connect-jdbc);kafka-connect-hdfs
(https://github.com/confluentinc/kafka-connect-hdfs);kafka-connect-storage-cloud
(https://github.com/confluentinc/kafka-connect-storage-cloud).
4、示例MySQL数据同步到Redis
4.1、准备连接器
下载连接器
MySQL连接器:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
Redis连接器:https://www.confluent.io/hub/jcustenborder/kafka-connect-redis
安装插件
在kafka目录下新建connect文件夹:
cd /usr/local/kafka_2.12-3.1.0
mkdir connect
将下载的插件移动到connect文件夹中:
cp confluentinc-kafka-connect-jdbc-10.7.4 /usr/local/kafka_2.12-3.1.0/confluentinc-kafka-connect-jdbc-10.7.4
cp jcustenborder-kafka-connect-redis-0.0.4 /usr/local/kafka_2.12-3.1.0/jcustenborder-kafka-connect-redis-0.0.4
下载mysql对应的驱动,放到confluentinc-kafka-connect-jdbc-10.7.4/lib目录下
mv mysql-connector-java-8.0.20.jar /usr/local/kafka_2.12-3.1.0/confluentinc-kafka-connect-jdbc-10.7.4/lib/mysql-connector-java-8.0.20.jar
修改distributed配置:
vim /usr/local/kafka_2.12-3.1.0/config/connect-distributed.properties
指定插件位置:
plugin.path=../connect
启动Connect,查看插件是否加载成功:
./connect-distributed.sh ../config/connect-distributed.properties
4.2、准备MySQL
创建表及数据
CREATE TABLE `login` (
`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`username` varchar(30) DEFAULT NULL,
`login_time` datetime
);
INSERT INTO `login` VALUES(1, 'aaa', NOW());
INSERT INTO `login` VALUES(2, 'bbb', NOW());
INSERT INTO `login` VALUES(3, 'ccc', NOW());
INSERT INTO `login` VALUES(4, 'ddd', NOW());
INSERT INTO `login` VALUES(5, 'eee', NOW());
INSERT INTO `login` VALUES(6, 'fff', NOW());
INSERT INTO `login` VALUES(7, 'ggg', NOW());
INSERT INTO `login` VALUES(8, 'hhh', NOW());
INSERT INTO `login` VALUES(9, 'iii', NOW());
INSERT INTO `login` VALUES(10, 'jjj', NOW());
创建连接器,新建source.json
{
"name": "example-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://127.0.0.1:3306/kafka_db",
"connection.user": "root",
"connection.password": "123456",
"table.whitelist": "login",
"mode":"incrementing",
"incrementing.column.name":"id"
}
}
向Worker发送请求,创建连接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @source.json
确认数据是否写入kafka
./kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets: 记录所有Kafka Consumer Group的Offset
connect-configs: 存储连接器的配置,对应Connect 配置文件中config.storage.topic
connect-offsets: 存储Source 的Offset,对应Connect 配置文件中offset.storage.topic
connect-status: 连接器与Task的状态,对应Connect 配置文件中status.storage.topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic login --from-beginning
4.3、准备redis
创建sink.json
{
"name": "example-sink",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"topics": "login",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"redis.client.mode": "Standalone",
"redis.database": "1",
"redis.hosts": "localhost:6379",
"redis.password": "123456"
}
}
启动:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @sink.json
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)