etcd学习和实战:4、Java使用etcd实现服务发现和管理


1. 前言

Java一般使用zookeeper来实现分布式系统下服务管理,zookeeper也具备key-value的存取功能,这里我们不讨论zookeeper和etcd的优劣,只提一下对于Java实现类似功能可能也有zookeeper这样的方案。

同样分为服务注册和发现两大部分,思路和go实现时相同,所以直接上代码并进行测试即可。

2. 代码

使用了jetcd,目前还是beta版本,但是目前似乎只有这个支持etcd v3版本。

参考自:
https://github.com/etcd-io/jetcd/tree/master/jetcd-examples
https://xinchen.blog.csdn.net/article/details/115434576
https://www.jianshu.com/p/bd7eed1f250c

2.1 服务注册

  • 设置endpoints(端点)并创建etcd客户端
  • 设置租约注册服务并绑定
  • 持续监听租约

Register.java:

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.PutOption;
import io.grpc.stub.CallStreamObserver;

import static com.google.common.base.Charsets.UTF_8;

public class Register {
    private Client client;
    private String endpoints;
    private Object lock = new Object();


    public Register(String endpoints) {
        super();
        this.endpoints = endpoints;
    }

    /**
     * 新建key-value客户端实例
     * @return
     */
    private KV getKVClient(){

        if (null==client) {
            synchronized (lock) {
                if (null==client) {

                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client.getKVClient();
    }

    public void close() {
        client.close();
        client = null;
    }

    public Response.Header put(String key, String value) throws Exception {
        return getKVClient().put(bytesOf(key), bytesOf(value)).get().getHeader();
    }

    /**
     * 将字符串转为客户端所需的ByteSequence实例
     * @param val
     * @return
     */
    public static ByteSequence bytesOf(String val) {
        return ByteSequence.from(val, UTF_8);
    }

    private Client getClient() {
        if (null==client) {
            synchronized (lock) {
                if (null==client) {
                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client;
    }

    public void putWithLease(String key, String value) throws Exception {
        Lease leaseClient = getClient().getLeaseClient();

        leaseClient.grant(60).thenAccept(result -> {
            // 租约ID
            long leaseId = result.getID();

            // 准备好put操作的client
            KV kvClient = getClient().getKVClient();

            // put操作时的可选项,在这里指定租约ID
            PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();

            // put操作
            kvClient.put(bytesOf(key), bytesOf(value), putOption)
                    .thenAccept(putResponse -> {
                        // put操作完成后,再设置无限续租的操作
                        leaseClient.keepAlive(leaseId, new CallStreamObserver<LeaseKeepAliveResponse>() {
                            @Override
                            public boolean isReady() {
                                return false;
                            }

                            @Override
                            public void setOnReadyHandler(Runnable onReadyHandler) {

                            }

                            @Override
                            public void disableAutoInboundFlowControl() {

                            }

                            @Override
                            public void request(int count) {
                            }

                            @Override
                            public void setMessageCompression(boolean enable) {

                            }

                            /**
                             * 每次续租操作完成后,该方法都会被调用
                             * @param value
                             */
                            @Override
                            public void onNext(LeaseKeepAliveResponse value) {
                                System.out.println("续租完成");
                            }

                            @Override
                            public void onError(Throwable t) {
                                System.out.println(t);
                            }

                            @Override
                            public void onCompleted() {
                            }
                        });
                    });
            });
    }
}

RegisterTest.java:


public class RegisterTest {
    public static void main(String[] args) {
        Register register = new Register("http://localhost:2379");
        String key = "/web/node0";
        String value = "localhost:7999";
//        try {
//            register.put(key, value);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        try {
            register.putWithLease(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.2 服务发现

  • 设置endpoints创建etcd客户端
  • 初始化配置并监听服务前缀(实际上也可以直接监听key,但不够灵活,监听前缀值更好一些)
  • 根据监听到的对key的操作类型进行进一步处理

Discovery.java:

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.Watch.Watcher;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;

import static java.nio.charset.StandardCharsets.UTF_8;

public class Discovery {
    private Client client;
    private String endpoints;
    private final Object lock = new Object();
    private HashMap<String, String> serverList = new HashMap<String, String>();

    /**
     * 发现服务类信息初始化
     * @param endpoints:监听端点,包含ip和端口,如:"http://localhost:2379“,多个端点则使用逗号分割, 比如:”http://localhost:2379,http://192.168.2.1:2330“
     */
    public Discovery(String endpoints) {
        this.endpoints = endpoints;
        newServiceDiscovery();
    }

    public Client newServiceDiscovery() {
        if (null == client) {
            synchronized (lock) {
                if (null == client) {
                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client;
    }

    public void watchService(String prefixAddress) {
        //请求当前前缀
        CompletableFuture<GetResponse> getResponseCompletableFuture =
                client.getKVClient().get(ByteSequence.from(prefixAddress,
                        UTF_8),
                        GetOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress, UTF_8)).build());

        try {
            //获取当前前缀下的服务并存储
            List<KeyValue> kvs = getResponseCompletableFuture.get().getKvs();
            for (KeyValue kv : kvs) {
                setServerList(kv.getKey().toString(UTF_8), kv.getValue().toString(UTF_8));
            }

            //创建线程监听前缀
            new Thread(new Runnable() {

                @Override
                public void run() {
                    watcher(prefixAddress);
                }
            }).start();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    private void watcher(String prefixAddress) {
        Watcher watcher;

        System.out.println("watching prefix:" + prefixAddress);
        WatchOption watchOpts = WatchOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress,
                UTF_8)).build();

        //实例化一个监听对象,当监听的key发生变化时会被调用
        Watch.Listener listener = Watch.listener(watchResponse -> {
            watchResponse.getEvents().forEach(watchEvent -> {
                WatchEvent.EventType eventType = watchEvent.getEventType();
                KeyValue keyValue = watchEvent.getKeyValue();
                System.out.println("type="+eventType+",key="+keyValue.getKey().toString(UTF_8)+",value="+keyValue.getValue().toString(UTF_8));

                switch (eventType) {
                    case PUT:  //修改或者新增
                        setServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));
                        break;
                    case DELETE: //删除
                        delServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));
                        break;
                }
            });
        });

        client.getWatchClient().watch(ByteSequence.from(prefixAddress, UTF_8), watchOpts,
                listener);
    }

    private void setServerList(String key, String value) {
        synchronized (lock) {
            serverList.put(key, value);
            System.out.println("put key:" + key + ",value:" + value);
        }
    }

    private void delServerList(String key, String value) {
        synchronized (lock) {
            serverList.remove(key);
            System.out.println("del key:" + key);
        }
    }

    public void close() {
        client.close();
        client = null;
    }
}

DiscoveryTest.java:

public class DiscoveryTest {
    public static void main(String[] args) {
        String endpoints = "http://localhost:2379";
        Discovery ser = new Discovery(endpoints);
        ser.watchService("/web/");
        ser.watchService("/grpc/");
        while (true) {

        }
    }
}

2.3 运行结果

//先运行服务发现
$ java -jar discovery.jar 
put key:/web/node0,value:localhost:7999
watching prefix:/web/
watching prefix:/grpc/
type=PUT,key=/web/node0,value=localhost:7999
put key:/web/node0,value:localhost:7999
...

//再运行服务注册
$ java -jar register.jar 
续租完成
续租完成
续租完成
续租完成
...

2.4 问题

六月 07, 2021 4:53:24 下午 io.grpc.internal.ManagedChannelImpl$NameResolverListener handleErrorInSyncContext
警告: [Channel<1>: (etcd)] Failed to resolve name. status=Status{code=NOT_FOUND, description=null, cause=null}
...

这个是endpoints错误导致的,需要在前面添加http://,即"http://localhost:2379"而不是"localhost:2379"。

3. 最后

实际生产环境中目前etcd+grpc更适合Go,java目前仅有jetcd支持gRPC,更多的还是v2版本,使用的gRPC的版本也比较底,还处于beat版本。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐