Leader Latch

Zookeeper在分布式系统中,常常被用于选主。在执行某个任务时,让所有的节点都知道有一个特别的,唯一的节点是任务的主节点,由主节点进行任务的执行,其他节点作为备用节点。通过这种热备方式,为分布式系统中任务执行的可控性,以及系统高可用性。

而Curator提供了两种选主机制,可以根据实际情况进行选用。

  1. 关键API

org.apache.curator.framework.recipes.leader.LeaderLatch
2. 机制说明

LeaderLatch的方式,就是以一种抢占的方式来决定选主。比较简单粗暴,逻辑相对简单。类似非公平锁的抢占,所以,多节点是一个随机产生主节点的过程。基本就是,谁抢到就算谁的。

多个参与者(如:逻辑节点;某个线程等),指定在一个分组之下,每个分组内进行主节点抢占。
3. 用法
3.1 创建
方法1

public LeaderLatch(CuratorFramework client,
                   String latchPath)

参数说明:

client : zk客户端链接
latchPath : 分组路径(zk中的path)

方法2

public LeaderLatch(CuratorFramework client,
                   String latchPath,
                   String id)

参数说明:

client : zk客户端链接
latchPath : 分组路径(zk中的path)
id : 参与者ID

3.2 使用

LeaderLatch创建好之后,必须执行:

leaderLatch.start();

这样,才能让leaderLatch开始参与选主过程。

由于LeaderLatch是一个不断抢占的过程,所以需要调用:

public boolean hasLeadership()

来检测当前参与者是否选主成功。这个方法是非阻塞的(立即返回),其结果只代表调用时的选主结果。所以,可以轮询此方法,或者当执行完本地逻辑后,需要执行分布式任务前检擦此方法。

不过,类似JDK中的CountDownLatch,LeaderLatch也提供了阻塞方法:

方法1

public void await()
          throws InterruptedException,
                 EOFException

这个方法,会阻塞,直到选主成功。

方法2 为了避免方法1的长时间选主失败
public boolean await(long timeout,
                     TimeUnit unit)
             throws InterruptedException

这个方法会根据参数中指定的时间,作为等待的期限。到期后,返回选主结果。

对于LeaderLatch实例,无论是否轩主成功,最后都应该调用:

leaderLatch.close();

这样,才会把当前参与者的信息从选主分组中移除出去。如果,当前参与者是主,还会释放主的资格。避免死锁。
4. 错误处理

在实际使用中,必须考虑链接问题引起的主身份丢失问题。 例如:当hasLeadership()返回true,之后链接出问题。 强烈建议:使用LeaderLatch时为其添加一个ConnectionStateListener

LeaderLatch实例会添加一个ConnectionStateListener来监听当前zk链接。 如果,链接不可用(SUSPENDED)则LeaderLatch会认为自己不在是主,等到链接恢复可用时,才可继续。 如果,链接断开(LOST),则LeaderLatch会认为自己不在是主,等到链接重新建立后,删除之前的参与者信息,然后重新参与选主。
5. 源码分析
5.1 LeaderLatch
5.1.1 类定义

先来看看类定义:

import java.io.Closeable;

public class LeaderLatch implements Closeable
{...}

注意:实现了java.io.Closeable,所以你懂的, try()…catch{}。(3.2中的leaderLatch.close();)
5.1.2 成员变量

public class LeaderLatch implements Closeable
{
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final WatcherRemoveCuratorFramework client;
    private final String latchPath;
    private final String id;
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
    private final AtomicReference<String> ourPath = new AtomicReference<String>();
    private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
    private final CloseMode closeMode;
    private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();

    private final ConnectionStateListener listener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            handleStateChange(newState);
        }
    };

    private static final String LOCK_NAME = "latch-";

    private static final LockInternalsSorter sorter = new LockInternalsSorter()
    {
        @Override
        public String fixForSorting(String str, String lockName)
        {
            return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
        }
    };

    public enum State { LATENT, STARTED, CLOSED }

    public enum CloseMode { SILENT, NOTIFY_LEADER }
    
    
    @VisibleForTesting
    volatile CountDownLatch debugResetWaitLatch = null;
log : caurtor依赖slf4j
client : zk客户端(curator-framework提供)
latchPath : 分组路径(zk中的path)
id : 参与者ID
state
    内部枚举
    状态
        LATENT 休眠
        STARTED 已启动
        CLOSED 已关闭
    使用AtomicReference原子化包装
hasLeadership
    是否为主
    使用AtomicBoolean原子化包装
ourPath
    使用AtomicReference原子化包装
listeners
    一组LeaderLatchListener监听器
closeMode
    内部枚举
    LeaderLatch关闭方式
        SILENT : 静默关闭,不触发相关监听器
        NOTIFY_LEADER :关闭时触发监听器
startTask
    异步Future
    使用AtomicReference原子化包装
listener
    链接状态监听器
    参见 : 4. 错误处理
LOCK_NAME
    私有常量
sorter
    私有常量
    用于锁处理时,规范path
    对参与者进行排序
debugResetWaitLatch
    volatile 可见性
    reset()使用
    在测试时控制启动的时机,防止环境未初始化完成就处理了启动逻辑

注意:

这些成员变量都是final类型。
并且,对于引用类型都进行原子化包装,避免并发问题

5.1.3 构造器

提供多个构造器模板,最终都是调用:

 public LeaderLatch(CuratorFramework client, String latchPath)
    {
        this(client, latchPath, "", CloseMode.SILENT);
    }

    public LeaderLatch(CuratorFramework client, String latchPath, String id)
    {
        this(client, latchPath, id, CloseMode.SILENT);
    }

    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
    {
        this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();
        this.latchPath = PathUtils.validatePath(latchPath);
        this.id = Preconditions.checkNotNull(id, "id cannot be null");
        this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
    }

可以发现:

默认是采用CloseMode.SILENT方式关闭
默认id是空字符串
client、latchPath、id、closeMode不能为空

5.1.4 启动

第3节,介绍过LeaderLatch是由start()启动选主过程:

public void start() throws Exception {
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

    startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
    {
        @Override
        public void run()
        {
            try
            {
                internalStart();
            }
            finally
            {
                startTask.set(null);
            }
        }
    }));
}

可以发现

调用原子性CAS方法,将状态由休眠更新到已启动
执行了一个异步任务来完成启动过程
    使用一个链接可用后回调方式
        AfterConnectionEstablished.execute()
            内部使用了一个ThreadUtils.newSingleThreadExecutor
            单线程的线程池
            所以本地多个LeaderLatch实例的启动过程是序列化方式执行的
    使用成员变量startTask持有异步Future
    启动完成后会制空startTask
        说明启动过程可能会有状态变化
启动的过程实际是由internalStart()方法来完成
private synchronized void internalStart() {
    if ( state.get() == State.STARTED )
    {
        client.getConnectionStateListenable().addListener(listener);
        try
        {
            reset();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            log.error("An error occurred checking resetting leadership.", e);
        }
    }
}
internalStart()使用synchronized
    同步调用
    使用this进行互斥锁对象
    同一个LeaderLatch对象的多次启动同样序列化执行
        即便绕过第2布,也同样可以保证不会重复启动
进行状态判断
    synchronized内部,再次判断
    相当于Double check
在当前连接上注册自带的监听器
调用reset()完成启动逻辑
处理了异常
    触发线程中断
        internalStart()是异步执行,通过中断可以进行更细节的控制
    避免粗暴的抛出异常
        internalStart()是异步执行
        避免当前线程意外中断
        同时也避免了第2.1步骤中那个单线程的线程池频繁的进行线程开/关所带来的额外开销
@VisibleForTesting
void reset() throws Exception {
    setLeadership(false);
    setNode(null);

    BackgroundCallback callback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            if ( debugResetWaitLatch != null )
            {
                debugResetWaitLatch.await();
                debugResetWaitLatch = null;
            }

            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                setNode(event.getName());
                if ( state.get() == State.CLOSED )
                {
                    setNode(null);
                }
                else
                {
                    getChildren();
                }
            }
            else
            {
                log.error("getChildren() failed. rc = " + event.getResultCode());
            }
        }
    };
    client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
private synchronized void setLeadership(boolean newValue)
{
    boolean oldValue = hasLeadership.getAndSet(newValue);

    if ( oldValue && !newValue )
    { // Lost leadership, was true, now false
        listeners.forEach(new Function<LeaderLatchListener, Void>()
            {
                @Override
                public Void apply(LeaderLatchListener listener)
                {
                    listener.notLeader();
                    return null;
                }
            });
    }
    else if ( !oldValue && newValue )
    { // Gained leadership, was false, now true
        listeners.forEach(new Function<LeaderLatchListener, Void>()
            {
                @Override
                public Void apply(LeaderLatchListener input)
                {
                    input.isLeader();
                    return null;
                }
            });
    }

    notifyAll();
}
private void setNode(String newValue) throws Exception
    {
        String oldPath = ourPath.getAndSet(newValue);
        if ( oldPath != null )
        {
            client.delete().guaranteed().inBackground().forPath(oldPath);
        }
    }
reset()的可见范围
    利于测试
    使用了com.google.common.annotations.VisibleForTesting

初始化选主状态false
    getAndSet设置
    根据不同的情况触发不同的监听器
        得到
        失去
    notifyAll()
        唤醒所有的synchronized等待

制空上次path
    如果上一次path有残留,则delete服务器上的信息

在latchPath下创建一个EPHEMERAL_SEQUENTIAL节点
    临时顺序节点
    并注册了回调
        回掉获取latchPath的子节点
        并判断自身是否为主

5.1.5 选主

LeaderLatch的选主判断逻辑,是由上一节中第12步中注册的回调方法来触发。 实际由checkLeadership()方法处理:

private void checkLeadership(List<String> children) throws Exception
    {
        final String localOurPath = ourPath.get();
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 )
        {
            setLeadership(true);
        }
        else
        {
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }

当获取到最新的参与者列表后:

对列表进行排序
如果自身处于列表第一位,则当选为主
否则,在latchPath上增加监听/回调
    监听列表中上一位参与者
        当上一位参与者退出(节点被删除时)
        重新getChildren()再次进行选主
    当latchPath发生变动(如:删除)
        调用reset(),重新进行启动过程
            即可导致hasLeadership()失效
  1. 测试
package com.roc.curator.demo.leader.latch

import org.apache.commons.lang3.RandomStringUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit

/**
 * Created by roc on 2017/5/25.
 */
class LatchParticipant() {

    val LATCH_PATH: String = "/test/leader/latch"

    var client: CuratorFramework = CuratorFrameworkFactory.builder()
            .connectString("0.0.0.0:8888")
            .connectionTimeoutMs(5000)
            .retryPolicy(ExponentialBackoffRetry(1000, 10))
            .sessionTimeoutMs(3000)
            .build()

    @Before fun init() {
        client.start()
    }

    @Test fun runTest() {
        var id: String = RandomStringUtils.randomAlphabetic(10)
        println("id : $id ")
        val time = Date()
        var latch: LeaderLatch = LeaderLatch(client, LATCH_PATH, id)
        latch.start()
        println("$id 开始竞选 $time")
        while(!latch.await(3, TimeUnit.SECONDS)){
            println("$id 选主失败 : $time")
            println("当前主是:${latch.leader.id}")
            println("参与者:${latch.participants}")
        }
        println("$id 选主成功 $time")
        while (latch.hasLeadership()) {
            println("$id 执行 $time")
            TimeUnit.SECONDS.sleep(2)
            if (Math.random() > 0.89) {
                break;
            }
        }
        println("$id 结束此轮: $time")


        latch.close()
    }
}

zookeeper节点:

get /test/leader/latch/_c_9b313527-e0ed-410f-9510-30e5fd92b5c6-latch-0000000208
zNillKMfuB
cZxid = 0x1db19
ctime = Thu May 25 20:53:22 CST 2017
mZxid = 0x1db19
mtime = Thu May 25 20:53:22 CST 2017
pZxid = 0x1db19
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07e9
dataLength = 10
numChildren = 0
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐