本篇文章主要讲述怎么通过zookeeper作为注册中心实现分布式系统中服务注册与发现的具体实现(具体的细节可能因不同的框架而不同,但原理其实都是一样的)

本文章设计的主要思路:

利用zookeeper临时顺序节点的性质,为每个应用服务在zookeeper上创建临时顺序的节点就(这个节点成为服务节点),而实现注册功能;服务消费者去相应服务节点下取出服务节点的信息,从而实现服务发现功能。具体实现如下:

zookeeper的环境搭建在这就不累述了大家可以在网上轻松的找到相关的文档

本实例采用zookeeper-java客户端原生api设计(大家额可以使用封装过的开源框架zkClient,apache 的Curator框架)

1 搭建项目(采用的开发工具是intellij IDEA)-- 用什么开发工具都无所谓啦

①我们采用spring boot项目快速搭建项目

②建立spring boot项目后添加zookeeper的maven依赖,创建zookeeper--java客户端

这里我们使用的3.4.8版本的,同学也可以随意但建议使用3.3以上的版本,因为3.3.6以后版本变更较大。

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.4.8</version>
</dependency>
 
@Service
public class Zook {

    public static ZooKeeper zooKeeper ;
    //获得配置资源
    @Autowired
    Environment env;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public ZooKeeper create() {
        String connectString = env.getProperty("zookeeper.address");
        String outtime = env.getProperty("zookeeper.sessionouttime");
        try {
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("创建zk会话成功");
                    countDownLatch.countDown();
                }
            };
            //默认6s
            if (outtime.isEmpty()) {
                outtime = "6000";
            }
            ZooKeeper zooKeeper = new ZooKeeper(connectString, Integer.parseInt(outtime), watcher);
            //zookeeper会话建立的特点是:触发一次defaultWatcher(就是注册时传入的watcher)才能真正的被建立
            countDownLatch.await();
            return zooKeeper;
        } catch (Exception e) {
            return null;
        }
    }
}

③在zookeeper上创建一个所有服务节点的根节点(例如:公司英文简写)这样方便统一路径,当然这个节点一定是持久节点;

④创建服务节点(这里是持久节点),而实际保存服务信息的节点是服务节点下子节点,子节点是临时顺序的节点(这一点与阿里的dubbo不同,如果同学门感兴趣可以关注我的博客,后期我会编写dubbo与Eureka对比的相关文章,里面将会详细介绍dubbo知识,着急了解的这方面知识的同学可以给我留言)

@Service
public class ZookRegister {
    //获得配置资源
    @Autowired
    Environment env;

    //固定的根目录比如公司名
    final String fixedpath = "/xxs";

    @Value("spring.application.name")
    String servername;
    @Autowired
    Zook zook;

    //spring容器初始化ZookRegister的实例时执行
    @PostConstruct
    public void register() throws Exception {
        String servername = env.getProperty("spring.application.name");
        String port = env.getProperty("server.port");
        String ip = env.getProperty("server.address");
        Zook.zooKeeper = zook.create();
        ZooKeeper zooKeeper = Zook.zooKeeper;
        Stat existsFixedpath = zooKeeper.exists(fixedpath, false);
        if (existsFixedpath == null) {
            //参数分别是创建的节点路径、节点的数据、权限(此处对所有用户开放)、节点的类型(此处是持久节点)
            zooKeeper.create(fixedpath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        String svnode = fixedpath + "/" + servername;
        Stat existsSvnode = zooKeeper.exists(svnode, false);
        //create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
        if (existsSvnode == null) {
            zooKeeper.create(svnode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        if (ip == null || "".equals(null)) {
            //如果配置文件中没有指定服务ip获取本机ip
            ip = InetAddress.getLocalHost().getHostAddress();
        }
        if (port == null || "".equals(null)) {
            port = "8080";
        }
        String address = ip + ":" + port;
        //临时节点的前缀是服务名称,节点数据是服务address
        String svipmlNode = fixedpath + "/" + servername + "/" + servername;
        zooKeeper.create(svipmlNode, address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

}

 

配置文件:

server.port=8081
zookeeper.address=localhost:2181
zookeeper.sessionouttime=4000
spring.application.name=test


服务注册得工作至此完成

服务发现工作:

@Service
public class GetServer {

    //本地缓存服务列表
    private static Map<String, List<String>> servermap;

    public Watcher myWatcher = new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            //如果服务节点数据发生变化则清空本地缓存(这种做法有点欠佳)
            if (watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)) {
                servermap = null;
            }
        }
    };

    private List<String> getNodeList(String serverName) throws KeeperException, InterruptedException {
        if (servermap == null) {
            servermap = new HashMap<>();
        }
        ZooKeeper zooKeeper = Zook.zooKeeper;
        Stat exists = null;
        Watcher existsWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("sssss");
            }
        };
        try {
            String s = "/xxs/" + serverName;
            exists = zooKeeper.exists(s, existsWatcher);
        } catch (Exception e) {
        }

        //判断是否存在该服务
        if (exists == null) return null;
        List<String> serverList = servermap.get("serverName");
        if (serverList != null && serverList.size() > 0) {
            return serverList;
        }
        List<String> children = zooKeeper.getChildren("/xxs/" + serverName, myWatcher);
        List<String> list = new ArrayList<>();
        for (String s : children) {
            byte[] data = zooKeeper.getData("/xxs/" + serverName + "/" + s, myWatcher, null);
            list.add(new String(data));
        }
        servermap.put(serverName, list);
        return list;
    }

    public String getServerinfo(String serverName) {
        try {
            List<String> nodeList = getNodeList(serverName);
            if (nodeList == null|| nodeList.size()<1) {
                return null;
            }
            //这里使用得随机负载策略,如需需要自己可以实现其他得负载策略
            String s = nodeList.get((int) (Math.random() * nodeList.size()));
            return s;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

服务发现调用getServerinfo()方法传入servername即可获得服务提供者得ip+port,然后去条用服务即可;

至此简单得服务注册与发现工作已完成;而一个框架得功能远不止这一点功能,比如还需要容错机制,负载的策略等功能;这些功能需要自己去完善;(有时间的话我会更新容错机制的实现)




Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐