本文来自网易云社区

作者:乔安然

etcd是CoreOS开发的分布式高可用键值存储系统。随着CoreOS和K8s等项目在开源社区日益火热,etcd组件也渐渐为开发人员所关注。

etcd也是受到ZooKeeper与doozer启发而催生的项目,除了拥有类似功能,更专注于以下四点。

简单:基于HTTP+JSON的API让你用curl就可以轻松使用(V3版本不再使用JSON)。

安全:可选SSL客户认证机制。

快速:每个实例每秒支持一千次写操作。

可信:使用Raft算法充分实现了分布式。

在项目对比etcd和zookeeper之后,etcd更轻型容易部署安装使用,zk特性比较丰富,但已老态龙钟,需要点新鲜选择。在去年我党生日迎来了etcd v3(使用gRPC、改变key ttl使用租约等),蛋疼的发现java客户端etcd4j不支持v3版本,v2版本目前可以满足我们需求,继续使用etcd,后续会关注etcd4j更新。本文基于etcd v2版本使用。

etcd事件监听

etcd没有提供被动监听的实现,我们可以主动轮训监听key的变化。如果想监听其子节点可以通过recursive=true参数

“curl http://127.0.0.1:2379/v2/keys/foo?wait=true”

对/foo的改变会受到通知和返回相关变化事件

HTTP/1.1 200 OK

Content-Type: application/json

X-Etcd-Cluster-Id: e88d54f6225f06ad

X-Etcd-Index: 271

X-Raft-Index: 872202

X-Raft-Term: 5

Date: Sat, 26 Sep 2015 08:43:17 GMT

Transfer-Encoding: chunked

{

"action":

"set",

"node": {

"createdIndex": 7,

"key": "/foo",

"modifiedIndex": 7,

"value": "bar"},

"prevNode": {

"createdIndex": 6,

"key": "/foo",

"modifiedIndex": 6,

"value": "bar"}}

etcd中的数据变化(包括目录和key、value变化)相关类型事件:set、get、create、update、delete、expire、compareAndSwap、compareAndDelete。在返回的http response中的action属性就是事件类型,如上图所示。

etcd记录下最近1000次事件变化,使用index我们可以watch其key在过去发生的变化。使用node的modifiedIndex+1就可以监听下一次事件:

curl

'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=8'

但当事件突发比如1秒内产生几千条事件,事件监听处理比较慢或者未监听是发生了客户端事件丢失。当我们index超过etcd记录的返回,就返回如下消息:

{

"errorCode"

:

401

,

"message"

:

"The event in requested index is outdated and cleared"

,

"cause"

:

"the requested history has been cleared [1008/8]"

,

"index"

:

2007

}

官方文档中推荐使用X-Etcd-Index+1作为waitIndex代替使用node的modifiedIndex+1:

1. X-Etcd-Index代表etcd当前index,为所有key共享。单挑递增总是大于或等于modifiedIndex,而这个modifiedIndex是etcd已经存储事件的index

2. modifiedIndex和X-Etcd-Index之间不代表有事件发生,当fetch 相关key不会有事件返回。

使用modifiedIndex+1只是功能表示后续监听,它比X-Etcd-Index+1小,很可能相关事件已经被清除,可能会受到401EventIndexCleared 错误。在初始监听或断开重新监听,index应该使用X-Etcd-Index+1,而不是modifiedIndex+1。

上述只能监听一次事件变化,Etcd还提供流式监听,在curl的时候加stream=true参数,会和etcd服务端建议http长连接,后续的每个事件都会通过这个http chunk推送给客户端。相对比一次性监听,更简洁,可靠性更高。但有一问题,流式监听监听从命令发出之后的时间,先前的时间是监听不到的,如果再加waitIndex参数,waitIndex小于或等于启动监听时etcd的X-Etcd-Index,只能接受到满足条件的事件,后续不再接收,只能收到一个事件。waitIndex大于X-Etcd-Index无效。

Etcd4j并没有支持流式监听,而且流式监听无法做到监听时的客户端和服务端数据同步。我们使用Etcd4j实现监听持久化,实现如下特性:

1. 事件类型简单化,etcd的set、get、create、update、delete、expire、compareAndSwap、compareAndDelete事件类型,简化成Add、Upate、Removed三种类型

2. 支持目录、key全量变化监听,支持子节点变化监听。例如etcd删除目录时子节点也随之删除,只产生删除目录事件,而子节点删除也需要通知其删除事件。

3. 需要解决事件突发(超过etcd记录event数量)造成监听index丢失,重新再同步需要保证数据一致、同步前后的变化通知上层监听。

事件和处理接口定义:

public class EtcdEvent{

public enum Type{

added, removed, updated

}

private EtcdEvent.Type type;

private T preValue;

private T value;

private String key;

private Throwable cause;

private long index;

public EtcdEvent(Type type, T preValue, T value, String key, Throwable cause, long index){

this.type = type;

this.preValue = preValue;

this.value = value;

this.key = key;

this.cause = cause;

this.index = index;

}

}

{

void handle(EtcdEvent event);

}

watch接口定义

public interface Watch{

// 是否属于该watch的范围,是则处理建构事件通知上层EtcdEventHandler

Watch accept(EtcdKeysResponse response);

void stop();

//用于初始同步和401异常再同步

Watch sync();

boolean isSync();

// 当前watch的index

long currentIndex()

}

当前watch有三种实现。

1、ValueWatch 针对单一key的value值变化监听

2、DirectoryWatch 针对目录的变化监听,不支持子节点

3、EtcdNodeWatch 针对某一key及子节点(包括目录和值变化)监听

WatchService是根据syncIndex定时监听etcd变化,将其获取结果交给注册的watch进行处理。

启动流程:

1、确保监听的根目录存在,并获取初始X-Etcd-Index

protected void ensureDirectoryExists(){

try {

logger.debug("attempting to put directory {}", directory);

EtcdKeysResponse response = client.get().putDir(directory).prevExist(false).send().get();

logger.debug("put directory {} successful", directory);

lock.writeRunnable(()->{

syncIndex.set(response.node.modifiedIndex);

etcdIndex.set(response.node.modifiedIndex);

});

logger.debug("directory {} index recorded", directory);

} catch (EtcdException ee) {

logger.debug("put directory {} failed, it exists", directory);

lock.writeRunnable(()->{

syncIndex.set(ee.index);

etcdIndex.set(ee.index);

});

logger.debug("directory {} index recorded", directory);

} catch (Exception e) {

throw new CommonException(String.format("could not create directory %s", directory), e);

}

}

2. 开始watch节点,使用ScheduledExecutorService提供FiexDelay任务,该任务就是监听变化

将返回的reponse交给watch处理。如果发生异常,让所有watch重新同步

protected void startWatchingNodes(){

logger.debug("start watch nodes thread.");

future = this.executorService.scheduleWithFixedDelay(()->{

long currentIndex = syncIndex.get();

try{

EtcdKeysResponse response =

client.get().getDir(directory)

.recursive()

.sorted()

.waitForChange(currentIndex + 1)

.timeout(timeoutInMs, TimeUnit.MILLISECONDS)

.send()

.get();

lock.writeRunnable(()->{

etcdIndex.set(response.etcdIndex);

if(syncIndex.compareAndSet(currentIndex, response.node.modifiedIndex)){

indexDelta.set(etcdIndex.get() - syncIndex.get());

watchList.forEach(watch -> watch.accept(response));

}

}).run();

} catch (EtcdException e) {

logger.warn("watch nodes exception, directory: "+ directory, e);

ensureDirectoryExists();

lock.writeRunnable(()->{

etcdIndex.set(e.index);

syncIndex.set(watchList.stream().mapToLong(w -> w.sync().currentIndex())

.min().orElse(e.index));

indexDelta.set(etcdIndex.get() - syncIndex.get());

}).run();

} catch (TimeoutException e) {

logger.debug("etcd watch timeout , directory:{}, waitIndex:{}", directory, currentIndex + 1);

} catch (Exception e) {

logger.warn("exception thrown while watching "+ directory + ", waitIndex:" + (currentIndex+1), e);

}

},1,1, TimeUnit.MILLISECONDS);

}

注册watch流程

1、调用watch.sync初始同步

2、将watch添加到watch列表中

3、更新watchService中SyncIndex,以watch列表中最小的index为主。

我们看下EtcdNodeWatch的实现

public class EtcdNodeWatch extends AbstractWatch{

//该watch的目录范围

private String directory;

//正则匹配,判断watchService的监听是否属于其范围

private Pattern keyMatcher;

//存储当前节点情况

private Map> nodeMap = new HashMap<>();

。。。

}

同步实现:

@Override

public Watch sync(){

isSync = false;

try {

EtcdKeysResponse response = etcdClient.get().get(directory).dir().recursive().sorted().send().get();

currentIndex = response.etcdIndex;

// 初始加载diretcory下所有节点,加载到nodeMap中

if(response.node.dir){

for(EtcdNode etcdNode : response.node.nodes){

loadEtcdNode(etcdNode, currentIndex);

}

}

//同步前后nodemap中的数据变化需要通知上层

nodeMap = nodeMap.entrySet().stream().filter((entry) ->{

// etcdIndex相同数据无变化

if(entry.getValue().loadIndex == currentIndex){

return true;

}

//nodeMap etcdIndex不一样,说明在etcd中已不存在该节点,统一removed处理。

EtcdValue etcdValue = entry.getValue();

EtcdEvent event = EtcdEvent.builder()

.withType(EtcdEvent.Type.removed)

.withPreValue(etcdValue.value)

.withIndex(currentIndex)

.withKey(entry.getKey())

.build();

fireEvent(eventHandler, event);

return false;

}).collect(Collectors.toMap(e -> e.getKey(), e->e.getValue()));

isSync = true;

} catch (EtcdException e) {

currentIndex = e.index;

// key不存在,说明已被删除,nodeMap中所有都要清理,并通知上层removed

if(e.errorCode == EtcdErrorCode.KeyNotFound){

logger.debug("directory:{} watch sync KeyNotFound , will clear all value and fire removed event.",directory);

for (Map.Entry> entry : nodeMap.entrySet()){

EtcdValue etcdValue = entry.getValue();

EtcdEvent event = EtcdEvent.builder()

.withType(EtcdEvent.Type.removed)

.withPreValue(etcdValue.value)

.withIndex(currentIndex)

.withKey(entry.getKey())

.build();

fireEvent(eventHandler, event);

}

nodeMap.clear();

isSync = true;

}else{

logger.warn("directory watch sync EtcdException:", e);

}

} catch (IOException | TimeoutException | EtcdAuthenticationException e) {

logger.error(format("failed to sync watch for directory %s", directory), e);

}catch (Throwable t){

logger.error("sync watch for director " + directory + " error.", t);

}

return this;

}

构造事件,将etcd事件统一成Add、Update、Removed类型

@Override

protected OptionalbuildEvent(ObjectMapper mapper, String directory, EtcdKeysResponse response){

EtcdNode nodeValue = response.node;

EtcdNode preNodeValue = response.prevNode;

if(nodeValue == null && preNodeValue == null){

return Optional.empty();

}

long etcdIndex = response.node.modifiedIndex;

switch (response.action){

case create:

case set:

if(filterValueChange(nodeValue, preNodeValue)){

return Optional.of(EtcdEvent.builder()

.withType(preNodeValue == null ? EtcdEvent.Type.added : EtcdEvent.Type.updated)

.withValue(nodeValue)

.withPreValue(preNodeValue)

.withIndex(etcdIndex)

.withKey(removeDirectory(directory, nodeValue.key))

.build());

}

break;

case expire:

case delete:

case compareAndDelete:

return Optional.of(EtcdEvent.builder()

.withType(EtcdEvent.Type.removed)

.withValue(nodeValue)

.withPreValue(preNodeValue)

.withIndex(etcdIndex)

.withKey(removeDirectory(directory, response.prevNode.key))

.build());

case compareAndSwap:

case update:

return Optional.of(EtcdEvent.builder()

.withType(EtcdEvent.Type.updated)

.withValue(nodeValue)

.withPreValue(preNodeValue)

.withIndex(etcdIndex)

.withKey(removeDirectory(directory, response.node.key))

.build());

default:

break;

}

return Optional.empty();

}

accept处理

@Override

public Watch accept(EtcdKeysResponse response){

//判断是否属于该watch范围

if(!keyMatcher.matcher(key(response)).matches()){

return this;

}

//构建事件通知上层EtcdEventHandler

long responseIndex = indexResponse(response);

if(currentIndex < responseIndex){

buildEvent(null, directory, response).ifPresent(etcdEvent -> nodeMap.compute(etcdEvent.getKey(), (key, entry) ->{

switch (etcdEvent.getType()){

case added:

case updated:

fireEvent(eventHandler, etcdEvent);

return new EtcdValue((EtcdNode) etcdEvent.getValue(), etcdEvent.getIndex(), response.etcdIndex);

case removed:

// 目录删除,需要清理子节点

if(((EtcdNode)etcdEvent.getValue()).isDir()){

Iterator keyIterator = nodeMap.keySet().iterator();

while (keyIterator.hasNext()){

if(keyIterator.next().startsWith(key)){

keyIterator.remove();

}

}

}

fireEvent(eventHandler, etcdEvent);

return null;

default:

throw new IllegalStateException("unknown event type "+etcdEvent.getType().name());

}

}));

currentIndex = responseIndex;

}else{

logger.debug("filtering event for {}", key(response));

logger.debug("response index {} current index {}", responseIndex, currentIndex);

}

return this;

}

本文来自网易云社区,经作者乔安然授权发布

网易云免费体验馆,0成本体验20+款云产品!

更多网易研发、产品、运营经验分享请访问网易云社区。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐