Zookeeper的ACL权限

ACL官方文档链接

1. ACL的简介

首先说明一下为什么需要ACL
  简单来说 :在通常情况下,zookeeper允许未经授权的访问,因此在安全漏洞扫描中暴漏未授权访问漏洞。这在一些监控很严的系统中是不被允许的,所以需要ACL来控制权限.

接下来贴出来的截图是:实际环境中网路检测出来需要整改的zookeeper漏洞
下图是一个实际遇到的需求

既然需要ACL来控制权限,那么Zookeeper的权限有哪些呢?

权限包括以下几种:

CREATE: 能创建子节点
READ:能获取节点数据和列出其子节点
WRITE: 能设置节点数据
DELETE: 能删除子节点
ADMIN: 能设置权限
说到权限,就要介绍一下zookeeper的认证方式:

包括以下四种:

world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
ip:使用Ip地址认证

ACL基本介绍就到这里

2. 没有ACL认证时zookeeper的操作

直接上代码 : 更改一下服务器地址和端口号即可!

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ZkConn {
	 public static void main(String[] args) 
             throws IOException, KeeperException, InterruptedException {
     /**
      *  创建一个与服务器的连接
      *  参数一:服务器地址和端口号(该端口号值服务器允许客户端连接的端口号)
      *  参数二:连接会话超时时间
      *  参数三:观察者,连接成功会触发该观察者。不过只会触发一次。
      *      该Watcher会获取各种事件的通知
      */
     ZooKeeper zk = new ZooKeeper("node005:4180", 60000, new Watcher() {
         // 监控所有被触发的事件
         public void process(WatchedEvent event) {
             System.out.println("监控所有被触发的事件:EVENT:" + event.getType());
         }
     });
     System.out.println("*******************************************************");
     // 查看根节点的子节点
     System.out.println("查看根节点的子节点:ls / => " + zk.getChildren("/", true));
     System.out.println("*******************************************************");
     // 创建一个目录节点
     if (zk.exists("/node", true) == null) {
         /**
          * 参数一:路径地址
          * 参数二:想要保存的数据,需要转换成字节数组
          * 参数三:ACL访问控制列表(Access control list),
          *      参数类型为ArrayList<ACL>,Ids接口提供了一些默认的值可以调用。
          *      OPEN_ACL_UNSAFE     This is a completely open ACL 
          *                          这是一个完全开放的ACL,不安全
          *      CREATOR_ALL_ACL     This ACL gives the
          *                           creators authentication id's all permissions.
          *                          这个ACL赋予那些授权了的用户具备权限
          *      READ_ACL_UNSAFE     This ACL gives the world the ability to read.
          *                          这个ACL赋予用户读的权限,也就是获取数据之类的权限。
          * 参数四:创建的节点类型。枚举值CreateMode
          *      PERSISTENT (0, false, false)
          *      PERSISTENT_SEQUENTIAL (2, false, true)
          *          这两个类型创建的都是持久型类型节点,回话结束之后不会自动删除。
          *          区别在于,第二个类型所创建的节点名后会有一个单调递增的数值
          *      EPHEMERAL (1, true, false)
          *      EPHEMERAL_SEQUENTIAL (3, true, true)
          *          这两个类型所创建的是临时型类型节点,在回话结束之后,自动删除。
          *          区别在于,第二个类型所创建的临时型节点名后面会有一个单调递增的数值。
          * 最后create()方法的返回值是创建的节点的实际路径
          */
         zk.create("/node", "conan".getBytes(),
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         System.out.println("创建一个目录节点:create /node conan");
         /**
          *  查看/node节点数据,这里应该输出"conan"
          *  参数一:获取节点的路径
          *  参数二:说明是否需要观察该节点,设置为true,则设定共享默认的观察器
          *  参数三:stat类,保存节点的信息。例如数据版本信息,创建时间,修改时间等信息
          */
         System.out.println("查看/node节点数据:get /node => "
                 + new String(zk.getData("/node", false, null)));
         /**
          * 查看根节点
          * 在此查看根节点的值,这里应该输出上面所创建的/node节点
          */
         System.out.println("查看根节点:ls / => " + zk.getChildren("/", true));
     }
     System.out.println("*******************************************************");
     // 创建一个子目录节点
     if (zk.exists("/node/sub1", true) == null) {
         zk.create("/node/sub1", "sub1".getBytes(),
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         System.out.println("创建一个子目录节点:create /node/sub1 sub1");
         // 查看node节点
         System.out.println("查看node节点:ls /node => "
                 + zk.getChildren("/node", true));
     }
     System.out.println("*******************************************************");
     /**
      *  修改节点数据
      *  修改的数据会覆盖上次所设置的数据
      *  setData()方法参数一、参数二不多说,与上面类似。
      *  参数三:数值型。需要传入该界面的数值类型版本号!!!
      *      该信息可以通过Stat类获取,也可以通过命令行获取。
      *      如果该值设置为-1,就是忽视版本匹配,直接设置节点保存的值。
      */
     if (zk.exists("/node", true) != null) {
         zk.setData("/node", "changed".getBytes(), -1);
         // 查看/node节点数据
         System.out.println("修改节点数据:get /node => "
                 + new String(zk.getData("/node", false, null)));
     }
     System.out.println("*******************************************************");
     // 删除节点
     if (zk.exists("/node/sub1", true) != null) {
         zk.delete("/node/sub1", -1);
         zk.delete("/node", -1);
         // 查看根节点
         System.out.println("删除节点:ls / => " + zk.getChildren("/", true));
     }
     // 关闭连接
     zk.close();
 }
}

以下是代码的运行结果 :
(程序运行开始控制台打印出来的很多日志信息就不一一截取了)

2018-05-22 08:41:14,942 INFO  [main-SendThread(node005:4180)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server node005/192.168.1.54:4180. Will not attempt to authenticate using SASL (unknown error)
2018-05-22 08:41:14,944 INFO  [main-SendThread(node005:4180)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to node005/192.168.1.54:4180, initiating session
2018-05-22 08:41:15,007 INFO  [main-SendThread(node005:4180)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1235)) - Session establishment complete on server node005/192.168.1.54:4180, sessionid = 0x36010c48e1200017, negotiated timeout = 40000
监控所有被触发的事件:EVENT:None
查看根节点的子节点:ls / => [super, kaishuntest1, testWatch, zookeeper, kaishuntest, zks10000000041, demo, hbase, kaishun]
*******************************************************
监控所有被触发的事件:EVENT:NodeCreated
监控所有被触发的事件:EVENT:NodeChildrenChanged
创建一个目录节点:create /node conan
查看/node节点数据:get /node => conan
查看根节点:ls / => [super, kaishuntest1, node, testWatch, zookeeper, kaishuntest, zks10000000041, demo, hbase, kaishun]
*******************************************************
监控所有被触发的事件:EVENT:NodeCreated
创建一个子目录节点:create /node/sub1 sub1
查看node节点:ls /node => [sub1]
*******************************************************
监控所有被触发的事件:EVENT:NodeDataChanged
修改节点数据:get /node => changed
*******************************************************
监控所有被触发的事件:EVENT:NodeDeleted
监控所有被触发的事件:EVENT:NodeChildrenChanged
监控所有被触发的事件:EVENT:NodeChildrenChanged
删除节点:ls / => [super, kaishuntest1, testWatch, zookeeper, kaishuntest, zks10000000041, demo, hbase, kaishun]
2018-05-22 08:41:15,178 INFO  [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x36010c48e1200017 closed
2018-05-22 08:41:15,178 INFO  [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down

从这里面可以看到,在没有进行任何设置的前提下,所有的操作都时被允许的!

3.接下来开始添加ACL认证

a. 首先进入到zookeeper
在zookeeper安装目录下的bin目录执行 : 
zkCli.sh -server 192.168.1.54:4180 // 服务器地址 : zookeeper端口号(默认2181)
b. 没有添加ACL认证的节点信息
create /test // 创建一个进行ACL认证测试的节点
getAcl /test // 获取该节点信息
控制台输出 :
'world,'anyone
: cdrwa
// 从结果来看,结合之前的介绍来看,新创建的节点默认是全世界都可以访问,并且具有全部的5种权限的.
c. 添加ACL认证
create /test 'test-data' //创建节点 
addauth digest xmr:123456 //增加一个认证用户  格式 addauth digest 用户名:密码
 setAcl /test auth:xmr:123456:r //对新增加的用户设置权限,r代表只读权限
 getAcl /test //获取节点信息

这里写图片描述
从控制台的输出可以看到,密码已经被加密,加密规则如下:

static public String generateDigest(String idPassword)
        throws NoSuchAlgorithmException {
    String parts[] = idPassword.split(":", 2);
    byte digest[] = MessageDigest.getInstance("SHA1").digest(
            idPassword.getBytes());
    return parts[0] + ":" + base64Encode(digest);
}

SHA1加密,然后base64编码

这个时候我们来试验一下对刚刚的节点进行非读取操作

set /test nihao // 向该节点里面写入数据(如果该节点存在数据,此操作为修改数据)
控制台返回如下 :
Authentication is not valid : /test // 说明我们配置的ACL认证已经生效

** 刚刚介绍的API操作再走一发,这次只列出部分代码**

   if (zk.exists("/test", true) != null) {
         zk.setData("/test", "ACL认证".getBytes(), -1);
         // 查看/node节点数据
         System.out.println("修改节点数据:get /test => "
                 + new String(zk.getData("/test", false, null)));
     }
     System.out.println("*******************************************************");

尝试修改,test节点的数据, 之前怎么做都是没问题的,现在呢?

控制台输出结果如下:
Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /test
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1270)
	at mastercom.cn.zookeeper.ZkConn.main(ZkConn.java:94)

可见,ACL权限设置的作用就在这儿了!

注意事项:
要修改某个节点的ACL属性,必须具有read、admin二种权限。
要删除某个节点下的子节点,必须具有对父节点的read权限,以及父节点的delete权限。

遇到的问题 :

在本地集群,以及代码测试 : 设置的ACL认证确实有效,成功的避免了一些用户对于zookeeper的任意操作!
但是在实际中,遇到了巨大的问题!
由于现场的集群配置的是高可用的,zookeeper下面有如下节点 :

zookeeper,hadoop-ha,spark等等

对集群下面所有节点 都设置了ACL权限认证之后,集群的启动直接报错!集群彻底爆炸!

尝试了各种各样的办法都宣告失败之后,使用通过zookeeper超级用户模式访问这些节点,我们修改了zookeeper的zookeeper.DigestAuthenticationProvider.superDigest参数。

在zkServer.sh配置启动参数:

nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:g9oN2HttPfn8MMWJZ2r45Np/LIA=" \

提示 : /nohup 找到该位置, 然后添加进去即可
重要的是添加下图中的这一行!!!这里的super: 后面跟的是:superpw的密文
添加后的结果如下所示:
重启zookeeper之后,执行:

addauth digest super:superpw //添加超级用户,设置密码superpw(对应于: g9oN2HttPfn8MMWJZ2r45Np/LIA=")

将zookeeper下面的所有节点: 设置为

'world,'anyone
: cdrwa
命令如下 : 
setAcl /testAcl world:anyone:cdrwa //将该节点设置为最高权限!

然后重启集群, 问题得到解决!!!
最终,我们使用ACL ip认证的方式,完美解决系统漏洞的问题!
参考的博客链接

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐