1.从源码了解mq启动配置流程

1.1配置启动入口

1.1.1从factories我们可以看到mq的启动配置类
在这里插入图片描述

1.1.2然后我们找到 RabbitAutoConfiguration,发现它引入了RabbitAnnotationDrivenConfiguration这个配置类

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
}

1.1.3进入RabbitAnnotationDrivenConfiguration滑到最低部看到这里引入了@EnableRabbit这个注解,找个注解里面又引出RabbitBootstrapConfiguration这个配置类

@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {

}

//---------------------------------------------


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

1.1.4这里定义了两个bean,其中RabbitListenerEndpointRegistry就是监听容器的注册操作实现类

@Configuration
public class RabbitBootstrapConfiguration {

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
		return new RabbitListenerAnnotationBeanPostProcessor();
	}

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
	public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
		return new RabbitListenerEndpointRegistry();
	}

}

1.1.5RabbitListenerEndpointRegistry里面有获取所有容器的方法getListenerContainerIds和注册监听容器的方法registerListenerContainer


	/**
	 * Create a message listener container for the given {@link RabbitListenerEndpoint}.
	 * <p>This create the necessary infrastructure to honor that endpoint
	 * with regards to its configuration.
	 * @param endpoint the endpoint to add
	 * @param factory the listener factory to use
	 * @see #registerListenerContainer(RabbitListenerEndpoint, RabbitListenerContainerFactory, boolean)
	 */
	public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
		registerListenerContainer(endpoint, factory, false);
	}

1.1.6触发监听容器的位置是在RabbitListenerEndpointRegistrar类里面的bean初始化完成调用的钩子方法里面,注册所有listenercontain。ps:而RabbitListenerEndpointRegistrar类是RabbitListenerAnnotationBeanPostProcessor的属性对象,RabbitListenerAnnotationBeanPostProcessor是在1.1.4哪里初始化的rabbitmq监听注解拓展对象

	@Override
	public void afterPropertiesSet() {
		registerAllEndpoints();
	}


//---------------------------------------
public class RabbitListenerAnnotationBeanPostProcessor
		implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
		SmartInitializingSingleton {

	/**
	 * The bean name of the default {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}.
	 */
	public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";


	private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
}

2.创建自定义消息处理对象和监听容器

2.1在自定义bean引入RabbitListenerEndpointRegistry

@Component
public class RabbitmqCustomerConfiguration {
    
    @Resource
    RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    
 
}

2.2创建MyMessageListener类,实现消息监听接口


@Component
public class MyMessageListener implements  MessageListener {


    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));

    }
}

2.3新建SimpleRabbitListenerEndpoint,设置需要监听的队列名和自定义消息接收处理器


@Component
public class RabbitmqCustomerConfiguration {

    @Resource
    RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @Autowired
    MyMessageListener myMessageListener;

    public void registryCustomerContain() {
        SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint = new SimpleRabbitListenerEndpoint();
        simpleRabbitListenerEndpoint.setMessageListener(myMessageListener);
        simpleRabbitListenerEndpoint.setQueueNames("testQueue");
        
		//第三个参数是否马上启动监听容器        
		rabbitListenerEndpointRegistry.registerListenerContainer(simpleRabbitListenerEndpoint,null,true);


    }

}

ps:第二个参数的null为空的时候会调用default的factory。
从RabbitListenerEndpointRegistrar判断工厂是否为空,最后会根据containerFactoryBeanName获取

	private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
		if (descriptor.containerFactory != null) 
		//*return descriptor.containerFactory;
		}
		else if (this.containerFactory != null) {
			//*	return this.containerFactory;
		}
		else if (this.containerFactoryBeanName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			this.containerFactory = this.beanFactory.getBean(
					this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
			return this.containerFactory;  // Consider changing this if live change of the factory is required
		}
		else {
			throw new IllegalStateException("Could not resolve the " +
					RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
					descriptor.endpoint + "] no factory was given and no default is set.");
		}
	}

而containerFactoryBeanName是来自RabbitListenerAnnotationBeanPostProcessor的

public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";

也就是1.1.2里面提到的RabbitAnnotationDrivenConfiguration里面定义的bean

	@Bean
	@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
			SimpleRabbitListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		return factory;
	}

2.4思路:运行rabbitListenerEndpointRegistry的stop和start重启刷新所有监听器,或者刷新容器会导致刷新

	@Override
	public void start() {
		for (MessageListenerContainer listenerContainer : getListenerContainers()) {
			startIfNecessary(listenerContainer);
		}
	}



//----------------

	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
			listenerContainer.start();
		}
	}

//----------------


	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if (event.getApplicationContext().equals(this.applicationContext)) {
			this.contextRefreshed = true;
		}
	}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐