etcd学习和实战:4、Java使用etcd实现服务发现和管理
etcd学习和实战:4、Java使用etcd实现服务发现和管理文章目录etcd学习和实战:4、Java使用etcd实现服务发现和管理1. 前言2. 代码2.1 服务注册2.2 服务发现2.3 运行结果2.4 问题3. 最后1. 前言Java一般使用zookeeper来实现分布式系统下服务管理,zookeeper也具备key-value的存取功能,这里我们不讨论zookeeper和etcd的优劣,只
etcd学习和实战:4、Java使用etcd实现服务发现和管理
1. 前言
Java一般使用zookeeper来实现分布式系统下服务管理,zookeeper也具备key-value的存取功能,这里我们不讨论zookeeper和etcd的优劣,只提一下对于Java实现类似功能可能也有zookeeper这样的方案。
同样分为服务注册和发现两大部分,思路和go实现时相同,所以直接上代码并进行测试即可。
2. 代码
使用了jetcd,目前还是beta版本,但是目前似乎只有这个支持etcd v3版本。
参考自:
https://github.com/etcd-io/jetcd/tree/master/jetcd-examples
https://xinchen.blog.csdn.net/article/details/115434576
https://www.jianshu.com/p/bd7eed1f250c
2.1 服务注册
- 设置endpoints(端点)并创建etcd客户端
- 设置租约注册服务并绑定
- 持续监听租约
Register.java:
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.PutOption;
import io.grpc.stub.CallStreamObserver;
import static com.google.common.base.Charsets.UTF_8;
public class Register {
private Client client;
private String endpoints;
private Object lock = new Object();
public Register(String endpoints) {
super();
this.endpoints = endpoints;
}
/**
* 新建key-value客户端实例
* @return
*/
private KV getKVClient(){
if (null==client) {
synchronized (lock) {
if (null==client) {
client = Client.builder().endpoints(endpoints.split(",")).build();
}
}
}
return client.getKVClient();
}
public void close() {
client.close();
client = null;
}
public Response.Header put(String key, String value) throws Exception {
return getKVClient().put(bytesOf(key), bytesOf(value)).get().getHeader();
}
/**
* 将字符串转为客户端所需的ByteSequence实例
* @param val
* @return
*/
public static ByteSequence bytesOf(String val) {
return ByteSequence.from(val, UTF_8);
}
private Client getClient() {
if (null==client) {
synchronized (lock) {
if (null==client) {
client = Client.builder().endpoints(endpoints.split(",")).build();
}
}
}
return client;
}
public void putWithLease(String key, String value) throws Exception {
Lease leaseClient = getClient().getLeaseClient();
leaseClient.grant(60).thenAccept(result -> {
// 租约ID
long leaseId = result.getID();
// 准备好put操作的client
KV kvClient = getClient().getKVClient();
// put操作时的可选项,在这里指定租约ID
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
// put操作
kvClient.put(bytesOf(key), bytesOf(value), putOption)
.thenAccept(putResponse -> {
// put操作完成后,再设置无限续租的操作
leaseClient.keepAlive(leaseId, new CallStreamObserver<LeaseKeepAliveResponse>() {
@Override
public boolean isReady() {
return false;
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
}
@Override
public void disableAutoInboundFlowControl() {
}
@Override
public void request(int count) {
}
@Override
public void setMessageCompression(boolean enable) {
}
/**
* 每次续租操作完成后,该方法都会被调用
* @param value
*/
@Override
public void onNext(LeaseKeepAliveResponse value) {
System.out.println("续租完成");
}
@Override
public void onError(Throwable t) {
System.out.println(t);
}
@Override
public void onCompleted() {
}
});
});
});
}
}
RegisterTest.java:
public class RegisterTest {
public static void main(String[] args) {
Register register = new Register("http://localhost:2379");
String key = "/web/node0";
String value = "localhost:7999";
// try {
// register.put(key, value);
// } catch (Exception e) {
// e.printStackTrace();
// }
try {
register.putWithLease(key, value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.2 服务发现
- 设置endpoints创建etcd客户端
- 初始化配置并监听服务前缀(实际上也可以直接监听key,但不够灵活,监听前缀值更好一些)
- 根据监听到的对key的操作类型进行进一步处理
Discovery.java:
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.Watch.Watcher;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import static java.nio.charset.StandardCharsets.UTF_8;
public class Discovery {
private Client client;
private String endpoints;
private final Object lock = new Object();
private HashMap<String, String> serverList = new HashMap<String, String>();
/**
* 发现服务类信息初始化
* @param endpoints:监听端点,包含ip和端口,如:"http://localhost:2379“,多个端点则使用逗号分割, 比如:”http://localhost:2379,http://192.168.2.1:2330“
*/
public Discovery(String endpoints) {
this.endpoints = endpoints;
newServiceDiscovery();
}
public Client newServiceDiscovery() {
if (null == client) {
synchronized (lock) {
if (null == client) {
client = Client.builder().endpoints(endpoints.split(",")).build();
}
}
}
return client;
}
public void watchService(String prefixAddress) {
//请求当前前缀
CompletableFuture<GetResponse> getResponseCompletableFuture =
client.getKVClient().get(ByteSequence.from(prefixAddress,
UTF_8),
GetOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress, UTF_8)).build());
try {
//获取当前前缀下的服务并存储
List<KeyValue> kvs = getResponseCompletableFuture.get().getKvs();
for (KeyValue kv : kvs) {
setServerList(kv.getKey().toString(UTF_8), kv.getValue().toString(UTF_8));
}
//创建线程监听前缀
new Thread(new Runnable() {
@Override
public void run() {
watcher(prefixAddress);
}
}).start();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private void watcher(String prefixAddress) {
Watcher watcher;
System.out.println("watching prefix:" + prefixAddress);
WatchOption watchOpts = WatchOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress,
UTF_8)).build();
//实例化一个监听对象,当监听的key发生变化时会被调用
Watch.Listener listener = Watch.listener(watchResponse -> {
watchResponse.getEvents().forEach(watchEvent -> {
WatchEvent.EventType eventType = watchEvent.getEventType();
KeyValue keyValue = watchEvent.getKeyValue();
System.out.println("type="+eventType+",key="+keyValue.getKey().toString(UTF_8)+",value="+keyValue.getValue().toString(UTF_8));
switch (eventType) {
case PUT: //修改或者新增
setServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));
break;
case DELETE: //删除
delServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));
break;
}
});
});
client.getWatchClient().watch(ByteSequence.from(prefixAddress, UTF_8), watchOpts,
listener);
}
private void setServerList(String key, String value) {
synchronized (lock) {
serverList.put(key, value);
System.out.println("put key:" + key + ",value:" + value);
}
}
private void delServerList(String key, String value) {
synchronized (lock) {
serverList.remove(key);
System.out.println("del key:" + key);
}
}
public void close() {
client.close();
client = null;
}
}
DiscoveryTest.java:
public class DiscoveryTest {
public static void main(String[] args) {
String endpoints = "http://localhost:2379";
Discovery ser = new Discovery(endpoints);
ser.watchService("/web/");
ser.watchService("/grpc/");
while (true) {
}
}
}
2.3 运行结果
//先运行服务发现
$ java -jar discovery.jar
put key:/web/node0,value:localhost:7999
watching prefix:/web/
watching prefix:/grpc/
type=PUT,key=/web/node0,value=localhost:7999
put key:/web/node0,value:localhost:7999
...
//再运行服务注册
$ java -jar register.jar
续租完成
续租完成
续租完成
续租完成
...
2.4 问题
六月 07, 2021 4:53:24 下午 io.grpc.internal.ManagedChannelImpl$NameResolverListener handleErrorInSyncContext
警告: [Channel<1>: (etcd)] Failed to resolve name. status=Status{code=NOT_FOUND, description=null, cause=null}
...
这个是endpoints错误导致的,需要在前面添加http://,即"http://localhost:2379"而不是"localhost:2379"。
3. 最后
实际生产环境中目前etcd+grpc更适合Go,java目前仅有jetcd支持gRPC,更多的还是v2版本,使用的gRPC的版本也比较底,还处于beat版本。
更多推荐
所有评论(0)