springboot~ImportBeanDefinitionRegistrar在自定义RPC框架中的使用
·
入口注解设计:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RpcComponentRegistrar.class)
public @interface EnableRpc {
// 扫描的包路径
String[] basePackages() default {};
// 注册中心类型
RegistryType registry() default RegistryType.ZOOKEEPER;
// 协议类型
ProtocolType protocol() default ProtocolType.HTTP;
}
2. RpcComponentRegistrar的多阶段注册
public class RpcComponentRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private Environment environment;
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Override
public void registerBeanDefinitions(
AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry
) {
// 阶段1:解析配置
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(EnableRpc.class.getName()));
// 阶段2:根据配置动态注册核心组件
registerRegistryCenter(registry, attributes);
registerProtocolProcessor(registry, attributes);
registerLoadBalancer(registry, attributes);
// 阶段3:扫描并注册服务提供者和消费者
scanAndRegisterServices(registry, attributes);
}
private void registerRegistryCenter(
BeanDefinitionRegistry registry,
AnnotationAttributes attributes
) {
RegistryType type = attributes.getEnum("registry");
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClassName(getRegistryClassByType(type));
// 从Environment读取配置(如zookeeper地址)
beanDefinition.getPropertyValues().add("address",
environment.getProperty("rpc.registry.address"));
registry.registerBeanDefinition("rpcRegistryCenter", beanDefinition);
}
private void scanAndRegisterServices(
BeanDefinitionRegistry registry,
AnnotationAttributes attributes
) {
// 1. 扫描@ServiceProvider注解的服务提供者
Set<BeanDefinition> providers = scanForAnnotations(
attributes.getStringArray("basePackages"),
ServiceProvider.class);
// 2. 为每个服务提供者注册特殊的BeanDefinition
for (BeanDefinition providerDef : providers) {
GenericBeanDefinition enhancedDef = enhanceForProvider(providerDef);
registry.registerBeanDefinition(
providerDef.getBeanClassName(),
enhancedDef);
// 3. 同时自动注册到服务注册中心
registerToServiceDiscovery(providerDef);
}
// 4. 扫描@RpcReference注解的消费方
// 需要创建ReferenceBeanFactoryBean来处理动态代理
registerReferenceProcessor(registry, attributes);
}
private GenericBeanDefinition enhanceForProvider(BeanDefinition originalDef) {
GenericBeanDefinition definition = new GenericBeanDefinition(originalDef);
// 添加服务暴露的初始化逻辑
definition.setInitMethodName("exportService");
// 添加后置处理器来监听服务状态
definition.getPropertyValues().add("serviceRegistry",
new RuntimeBeanReference("rpcRegistryCenter"));
return definition;
}
private void registerReferenceProcessor(
BeanDefinitionRegistry registry,
AnnotationAttributes attributes
) {
// 创建处理@RpcReference注解的后置处理器
RootBeanDefinition processorDef = new RootBeanDefinition(
RpcReferenceAnnotationBeanPostProcessor.class);
// 注入必要的依赖
processorDef.getPropertyValues().add("registryCenter",
new RuntimeBeanReference("rpcRegistryCenter"));
processorDef.getPropertyValues().add("loadBalancer",
new RuntimeBeanReference("rpcLoadBalancer"));
registry.registerBeanDefinition(
"rpcReferenceAnnotationBeanPostProcessor",
processorDef);
}
}
3. RPC框架的关键扩展点设计
服务消费者代理工厂:
public class RpcReferenceFactoryBean implements FactoryBean<Object> {
private Class<?> interfaceType;
private String serviceName;
private LoadBalancer loadBalancer;
@Override
public Object getObject() throws Exception {
// 创建动态代理,实现RPC调用
return Proxy.newProxyInstance(
interfaceType.getClassLoader(),
new Class<?>[] {interfaceType},
new RpcInvocationHandler(serviceName, loadBalancer)
);
}
@Override
public Class<?> getObjectType() {
return interfaceType;
}
}
注解处理器:
public class RpcReferenceAnnotationBeanPostProcessor
implements BeanPostProcessor, BeanFactoryAware {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 扫描bean中所有@RpcReference注解的字段
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(RpcReference.class)) {
RpcReference reference = field.getAnnotation(RpcReference.class);
// 为每个引用创建代理并注入
Object proxy = createProxy(field.getType(), reference);
field.setAccessible(true);
try {
field.set(bean, proxy);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
return bean;
}更多推荐
所有评论(0)