摊牌了,我要手写一个RPC
文章目录前言需要解决的问题手写RPC实战1、定义通信协议2、自定义注解3、定义接口4、实现接口5、暴露服务并监听处理请求6、生成RPC动态代理对象7、消费者注入RPC动态代理对象功能测试尾巴前言RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数
前言
RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。
随着微服务、分布式的大热,开发者慢慢趋向于将一个大的服务拆分成多个独立的小的服务。
服务经过拆分后,服务与服务之间的通信就变得至关重要。
RPC说白了就是节点A去调用节点B的服务,站在Java的角度看,就是像调用本地函数一样调用远程函数。
需要解决的问题
手写RPC实战
首先看下目录结构:
1、定义通信协议
消费者发起一个调用请求,服务者必须知道你要调哪个服务,参数是什么,这些需要封装好。
@Data
public class RpcMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String interfaceName;//调用的Service接口名
private String methodName;//调用的方法名
private Class<?>[] argsType;//参数类型列表
private Object[] args;//参数
}
2、自定义注解
分别是服务的提供者和消费者。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自动注入IOC容器
// 服务提供者
public @interface MyRpcService {
}
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服务消费者
public @interface MyRpcReference {
}
3、定义接口
public interface UserService {
// 根据UserId查找用户
R<UserResp> findById(Long userId);
}
4、实现接口
加上自定义注解@MyRpcService
,后续需要扫描这些实现类,并暴露服务。
@MyRpcService
public class UserServiceImpl implements UserService{
@Override
public R<UserResp> findById(Long userId) {
UserResp userResp = new UserResp();
userResp.setId(userId);
userResp.setName("张三");
userResp.setPwd("root@abc");
return R.ok(userResp);
}
}
5、暴露服务并监听处理请求
应用程序启动后,从Spring的IOC容器中,找到加了@MyRpcService
注解的服务,并暴露出去。
/**
* @author: pch
* @description: 程序启动,暴露Service服务
* @date: 2020/10/13
**/
@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> {
@Override
public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) {
ProviderHolder.addService(bean);
}
try {
ProviderHolder.start();
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("provider...启动");
}
}
暴露服务,处理消费者请求的核心代码
/**
* @author: pch
* @description: 服务持有者
* @date: 2020/10/13
**/
public class ProviderHolder {
// 缓存所有的服务提供者
private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
// 起一个线程池,处理消费者的请求
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
// 添加服务
public static void addService(Object bean) {
Class<?> beanClass = bean.getClass();
String interfaceName = beanClass.getInterfaces()[0].getName();
SERVICES.put(interfaceName, new Provider(bean));
}
/**
* 启动服务
* @throws Exception
*/
public static void start() throws Exception {
if (SERVICES.isEmpty()) {
return;
}
// 开启ServerSocket,端口3333,监听消费者发起的请求。
ServerSocket serverSocket = new ServerSocket(3333);
while (true) {
// 当有请求到达,提交一个任务到线程池
Socket socket = serverSocket.accept();
EXECUTOR_SERVICE.submit(() -> {
try {
// 从网络IO中读取消费者发送的参数
Object o = new ObjectInputStream(socket.getInputStream()).readObject();
if (o instanceof RpcMessage) {
RpcMessage message = (RpcMessage) o;
// 找到消费者要调用的服务
Provider provider = SERVICES.get(message.getInterfaceName());
if (provider == null) {
return;
}
// 利用反射调用服务
Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
OutputStream outputStream = socket.getOutputStream();
// 将返回结果序列化为字节数组并通过Socket写回
outputStream.write(ObjectUtil.serialize(result));
outputStream.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
6、生成RPC动态代理对象
/**
* @author: pch
* @description: 基于JDK动态代理生成代理对象,发起RPC调用
* @date: 2020/10/13
**/
public class RpcProxy implements InvocationHandler {
private Object origin = new Object();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(origin, args);
}
// 开启一个Socket
Socket socket = new Socket("127.0.0.1", 3333);
// 封装请求协议
RpcMessage message = new RpcMessage();
message.setInterfaceName(method.getDeclaringClass().getName());
message.setMethodName(method.getName());
message.setArgsType(method.getParameterTypes());
message.setArgs(args);
// 将请求参数序列化成字节数组通过网络IO写回
OutputStream outputStream = socket.getOutputStream();
outputStream.write(ObjectUtil.serialize(message));
outputStream.flush();
// 阻塞,等待服务端处理完毕返回结果
Object o = new ObjectInputStream(socket.getInputStream()).readObject();
// 返回给调用者
return o;
}
}
7、消费者注入RPC动态代理对象
/**
* @author: pch
* @description: 注入加了@MyRpcReference注解的属性
* @date: 2020/10/13
**/
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
Field[] fields = ClassUtil.getDeclaredFields(beanClass);
for (Field field : fields) {
if (field.getAnnotation(MyRpcReference.class) == null) {
continue;
}
Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{field.getType()}, new RpcProxy());
field.setAccessible(true);
try {
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
return bean;
}
}
功能测试
核心代码写好了,那就可以开始测试功能是否符合预期了。
1、启动服务提供者
2、启动消费者,并发起一个请求
尾巴
基于篇幅原因,本文只是实现了RPC最基本最简单的功能,主要是理解RPC的思想。
当然,还有很多可以优化的点:
- Service暴露的所有方法缓存起来,每次调用再反射查找开销还是很大的。
- 使用Netty提升网络IO的通信性能。
- 连接池的引入。
- 注册中心的加入。
- 写回的数据没有包装协议。
- 数据格式的扩展,请求头的加入。
你可能感兴趣的文章:
更多推荐
所有评论(0)