gRpc服务注册到zookeeper实现
将Grpc服务注册到zookeeper中,具体zookeeper代码实现如下:服务注册在http://blog.csdn.net/July_whj/article/details/79423459中体现。注册中心代码:package cn.org.bjca.anysign.server.service.grpcanywrite.grpcserverinzk;import org.a...
·
将Grpc服务注册到zookeeper中,具体zookeeper代码实现如下:服务注册在http://blog.csdn.net/July_whj/article/details/79423459中体现。
注册中心代码:
package cn.org.bjca.anysign.server.service.grpcanywrite.grpcserverinzk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
/***************************************************************************
* <pre>ZK注册中心</pre>
*
* @文件名称: GrpcAnyWritePDFSignFacadeZKServer.class
* @包 路 径: xxx
* @版权所有: xxx (C) 2018
* @类描述:
* @版本: V2.0
* @创建人: july_whj
* @创建时间:2018/3/1 13:16
***************************************************************************/
public class GrpcAnyWritePDFSignFacadeZKServer {
private org.slf4j.Logger logger = LoggerFactory.getLogger(getClass());// 日志
private ZooKeeper zk = null;
private String zkHosts = null;
private int sessiontimeout = 0;
public String parentznodepath = null;
public GrpcAnyWritePDFSignFacadeZKServer(){
}
public GrpcAnyWritePDFSignFacadeZKServer(String zkHosts,int sessiontimeout,String parentznodepath){
this.zkHosts = zkHosts;
this.sessiontimeout = sessiontimeout;
this.parentznodepath = parentznodepath;
}
public void getZkClient() throws Exception {
zk = new ZooKeeper(zkHosts,
sessiontimeout, null);
}
/**
* 向zookeeper中的/下创建子节点
* 默认本机IP
* @param port
* @throws InterruptedException
*/
public void connectZK(String port) throws Exception {
InetAddress address = InetAddress.getLocalHost();
String data = address.getHostAddress()+":"+port;
// 先创建出父节点
if (zk.exists(parentznodepath, false) == null) {
zk.create(parentznodepath, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 连接zk创建znode
zk.create(parentznodepath + "/",
data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("server "+data+" is online ......");
}
/**
* 向zookeeper中的/下创建子节点
* @param ip
* @param port
* @throws Exception
*/
public void connectZK(String ip,String port) throws Exception {
String data = ip+":"+port;
// 先创建出父节点
if (zk.exists(parentznodepath, false) == null) {
zk.create(parentznodepath, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 连接zk创建znode
zk.create(parentznodepath + "/",
data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("server "+data+" is online ......");
}
}
客户端实现:
package cn.org.bjca.anysign.server.service.grpcanywrite.grpcserverinzk;
import cn.org.bjca.anysign.server.service.grpcanywrite.GrpcAnyWritePDFSignFacadeClient;
import cn.org.bjca.seal.esspdf.platform.single.SysConfigSingle;
import cn.org.bjca.seal.esspdf.platform.utils.PropertiesUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.ArrayList;
import java.util.List;
/***************************************************************************
* <pre></pre>
*
* @文件名称: ${CLASS_NAME}
* @包 路 径: xxx
* @版权所有:xxx (C) 2018
* @类描述:
* @版本: V2.0
* @创建人: july_whj
* @创建时间:2018/3/1 15:11
***************************************************************************/
public class ZKClient {
private volatile List<String> servers = null;
private ZooKeeper zk = null;
String parentZNodePath = null;
// 获取zk连接
private void getZkClient() throws Exception {
try {
PropertiesUtil propertiesUtil = SysConfigSingle.getInstance().getSysConfigProperties();
String zkHosts = propertiesUtil.read("zkServerPort");
String sessionTimeout = propertiesUtil.read("sessionTimeout");
parentZNodePath = propertiesUtil.read("parentZNodePath");
// 服务器在需求中并不需要做任何监听
zk = new ZooKeeper(zkHosts,
Integer.parseInt(sessionTimeout), new Watcher() {
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None)
return;
try {
// 获取新的服务器列表,重新注册监听
updateServers();
} catch (Exception e) {
e.printStackTrace();
}
}
});
} catch (Exception ex) {
}
}
/**
* 从zk中获取在线服务器信息
*/
public void updateServers() throws Exception {
// 从servers父节点下获取到所有子节点,并注册监听
List<String> children = zk.getChildren(parentZNodePath,
true);
ArrayList<String> serverList = new ArrayList<String>();
for (String child : children) {
byte[] data = zk.getData(parentZNodePath + "/"
+ child, false, null);
serverList.add(new String(data));
}
// 如果客户端是一个多线程程序,而且各个线程都会竞争访问servers列表,所以,在成员中用volatile修饰了一个servers变量
// 而在更新服务器信息的这个方法中,是用一个临时List变量来进行更新
servers = serverList;
// 将更新之后的服务器列表信息打印在控制台观察一下
for (String server : serverList) {
System.out.println(server);
}
System.out.println("===================");
}
/**
* 业务逻辑
*
* @throws InterruptedException
*/
private void requestService() throws InterruptedException {
String ipport = servers.get(0);//这里可以获取服务列表中任何值可做负载均衡
if (StringUtils.isBlank(ipport)){
System.out.println("zk获取服务失败");
}else{
String [] strs = ipport.split(":");
GrpcAnyWritePDFSignFacadeClient client = new GrpcAnyWritePDFSignFacadeClient(strs[0], Integer.parseInt(strs[1]));//获取服务实现
try {
client.Test(null);//调用服务
} finally {
client.shutdown();
}
}
}
public static void main(String[] args) throws Exception {
ZKClient client = new ZKClient();
// 先构造一个zk的连接
client.getZkClient();
// 获取服务器列表
client.updateServers();
// 客户端进入业务流程,请求服务器的服务
client.requestService();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)