rabbitmq添加stomp插件(具体操作就不描述了,自行百度)

在这里插入图片描述

项目结构

在这里插入图片描述
创建一个虚拟host命名为test
在这里插入图片描述

springboot添加maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.1.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.butterfly</groupId>
	<artifactId>stomp</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>stomp</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.62</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.9.8</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.datatype</groupId>
			<artifactId>jackson-datatype-joda</artifactId>
			<version>2.9.8</version>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

启动类上加上@EnableRabbit

package com.butterfly.stomp;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableRabbit
public class StompApplication {

	public static void main(String[] args) {
		SpringApplication.run(StompApplication.class, args);
	}

}

rabbitmq配置类如下:

package com.butterfly.stomp.config;

import com.butterfly.stomp.config.listener.ReceivePushMsgListener;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;

import java.util.ArrayList;
import java.util.List;


@Configuration
public class RabbitmqConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        /*channel缓存的大小*/
        connectionFactory.setChannelCacheSize(200);
        /**/
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.addConnectionListener(new ConnectionListener() {
            @Override
            public void onCreate(Connection connection) {
                logger.info("创建rabbitmq连接");
            }

            @Override
            public void onClose(Connection connection) {
                logger.info("关闭rabbitmq连接");
            }
        });
        //设置虚拟主机,默认/
        connectionFactory.setVirtualHost("test");
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    /**
     * 配置MQ传输序列化对象
     *
     * @return Jackson2JsonMessageConverter
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JodaModule());
        mapper.enable(MapperFeature.USE_GETTERS_AS_SETTERS);
        mapper.enable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS);
        mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
        mapper.enable(SerializationFeature.WRITE_ENUMS_USING_TO_STRING);
        mapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        return new Jackson2JsonMessageConverter(mapper);
    }

    /**
     * 配置模版对象
     *
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate template() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    /**
     * 配置管理器
     *
     * @return RabbitAdmin
     */
    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.setIgnoreDeclarationExceptions(true);
        return rabbitAdmin;
    }

    /**
     * RepublishMessageRecoverer允许在重试次数耗尽后,发布失败消息
     *
     * @return RetryOperationsInterceptor
     */
    @Bean
    public RetryOperationsInterceptor interceptor() {
        return RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .recoverer(new RepublishMessageRecoverer(template()))
                .build();
    }

    /**
     * 配置spring上下文监听容器 需要延迟启动
     *
     * @return SimpleMessageListenerContainer
     */
    public SimpleMessageListenerContainer newListenerContainer(MessageListener messageListener, String[] queues) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setAutoDeclare(false);
        //声明失败重复次数
        container.setDeclarationRetries(1);
        //可接受来自broker一个socket帧中的消息数目. 数值越大,消息分发速度就越快, 但无序处理的风险更高
        container.setPrefetchCount(10);
        container.addQueueNames(queues);
        container.setAcknowledgeMode(AcknowledgeMode.NONE);
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(100);
        container.setConcurrentConsumers(10);
        container.setMessageListener(messageListener);
        container.setAutoStartup(false);
        container.setExclusive(false);
         /*丢弃所有失败的消息 false丢弃 true 循环处理 或者,
         你可以抛出一个AmqpRejectAndDontRequeueException;这会阻止消息重新入列,不管defaultRequeueRejected 属性设置的是什么.*/
        container.setDefaultRequeueRejected(false);
        container.start();
        return container;
    }

    /**
     * 声明交互器
     *
     * @return
     */
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("exchange_pushmsg");
    }

    /**
     * 创建一个ReceivePushMsgListener,监听routingKey为“rk_recivemsg”的队列实现客户端收到消息后向此队列发送确认收到消息
     */
    @Bean
    public Object declareDirectQueue() {
        List<String> receiveQueueNames = new ArrayList<>();
        String receive = "queue_pushmsg";
        declare(receive, directExchange(), "rk_recivemsg");
        receiveQueueNames.add(receive);
        newListenerContainer(new ReceivePushMsgListener(), receiveQueueNames.toArray(new String[receiveQueueNames.size()]));
        return null;
    }

    private void declare(String queueName, DirectExchange exchange, String routingKey) {
        RabbitAdmin admin = rabbitAdmin();
        Queue queue = new Queue(queueName, true, false, false);
        admin.declareQueue(queue);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
    }
}

消息推送生产者

package com.butterfly.stomp.config.component;

import com.butterfly.stomp.entity.WiselyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.UUID;

@Component
public class PushMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private static Logger log = LoggerFactory.getLogger(PushMsgProducer.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    //消息发送确认回调方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息发送成功:" + correlationData);
    }

    //消息发送失败回调方法
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息发送失败:" + new String(message.getBody()));
    }

    /**
     * 发送消息
     *
     * @param messageVo
     */
    public void send(WiselyMessage messageVo) throws IOException {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange_pushmsg", messageVo.getRoutingKey(), messageVo, correlationId);
    }
}

消息实体bean

package com.butterfly.stomp.entity;


public class WiselyMessage {
    private String name;
    private String routingKey;
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }
}

测试controller

package com.butterfly.stomp.controller;

import com.butterfly.stomp.config.component.PushMsgProducer;
import com.butterfly.stomp.entity.WiselyMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;


@RestController
public class TestController {
    @Autowired
    PushMsgProducer sender;
    @GetMapping("/test")
    public String send() throws IOException {
        WiselyMessage msg = new WiselyMessage();
        msg.setName("hello");
        msg.setRoutingKey("rk_pushmsg");
        sender.send(msg);
        return "success";
    }
}

vue集成stompjs插件

npm install net -S
npm install stompjs -S
代码片段

import Stomp from 'stompjs'

export default {
    name: 'messageNotice',
    data () {
    	return {
    		client: null
		}
    },
    created() {
        this.connect();
    },
    methods: {
        onConnected(frame) {
            console.log("Connected: " + frame);
            var exchange = "/exchange/exchange_pushmsg/rk_pushmsg";
            this.client.subscribe(exchange, this.responseCallback, this.onFailed);
            console.log(frame)
        },
        onFailed(frame) {
            console.log("Failed: " + frame);
        },
        responseCallback(frame) {
            console.log("得到的消息 msg=>" + frame.body);
            console.log(frame)
            //接收到服务器推送消息,向服务器定义的接收消息routekey路由rk_recivemsg发送确认消息
            this.client.send("/exchange/exchange_pushmsg/rk_recivemsg", {"content-type":"text/plain"}, frame.body);
        },
        connect() {
            this.client= Stomp.client("ws://localhost:15674/ws")
            var headers = {
                "login": "admin",
                "passcode": "admin",
                //虚拟主机,默认“/”
                "host": "test",
                "heart-beat":"0,0"
            };
            this.client.connect(headers, this.onConnected, this.onFailed);
        },
  }

实现效果

在这里插入图片描述
后台接收到的消息
在这里插入图片描述

简单的实现了基于stomp协议实现vue实时消息推送,以及消息确认机制。最后给大家领个支付宝红包,扫一扫得红包。
在这里插入图片描述

Logo

前往低代码交流专区

更多推荐