在分布式架构的系统中,系统经常被暴露为服务以供其他系统调用,这也是SOA或微服务架构常用的模式。

      为了使服务之间能够互相通信,需要有一个协调系统来管理这些服务,以便这些服务能够互相找到对方,这就是服务注册以发现机制。这个协调系统有时也被称作“注册中心”;

      下面,我们将基于zookeeper来实现服务注册与发现功能。

       新建springboot项目,添加一下依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

服务注册接口与实现:

package com.fish.learn.zookeeper.service.registry;

/**
 * @Description:
 * @Author devin.jiang
 * @CreateDate 2019/4/30 20:41
 */
public interface ServiceRegistry {

    /**
     * 注册服务
     * @param serviceName
     * @param serviceAddress
     */
    void registry(String serviceName, String serviceAddress);

}
package com.fish.learn.zookeeper.service.registry.impl;

import com.fish.learn.zookeeper.service.constant.Constant;
import com.fish.learn.zookeeper.service.registry.ServiceRegistry;
import org.I0Itec.zkclient.ZkClient;

/**
 * @Description:
 * @Author devin.jiang
 * @CreateDate 2019/4/30 20:44
 */
public class ZkServiceRegistry implements ServiceRegistry {

    private String zkAddress = "localhost";

    private ZkClient zkClient;

    public void init() {

        zkClient = new ZkClient(zkAddress,
                Constant.ZK_SESSION_TIMEOUT,
                Constant.ZK_CONNECTION_TIMEOUT);
        System.out.println(">>> connect to zookeeper");

    }

    @Override
    public void registry(String serviceName, String serviceAddress) {

        //创建registry节点(持久)
        String registryPath = Constant.ZK_REGISTRY;
        if (!zkClient.exists(registryPath)) {
            zkClient.createPersistent(registryPath);
            System.out.println(">>> create registry node:" + registryPath);
        }

        //创建service节点(持久)
        String servicePath = registryPath + "/" + serviceName;
        if (!zkClient.exists(servicePath)) {
            zkClient.createPersistent(servicePath);
            System.out.println(">>>create service node:" + servicePath);
        }

        //创建address节点(临时)
        String addressPath = servicePath + "/address-";
        String addressNode = zkClient.createEphemeralSequential(addressPath,serviceAddress);
        System.out.println(">>> create address node:" + addressNode);

    }

}

服务发现实现:

package com.fish.learn.zookeeper.service.discovery;

/**
 * @Description:
 * @Author devin.jiang
 * @CreateDate 2019/4/30 21:01
 */
public interface ServiceDiscovery {

    /**
     * 服务发现
     * @param name
     * @return
     */
    String discover(String name);

}
package com.fish.learn.zookeeper.service.discovery.impl;

import com.fish.learn.zookeeper.service.constant.Constant;
import com.fish.learn.zookeeper.service.discovery.ServiceDiscovery;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;

/**
 * @Description:
 * @Author devin.jiang
 * @CreateDate 2019/4/30 21:02
 */
public class ZkServiceDiscovery implements ServiceDiscovery {

    private String zkAddress = "localhost";

    private final List<String> addressCache = new CopyOnWriteArrayList<>();

    private ZkClient zkClient;

    public void init() {

        zkClient = new ZkClient(zkAddress,
                Constant.ZK_SESSION_TIMEOUT,
                Constant.ZK_CONNECTION_TIMEOUT);
        System.out.println(">>> connect to zookeeper");

    }

    @Override
    public String discover(String name) {

        try {
            String servicePath = Constant.ZK_REGISTRY + "/" + name;

            //获取服务节点
            if (!zkClient.exists(servicePath)) {
                throw new RuntimeException(String.format(">>>can't find any service node on path {}",servicePath));
            }

            //从本地缓存获取某个服务地址
            String address;
            int addressCacheSize = addressCache.size();
            if (addressCacheSize > 0) {
                if (addressCacheSize == 1) {
                    address = addressCache.get(0);
                } else {
                    address = addressCache.get(ThreadLocalRandom.current().nextInt(addressCacheSize));
                }
                System.out.println(">>>get only address node:" + address);

                //从zk服务注册中心获取某个服务地址
            } else {
                List<String> addressList = zkClient.getChildren(servicePath);
                addressCache.addAll(addressList);

                //监听servicePath下的子文件是否发生变化
                zkClient.subscribeChildChanges(servicePath,(parentPath,currentChilds)->{
                    System.out.println(">>>servicePath is changed:" + parentPath);
                    addressCache.clear();
                    addressCache.addAll(currentChilds);

                });

                if (CollectionUtils.isEmpty(addressList)) {
                    throw new RuntimeException(String.format(">>>can't find any address node on path {}", servicePath));
                }

                int nodeSize = addressList.size();
                if (nodeSize == 1) {
                    address = addressList.get(0);
                } else {

                    //如果多个,则随机取一个
                    address = addressList.get(ThreadLocalRandom.current().nextInt(nodeSize));
                }
                System.out.println(">>>get address node:" + address);

            }

            //获取IP和端口号
            String addressPath = servicePath + "/" + address;
            String hostAndPort = zkClient.readData(addressPath);

            return hostAndPort;

        } catch (Exception e) {
            System.out.println(">>> service discovery exception: " + e.getMessage());
            zkClient.close();
        }

        return null;

    }

}

运行测试用例:

package com.fish.learn.zookeeper;

import com.fish.learn.zookeeper.service.discovery.impl.ZkServiceDiscovery;
import com.fish.learn.zookeeper.service.registry.impl.ZkServiceRegistry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ZookeeperApplication {

    private static final String SERVICE_NAME = "fish.com";

    private static final String SERVER_ADDRESS = "localhost:2181";

    public static void main(String[] args) {

        SpringApplication.run(ZookeeperApplication.class, args);

        ZkServiceRegistry registry = new ZkServiceRegistry();
        registry.init();
        registry.registry(SERVICE_NAME,SERVER_ADDRESS);

        ZkServiceDiscovery discovery = new ZkServiceDiscovery();
        discovery.init();
        discovery.discover(SERVICE_NAME);

        while (true){}

    }

}

       先启动zookeeper服务,再执行测试用例。我们分别启动三个测试用例,以模拟多个客户端同时进行服务注册场景,程序执行后,观察控制台的输出信息。

      idea多实例启动项目:

第一个测试用例输出如下:

第二个测试用例输出如下:

第三个测试用例输出如下:

       从上面例子运行的结果可以看出,第一个测试用例先运行,而其他服务还没有注册,所以在获取可用服务的时候获取了自己。第二个和第三个测试用例运行后,可用的服务实例就有多个,所以在获取服务时,有可能获取自己,也可能获取其他服务。当有新的服务进行注册时,所有服务实例都能感知到新服务的加入。

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐