ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供的功能包括配置维护、名字服务、分布式同步、组服务等。
ZooKeeper会维护一个树形的数据结构,类似于Windows资源管理器目录,其中EPHEMERAL类型的节点会随着创建它的客户端断开而被删除,利用这个特性很容易实现软负载均衡。

基本原理是,每个应用的Server启动时创建一个EPHEMERAL节点,应用客户端通过读取节点列表获得可用服务器列表,并订阅节点事件,有Server宕机断开时触发事件,客户端监测到后把该Server从可用列表中删除。

来看示例,这里用了BIO模型编写了一个接收/应答的小程序用于演示效果,优点就是简单。为了方便后面的改造,客户端每次发送消息时都会读取服务器列表并从新建立连接。后边会看到只需要几十行代码即可改造为使用ZooKeeper的软负载模式。

Server代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleServer implements Runnable {

	public static void main(String[] args) throws IOException {
		int port = 18080;
		SimpleServer server = new SimpleServer(port);
		Thread thread = new Thread(server);
		thread.start();
	}

	private int port;

	public SimpleServer(int port) {
		this.port = port;
	}

	@Override
	public void run() {
		ServerSocket server = null;
		try {
			server = new ServerSocket(port);
			System.out.println("Server started");
			Socket socket = null;
			while (true) {
				socket = server.accept();
				new Thread(new SimpleServerHandler(socket)).start();
			}
		} catch(IOException ex) {
			ex.printStackTrace();
		} finally {
			if (server != null) {
				try {
					server.close();
				} catch (IOException e) {}
			}
		}
	}
}

class SimpleServerHandler implements Runnable {

	private Socket socket;

	public SimpleServerHandler(Socket socket) {
		this.socket = socket;
	}

	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(
					this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String body = null;
			while (true) {
				body = in.readLine();
				if (body == null)
					break;
				System.out.println("Receive : " + body);
				out.println("Hello, " + body);
			}

		} catch (Exception e) {
			if (in != null) {
				try {
					in.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
			}
			if (out != null) {
				out.close();
			}
			if (this.socket != null) {
				try {
					this.socket.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
				this.socket = null;
			}
		}
	}
}
客户端代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

public class SimpleClient {

	private static List<String> servers = new ArrayList<>();
	
	public static void main(String[] args) {
		
		initServerList();
		
		SimpleClient client = new SimpleClient();
		BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
		while (true) {
			String name;
			try {
				name = console.readLine();
				if("exit".equals(name)) {
					System.exit(0);
				}
				client.send(name);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	private static void initServerList() {
		servers.clear();
		servers.add("127.0.0.1:18080");
	}
	
	public static String getServer() {
		return servers.get(0);
	}
	
	public SimpleClient() {
	}
	
	public void send(String name) {
		
		String server = SimpleClient.getServer();
		String[] cfg = server.split(":");
		
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);
			
			out.println(name);
			while(true) {
				String resp = in.readLine();
				if(resp == null)
					break;
				else if(resp.length() > 0) {
					System.out.println("Receive : " + resp);
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (out != null) {
				out.close();
			}
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if (socket != null) {
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}
运行测试,服务器端输出截图:


客户端输出截图:


很好,一切运行正常。

接下来添加ZooKeeper部分。为了演示效果更好,修改一下ZooKeeper的配置文件,以便于服务器断开后能更快的被监测到。主要是减小Session的超时时间

zookeeper/conf/zoo.cfg

tickTime=2000
initLimit=2
syncLimit=5
dataDir=D:\\ZooKeeper\\zookeeper-3.4.8\\data
clientPort=2181
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
minSessionTimeout=2000
maxSessionTimeout=5000
在项目中添加zkclient的maven依赖

		<!-- http://mvnrepository.com/artifact/com.101tec/zkclient -->
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.8</version>
		</dependency>

Server代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

import org.I0Itec.zkclient.ZkClient;

public class SimpleServer implements Runnable {

	public static void main(String[] args) throws IOException {
		int port = 18080;
		SimpleServer server = new SimpleServer(port);
		Thread thread = new Thread(server);
		thread.start();
	}

	private int port;

	public SimpleServer(int port) {
		this.port = port;
	}
	
	private void regServer() {
		//向ZooKeeper注册当前服务器
		ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
		String path = "/test/server" + port;
		if(client.exists(path))
			client.delete(path);
		client.createEphemeral(path, "127.0.0.1:" + port);
	}

	@Override
	public void run() {
		ServerSocket server = null;
		try {
			server = new ServerSocket(port);
			regServer();
			System.out.println("Server started at " + port);
			Socket socket = null;
			while (true) {
				socket = server.accept();
				new Thread(new SimpleServerHandler(socket)).start();
			}
		} catch(IOException ex) {
			ex.printStackTrace();
		} finally {
			if (server != null) {
				try {
					server.close();
				} catch (IOException e) {}
			}
		}

	}
}
//SimpleServerHandler略

客户端代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

public class SimpleClient {

	private static List<String> servers = new ArrayList<>();
	
	public static void main(String[] args) {
		
		initServerList();
		
		SimpleClient client = new SimpleClient();
		BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
		while (true) {
			String name;
			try {
				name = console.readLine();
				if("exit".equals(name)) {
					System.exit(0);
				}
				client.send(name);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	private static void initServerList() {
		//启动时从ZooKeeper读取可用服务器
		String path = "/test";
		ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
		List<String> childs = zkClient.getChildren(path);
		servers.clear();
		for(String p : childs) {
			servers.add(zkClient.readData(path + "/" + p));
		}
		//订阅节点变化事件
		zkClient.subscribeChildChanges("/test", new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds)
					throws Exception {
				System.out.println(String.format("[ZookeeperRegistry] service list change: path=%s, currentChilds=%s", parentPath, currentChilds.toString()));
				servers.clear();
				for(String p : currentChilds) {
					servers.add(zkClient.readData(path + "/" + p));
				}
				System.out.println("Servers: " + servers.toString());
			}
		});
		
	}
	
	public static String getServer() {
		return servers.get(new Random().nextInt(servers.size()));
	}
	//其他无变化, 略
}
分别启动Server和Client,然后修改Server的端口号,再启动一个实例,可以看到客户端检测到了这个新服务器的存在



在客户端发送一些消息,可以看到被随机的分发到两个Server上处理




接下来关闭其中一个Server,可以看到客户端几秒钟后监测到这个事件并自动删除了该服务器。


可以看到,基于ZooKeeper实现软负载均衡非常简单,与应用紧密结合,使用灵活。

源码下载

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐