一个轻量级的分布式RPC框架
在互联网中,随着访问需求的不断扩大,单一的MVC架构已经不能满足用户的访问需求,这个时候就需要RPC分布式架构。常见的RPC分布式框架:RPC server(生产者)提供RPC服务,通过向Zookeeper中注入服务。zookeeper管理分布式服务,负责服务节点选取、Master节点选择、分布式一致性、注册功能。RPC client 作为消费者订阅RPC server服务
·
背景:
在互联网中,随着访问需求的不断扩大,单一的MVC架构已经不能满足用户的访问需求,这个时候就需要RPC分布式架构。
常见的RPC框架:
跨语言调用型:Apache的Thrift,Google的Grpc、微博开源的Motan。
服务治理型:淘宝的Dubbo(java)、Twitter的finagle(基于Scala语言)。
框架比较:RPC比较
常见的RPC分布式框架架构:
RPC server(生产者)提供RPC服务,通过向Zookeeper中注入服务。zookeeper管理分布式服务,负责服务节点选取、Master节点选择、分布式一致性、注册功能。RPC client 作为消费者订阅RPC server服务,请求服务,zookeeper选取合适的节点响应client的请求。
根据已有的技术选型:
1 ,spring提供强大的依赖注入功能。
2 , Netty高性能的网络底层通信框架。
3 , zookeeper提供注册和一致性保障机制。
4 , protostuff 序列化和反序列化。
轻量级分布式框架:
1 , 编写server可提供的服务,暴露给client。
/**
* 编写提供服务的接口。
* @author pc
*
*/
public interface HelloService {
public String hello(Person person);
public String hello(String name);
}
2 , 编写服务的实现类,为client提供的具体服务。
import com.nettyRpc.server.RpcService;
import com.nettyRpc.test.client.HelloService;
import com.nettyRpc.test.client.Person;
/**
* server实现的服务。
* @author pc
* 因为类可能实现多个接口,因此需要指定远程接口。
*/
// 指定远程接口,表名@Component可被spring扫描。
@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService{
@Override
public String hello(Person person) {
// TODO Auto-generated method stub
return "hello" + person.getFirstName() + " " + person.getLastName();
}
@Override
public String hello(String name) {
// TODO Auto-generated method stub
return "hello" + name;
}
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.stereotype.Component;
/**
* 注解为component,component泛指组件,表示可被spring扫描。
* @author pc
*
*/
// spring注解 , @Controller控制器,@Service标注业务层,@Repository标注数据访问层(Dao),@Component泛指组件。
// Target通过ElementType来指定注解可使用范围的枚举集合,TYPE 表示注解可使用与类、接口和枚举。
// JDK 注解
@Target(ElementType.TYPE)
// @Retention用来告知编译程序如何处理注解,编译的时候将注解存入.class或者不在.class中。
@Retention(RetentionPolicy.RUNTIME)
// spring的注解
@Component
public @interface RpcService {
Class<?> value();
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.stereotype.Component;
/**
* 注解为component,component泛指组件,表示可被spring扫描。
* @author pc
*
*/
// spring注解 , @Controller控制器,@Service标注业务层,@Repository标注数据访问层(Dao),@Component泛指组件。
// Target通过ElementType来指定注解可使用范围的枚举集合,TYPE 表示注解可使用与类、接口和枚举。
// JDK 注解
@Target(ElementType.TYPE)
// @Retention用来告知编译程序如何处理注解,编译的时候将注解存入.class或者不在.class中。
@Retention(RetentionPolicy.RUNTIME)
// spring的注解
@Component
public @interface RpcService {
Class<?> value();
}
实现的服务放入Server中,为server为client提供的服务。
3 , 配置服务器端。
<?xml version="1.0" encoding="UTF-8"?>
<!-- 服务器配置文件 -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- component-scan spring 扫描component服务注解 -->
<context:component-scan base-package="com.nettyRpc.test.server" />
<!-- rpc 配置文件 -->
<context:property-placeholder location="classpath:rpc.properties"/>
<!-- 配置服务器注册组件 -->
<bean id="serviceRegistry" class="com.nettyRpc.registry.ServiceRegistry">
<constructor-arg name="registryAddress" value="${registry.address}"/>
</bean>
<!-- 配置RPC服务器 -->
<bean id="rpcServer" class="com.nettyRpc.registry.RpcServer">
<constructor-arg name="serverAddress" value="${server.address}"/>
<constructor-arg name="serviceRegistry" ref="serviceRegistry"/>
</bean>
</beans>
#配置zookeeper服务器
registry.address=127.0.0.1:2181
registry.address=127.0.0.1:2182
registry.address=127.0.0.1:2183
#配置RPC服务器
server.address=127.0.0.1:18867
4 , 启动服务器,并发布服务。
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 启动服务器,加载配置文件。
* @author pc
*
*/
public class RpcBootstrap {
@SuppressWarnings("resource")
public static void main(String[] args){
new ClassPathXmlApplicationContext("server-spring.xml");
}
}
5 , 实现服务注册。
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 服务注册进入zookeeper
* @author pc
*
*/
public class ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private String registryAddress;
// 添加监听注册事件。
private CountDownLatch countDown = new CountDownLatch(1);
public ServiceRegistry(String registryAddress){
this.registryAddress = registryAddress;
}
// 注册服务。
public void register(String data){
if (data != null){
ZooKeeper zk = connectZK();
if (zk != null){
AddRootNode(zk);
createNode(zk , data);
}
}
}
// 连接zk。
public ZooKeeper connectZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(this.registryAddress , Constant.ZK_SESSION_TIMEOUT , new Watcher(){
// watcher 事件监听函数,成功Zk的时候调用process。
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){
countDown.countDown();
}
}
});
countDown.await();
} catch (IOException e) {
logger.error("注册没有成功:" + e);
} catch (InterruptedException e) {
logger.error("注册被中断:" + e);
}
return zk;
}
// 添加根节点。
public void AddRootNode(ZooKeeper zk ){
try {
Stat s = zk.exists(Constant.ZK_REGISTRY_PATH , false);
if (s == null){
zk.create(Constant.ZK_REGISTRY_PATH,new byte[0]
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
logger.error(e.toString());
} catch (InterruptedException e) {
logger.error(e.toString());
}
}
//创建节点。
public void createNode(ZooKeeper zk , String data){
byte[] bytes = data.getBytes();
try {
zk.create(Constant.ZK_DATA_PATH, bytes
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
logger.error("添加节点:" + e.toString());
} catch (InterruptedException e) {
logger.error("添加节点" + e.toString());
}
}
}
6 , 实现RPC服务器。
客户端通过spring的ClassPathXmlApplicationContext()加载配置文件,spring bean的加载顺序:先构造函数——>然后是b的set方法注入——>InitializingBean 的afterPropertiesSet方法——>init-method方法。
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import com.nettyRpc.protocol.RpcDecoder;
import com.nettyRpc.protocol.RpcEncoder;
import com.nettyRpc.protocol.RpcRequest;
import com.nettyRpc.protocol.RpcResponse;
import com.nettyRpc.registry.ServiceRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* Rpc服务器
* @author pc
*
*/
public class RpcServer implements ApplicationContextAware, InitializingBean{
// slf4j记录日志
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
private String serverAddress;
private ServiceRegistry serviceRegistry;
// 存放接口名与服务之间的映射。
Map<String , Object> handlerMap = new HashMap<String , Object>();
/*
* 和serviceRegistry一样通过rpc.properties初始化构造函数。
*/
public RpcServer(String serverAddress){
this.serverAddress = serverAddress;
}
public RpcServer(String serverAddress , ServiceRegistry serviceRegistry){
this.serverAddress = serverAddress;
this.serviceRegistry = serviceRegistry;
}
/*
* 实现该接口的类,可以在spring容器初始化的时候调用setApplicationContext方法,会自动的将ApplicationContext注入进来:
* 从而获得ApplicationContext中的所有bean。
* spring 加载顺序:
* (non-Javadoc)
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
// spring扫描指定标注接口,获取特定标注服务实现。
Map<String , Object> beanMap = ctx.getBeansWithAnnotation(RpcService.class);
if(MapUtils.isNotEmpty(beanMap)){
for(Object bean : beanMap.values()){
// 获取指定Annotation的class,将映射放入handlerMap中。
String beanStr = bean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(beanStr, bean);
}
}
}
/*
* springFrame 定义的事件,在加载完配置文件的时候触发afterPropertiesSet(),
* afterPropertiesSet()表示在资源加载完以后,初始化bean之前执行的方法。
* setApplicationContext方法已经获取了需要的bean,afterPropertiesSet()需要加载服务器。
* (non-Javadoc)
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
// 添加两个EventLoopGroup处理channel请求的事件,源码中是创建线程池。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
// 添加group进入bootstrap中,bootstrap一般有两个EventLoop,一个为主线程,一个为子线程。
//客户端与主线程建立连接,之后交给子线程处理连接请求。
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ctx) throws Exception {
//添加handler ,按照长度获取帧,
ctx.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0)
, new RpcDecoder(RpcRequest.class)
, new RpcEncoder(RpcResponse.class)
, new RpcHandler(handlerMap));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] arr = this.serverAddress.split(":");
String host = arr[0];
int port = Integer.parseInt(arr[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
if (serviceRegistry != null){
serviceRegistry.register(serverAddress);
}
// 用户调用了closeFuture会阻塞用户主线程,子线程处理用户请求。
future.channel().closeFuture().sync();
} catch(Exception e){
logger.error(e.toString());
} finally{
// finally时,关闭group,group对应线程,所以需要关闭。
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
对于request进行解码,ByteToMessageDecoder。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* RPC Decoder
* @author huangyong
*/
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
/*if (dataLength <= 0) {
ctx.close();
}*/
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserialize(data, genericClass);
//Object obj = JsonUtil.deserialize(data,genericClass); // Not use this, have some bugs
out.add(obj);
}
}
对Response进行编码,MessageToByteEncoder。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class RpcEncoder extends MessageToByteEncoder{
private Class<?> genericClass;
public RpcEncoder(Class<?> rpcResponse){
this.genericClass = rpcResponse;
}
// 编码 object - > byte[]
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (genericClass.isInstance(msg)){
byte[] data= SerializationUtil.serialize(msg);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
需要定义Request和Response的数据格式。
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
private String requestId;
private String error;
private Object result;
import java.lang.reflect.Method;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nettyRpc.protocol.RpcRequest;
import com.nettyRpc.protocol.RpcResponse;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest>{
private static final Logger logger = LoggerFactory.getLogger(RpcHandler.class);
private final Map<String , Object> handlerMap;
// RPCServer传入spring初始化的ServerBean,根据Map中的bean提供服务。
public RpcHandler(Map<String , Object> handlerMap){
this.handlerMap = handlerMap;
}
/*
* Request和 response不同于servlet中的request,response,tomcat中init的过程。
* @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {
RpcServer.submit(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
logger.debug("channelRead0 start :" + request.getRequestId());
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try{
// 根据requestID、className、parameters 处理请求。
Object object = handle(request);
response.setResult(object);
} catch(Throwable e){
logger.error("channelRead0 is wrong." + e);
}
ctx.writeAndFlush(response).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// TODO Auto-generated method stub
logger.debug("send response for request :" + request.getRequestId());
}
});
}
});
}
/*
* 根据请求的class和Method信息执行service。
*/
private Object handle(RpcRequest request) throws Throwable{
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> cls = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
Method method = cls.getMethod(methodName, parameterTypes);
return method.invoke(serviceBean, parameters);
}
}
import java.lang.reflect.Method;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nettyRpc.protocol.RpcRequest;
import com.nettyRpc.protocol.RpcResponse;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest>{
private static final Logger logger = LoggerFactory.getLogger(RpcHandler.class);
private final Map<String , Object> handlerMap;
// RPCServer传入spring初始化的ServerBean,根据Map中的bean提供服务。
public RpcHandler(Map<String , Object> handlerMap){
this.handlerMap = handlerMap;
}
/*
* Request和 response不同于servlet中的request,response,tomcat中init的过程。
* @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {
RpcServer.submit(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
logger.debug("channelRead0 start :" + request.getRequestId());
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try{
// 根据requestID、className、parameters 处理请求。
Object object = handle(request);
response.setResult(object);
} catch(Throwable e){
logger.error("channelRead0 is wrong." + e);
}
ctx.writeAndFlush(response).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// TODO Auto-generated method stub
logger.debug("send response for request :" + request.getRequestId());
}
});
}
});
}
/*
* 根据请求的class和Method信息执行service。
*/
private Object handle(RpcRequest request) throws Throwable{
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> cls = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
Method method = cls.getMethod(methodName, parameterTypes);
return method.invoke(serviceBean, parameters);
}
}
7 , 配置客户端。
8 , 实现服务发现。
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.ThreadLocalRandom;
/**
* 服务发现
* @author pc
*
*/
public class ServiceDiscovery {
private static final Logger logger= LoggerFactory.getLogger(ServiceDiscovery.class);
private CountDownLatch countDown = new CountDownLatch(1);
private String registryAddress;
private ZooKeeper zk ;
private List<String> dataList;
public ServiceDiscovery(String registryAddress){
this.registryAddress = registryAddress;
zk = connectServer();
if (zk != null){
watchNode(zk);
}
}
// 连接zk服务器。
public ZooKeeper connectServer(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher(){
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){
countDown.countDown();
}
}
});
countDown.await();
} catch (IOException e) {
logger.error("连接异常。" + e);
} catch (InterruptedException e) {
logger.error("中断连接异常。" + e);
}
return zk;
}
public String discover(){
String data = null;
int size = dataList.size();
if (size > 1){
if (size == 1){
data = dataList.get(0);
logger.debug("使用唯一的服务" + data );
} else {
data = dataList.get(ThreadLocalRandom.current().nextInt(size));
logger.debug("选取任意服务" + data);
}
}
return data;
}
// 发现服务节点,将服务节点的数据存储在dataList中。
public void watchNode(ZooKeeper zk){
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher(){
@Override
public void process(WatchedEvent event) {
// 如果节点状态改变,则更新节点。
if (event.getType() == Event.EventType.NodeChildrenChanged){
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for(String node : nodeList){
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
logger.debug("node data" + dataList);
this.dataList = dataList;
UpdateConnectedServer();
} catch (KeeperException e) {
logger.error("保持连接异常。" + e);
e.printStackTrace();
} catch (InterruptedException e) {
logger.error("中断异常。" + e);
}
}
public void UpdateConnectedServer(){
}
// 停止zookeeper。
public void stop(){
if (zk != null){
try {
zk.close();
} catch (InterruptedException e) {
logger.error("关闭zk" + e);
}
}
}
}
9 , 实现RPC代理。
10 , 发送RPC请求。
更多推荐
已为社区贡献1条内容
所有评论(0)