JAVA搭配RabbitMQ使用

1.RabbitMq的优点:

  1. 实时性要求比较高,延迟敏感
  2. 路由规则复杂(多条件、动态绑定)
  3. 日消息量<1000万,堆积压力小
  4. 对顺序、事务消息没有极致要求

如果使用的模块不是互联网级的百亿消息洪峰,也不需要苛刻的金融级可靠性和顺序性,那么RabbitMQ凭借其低延迟、灵活路由、轻量运维和生态成熟,依然是比RocketMQ更务实选择

一、下载安装RabbitMq

windows下载地址:

根据JAVA版本现在对应的rabbitMq安装包
案例: java8+rabbitMq3.12.14+erlang3.2.21
erlang官网: https://www.erlang.org/downloads
下载erlang:https://www.erlang.org/patches/OTP-25.3.2.21
rabbitMq官网:https://www.rabbitmq.com/release-information
下载https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.12.14
安装后启动

1. win+输入cmd,选择管理员身份打开,
cd "C:\Program Files\RabbitMQ Server\rabbitmq_server-{版本号}\sbin"
2. 启用插件:在 sbin 目录下,执行以下命令启用管理插件
rabbitmq-plugins enable rabbitmq_management

当看到类似 Enabling plugins on node rabbit@… 的输出时,表示成功。

3. 重启服务:有几种方法,推荐在刚才的命令行中继续输入:
net stop RabbitMQ && net start RabbitMQ
4. 访问后台:打开浏览器,访问 http://localhost:15672。使用默认的用户名 guest 和密码 guest 登录
5. 添加email.collection.queue
  1. 点击 Queues 标签页。
    在页面下方的 Add a new queue 区域:
    Type: 保持默认的 Classic 队列类型即可。
    Name: 输入 email.collection.queue。
    Durability: 这是关键选项!
    在开发和测试中,队列临时存在可以选 Transient。
    但在生产环境中,为避免队列因服务器重启而丢失数据,必须选择 Durable。
    其他参数可以先保持默认。
    完成设置后,点击 Add queue 按钮,你的队列就会出现在列表中了。
6. 测试

找到队列 email.collection.queue,点击队列名称进入详情页
向下滚动到 Publish message 区域
填写:
Payload:
json
{“email”:“xxx@123.com”}
Payload encoding:选 string
Properties:一般无需填写
点击 Publish message 按钮


二、JAVA中的处理

1. 启动类

@EnableAsync
@EnableRabbit
@EnableScheduling
@EnableCustomConfig
@EnableCustomSwagger2
@EnableRyFeignClients
@SpringBootApplication
public class EmailCollectionApplaction {
    public static void main(String[] args)
    {
        SpringApplication.run(EmailCollectionApplaction.class, args);
        System.out.println("(♥◠‿◠)ノ゙  邮件归集服务启动成功   ლ(´ڡ`ლ)゙");
    }
}

2. 配置文件bootstrap.yml

spring:
  mvc:
    hiddenmethod:
      filter:
        enabled: true
  application:
    name: bgn-email-collection
  profiles:
    active: dev
server:
  port: 1111
email:
  maildir: /ewomail/mail
  aeskey: 11111
  collection:
    rabbit:
      queue: email.collection.queue
  cleanup:
    enabled: true
    cron: 0 27 19 * * ?
    expire-days: 365
    base-url: http://xxx.com
    api-key: 999999

3. POM文件

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>

4.监听类EmailCollectionMqListener

package com.bgn.dc.mq;

import com.alibaba.fastjson2.JSON;
import com.bgn.common.core.utils.StringUtils;
import com.bgn.dc.email.ReadEmailService;
import com.bgn.dc.model.EmailAccount;
import com.bgn.dc.model.EmailCollectMqMessage;
import com.bgn.dc.service.ScanEmailService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class EmailCollectionMqListener {

    @Resource
    private ScanEmailService scanEmailService;

    @Resource
    private ReadEmailService readEmailService;

  @RabbitListener(queues = "${spring.rabbitmq.secondary.queue:dovecot_inbox}",
            containerFactory = "secondaryRabbitListenerContainerFactory")
    public void onMessage(String body) {
        if (StringUtils.isNullOrBlank(body)) {
            log.warn("收到空消息,忽略处理");
            return;
        }
        try {
            EmailCollectMqMessage message = JSON.parseObject(body, EmailCollectMqMessage.class);
            if (message == null) {
                log.warn("消息解析为空,body:{}", body);
                return;
            }

            if ("SCAN".equalsIgnoreCase(message.getTriggerType())) {
                scanEmailService.scan();
                return;
            }

            if (StringUtils.isNullOrBlank(message.getEmail())) {
                log.warn("消息缺少email字段,body:{}", body);
                return;
            }

            EmailAccount account = scanEmailService.getEmailAccountsByEmailNotFilterTime(message.getEmail());
            if (account == null) {
                log.warn("邮箱账号不存在或已禁用,email:{}", message.getEmail());
                return;
            }

            if (!StringUtils.isNullOrBlank(message.getMessageId())) {
                readEmailService.monitorAccountByMessageId(account, message.getMessageId());
                return;
            }

            readEmailService.monitorAccount(account);
        } catch (Exception e) {
            log.error("消费邮件归集消息异常,body:{}", body, e);
        }
    }
}

6.监听实体类EmailCollectMqMessage

package com.bgn.dc.model;

import lombok.Data;

import java.io.Serializable;

/**
 * @author 44510
 */
@Data
public class EmailCollectMqMessage implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * 邮箱账号
     */
    private String email;

    /**
     * 邮件Message-ID,可为空
     */
    private String messageId;

    /**
     * 触发类型:SCAN/ACCOUNT/MESSAGE
     */
    private String triggerType;
}

如果配置第二个RabbitMQ实例的Bean:

1. 创建RabbitMQ配置类(必须)

package com.bgn.dc.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 44510
 */
@Configuration
public class SecondaryRabbitConfig {

    @Bean("secondaryConnectionFactory")
    @ConfigurationProperties(prefix = "spring.rabbitmq.secondary")
    public CachingConnectionFactory secondaryConnectionFactory() {
        return new CachingConnectionFactory();
    }

    @Bean("secondaryRabbitTemplate")
    public RabbitTemplate secondaryRabbitTemplate(
            @Qualifier("secondaryConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean("secondaryRabbitAdmin")
    public RabbitAdmin secondaryRabbitAdmin(
            @Qualifier("secondaryConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean("secondaryRabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory secondaryRabbitListenerContainerFactory(
            @Qualifier("secondaryConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode.AUTO);
        factory.setDefaultRequeueRejected(false);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(5);
        factory.setMissingQueuesFatal(false);
        return factory;
    }
}

更多推荐