2019-07-19:完成基本RPC通信!

2019-07-22:优化此框架,实现单一长连接!

2019-07-24:继续优化此框架:1、增加服务提供注解(带版本号),然后利用Spring框架的在启动时立刻保存提供服务的实现类。2、优化NettyConfig(区分消费者和提供者配置),因为一个项目可同时作为服务提供者和服务消费者,所以增加两个配置来区分是提供服务还是消费服务,而且,因为如果都是本地启动着两个项目,那么IP必定是一样的,所以需要区分服务端口和消费端口。不然会有下面事故:先启动client,再启动server,但是他们同样依赖于netty包,所以client也启动了netty服务,只配置一个相同的端口会导致client的RPC通信也是通道自己启动的Netty服务。。。

2019-07-27:优化此框架:增加注册中心,使用Zookeeper作为注册中心。

接下来:我会优化Netty方面的,例如增加心跳检测、业务处理统一使用自定义业务线程池、客户端或服务端异常断开处理等,然后会优化一下项目的结构和rpc通信返回结果等,最后可能会考虑增加Redis作为注册中心。等完成所有的这些,就会对整个项目重新写一篇文章来介绍一下自己的整体思路,当然了,如果有同学需要的,可以在下方留言,我可以提前写文章,对完成注册中心及之前的代码进行详细介绍,之后再补充其他新增的功能实现过程!~

2019-07-30:已完成全部功能。放上连接:完整版RPC通信框架

下面的是2019-07-19写的文章,所以代码是没经过优化的,不过是核心代码,还是需要阅读一下的,需要看完整代码的请到最下面的github地址,大家可根据标签拉到对应的代码,麻烦啦~然后还有,测试方法是HelloController的sayHello方法呢,也可以自己再捣鼓一些测试一下~

前段时间,我花了两个星期的时间去重新学习Netty,因为之前总是看过一会就没看了,所以今次下定决心一定要全部看完,然后也思考做了一些的思考题,并且将简单的控制台版IM系统做出来了。虽然叫IM系统,但是是很简陋的,哈哈,只有登录、单聊、建群、加群、退群、群聊等简单的功能。大家可以到我github上看看:Netty-IM

写完这个IM系统后,我是打算自己写一个网页版的,可是考虑到自己前端的技能好像都退化得差不多了,而且时间上可能没那么充裕,就不了了之了。然后有一天,突然想起来之前使用的RPC框架->Dubbo,他的通信底层就是使用Netty,那么我就想着要不自己先搞个简单版试试呗,因为最主要的是学习技能得实践一番,不然学了好像没学一样。。。

在开始动手前,自己屡了一下思路,也参考了两篇文章,决定先做一个简版的RPC框架,不带注册中心的那种。那么来了老弟,首先我们看一下整个流程图是咋样的:

接下来重头戏来了,下面将会较详细得说一下流程:

先简单介绍一下项目结构:

simple-rpc-client:服务消费

simple-rpc-server:服务提供

simple-rpc-encapsulation:消费者和提供者公共接口

simple-rpc-netty:是关于Netty的东西,包括:自定义协议,序列化,通信实体Packet,各种Handler等等。

客户端:
        1、首先是两个注解,一个注解是:标识那些接口的调用会进行RPC通信,即@NettyRPC注解。
        另外一个注解是:告诉程序哪些包下的类会使用RPC通信,像@ComponentScan一样,即@EnableNettyRPC注解。

/**
 * @author Howinfun
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NettyRPC {
}
/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableNettyRPC {
    //扫描的包名,如果为空,则根据启动类所在的包名扫描
    String[] basePackages() default {};
}


        2、因为我们使用@NettyRPC的将是一些接口,如果项目里头没有实现类,那是调用失败的,那么我们可以通过实现ImportBeanDefinitionRegistrar和自定义FactoryBean和InvocationHandler,利用动态代理使接口有实现,并且能动态注入Bean。ImportBeanDefinitionRegistrar接口可以详细说一下,因为这里是动态注入Bean,怎么注入规则是可以自定的,主要是靠ClassPathScanningCandidateComponentProvider这个类,它主要功能是扫描ClassPath下的所有类,并且根据isCandidateComponent方法来判断哪些类可以作为候选人,当然了,isCandidateComponent方法你可以重写,然后加上你自己的规则,我这里是必须是独立的并且是接口,才能成为候选人。然后ClassPathScanningCandidateComponentProvider还能添加过滤器,我这里主要添加的过滤器是注解过滤器,只要带有@NettyRPC注解的,其他的都不要。
    不过需要注意一点的是:记得在有@Configuration注解的配置类上使用@Import导入实现ImportBeanDefinitionRegistrar的类,不然实现动态注入Bean的作用,这里我们在客户端的启动类Import即可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.anno.EnableNettyRPC;
import com.hyf.rpc.netty.anno.NettyRPC;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;


/**
 * 自定义注册带@NettyRPC注解的接口,利用动态代理使接口有实现
 * 然后在有@Configuration注解的配置类上使用@Import导入,不然不能注入这些实现@NettyRPC接口的BeanDefinition
 * @author Howinfun
 * @date 2019-07-18
 */
public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {

    private ClassLoader classLoader;

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        ClassPathScanningCandidateComponentProvider scan = getScanner();

        //指定注解,类似于Feign注解,只扫描带@NettyRPC注解的接口
        scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class));

        Set<BeanDefinition> candidateComponents = new HashSet<>();
        for (String basePackage : getBasePackages(importingClassMetadata)) {
            candidateComponents.addAll(scan.findCandidateComponents(basePackage));
        }
        candidateComponents.stream().forEach(beanDefinition -> {
            if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) {
                if (beanDefinition instanceof AnnotatedBeanDefinition) {
                    AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
                    AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
                    Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName());

                    this.registerNettyRpcClient(registry, annotationMetadata,attributes);
                }
            }
        });
    }

    private void registerNettyRpcClient(BeanDefinitionRegistry registry,
                                        AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        // 指定工厂,使用@NettyRPC注解的接口,当代码中注入时,是从指定工厂获取,而这里的工厂返回的是代理
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
                .genericBeanDefinition(NettyClientFactoryBean.class);
        // @Autowrie:根据类型注入
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
        // 注定type属性
        definition.addPropertyValue("type", className);
        String name = attributes.get("name") == null ? "" :(String)(attributes.get("name"));
        // 别名
        String alias = name + "NettyRpcClient";
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        beanDefinition.setPrimary(true);
        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                new String[] { alias });
        // 注册BeanDefinition
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }



    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false) {
            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                // 判断候选人的条件:必须是独立的,然后是接口
                if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){
                    return true;
                }
                return false;
            }
        };
    }

    /**
     * 获取指定扫描@NettyRPC注解的包路径
     * @param importingClassMetadata
     * @return
     */
    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
        Map<String, Object> attributes = importingClassMetadata
                .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName());

        Set<String> basePackages = new HashSet<>();
        // 如果指定的包路径为空,则获取启动类当前路径
        if (basePackages.isEmpty()) {
            basePackages.add(
                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));
        }else{
            for (String pkg : (String[]) attributes.get("basePackages")) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
        }
        return basePackages;
    }
}

package com.hyf.rpc.netty.client.config;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.stereotype.Component;

import java.lang.reflect.Proxy;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
public class NettyClientFactoryBean implements FactoryBean<Object> {

    private Class<?> type;

    @Override
    public Object getObject() throws Exception {
        // 这里的interfaces注意是就是type,因为我们现在是给接口做代理,千万别写type.getInterfaces(),不然启动会报错
        return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type));
    }

    @Override
    public Class<?> getObjectType() {
        return this.type;
    }
}


        3、在动态代理的invoke方法里头,我们将启动Netty的一个客户端,带上接口调用的信息,然后等待Netty服务端返回结果结果再返回到前端即可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.client.NettyClient;
import com.hyf.rpc.netty.packet.RPCRequestPacket;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@NoArgsConstructor
@Component
public class NettyRPCInvocationHandler implements InvocationHandler {

    private Class<?> type;

    public NettyRPCInvocationHandler(Class<?> type){
        this.type = type;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RPCRequestPacket requestPacket = new RPCRequestPacket();
        requestPacket.setClazz(type);
        requestPacket.setMethodName(method.getName());
        requestPacket.setParamTypes(method.getParameterTypes());
        requestPacket.setParams(args);
        Object result = NettyClient.callRPC(requestPacket);
        return result;
    }
}

    有一个坑是:当客户端接收到服务端的返回结果后,记得关闭通道[ctx.channel().close()],因为在客户端中RPC调用后是同步等待Channel关闭的,不然不能响应给前端。

服务端:服务端的流程稍微会简单很多
        1、启动Netty服务端服务,然后接收客户端的链接请求,解析请求
        2、然后根据接口调用信息,利用反射获取到实现类和对应的方法,最后调用方法得到结果,然后封装一下结果就可以相应给客户端了。

package com.hyf.rpc.netty.server.handler;

import com.hyf.rpc.netty.packet.RPCRequestPacket;
import com.hyf.rpc.netty.packet.RPCResponsePacket;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/16
 */
@ChannelHandler.Sharable
public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> {

    public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler();
    private RPCRequestPacketHandler(){}

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception {
        RPCResponsePacket responsePacket = new RPCResponsePacket();
        // 获取rpc调用信息,利用反射执行方法,返回结果
        Class clazz = msg.getClazz();
        String methodName = msg.getMethodName();
        Object[] params = msg.getParams();
        Class[] paramTypes = msg.getParamTypes();
        // 扫面路径下所有元数据
        Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl");
        Set<Class> subTypes = reflections.getSubTypesOf(clazz);
        if (subTypes.isEmpty()){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("没有实现类");
        }else if (subTypes.size() > 1){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("多个实现类,无法判断执行哪一个");
        }else{
            Class subClass = subTypes.toArray(new Class[1])[0];
            Method method = subClass.getMethod(methodName,paramTypes);
            Object result = method.invoke(subClass.newInstance(),params);
            responsePacket.setSuccess(true);
            responsePacket.setResult(result);
        }
        ctx.channel().writeAndFlush(responsePacket);
    }
}


        3、这里的反射我推荐一个很好用的框架->Reflections。简单介绍一下我使用了哪些API,首先是根据路径扫描反射元数据,
    然后根据接口获取它的所有实现类,然后就可以获取实现类的反射信息,得到方法执行结果了。

如果同学们对此比较简陋的代码还略感兴趣,可以到我github上看看:Netty-RPC

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐