前言

RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

随着微服务、分布式的大热,开发者慢慢趋向于将一个大的服务拆分成多个独立的小的服务。
服务经过拆分后,服务与服务之间的通信就变得至关重要。

RPC说白了就是节点A去调用节点B的服务,站在Java的角度看,就是像调用本地函数一样调用远程函数


需要解决的问题

在这里插入图片描述

手写RPC实战

首先看下目录结构:
在这里插入图片描述

1、定义通信协议

消费者发起一个调用请求,服务者必须知道你要调哪个服务,参数是什么,这些需要封装好。

@Data
public class RpcMessage implements Serializable {
	private static final long serialVersionUID = 1L;

	private String interfaceName;//调用的Service接口名
	private String methodName;//调用的方法名
	private Class<?>[] argsType;//参数类型列表
	private Object[] args;//参数
}

2、自定义注解

分别是服务的提供者和消费者。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自动注入IOC容器
// 服务提供者
public @interface MyRpcService {

}


@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服务消费者
public @interface MyRpcReference {

}

3、定义接口

public interface UserService {

	// 根据UserId查找用户
	R<UserResp> findById(Long userId);
}

4、实现接口

加上自定义注解@MyRpcService,后续需要扫描这些实现类,并暴露服务。

@MyRpcService
public class UserServiceImpl implements UserService{

	@Override
	public R<UserResp> findById(Long userId) {
		UserResp userResp = new UserResp();
		userResp.setId(userId);
		userResp.setName("张三");
		userResp.setPwd("root@abc");
		return R.ok(userResp);
	}
}

5、暴露服务并监听处理请求

应用程序启动后,从Spring的IOC容器中,找到加了@MyRpcService注解的服务,并暴露出去。

/**
 * @author: pch
 * @description: 程序启动,暴露Service服务
 * @date: 2020/10/13
 **/
@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> {

	@Override
	public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
		ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
		for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) {
			ProviderHolder.addService(bean);
		}
		try {
			ProviderHolder.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.err.println("provider...启动");
	}
}

暴露服务,处理消费者请求的核心代码

/**
 * @author: pch
 * @description: 服务持有者
 * @date: 2020/10/13
 **/
public class ProviderHolder {
	// 缓存所有的服务提供者
	private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
	// 起一个线程池,处理消费者的请求
	private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

	// 添加服务
	public static void addService(Object bean) {
		Class<?> beanClass = bean.getClass();
		String interfaceName = beanClass.getInterfaces()[0].getName();
		SERVICES.put(interfaceName, new Provider(bean));
	}

	/**
	 * 启动服务
	 * @throws Exception
	 */
	public static void start() throws Exception {
		if (SERVICES.isEmpty()) {
			return;
		}
		// 开启ServerSocket,端口3333,监听消费者发起的请求。
		ServerSocket serverSocket = new ServerSocket(3333);
		while (true) {
			// 当有请求到达,提交一个任务到线程池
			Socket socket = serverSocket.accept();
			EXECUTOR_SERVICE.submit(() -> {
				try {
					// 从网络IO中读取消费者发送的参数
					Object o = new ObjectInputStream(socket.getInputStream()).readObject();
					if (o instanceof RpcMessage) {
						RpcMessage message = (RpcMessage) o;
						// 找到消费者要调用的服务
						Provider provider = SERVICES.get(message.getInterfaceName());
						if (provider == null) {
							return;
						}
						// 利用反射调用服务
						Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
						OutputStream outputStream = socket.getOutputStream();
						// 将返回结果序列化为字节数组并通过Socket写回
						outputStream.write(ObjectUtil.serialize(result));
						outputStream.flush();
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
	}
}

6、生成RPC动态代理对象

/**
 * @author: pch
 * @description: 基于JDK动态代理生成代理对象,发起RPC调用
 * @date: 2020/10/13
 **/
public class RpcProxy implements InvocationHandler {
	private Object origin = new Object();

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		if (Object.class.equals(method.getDeclaringClass())) {
			return method.invoke(origin, args);
		}
		// 开启一个Socket
		Socket socket = new Socket("127.0.0.1", 3333);
		// 封装请求协议
		RpcMessage message = new RpcMessage();
		message.setInterfaceName(method.getDeclaringClass().getName());
		message.setMethodName(method.getName());
		message.setArgsType(method.getParameterTypes());
		message.setArgs(args);
		// 将请求参数序列化成字节数组通过网络IO写回
		OutputStream outputStream = socket.getOutputStream();
		outputStream.write(ObjectUtil.serialize(message));
		outputStream.flush();
		// 阻塞,等待服务端处理完毕返回结果
		Object o = new ObjectInputStream(socket.getInputStream()).readObject();
		// 返回给调用者
		return o;
	}
}

7、消费者注入RPC动态代理对象

/**
 * @author: pch
 * @description: 注入加了@MyRpcReference注解的属性
 * @date: 2020/10/13
 **/
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		Class<?> beanClass = bean.getClass();
		Field[] fields = ClassUtil.getDeclaredFields(beanClass);
		for (Field field : fields) {
			if (field.getAnnotation(MyRpcReference.class) == null) {
				continue;
			}
			Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{field.getType()}, new RpcProxy());
			field.setAccessible(true);
			try {
				field.set(bean, proxy);
			} catch (IllegalAccessException e) {
				e.printStackTrace();
			}
		}
		return bean;
	}
}

功能测试

核心代码写好了,那就可以开始测试功能是否符合预期了。

1、启动服务提供者
在这里插入图片描述
2、启动消费者,并发起一个请求
在这里插入图片描述

尾巴

基于篇幅原因,本文只是实现了RPC最基本最简单的功能,主要是理解RPC的思想。
当然,还有很多可以优化的点:

  1. Service暴露的所有方法缓存起来,每次调用再反射查找开销还是很大的。
  2. 使用Netty提升网络IO的通信性能。
  3. 连接池的引入。
  4. 注册中心的加入。
  5. 写回的数据没有包装协议。
  6. 数据格式的扩展,请求头的加入。

你可能感兴趣的文章:

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐