基于CentOS7、java 1.8.0_281、kafka_2.12-2.6.0、zookeeper-3.5.9。
本文中,“#”表示注释;“$”表示命令输入;“>”表示命令输出;“=>”表示文件中写入。
一、安装
1.1 zookeeper
- 下载 zookeepeer-3.5.9:https://zookeeper.apache.org/releases.html
- 解压到 /opt/module/ (自行选择安装目录)
- 接下来进入 conf 文件夹中做配置(先拷贝 zoo_sample.cfg为zoo.cfg),然后编辑zoo.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/mnt/module/zookeeper-3.5.9/data/
clientPort=2181
server.1=0.0.0.0:2888:3888
- 添加环境变量
$ vi /etc/profile
=> export ZK_HOME=/opt/module/zookeeper-3.5.9
export PATH=$PATH:$ZK_HOME/bin
$ source /etc/profile
- 启动节点
# 启动
$ zkServer.sh start
# 查看状态
$ zkServer.sh status
1.2 kafka
- 下载 kafka_2.12-2.6.0:http://kafka.apache.org/downloads
- 解压到 /opt/module/,并新建 logs 文件夹
- 修改 config 目录下的 server.properties
broker.id=1 # 集群中唯一,且不能大于 181 ?
listeners=PLAINTEXT://0.0.0.0:9092 # 节点通信地址
log.dirs=/opt/module/kafka_2.12-2.6.0/logs # 数据存放路径
num.partitions=10 # 新建topic默认多少个分区
zookeeper.connect=127.0.0.1:2181 # zookeeper的通信地址
- 添加环境变量
$ vi /etc/profile
=> export KAFKA_HOME=/opt/module/kafka_2.12-2.6.0
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
- 启动节点
# 前台启动
kafka-server-start.sh /opt/module/kafka_2.12-2.6.0/config/server.properties
# 后台启动,通过jps命令及查看启动日志判断是否启动成功
kafka-server-start.sh -daemon /opt/module/kafka_2.12-2.6.0/config/server.properties
二、添加安全认证机制SASL
2.1 zookeeper安全认证配置
导入kafka相关jar包
从 /opt/module/kafka_2.12-2.6.0/lib 目录下导入以下的 jar 包到zookeeper的 lib 目录下。
kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar
zoo.cfg配置
添加如下内容
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zk_server_jaas.conf
在 /opt/module/zookeeper-3.5.9/conf 目录下新建文件 zk_server_jaas.conf ,内容如下:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_kafka="kfk123;
};
这个文件定义连接到 zookeeper 客户端的用户名和密码,其中 username 和 password 是 zookeeper 集群节点间内部认证的用户名密码,而以 user_ 开头的字段则定义了外部程序的认证用户名(kafka)和密码(kfk123)。
修改zkEnv.sh
在 /opt/module/zookeeper-3.5.9/bin/zkEnv.sh 添加以下内容:
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/module/zookeeper-3.5.9/conf/zk_server_jaas.conf "
重新启动 zookeeper ,无错误即配置成功,若报错请查看日志排查问题。
2.2 kafka安全认证配置
kafka_server_jaas.conf
在 kafka 的 config 目录下,新建 kafka_server_jaas.conf 文件,内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin123"
user_admin="admin123"
user_producer="prod123"
user_consumer="cons123";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kfk123";
};
KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。
配置server.properties
打开 /opt/module/kafka_2.12-2.6.0/config/server.properties ,修改或添加如下配置
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.89:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#super.users=User:admin
allow.everyone.if.no.acl.found=true
delete.topic.enable=true
auto.create.topics.enable=true
修改 kafka-server-start.sh
将 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 修改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.6.0/config/kafka_server_jaas.conf"
重新启动 kafka,无错误即配置成功,若报错请查看日志排查问题。
三、Python 连接 kafka
生产者
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers = ["192.168.1.89:9092"],
value_serializer = lambda m: json.dumps(m).encode('utf8'),
linger_ms = 5,
batch_size = 1*1024*1024,
api_version=(0, 10),
security_protocol='SASL_PLAINTEXT',
sasl_mechanism="PLAIN",
sasl_plain_username="producer",
sasl_plain_password="prod123"
)
topic_name = 'topic001'
msg = {"a":1, "b":2, "c":3}
producer.send(topic_name, value=msg)
producer.close()
消费者
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers = "192.168.1.89:9092",
value_deserializer = lambda m: json.loads(m.decode('utf8')),
auto_offset_reset = "earliest",
api_version=(0, 10),
security_protocol='SASL_PLAINTEXT',
sasl_mechanism="PLAIN",
sasl_plain_username="consume",
sasl_plain_password="cons123"
)
topics = ['topic001']
consumer.subscribe(topics=topics)
for msg in consumer:
print(msg)
17 评论
阿添 2021-07-22 18:02:58
55555
阿添 2021-07-22 18:00:13
rrr
阿添 2021-07-22 17:58:23
yyy
阿添 2021-07-22 17:56:11
ooo
阿添 2021-07-22 17:54:39
123
阿添 2021-07-22 17:51:05
iii
阿添 2021-07-22 17:49:06
uuu
阿添 2021-07-22 17:43:50
啦啦啦
阿添 2021-07-22 17:41:32
哈哈
阿添 2021-07-22 17:38:00
123
阿添 2021-07-22 14:22:23
111