使用Zookeeper实现Leader(Master)选举
分布式系统最典型的架构就是一主多从。在很多时候,虽然处理大规模的数据、图像和文件等,这种工作极其耗资源而且数据、文件等都是共享的,若全部机器都计算处理一次会浪费保贵的计算资源;我们可以把这些工作交给一台机器处理,其它机器则通过数据库、分布式文件系统等方式共享计算成果Leader(Master)。另外,对于数据库、缓存等组件读写分离是惯用的提高性能的方式;读写分离是把写全部给leader(maste
·
应用场景
分布式系统最典型的架构就是
一主多从
。在很多时候,虽然处理大规模的数据、图像和文件等,这种工作极其耗资源而且数据、文件等都是共享的,若全部机器都计算处理一次会浪费保贵的计算资源;我们可以把这些工作交给一台机器处理,其它机器则通过数据库、分布式文件系统等方式共享计算成果Leader(Master)。另外,对于数据库、缓存等组件读写分离是惯用的提高性能的方式;读写分离是把写全部给leader(master),查询则使用follower的机器。使用Zookeeper提供的API可轻松实现leader选举。
具体步骤
1、客户端连接时,在指定的目录(这里假定为"/leader")创建一个
EPHEMERAL_SEQUENTIAL
的节点,把内网的IP数据存入创建节点。
2、获取目录的子点节,并取得序列号最小的节点,我们把这个节点设置为leader。当此节点被删除时,证明leader断线。
3、其它机器监听leader节点,当leader节点的删除时,再取目录的最小子点节作为leader。
详细代码
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by Massive on 2016/12/18.
*/
public class ZooKeeperClient {
private static String connectionString = "localhost:2181";
private static int sessionTimeout = 10000;
public static ZooKeeper getInstance() throws IOException, InterruptedException {
//--------------------------------------------------------------
// 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
// 这里等Zookeeper的连接完成才返回实例
//--------------------------------------------------------------
final CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
return zk;
}
public static int getSessionTimeout() {
return sessionTimeout;
}
public static void setSessionTimeout(int sessionTimeout) {
ZooKeeperClient.sessionTimeout = sessionTimeout;
}
}
LeaderElection
package org.massive.group;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
/**
* Created by Massive on 2016/12/25.
*/
public class LeaderElection {
private ZooKeeper zk;
private int sessionTimeout;
private static byte[] DEFAULT_DATA = {0x12,0x34};
private static Object mutex = new Object();
private static String ROOT = "/leader";
private byte[] localhost = getLocalIpAdressBytes();
private String znode;
private static CountDownLatch firstElectionSignal = new CountDownLatch(1);
//----------------------------------------------------
// leader的IP地址
//----------------------------------------------------
private static String leader;
public LeaderElection() throws IOException, InterruptedException, KeeperException {
this.zk = ZooKeeperClient.getInstance();
this.sessionTimeout = zk.getSessionTimeout();
ensureExists(ROOT);
ensureLocalNodeExists();
System.out.println("-------------------------------------");
System.out.println("local IP: " + getLocalIpAddress());
System.out.println("local created node: " + znode);
System.out.println("-------------------------------------");
}
/**
* 检查本机是否已创建节点,不存在则创建
* @throws KeeperException
* @throws InterruptedException
*/
public void ensureLocalNodeExists() throws KeeperException, InterruptedException {
List<String> list = zk.getChildren(ROOT,new NodeDeleteWatcher());
for (String node : list) {
Stat stat = new Stat();
String path = ROOT + "/" + node;
byte[] data = zk.getData(path,false,stat);
if (Arrays.equals(data,localhost)) {
znode = path;
return;
}
}
znode = zk.create(ROOT + "/", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
}
public void ensureExists(String path) {
try {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, DEFAULT_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start() throws KeeperException, InterruptedException, UnsupportedEncodingException {
do{
synchronized (mutex) {
System.out.println("begin leader election...");
List<String> nodes = zk.getChildren(ROOT, null);
SortedSet<String> sortedNode = new TreeSet<String>();
for (String node : nodes) {
sortedNode.add(ROOT + "/" + node);
}
//----------------------------------------------------
// 取出序列号最小的消息
//----------------------------------------------------
String first = sortedNode.first();
leader = first;
//----------------------------------------------------
// 监控序列最小节点(非本机创建节点)
//----------------------------------------------------
NodeDeleteWatcher watcher = znode.equals(first) ? null : new NodeDeleteWatcher();
byte[] data = zk.getData(first, watcher, null);
leader = new String(data, "UTF-8");
System.out.println("leader election end, the leader is : " + leader);
if (firstElectionSignal.getCount() != 0) {
firstElectionSignal.countDown();
}
if (znode.equals(first)) {
return;
}
mutex.wait();
}
} while (true);
}
class NodeDeleteWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (mutex) {
mutex.notify();
}
}
}
}
/**
* 获取本地内网IP地址
* @return
* @throws SocketException
*/
public static String getLocalIpAddress() throws SocketException {
// 获得本机的所有网络接口
Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
// 获得与该网络接口绑定的 IP 地址,一般只有一个
Enumeration<InetAddress> addresses = nif.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
String ip = addr.getHostAddress();
// 只关心 IPv4 地址
if (addr instanceof Inet4Address && !"127.0.0.1".equals(ip)) {
return ip;
}
}
}
return null;
}
public static byte[] getLocalIpAdressBytes() throws SocketException {
String ip = getLocalIpAddress();
return ip == null ? null : ip.getBytes();
}
public static String getLeader() throws InterruptedException {
//----------------------------------------------------
// 第一次leader选举完成后才释放阀门
//----------------------------------------------------
firstElectionSignal.await();
return leader;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//-------------------------------------------------------
// 启动一条线程用处理leader选举
//-------------------------------------------------------
new Thread(new Runnable() {
@Override
public void run() {
try {
LeaderElection leaderElection = new LeaderElection();
leaderElection.start();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
String leader = LeaderElection.getLeader();
}
}
测试
运行main方法,本机器的某次输出结果-------------------------------------
local IP: 192.168.1.103
local created node: /leader/0000000017
-------------------------------------
begin leader election...
leader election end, the leader is : 192.168.1.103
注:本文只进行了简单的测试,要在生产环境中使用请先同时多在个JVM上进行测试。
ZooKeeper实现分布式锁和队列可参考:
更多推荐
已为社区贡献5条内容
所有评论(0)