大数据Hadoop之——EFAK安全认证实现(kafka+zookeeper)
2023-09-14 09:12:57 时间
文章目录
一、概述
前面已经很详细的讲了
kafka
和zookeeper
的安全机制和实现,如果它两配置了安全机制认证,EFAK作为监控kafka和zookeeper的图像化软件,必然也离不开对kafka和zookeeper安全认证的配置。
关于EFAK的介绍和安装部署可以参考我这篇文章:大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)
官方文档:https://www.kafka-eagle.org/articles/docs/installation/security.html
二、EFAK kafka鉴权配置(zookeeper无鉴权)
最好先看完我这两篇文章再来配置哦:
分布式开源协调服务——Zookeeper
大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)
1)kafka Kerberos 鉴权 配置
1、启动zookeeper服务
$ cd $KAFKA_HOME
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
$ ./bin/zookeeper-shell.sh hadoop-node1:12181
2、启动kafka服务
$ cd $KAFKA_HOME
$ ./bin/kafka-server-start-sasl.sh -daemon config/server-sasl.properties
3、配置EFAK
$ cd $KE_HOME
$ vi conf/system-config.properties
配置如下:
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=true
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=GSSAPI
# serviceName与kakfka 配置文件里的这个字段sasl.kerberos.service.name值一样
cluster1.efak.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/kerberos/kafka-client.keytab" storeKey=true useTicketCache=false serviceName=kafka-server principal="kafka-client@HADOOP.COM";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
4、把配置copy到几个节点
$ scp $KE_HOME/conf/system-config.properties hadoop-node2:$KE_HOME/conf/
$ scp $KE_HOME/conf/system-config.properties hadoop-node3:$KE_HOME/conf/
5、启动EFAK服务
$ ke.sh cluster restart
6、测试验证
2)kafka 账号密码鉴权 配置
1、启动zookeeper服务
$ cd $KAFKA_HOME
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
$ ./bin/zookeeper-shell.sh hadoop-node1:12181
2、启动kafka服务
$ cd $KAFKA_HOME
$ ./bin/kafka-server-start-pwd.sh config/server-pwd.properties
3、配置EFAK
$ cd $KE_HOME
$ vi conf/system-config.properties
配置如下:
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=true
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=PLAIN
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="123456";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
4、把配置copy到几个节点
$ scp $KE_HOME/conf/system-config.properties hadoop-node2:$KE_HOME/conf/
$ scp $KE_HOME/conf/system-config.properties hadoop-node3:$KE_HOME/conf/
5、启动EFAK服务
$ ke.sh cluster restart
6、测试验证
三、EFAK kafka kerberos配置(zookeeper账号密码鉴权)
最好先看完我这篇文章再来配置哦: 大数据Hadoop之——Zookeeper鉴权认证(Kerberos认证+账号密码认证)
1)开启zookeeper 账号密码认证
不清楚的小伙伴可以先看我上面的文章哦
【温馨提示】看EFAK配置里只有zookeeper的账号密码认证的配置,所以这里只能选择开启zookeeper的账号密码认证了。
$ cd $KAFKA_HOME
$ ./bin/zookeeper-server-start-userpwd.sh -daemon ./config/zookeeper-userpwd.properties
$ ./bin/zookeeper-shell-userpwd.sh hadoop-node1:12181
2)开启kafka kerberos认证
不清楚的小伙伴可以先看我上面的文章哦
$ cd $KAFKA_HOME
$ ./bin/kafka-server-kerberos-zkcli-userpwd-start.sh -daemon ./config/server-kerberos-zkcli-userpwd.properties
3)配置EFAK
$ cd $KE_HOME
$ vi conf/system-config.properties
配置如下:
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=true
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=kafka
cluster1.zk.acl.password=123456
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=true
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=GSSAPI
# serviceName与kakfka 配置文件里的这个字段sasl.kerberos.service.name值一样
cluster1.efak.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/kerberos/kafka-client.keytab" storeKey=true useTicketCache=false serviceName=kafka-server principal="kafka-client@HADOOP.COM";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
4、把配置copy到几个节点
$ scp $KE_HOME/conf/system-config.properties hadoop-node2:$KE_HOME/conf/
$ scp $KE_HOME/conf/system-config.properties hadoop-node3:$KE_HOME/conf/
5、启动EFAK服务
$ ke.sh cluster restart
6、测试验证
四、EFAK kafka 账号密码配置(zookeeper账号密码鉴权)
1)开启zookeeper 账号密码认证
不清楚的小伙伴可以先看我上面的文章哦
【温馨提示】看EFAK配置里只有zookeeper的账号密码认证的配置,所以这里只能选择开启zookeeper的账号密码认证了。
$ cd $KAFKA_HOME
$ ./bin/zookeeper-server-start-userpwd.sh -daemon ./config/zookeeper-userpwd.properties
2)开启kafka 账号密码认证
不清楚的小伙伴可以先看我上面的文章哦
$ cd $KAFKA_HOME
$ ./bin/kafka-server-start-zkcli-userpwd.sh -daemon ./config/server-zkcli-userpwd.properties
# 查看topic列表
$ ./bin/kafka-topics-pwd.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/userpwd/client.properties
3)配置EFAK
$ cd $KE_HOME
$ vi conf/system-config.properties
配置如下:
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=true
### scheme采用何种方式授权
# 1. world :默认方式,相当于全部都能访问;
# 2.auth :代表已经认证通过的用户( cli中可以通过addauth digest user:pwd来添加当前上下文中的授权用户);
# digest :即用户名:密码这种方式认证,这也是业务系统中最常用的。用username:password字符串来产生一个MD5串,然后该 串被用来作为ACL ID。认证是通过明文发送username password来进行的,当用在ACL时,表达式为username:base64;
# ip :使用客户端的主机IP作为ACL ID。这个ACL表达式的格式为addr/Nts ,此时add「中的有效位与客户端addr中的有效位进行比对,base64是password的SHA1摘要的编码。
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=kafka
cluster1.zk.acl.password=123456
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=true
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=PLAIN
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="123456";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
4、把配置copy到几个节点
$ scp $KE_HOME/conf/system-config.properties hadoop-node2:$KE_HOME/conf/
$ scp $KE_HOME/conf/system-config.properties hadoop-node3:$KE_HOME/conf/
5、启动EFAK服务
$ ke.sh cluster restart
6、测试验证
五、EFAK SSL配置(zookeeper无鉴权)
最好先看完我这篇文章再来配置哦: Kafka安全机制(Kafka SSL认证实现)
1)启动zookeeper服务
$ cd $KAFKA_HOME
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 登录客户端
$ ./bin/zookeeper-shell.sh hadoop-node1:12181
2)启动kafka服务
$ cd $KAFKA_HOME
# 【温馨提示】在“Kafka安全机制(Kafka SSL认证实现)”,zookeeper都是带鉴权的,所以这里启动脚本就用kafka-server-start.sh
$ ./bin/kafka-server-start.sh -daemon ./config/server-ssl.properties
# 查看topic列表
$ ./bin/kafka-topics.sh --list --bootstrap-server hadoop-node1.ssltest.com:19092 --command-config ./config/client-ssl.properties
3)配置EFAK
$ cd $KE_HOME
$ vi conf/system-config.properties
配置如下:
######################################
# kafka ssl authenticate
######################################
cluster1.efak.ssl.enable=true
cluster1.efak.ssl.protocol=SSL
# kafka server.properties "ssl.truststore.location" value
cluster1.efak.ssl.truststore.location=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/kafka.truststore
# kafka server.properties "ssl.truststore.password" value
cluster1.efak.ssl.truststore.password=123456
# kafka server.properties "ssl.keystore.location" value
cluster1.efak.ssl.keystore.location=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/certificates/kafka.keystore
# kafka server.properties "ssl.keystore.password" value
cluster1.efak.ssl.keystore.password=123456
# kafka server.properties "ssl.endpoint.identification.algorithm" value
cluster1.efak.ssl.endpoint.identification.algorithm=https
# kafka server.properties "ssl.key.password" value
cluster1.efak.ssl.key.password=123456
4)把配置copy到几个节点
$ scp $KE_HOME/conf/system-config.properties hadoop-node2:$KE_HOME/conf/
$ scp $KE_HOME/conf/system-config.properties hadoop-node3:$KE_HOME/conf/
5)启动EFAK服务
$ ke.sh cluster restart
6)测试验证
EFAK里配置kafka和zookeeper的安全认证就到这了,有疑问的小伙伴欢迎给我留言哦,后续文章更丰富,请小伙伴耐心等待哦~
相关文章
- kafka学习之-雅虎开源管理工具Kafka Manager
- Apache Kafka - How to Load Test with JMeter
- JMeter进行Apache Kafka负载测试
- Knative 实战:基于 Kafka 实现消息推送
- kafka文档
- KAFKA安装+配置详解+常用操作+监控
- 大叔问题定位分享(3)Kafka集群broker进程逐个报错退出
- centos8安装kafka(单机方式)
- Kafka详解二、如何配置Kafka集群
- kafka详解三:开发Kafka应用
- spark集成kafka数据源
- kafka详解三:开发Kafka应用
- 来吧,1分钟带你玩转Kafka
- 全网最详细地理解Kafka中的Topic和Partition以及关于kafka的消息分发、服务端如何消费指定分区、kafka的分区分配策略(range策略和RoundRobin策略)
- 一脸懵逼学习KafKa集群的安装搭建--(一种高吞吐量的分布式发布订阅消息系统)
- y151.第八章 Servless和Knative从入门到精通 -- Kafka 与Eventing(十五)
- Kafka简介及使用PHP处理Kafka消息
- 大数据开发笔记(七):Kafka分布式流式处理
- 大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)
- 解开Kafka神秘的面纱(一):kafka架构与应用场景
- 【大数据开发运维解决方案】OGG For Bigdata 12按操作类型同步Oracle数据到kafka不同topic