kafka安装与使用(SASL认证)

  • 分类: 数据库
  • 发表日期:2021-06-20 20:32:00
  • 最后修改:2021-07-20 18:14:00

基于CentOS7、java 1.8.0_281、kafka_2.12-2.6.0、zookeeper-3.5.9。

本文中,“#”表示注释;“$”表示命令输入;“>”表示命令输出;“=>”表示文件中写入。

 

一、安装

1.1 zookeeper

  • 下载 zookeepeer-3.5.9:https://zookeeper.apache.org/releases.html
  • 解压到 /opt/module/ (自行选择安装目录)
  • 接下来进入 conf 文件夹中做配置(先拷贝 zoo_sample.cfg为zoo.cfg),然后编辑zoo.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/mnt/module/zookeeper-3.5.9/data/
clientPort=2181
server.1=0.0.0.0:2888:3888
  • 添加环境变量
$ vi /etc/profile
=> export ZK_HOME=/opt/module/zookeeper-3.5.9
   export PATH=$PATH:$ZK_HOME/bin
$ source /etc/profile
  • 启动节点
# 启动
$ zkServer.sh start

# 查看状态
$ zkServer.sh status

1.2 kafka

broker.id=1    # 集群中唯一,且不能大于 181 ?
listeners=PLAINTEXT://0.0.0.0:9092    # 节点通信地址
log.dirs=/opt/module/kafka_2.12-2.6.0/logs    # 数据存放路径
num.partitions=10    # 新建topic默认多少个分区
zookeeper.connect=127.0.0.1:2181    # zookeeper的通信地址
  • 添加环境变量
$ vi /etc/profile
=> export KAFKA_HOME=/opt/module/kafka_2.12-2.6.0
   export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
  • 启动节点
# 前台启动
kafka-server-start.sh /opt/module/kafka_2.12-2.6.0/config/server.properties

# 后台启动,通过jps命令及查看启动日志判断是否启动成功
kafka-server-start.sh -daemon /opt/module/kafka_2.12-2.6.0/config/server.properties

 

二、添加安全认证机制SASL

2.1 zookeeper安全认证配置

导入kafka相关jar包

从 /opt/module/kafka_2.12-2.6.0/lib 目录下导入以下的 jar 包到zookeeper的 lib 目录下。

kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar

zoo.cfg配置

添加如下内容

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

zk_server_jaas.conf

在 /opt/module/zookeeper-3.5.9/conf 目录下新建文件 zk_server_jaas.conf ,内容如下:

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="admin" 
    password="admin" 
    user_kafka="kfk123;
};

这个文件定义连接到 zookeeper 客户端的用户名和密码,其中 username 和 password 是 zookeeper 集群节点间内部认证的用户名密码,而以 user_ 开头的字段则定义了外部程序的认证用户名(kafka)和密码(kfk123)。

修改zkEnv.sh

在 /opt/module/zookeeper-3.5.9/bin/zkEnv.sh 添加以下内容:

export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/module/zookeeper-3.5.9/conf/zk_server_jaas.conf "

重新启动 zookeeper ,无错误即配置成功,若报错请查看日志排查问题。

2.2 kafka安全认证配置

kafka_server_jaas.conf

在 kafka 的 config 目录下,新建 kafka_server_jaas.conf 文件,内容如下:

KafkaServer {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="admin123"
  	user_admin="admin123"
	user_producer="prod123"
    user_consumer="cons123";
};

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
	username="kafka"
	password="kfk123";
};

KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。

配置server.properties

打开 /opt/module/kafka_2.12-2.6.0/config/server.properties ,修改或添加如下配置

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.89:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#super.users=User:admin
allow.everyone.if.no.acl.found=true

delete.topic.enable=true
auto.create.topics.enable=true

修改 kafka-server-start.sh

将 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 修改为

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.6.0/config/kafka_server_jaas.conf"

重新启动 kafka,无错误即配置成功,若报错请查看日志排查问题。

 

三、Python 连接 kafka

生产者

import json
from kafka import KafkaProducer

producer = KafkaProducer(
            bootstrap_servers = ["192.168.1.89:9092"],
            value_serializer = lambda m: json.dumps(m).encode('utf8'),
            linger_ms = 5,
            batch_size = 1*1024*1024,
            api_version=(0, 10),
            security_protocol='SASL_PLAINTEXT',
            sasl_mechanism="PLAIN",
            sasl_plain_username="producer",
            sasl_plain_password="prod123"
           )

topic_name = 'topic001'
msg = {"a":1, "b":2, "c":3}

producer.send(topic_name, value=msg)
producer.close()

消费者

import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
            bootstrap_servers = "192.168.1.89:9092",
            value_deserializer = lambda m: json.loads(m.decode('utf8')),
            auto_offset_reset = "earliest",
            api_version=(0, 10),
            security_protocol='SASL_PLAINTEXT',
            sasl_mechanism="PLAIN",
            sasl_plain_username="consume",
            sasl_plain_password="cons123"
           )   

topics = ['topic001']
consumer.subscribe(topics=topics)

for msg in consumer:
    print(msg)

 

post
2020年12月26日 19:59 原创
post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年3月25日 22:09 原创
post
2021年4月15日 20:59 原创
post
2021年4月18日 16:32 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:02 原创
post
2021年8月25日 16:11
post
2021年11月24日 16:45 原创
post
2024年6月13日 09:47 原创

17 评论

阿添 2021-07-22 18:02:58

55555

阿添 2021-07-22 18:00:13

rrr

阿添 2021-07-22 17:58:23

yyy

阿添 2021-07-22 17:56:11

ooo

阿添 2021-07-22 17:54:39

123

阿添 2021-07-22 17:51:05

iii

阿添 2021-07-22 17:49:06

uuu

阿添 2021-07-22 17:43:50

啦啦啦

阿添 2021-07-22 17:41:32

哈哈

阿添 2021-07-22 17:38:00

123

阿添 2021-07-22 14:22:23

111

阿添 回复 阿添 2021-07-22 14:22:27

222

阿添 回复 阿添 2021-07-22 14:22:32

333

阿添 回复 阿添 2021-07-22 15:03:13

666

阿添 回复 阿添 2021-07-22 15:03:25

qqq

阿添 回复 阿添 2021-07-22 15:03:33

666

阿添 回复 阿添 2021-07-22 15:03:46

qwe