1、环境:spring boot

  1. 实现EnvironmentAware 接口可以取得properties里的配置文件(原来打算使用一个config类取,然后从注入Config取值,行不通的原因是@value 是在bean实例化后才注入进去的,这个我通过applicationaware取得的bean里面的属性值都是空的)
  2. 实现ApplicationContextAware 可以取得spring 容器跌bean值
    3)实现BeanDefinitionRegistryPostProcessor对bean进行注册
@Getter
@ConditionalOnBean (ConstomRabbitMqPropteties.class)
@Component
public class CreateCustomBean implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware, EnvironmentAware {
    //用来取上下文
    private static ApplicationContext applicationContext;

    //取配置文件
    private static Environment environment;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {


          /*
         * 实例化交换机
         */
        BeanDefinitionBuilder exchangeBuilder = BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class);
        String exchangeName = environment.getProperty("spring.rabbitmq.template.exchange");
        exchangeBuilder.addConstructorArgValue(exchangeName);


        String queueNames = environment.getProperty("spring.rabbitmq.quenuename");
        String[] queues = queueNames.split(",");
        Stream.of(queues).forEach((e) -> {
            /*
             * 实例化queue
             */
            String queueName = e;
            BeanDefinitionBuilder queueBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
            queueBuilder.addConstructorArgValue(queueName);
            queueBuilder.addConstructorArgValue(true);
            queueBuilder.setAutowireMode(AUTOWIRE_CONSTRUCTOR);

            beanDefinitionRegistry.registerBeanDefinition(queueName,queueBuilder.getBeanDefinition());

            /*
             * 实例化bind
             */
            BeanDefinitionBuilder bindBuilder = BeanDefinitionBuilder.genericBeanDefinition(Binding.class);
            String routingKey = e;
            bindBuilder.addConstructorArgValue(queueName);
            bindBuilder.addConstructorArgValue(Binding.DestinationType.QUEUE);
            bindBuilder.addConstructorArgValue(exchangeName);
            bindBuilder.addConstructorArgValue(routingKey);
            Map<String,Object> arguments = new HashMap<>(1 << 4);
            arguments.put("x-dead-letter-exchange",exchangeName);
            arguments.put("x-dead-letter-routing-key",routingKey);
            bindBuilder.addConstructorArgValue(arguments);
            bindBuilder.setAutowireMode(AUTOWIRE_CONSTRUCTOR);
            beanDefinitionRegistry.registerBeanDefinition(e+"bind",bindBuilder.getBeanDefinition());

        });

    }


    //可以在bean实例化前做处理
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContextz) throws BeansException {
        applicationContext = applicationContextz;
    }


    public static Object getBean(Class<?> clazz) {
        return applicationContext.getBean(clazz);
    }

    public static Object getBean(String className) {
        return applicationContext.getBean(className);
    }

    @Override
    public void setEnvironment(Environment environmentz) {
        environment = environmentz;
    }
}

properties文件
在这里插入图片描述
消费者 ack确认
在这里插入图片描述
生产者

@Slf4j
@Service
public class ProductServiceImpl implements IProductService, RabbitTemplate.ConfirmCallback {

    //由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConstomRabbitMqPropteties rabbitMqConfig;
    /**
     * 构造方法注入rabbitTemplate
     */
    @Autowired
    public ProductServiceImpl(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData,boolean ack,String cause) {
        log.info(" 消息id:" + correlationData);
        if (ack) {
            log.info("消息发送确认成功");
        } else {
            log.info("消息发送确认失败:" + cause);
        }
    }

    @Override
    public void save(RetryVO retryVO,String routingKey) {
        //执行保存
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(rabbitMqConfig.getMqRabbitExchange(),routingKey,retryVO,correlationId);

    }

}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐