目录

开启SASL

控制台配置用户

ACL授权

Python客户端访问 

ACL常用命令


Kafka系列:

kafka 2.4.1单机版部署及使用

kafka监控系统kafka eagle安装使用

滴滴开源的kafka-manager编译及部署使用

kafka管理监控系统 CMAK(yahoo的kafka-manager)部署及使用

Kafka系列(一)、2.6.0版本kafka集群搭建

Kafka系列(二)、架构原理及存储机制

Kafka系列(三)、生产者分区策略、ISR、ACK机制、一致性语义

Kafka系列(四)、消费者策略、Rebalance机制、Offset存储机制 


本篇来讲解如何给kafka集群开启SASL安全认证以及配置ACL权限,还是之前搭建的那个kafka2.6集群。

本文主要内容:

  • wyk01,wyk02,wyk03 kafka集群配置SASL认证;
  • wyk01的消费者和生产者客户端配置用户并验证ACL;
  • 利用其它客户端(Python)访问开启了SASL的kafka集群;

开启SASL

在 wyk01 & wyk02 & wyk03 机器上执行下面的步骤: 

(1)、配置kafka软连接和环境变量 :

# 创建软连接
ln -s -f /opt/app/kafka_2.12-2.6.0 /opt/app/kafka

# 配置环境变量 
vim /etc/profile
# 添加下面的内容
export KAFKA_HOME=/opt/app/kafka

# 刷新环境变量
source /etc/profile

(2)、将kafka集群服务关闭:

$KAFKA_HOME/bin/kafka-server-stop.sh

(3)、修改kafka的server.properties 配置文件添加SASL认证,修改listeners中的ip为对应机器的:

cd $KAFKA_HOME/config
vim server.properties  
# 添加下面的内容
#--------------------------------------------------
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# SASL_PLAINTEXT 
#这里的listener中的wyk01 在三台机器上换成每台机器对应的hostname/ip
listeners=SASL_PLAINTEXT://wyk01:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# 设置admin超级用户
super.users=User:admin

#设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问
#默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
allow.everyone.if.no.acl.found=false
#--------------------------------------------------

(4)、新增 kafka_server_jaas.conf 配置文件添加用户,注意最后的两个分号。

前三行是配置管理员账户(该账户与上面server.properties中配置的super.users一样),后面的user_wyk_reader="wyk_reader_pwd"表示添加一个用户名为wyk_reader对应的密码为wyk_reader_pwd。即 user_用户名="该用户的密码"。之后配置ACL的时候需要用到这里配置的用户。

vim kafka_server_jaas.conf
#添加下面的内容
#--------------------------------------------------
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required
     username="admin"
     password="admin"
     user_admin="admin"
     user_wyk_reader="wyk_reader_pwd"
     user_wyk_writer="wyk_writer_pwd"
     user_no_acl="no_acl_pwd";
     };

 (5)、修改kafka-server-start.sh文件,将刚刚配置的用户列表添加到kafka启动脚本内:

vim ../bin/kafka-server-start.sh
#修改最后一行,改成下面的内容
#-------------------------------------------
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
source /etc/profile
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf  kafka.Kafka "$@"

至此,kafka集群已完成了开启SASL安全认证并添加了四个用户admin、wyk_writer、wyk_reader 、no_acl。

(6)、重启kafka集群

$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

控制台客户端配置认证文件

集群虽然开启了SASL认证,接下来我们可以在控制台使用consumer和producer命令行测试是否有权限,这个是客户端级别的测试,因此只需要随便找个kafka客户端就可以用,不需要在每台kafka broker机器上修改。如果不想要在控制台测试可以跳过直接看后面的对比以及python客户端访问。

要在控制台使用命令行需要先配置sasl用户,步骤如下:

生产者:

(1)、新增配置文件wyk_writer_jaas.conf,这里的用户名密码必须和服务器端配置的一样,我们的客户端是拿着这个认证信息去和kafka 的服务端做校验,如果不匹配就等同于登录失败。

vim wyk_writer_jaas.conf
#添加下面的内容
#-------------------------------
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required
     username="wyk_writer"
     password="wyk_writer_pwd";
     };

(2)、修改生产者启动脚本,在启动生产者客户端时会去加载该认证文件。

vim ../bin/kafka-console-producer.sh
#修改最后一行,改成下面的内容
#-------------------------------------------
#exec $(dirname $0)/kafka-run-class.sh  kafka.tools.ConsoleProducer "$@"
source /etc/profile
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_writer_jaas.conf  kafka.tools.ConsoleProducer "$@"

(3)、修改 producer.properties 添加SASL认证。

vim producer.properties
#添加下面的内容
#-------------------------------------------
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

消费者:

(1)、新增配置文件wyk_reader_jaas.conf,这里的用户名密码必须和服务器端配置的一样,我们的客户端是拿着这个认证信息去和kafka 的服务端做校验,如果不匹配就等同于登录失败。

vim wyk_reader_jaas.conf
#添加下面的内容
#-------------------------------
KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="wyk_reader"
        password="wyk_reader_pwd";
};

(2)、修改消费者启动脚本,在启动消费者客户端时会去加载该认证文件。

vim ../bin/kafka-console-consumer.sh 
#修改最后一行,改成下面的内容
#-------------------------------------------
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
source /etc/profile
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_reader_jaas.conf   kafka.tools.ConsoleConsumer "$@"

(3)、修改 consumer.properties 添加SASL认证。需注意,这个配置文件中有一个消费者组group.id=test-consumer-group,待会在给控制台客户端授权消费者权限时还需要指定这个消费者组。

vim consumer.properties
#添加下面的内容
#-------------------------------------------
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 

ACL授权及验证

我们已经开启了SASL,并给控制台客户端配置了生产者和消费者的认证文件,下面我们来测试授权之后和未授权用户的区别。

为了起到对比效果,我们先新增一个认证文件用户为no_acl:

(1)、新增配置文件no_acl_jaas.conf

vim no_acl_jaas.conf
#添加下面的内容
#-------------------------------
KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="no_acl"
        password="no_acl_pwd";
};

(2)、使用下面的命令行授权用户 wyk_reader 对主题csdn01 的读权限,授权用户 wyk_writer 对主题csdn01 的写权限,no_acl用户不设置任何权限用做对比。

# 给用户wyk_writer 添加csdn01主题的 生产者权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01

# 给用户wyk_reader 添加csdn01主题的 消费者权限
#需要注意这里消费者还需要给消费者组配置权限,消费者组名称要和consumer.properties中配置的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group

 

(3)、验证ACL权限:

(3.1)、当使用no_acl用户作为生产者时可以看到启动时会报错:Not authorized to access topics:[csdn01]。而使用拥有csdn01主题的生产者权限的用户wyk_writer启动时可以正常的使用:

(3.2)、 当使用no_acl用户作为消费者时可以看到启动时会报错:Not authorized to access topics:[csdn01]。而使用拥有csdn01主题的消费者权限的用户wyk_reader启动时可以正常的消费数据:

(3.3)、将生产者的sasl认证文件里的密码 改为错误密码后,启动时会报错invalid username or password:

Python客户端访问 

接下来我们用python客户端访问开启了sasl认证的kafka,首先我们在kafka_server_jaas.conf 配置文件中添加一个用户python_wyk (需要重启kafka集群),并授权对主题csdn01的读写权限用作python客户端的测试:

生产者脚本

from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['wyk01:9092','wyk02:9092','wyk03:9092'],
                         security_protocol="SASL_PLAINTEXT",
                         sasl_mechanism="PLAIN",
                         sasl_plain_username="python_wyk",
                         sasl_plain_password="python_wyk_pwd"
)

producer.send('csdn01', json.dumps({"id":"1","name":"wyk1","company":"csdn1"}).encode('utf-8'))
producer.send('csdn01', json.dumps({"id":"2","name":"wyk2","company":"csdn2"}).encode('utf-8'))
producer.send('csdn01', json.dumps({"id":"3","name":"wyk3","company":"csdn3"}).encode('utf-8'))
producer.flush()

消费者脚本: 

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import json

consumer = KafkaConsumer('csdn01',
                         bootstrap_servers=['wyk01:9092','wyk02:9092','wyk03:9092']
                         ,group_id='test-consumer-group'
                         ,auto_offset_reset='latest'   
                         ,enable_auto_commit=False
                         ,security_protocol="SASL_PLAINTEXT"
                         ,sasl_mechanism="PLAIN"
                         ,sasl_plain_username="python_wyk"
                         ,sasl_plain_password="python_wyk_pwd"
)
for msg in consumer:
     print(msg)

ACL常用命令

查看权限列表:

#查看所有权限列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --list

#查看指定topic的权限列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --list --topic csdn01

添加读写权限:

# 给用户wyk_writer 添加csdn01主题的 生产者权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01

# 给用户wyk_reader 添加csdn01主题的 消费者权限
#需要注意这里消费者还需要给消费者组配置权限,消费者组名称要和consumer.properties中配置的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group

移除权限:

#移除用户wyk_all 对csdn01的读写权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --remove --allow-principal User:wyk_all --operation Read --operation Write --topic csdn01

添加黑名单:

# 禁止来自ip:192.168.145.100的用户 wyk_no对csdn01的消费权限,允许其他所有用户的消费权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --add --allow-principal User:* --allow-host * --deny-principal User:wyk_no --deny-host 192.168.145.100 --operation Read --topic csdn01

添加白名单:

#添加用户wyk_yes在ip192.168.145.100 对csdn01的读写权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --add --allow-principal User:wyk_yes --allow-host 192.168.145.100 --operation Read --operation Write --topic csdn01

希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐