配置步骤

1. 在server.properties中添加

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

说明:

配置上 authorizer.class.name 即开启ACL权限认证,可以通过配置来限定某个账号的操作权限。

User:admin 为超级账号,拥有所有权限。

2. 在config目录下新建文件
kafka-zk-jaas.conf

zookeeper {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-pwd";
};

说明:username 为kafka broker在zk中注册使用的账号密码,通常使用超级账号。

kafka-server-jaas.conf

KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-pwd"
        user_admin="admin-pwd"
        user_broker0="broker0-pwd";
};

说明:

username 为注册到zk时使用的账号密码。

user_admin="admin-pwd" 设定存在账号 admin,密码为 admin-pwd。此处admin密码必须与zk中配置的admin账号密码相同

user_broker0="broker0-pwd" 设定存在账号 broker0,密码为 broker0-pwd。客户端可以使用此账号进行连接,并通过ACL配置来限制该账号的操作权限。

kafka-client-jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="broker0"
        password="broker0-pwd";
};

3. 在zookeeper, kafka和consumer、producer 的启动脚本中添加如下脚本(在最后一行之前添加)

zookeeper-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/******/kafka_2.13-2.5.0/config/kafka-zk-jaas.conf"

kafka-server-start.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/*******/kafka_2.13-2.5.0/config/kafka-server-jaas.conf"

 kafka-console-consumer.sh

 kafka-console-producer.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/*******/kafka_2.13-2.5.0/config/kafka-client-jaas.conf"

4. 启动zk和kafka

./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties

5. 授权

给broker0账户赋予topic=[topic_name]的写权限

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:broker0 --operation Write --topic topic_name

给broker0账户赋予topic=[topic_name]的读权限

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:broker0 --operation Read --topic topic_name

修改consumer.properties中的group.id (默认为 test-consumer-group) 

# consumer group id
group.id=group_id

 给broker0账户赋予group=[group_id]读授权 (外部连接kafka需要指定group.id,此时必须对group进行授权,否则无法通过授权)

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:broker0 --operation Read --group group_id

查看权限

./kafka-acls.sh --list --authorizer-properties zookeeper.connect=127.0.0.1:2181
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=group_name, patternType=LITERAL)`:
        (principal=User:broker0, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=group_id, patternType=LITERAL)`:
        (principal=User:broker0, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topic_name, patternType=LITERAL)`:
        (principal=User:broker0, host=*, operation=READ, permissionType=ALLOW)
        (principal=User:broker0, host=*, operation=WRITE, permissionType=ALLOW)

 ACL命令操作(--operation)对应功能:

OperationResourceAPI
ALTERTopicCreatePartitions
ALTER_CONFIGSTopicAlterConfigs
CREATETopicMetadata if auto.create.topics.enable
CREATETopicCreateTopics
DELETETopicDeleteRecords
DELETETopicDeleteTopics
DESCRIBETopicListOffsets
DESCRIBETopicMetadata
DESCRIBETopicOffsetFetch
DESCRIBETopicOffsetForLeaderEpoch
DESCRIBE_CONFIGSTopicDescribeConfigs
READTopicFetch
READTopicOffsetCommit
READTopicTxnOffsetCommit
WRITETopicProduce
WRITETopicAddPartitionsToTxn
OperationResourceAPI
DELETEGroupDeleteGroups
DESCRIBEGroupDescribeGroup
DESCRIBEGroupFindCoordinator
DESCRIBEGroupListGroups
READGroupAddOffsetsToTxn
READGroupHeartbeat
READGroupJoinGroup
READGroupLeaveGroup
READGroupOffsetCommit
READGroupOffsetFetch
READGroupSyncGroup
READGroupTxnOffsetCommit

6. 启动生产者和消费者

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name --from-beginning --consumer.config ../config/consumer.properties
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name --producer.config ../config/producer.properties

7. 连接成功

Springboot-kafka SASL/PLANTEXT 配置

yaml配置

spring:
  kafka:    
    properties:
      sasl.mechanism: PLAIN
      security.protocol: SASL_PLAINTEXT
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";

该效果等同于在代码中设置系统配置

System.setProperty("java.security.auth.login.config", "/home/*******/kafka_2.13-2.5.0/config/kafka-client-jaas.conf");

 Spring-kafka加载配置逻辑:

首先由Springboot加载application.yaml(或application.properties)可识别的配置,不可识别的配置均无效,如下图所示,IDE提示不可识别的配置springboot都不会加载。

然后在检测到 security.protocol: SASL_PLAINTEXT 之后再读取 sasl.jaas.config 配置,若配置表中不存在则使用

System.getProperty("java.security.auth.login.config");

加载配置。

PS:consumer和producer可以统一配置也可以单独配置(Kafka stream SASL/PLANTEXT配置也一样,这块网上资料比较少,所以特别提一下,统一配置对stream同样生效)

单独配置

spring:
  kafka:
    consumer:
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";


spring:
  kafka:
    producer:
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";


spring:
  kafka:
    streams:
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker0" password="broker0-pwd";

异常分析

1. Windows系统启动Kafka报错异常,ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed 

分析:无

方案:删除 /tmp/kafka-logs(Win10内置ubuntu) 文件夹内所有文件

2. ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 'sasl_plaintext.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /home/******/kafka_2.13-2.5.0/config/kafka-server-jaas.conf 

[2020-07-09 17:03:05,360] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 'sasl_plaintext.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /home/******/kafka_2.13-2.5.0/config/kafka-server-jaas.conf
        at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
        at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
        at org.apache.kafka.common.security.JaasContext.loadServerContext(JaasContext.java:70)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:131)
        at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:921)
        at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)

分析:kafka-server-jaas.conf 配置文件找不到,或者配置的 KafkaServer元素找不到

方案:确认kafka-server.start.sh配置的路径地址正确,确认 kafka-server-jaas.conf 配置文件中存在 KafkaServer 元素,区分大小写

3. INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

[2020-07-09 17:29:49,356] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2020-07-09 17:29:49,765] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2020-07-09 17:29:50,172] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

分析:存在未认证的方式连接Kafka的请求。

方案:检查一下是否有kafka-console-consumer ,kafka-console-producer或者外部进程在尝试连接

4. java.io.EOFException: null,同时kafka控制台刷问题3出现的日志

[18:10:54:133] [TRACE] - org.apache.kafka.common.network.Selector.maybeReadFromClosingChannel(Selector.java:707) - [Producer clientId=producer-1] Read from closing channel failed, ignoring exception
java.io.EOFException: null
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.maybeReadFromClosingChannel(Selector.java:704) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.close(Selector.java:909) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:629) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) [kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.0.jar:?]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]

分析 :Spring-kafka尝试连接kafka未通过授权,nio读取结果为空抛出的异常。

方案:检查授权文件及配置是否正确,检查用户名密码是否正确。

5. Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.clients.admin.Admin.create(Admin.java:71) ~[kafka-clients-2.5.0.jar:?]
	at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getAdmin(DefaultKafkaClientSupplier.java:41) ~[kafka-streams-2.5.0.jar:?]
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:747) ~[kafka-streams-2.5.0.jar:?]
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:584) ~[kafka-streams-2.5.0.jar:?]
	at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:266) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	... 14 more

分析:sasl.jaas.config 未设置或 System.setProperty("java.security.auth.login.config", "kafka-client-jaas.conf"); 未设置。

方案:参照上文提到的 Springboot-kafka SASL/PLANTEXT 配置 进行配置。

 

6. failed authentication due to: Authentication failed: Invalid username or password

[2020-07-14 09:53:52,921] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2020-07-14 09:53:52,922] WARN [Producer clientId=console-producer] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

分析:用户名或密码错误

方案:请确认 kafka-client-jaas.conf 配置的用户名和密码,和 kafka-server-jaas.conf 中配置的用户名密码一致

7. org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic_name]

[2020-07-14 09:55:29,838] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {topic_name=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2020-07-14 09:55:29,841] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [topic_name] (org.apache.kafka.clients.Metadata)
[2020-07-14 09:55:29,843] ERROR Error when sending message to topic topic_name with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic_name]

分析:对 topics:[topic_name] 操作未授权,

方案:使用命令./kafka-acls.sh --list --authorizer-properties zookeeper.connect=127.0.0.1:2181 查看授权配置,是否有登录账号对topic的操作权限

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐