使用Netty搭建Rpc框架,整合Nacos实现服务治理
基本介绍Netty是一个基于JAVA NIO的高性能网络通信框架,它屏蔽了NIO复杂的API操作,让我们得以编写很简单的程序就能获得一个高性能的通信组件。目前主流微服务之间调用方式有:RPC(Remote Procedure Call Protocol远程过程调用协议)和 HTTP restful 两种。比较有代表性的微服务框架就是dubbox 和 springcloud。分别主要以RPC和Res
本文干货满满,涉及Rpc、Netty、Java反射、代理、注解等知识。并借鉴spring和springcloud部分的设计思想
基本介绍
Netty是一个基于JAVA NIO的高性能网络通信框架,它屏蔽了NIO复杂的API操作,让我们得以编写很简单的程序就能获得一个高性能的通信组件。
目前主流微服务之间调用方式有:RPC(Remote Procedure Call Protocol 远程过程调用协议)和 HTTP restful 两种。比较有代表性的微服务框架就是dubbox 和 springcloud。分别主要以RPC和RestFul方式进行服务之间调用。
本文旨在学会如何使用Netty搭建一个简易的RPC框架。
搭建原理
首先了解什么叫RPC,为什么要RPC,RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。说白了,用过dubbox的都知道,dubbox之间远程服务调用就跟本地服务调用一样简单。
既然我们要搭建一个自己的RPC框架,先要对这个过程进行一个剖析。首先,RPC框架要做到封装所有调用过程,对调用者只返回结果。其次,这个调用过程,由于是远程服务调用,通信方面要么选择http方式,要么socket方式。这里选择socket方式,采用netty框架。
通信问题解决后,下一个就是服务调用,这个就很简单。使用java反射和proxy代理就行。
搭建过程
此次搭建总流程如下:
- 服务提供者(Netty服务端):自定义一个@RpcService注解,声明一个name属性,统一修饰提供出去的服务。在服务提供者启动时,统一将带有@rpcservice注解的类通过反射方式创建对象。以@rpcservice注解提供的name属性当key加入到Map容器中(是不是感觉很熟悉?spring就是这么干的)
- 服务消费者(Netty客户端):自定义一个@RpcReference,声明一个name属性。在需调用服务的类中,声明被调用对象的属性,以@RpcReference修饰并指定name跟@RpcService的name保持一致。(类似于@autowired注解)然后在调用服务时,通过java proxy创建服务提供者的代理对象,并重写调用逻辑,通过netty将调用信息(类名称、方法名、参数类型、参数值、返回类型、@RpcReference修饰的name属性值)等发送到服务提供方。服务提供方拿到信息后既可以解析并根据类名称、方法名称等通过反射方式创建被调用对象,也可以通过@RpcReference修饰的name属性值去Map容器里查找服务实例去调用并返回。
实战演示
代码结构图示:
该工程需要引入netty、fastjson以及reflections依赖。关于三个依赖作用下图解释:
<dependency>
<!--netty依赖,只需这一个即可。里面包含了所有netty会用到的依赖-->
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>
<!--nacos依赖,如需为rpc服务增加服务治理功能,可以引入nacos,来完成服务注册发现、配置管理等。具体参展
https://nacos.io/zh-cn/docs/sdk.html nacos官方手册地址-->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.3.1</version>
</dependency>
<!-- 用在自定义编解码器转换数据所用 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<!-- java反射框架,用于netty服务端启动时加载注解修饰的类。功能非常强大 -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.12</version>
</dependency>
代码方面
首先是编写提供服务接口、实现类以及实体类
public class Book {
private Integer id;
private String bookName;
public Book() {
}
public Book(Integer id, String bookName) {
this.id = id;
this.bookName = bookName;
}
//省略set、get、toString方法
}
public interface BookService {
Book getBook(Integer id);
Book updateBook(Book book);
Integer delBook(Integer id);
Integer insertBook(Book book);
}
@RpcService(serviceName = "BookServiceImpl")//自定义注解,服务注入
public class BookServiceImpl implements BookService {
@Override
public Book getBook(Integer id) {
return new Book(id,"大悲赋");
}
@Override
public Book updateBook(Book book) {
book.setBookName("修改后:"+book.getBookName());
return book;
}
@Override
public Integer delBook(Integer id) {
return id;
}
@Override
public Integer insertBook(Book book) {
return book.getId();
}
}
编写注解
@Target({ElementType.FIELD})//用于属性字段上
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcReference {//服务依赖,类似于spring的@autowired理解
public String referenceName();//根据name确认对象,类似于spring的@autowired的byName匹配模式,name要唯一
}
@Target({ElementType.TYPE})//用于类上
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {//服务注入,此注解修饰的类,会在netty服务端启动时加载到Bean容器中
public String serviceName();//唯一,根据name确认对象。类似于spring的@autowired的byName匹配模式,name要唯一
}
编写自定义消息,请求消息和返回消息体
public class MessageContext implements Serializable { //消息体,发送消息和接收的就是它
private String messageId;
private Boolean isSuccess;
private Object context;//具体的消息内容。分为请求数据和返回数据两种?
public MessageContext() {
}
public MessageContext(String messageId, Boolean isSuccess, Object context) {
this.messageId = messageId;
this.isSuccess = isSuccess;
this.context = context;
}
//省略set、get、toString方法
public class Request {//请求数据内容
private String className;//需调用的类全限定名
private String methodName;//需调用的方法名
private Class<?>[] parameterType;//参数类型
private Object[] parameterValue;//参数值
private String serviceNameByAnnotated;//bean名称,服务端根据此名称去容器中查找对象
public Request() {
}
public Request(String className, String methodName, Class<?>[] parameterType, Object[] parameterValue) {
this.className = className;
this.methodName = methodName;
this.parameterType = parameterType;
this.parameterValue = parameterValue;
}
//省略set、get、toString方法
}
public class Response{//响应数据内容
private Boolean isSuccess;
private Object context;//返回具体数据
private Class<?> contextType;//返回数据类型
public Response(Boolean isSuccess, Object context,Class<?> contextType) {
this.contextType = contextType;
this.isSuccess = isSuccess;
this.context = context;
}
public Response() {
}
//省略set、get、toString方法
}
然后,编写自定义编解码器(Netty知识)。Netty的编解码其实就是一个二进制数据和我们用到的数据之间的一个转换过程。编码器就是将我们自定义的数据转成二进制数据,通过网络发送到另外一端。另外一端再通过解码器将二进制数据解码成我们自定义的数据。然后进行一个处理。
public class MessageEncoder extends MessageToByteEncoder {//自定义编码器。继承Netty的MessageToByteEncoder以提供编码功能
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
String s = JSONObject.toJSONString(o);
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
System.out.println("已成功写入到通道:"+s);
}
}
public class MessageDecoder extends ByteToMessageDecoder {//自定义解码器。继承Netty的ByteToMessageDecoder 以提供解码功能
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
byte[] body = new byte[byteBuf.readInt()];//创建一个从readerIndex开始可读取总数的byte数组
byteBuf.readBytes(body);//将数据读到byte数组当中
String context = new String(body, StandardCharsets.UTF_8);//转回String
System.out.println("decoder出来的字符串" + context);
MessageContext messageContext = JSON.parseObject(context, MessageContext.class);//
list.add(messageContext);
byteBuf.skipBytes(byteBuf.readableBytes());
}
}
重点
Netty服务端
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//NacosTemplate nacosTemplate = new NacosTemplate(); //集成nacos才用到,可以去掉
NettyServerHandler nettyServerHandler = new NettyServerHandler();
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MessageDecoder());//自定义的解码器
pipeline.addLast(new MessageEncoder());//自定义的编码器
pipeline.addLast(nettyServerHandler);
}
});
ChannelFuture future = bootstrap.bind(8881).sync();
if (future.isSuccess()){
nettyServerHandler.setBeans(InitAnnotatedObject.init());//将带有@RpcService注解的类加载到“Bean容器”中
System.out.println("成功加载bean,数量:"+nettyServerHandler.getBeans().size());
System.out.println("已加载的beans:--["+nettyServerHandler.getBeans().toString()+"]--");
System.out.println("服务器已启动。。。");
}
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
//来看一下是如何将带有@RpcService注解的类加载到“Bean容器”中的
public class InitAnnotatedObject {
public static Map<String,Object> init(){
Map<String, Object> map = new HashMap<>();
try{
Reflections reflections = new Reflections("com.yxj.netty");//创建一个Reflections 对象,并指定要扫描的包路径
Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcService.class);//该方法会扫描指定包路径下带有@RpcService注解的类的Class对象,并封装成一个set集合
typesAnnotatedWith.stream().forEach(aClass -> {
if (aClass.isAnnotationPresent(RpcService.class)) {
RpcService annotation = aClass.getAnnotation(RpcService.class);
try {
Object o = aClass.newInstance();//遍历set集合,通过反射创建Class的对象
map.put(annotation.serviceName(), o);//以@RpcService的name属性为key放到Bean容器中
} catch (Exception e) {
e.printStackTrace();
}
}
});
}catch (Exception e){
e.printStackTrace();
}
return map;
}
}
编写服务端handler处理
public class NettyServerHandler extends ChannelInboundHandlerAdapter{
// private NacosTemplate nacosTemplate;//集成nacos才用到,可以去掉
private Map<String,Object> beans;
public Map<String, Object> getBeans() {
return beans;
}
public void setBeans(Map<String, Object> beans) {
this.beans = beans;
}
public NettyServerHandler() {
super();
// nacosTemplate = new NacosTemplate();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端 NioServerSocketChannel 已经和主eventLoopGroup中的某一个eventLoop绑定注册完成 Address ====== " + ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到来自客户端的消息 ====== " + msg);
if (msg instanceof MessageContext){
channelRead0(ctx,(MessageContext) msg);
}
}
//服务端主要调用逻辑在这里
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageContext messageContext) throws Exception {
Object context = messageContext.getContext();
Request request = JSON.parseObject(JSONObject.toJSONString(context),Request.class);//拿到消息体内的请求数据
//将参数值和参数类型匹配对应
Class<?>[] parameterType = request.getParameterType();
Object[] parameterValue = request.getParameterValue();
//由于参数值是通过json转换的,这边拿到的参数值全是JSONObject类型。所以需要将参数值按照参数类型重新转换一遍。
if (parameterType.length == parameterValue.length){
for (int i = 0; i < parameterValue.length; i++) {
Object o = JSON.parseObject(JSONObject.toJSONString(parameterValue[i]), parameterType[i]);
parameterValue[i] = o;
}
}
System.out.println(context.getClass().getName());
Object result = null ;
Class<?> aClass = Class.forName(request.getClassName());
//先判断bean容器是否为空,如果为空则可以拿到请求参数的类名(全限定名)反射去创建对象
if (beans == null || beans.size() == 0) {//我这里逻辑没写,可以自己加(request.getClassName()传类的全限定名就可以)
System.out.println("没有找到被调用的bean实例");
channelHandlerContext.writeAndFlush(new MessageContext(String.valueOf(new Random().nextInt(9999)),false,new Response(true,new Object(),Object.class)));
return;
}
Object o = beans.get(request.getServiceNameByAnnotated());
//根据请求传的bean的name去bean容器里查,没查到则通过类全限定名反射创建对象
if (o == null){
System.out.println("beans里没找到实例,根据Class创建");
o = aClass.newInstance();
}
Method method = aClass.getMethod(request.getMethodName(), request.getParameterType());
Class<?> returnType = method.getReturnType();
Object invoke = method.invoke(o, request.getParameterValue());
if (returnType.isInstance(invoke)){//将调用结果转换为方法返回类型。
result = returnType.cast(invoke);
}
System.out.println("调用完成,返回结果:"+result.toString());
channelHandlerContext.writeAndFlush(new MessageContext(String.valueOf(new Random().nextInt(9999)),true,new Response(true,result,returnType)));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生了异常");
cause.printStackTrace();
ctx.close();
}
}
Netty客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// NacosTemplate nacosTemplate = new NacosTemplate();//集成nacos用,这里可以去掉
NettyClientHandler nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MessageDecoder());//自定义解码器
pipeline.addLast(new MessageEncoder());//自定义编码器
pipeline.addLast(nettyClientHandler);
}
});
//集成nacos用,这里可以去掉。从nacos中负载均衡获取一个健康的服务实例
/*Instance nettyServices = nacosTemplate.getOneHealthyInstance("nettyServices");
ChannelFuture future = bootstrap.connect(nettyServices.getIp(), nettyServices.getPort()).sync();*/
ChannelFuture future = bootstrap.connect("localhost", 8881).sync();
if (future.isSuccess()){
System.out.println("连接到服务端,开始调用服务");
//客户端核心处理都在这个InvockService里
InvockService invockService = new InvockService(BookService.class,nettyClientHandler);
Book book = invockService.getBook(1);//调用远程服务,对于客户端来接屏蔽了调用过程,只返回数据
System.out.println("---getBook返回结果:"+book.toString());
Integer integer = invockService.insertBook(book);
System.out.println("---insertBook返回结果:"+integer.toString());
Book book1 = invockService.updateBook(book);
System.out.println("---updateBook返回结果:"+book1.toString());
Integer integer1 = invockService.delBook(integer);
System.out.println("---delBook返回结果:"+integer1.toString());
}
ChannelFuture sync = future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
}
先来单独看看InvockService的处理逻辑
public class InvockService {
@RpcReference(referenceName = "BookServiceImpl")//自定义注解,服务依赖。指定名称跟服务端保持一致
private BookService bookService;
private ExecutorService executorService = Executors.newFixedThreadPool(8);//创建一个线程池
public InvockService(Class<?> className,NettyClientHandler nettyClientHandler) {
this.bookService = (BookService) getBean(className,nettyClientHandler);//初始化时会调用一个getBean方法。该方法会创建被调用服务的代理对象
}
public Object getBean(Class<?> className,NettyClientHandler nettyClientHandler){
//通过proxy创建被调用者className的代理对象。
Object proxyInstance = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{className}, ((proxy, method, args) -> {
//并重写调用逻辑。当对象被代理后,你调用原对象方法,会拦截进入到该逻辑处理
//封装请求体数据
Request request = new Request();
request.setClassName("com.yxj.netty.server.pService.impl.BookServiceImpl");
request.setMethodName(method.getName());
request.setParameterType(method.getParameterTypes());
request.setParameterValue(args);
Field userServiceField = InvockService.class.getDeclaredField("bookService");
//获取bookService字段的@RpcReference注解的name属性传给服务端,服务端通过这个bean的name属性去bean容器查找对象
request.setServiceNameByAnnotated(userServiceField.getAnnotation(RpcReference.class).referenceName());
MessageContext messageContext = new MessageContext(String.valueOf(new Random().nextInt(9999)),true,request);
//把消息体放入到nettyClientHandler中,以供nettyClientHandler去发送数据
nettyClientHandler.setMessageContext(messageContext);
//把具体远程调用处理传给线程池去异步处理。
Object o = executorService.submit(nettyClientHandler).get();
return o;
}));
return proxyInstance;
}
public User getUser(){
return userService.getUser(1);
}
public User updateUser(){
return userService.updateUser(getUser());
}
public int delUser(){
return userService.deleteUser(1);
}
public int insertUser(){
return userService.insertUser(getUser());
}
Book getBook(Integer id){
return bookService.getBook(id);
}
Book updateBook(Book book){
return bookService.updateBook(book);
}
Integer delBook(Integer id){
return bookService.delBook(id);
}
Integer insertBook(Book book){
return bookService.insertBook(book);
}
}
客户端handler处理实现Callable。封装成一个线程处理
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {//实现Callable接口,重写call方法
private ChannelHandlerContext channelHandlerContext;
private MessageContext messageContext;
private NacosTemplate nacosTemplate;
public NettyClientHandler() {
super();
// nacosTemplate = new NacosTemplate();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("调用ctx方法");
this.channelHandlerContext = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("来自服务端的消息:"+msg);
if (msg instanceof MessageContext){
System.out.println("调用read0方法");
channelRead0(ctx,(MessageContext) msg);
}
}
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, MessageContext response) throws Exception {
System.out.println("调用服务返回值:"+response.toString());
this.messageContext = response;
//接收到服务端返回后,notify通知下面的call方法苏醒去做返回处理
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生了异常");
cause.printStackTrace();
ctx.close();
}
@Override
public synchronized Object call() throws Exception {//重写call方法
System.out.println("调用call方法");
channelHandlerContext.writeAndFlush(messageContext);
//发送完数据后,wait等待服务端响应返回。
wait();
//拿到数据解析并转换成相应的返回类型
Response response = JSON.parseObject(JSONObject.toJSONString(messageContext.getContext()), Response.class);
return JSON.parseObject(JSONObject.toJSONString(response.getContext()), response.getContextType());
}
public MessageContext getMessageContext() {
return messageContext;
}
public void setMessageContext(MessageContext messageContext) {
this.messageContext = messageContext;
}
}
验证
服务端日志:
客户端日志:
至此,netty搭建的一个简单Rpc框架就完成了!
番外
最后说一下,用我们搭建Rpc框架整合nacos客户端完成服务注册发现等功能的过程
除上面工程依赖那里需添加nacos依赖以外,还要有一个nacos的处理template
public class NacosTemplate {
private ConfigService configService;//主要用作配置方面的管理功能
private NamingService namingService;//主要用作服务方面的管理功能
//初始化namingService和configService;
public NacosTemplate(){
try {
configService = NacosFactory.createConfigService("localhost:8848");
namingService = NacosFactory.createNamingService("localhost:8848");
} catch (NacosException e) {
e.printStackTrace();
}
}
//注册服务
public void registerServer(Instance instance) throws Exception{
namingService.registerInstance(instance.getServiceName(),instance);
}
//删除服务
public void deleteServer(Instance instance) throws Exception{
namingService.deregisterInstance(instance.getServiceName(),instance.getIp(),instance.getPort());
}
//随机全部(有可能获取到的不健康)。拿到全部实例后,我们可以按照自己的负载均衡算法进行调用。类似于springcloud的ribbon。
public List<Instance> getAllServer(String serverName) throws Exception{
return namingService.getAllInstances(serverName);
}
//根据负载均衡算法获取一个健康的实例
public Instance getOneHealthyInstance(String serverName) throws Exception{
return namingService.selectOneHealthyInstance(serverName);
}
//更多api请参数:https://nacos.io/zh-cn/docs/open-api.html
}
具体的整合逻辑就是:服务端在启动时,调用注册服务方法,将自己注册到nacos上,客户端在启动前,调用getAllServer或者getOneHealthyInstance拿到服务端实例,然后进行bind连接。
更多推荐
所有评论(0)