SASL_PLAINTEXT 实现认证

1. server.properties中添加:

listeners=SASL_PLAINTEXT://hostname:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

 

2. config目录下新建三个文件:

kafka_server_jaas.conf

 

KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-pwd"
        user_admin="admin-pwd"
        user_alice="alice-pwd"
        user_bob="bob-pwd";
};

kafka_cilent_jaas.conf

 

 

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="alice"
        password="alice-pwd";
};

kafka_zoo_jaas.conf

 

 

zookeeper {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-pwd";
};

 

 

3. 修改kafka各项sh脚本 
zookeeper-server-start.sh中添加(要在最后一行之前添加,别添加在最后一行了):

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/ocdc/app/kafka_2.10-0.10.1.1/config/kafka_zoo_jaas.conf"

kafka-server-start.sh中添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/ocdc/app/kafka_2.10-0.10.1.1/config/kafka_server_jaas.conf"

kafka-console-consumer.sh和kafka-console-producer.sh中添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/ocdc/app/kafka_2.10-0.10.1.1/config/kafka_client_jaas.conf"

 

4.启动zookeeper 和Kafka集群

上述步骤,为集群添加了SASL_PLAINTEXT认证。Kafka Server间/Kafka和ZK间,利用admin账号互联。

同时Kafka Server还保留了两个普通用户:alice和bob,密码分别为alice-pwd,bob-pwd。

5.测试

利用kafka-console-consumer.sh和kafka-console-producer.sh,同时修改kafka_cilent_jaas.conf中的用户名和密码,测试认证机制是否生效。

 

 

Kafka-ACL实现权限控制

1. server.properties中添加:

authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

Kafka Server间是使用admin账号互联的,使用得把admin配为超级账号

 

2. 重启Kafka集群

 

3.利用kafka-acls.sh为topic设置ACL

sh kafka-acls.sh --authorizer-properties zookeeper.connect=hostname:2181 --add --allow-principal  User:alice  --group test-consumer-group --topic alice-topic

这里注意,如果alice要作为消费端连接alice-topic的话,必须对其使用的group(test-consumer-group)也赋权

 

 

4. 利用kafka-acls.sh设置不同的权限,同时修改kafka_cilent_jaas.conf中的用户名和密码,验证ACL和认证机制。

 

JAVA producer端示例

public class NewProducer
{
    public static void main(String[] args) throws IOException
    {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        System.out.println("send msg");
        producer.send(
                new ProducerRecord<String, String>("topic_name", "123123123"));
        System.out.println("send one msg : 123123123");
        producer.close();
    }
}

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐