Spring Boot封装RocketMQ普通消息发送

下面我将把RocketMQ普通消息发送功能封装为Spring Boot组件,与之前的事务消息发送形成完整解决方案。

1. 配置类增强

更新之前的RocketMQConfig类,添加普通Producer的配置:

import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConfig {
    private static final Logger log = LoggerFactory.getLogger(RocketMQConfig.class);

    @Value("${rocketmq.normal.topic:yourNormalTopic}")
    private String normalTopic;

    @Value("${rocketmq.transaction.topic:yourTransactionTopic}")
    private String transactionTopic;

    @Value("${rocketmq.endpoints:localhost:8081}")
    private String endpoints;

    @Bean
    public ClientServiceProvider clientServiceProvider() {
        return ClientServiceProvider.loadService();
    }

    @Bean
    public TransactionChecker transactionChecker() {
        return messageView -> {
            log.info("Receive transactional message check, message={}", messageView);
            return TransactionResolution.COMMIT;
        };
    }

    @Bean(destroyMethod = "close")
    public Producer transactionalProducer(ClientServiceProvider provider, TransactionChecker checker) {
        try {
            return provider.newProducerBuilder()
                    .setTopics(transactionTopic)
                    .setTransactionChecker(checker)
                    .build();
        } catch (Exception e) {
            log.error("Failed to create transactional producer", e);
            throw new RuntimeException("Failed to create transactional producer", e);
        }
    }

    @Bean(destroyMethod = "close")
    public Producer normalProducer(ClientServiceProvider provider) {
        try {
            return provider.newProducerBuilder()
                    .setTopics(normalTopic)
                    .build();
        } catch (Exception e) {
            log.error("Failed to create normal producer", e);
            throw new RuntimeException("Failed to create normal producer", e);
        }
    }
}

2. 普通消息服务类

import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.Map;

@Service
public class RocketMQNormalService {
    private static final Logger log = LoggerFactory.getLogger(RocketMQNormalService.class);

    private final ClientServiceProvider provider;
    private final Producer producer;
    private final String normalTopic;

    @Autowired
    public RocketMQNormalService(ClientServiceProvider provider, 
                               Producer normalProducer,
                               @Value("${rocketmq.normal.topic:yourNormalTopic}") String normalTopic) {
        this.provider = provider;
        this.producer = normalProducer;
        this.normalTopic = normalTopic;
    }

    /**
     * 发送普通消息
     * @param body 消息内容
     * @param tag 消息标签
     * @param keys 消息keys
     * @param properties 消息属性
     * @return 消息ID
     */
    public String sendNormalMessage(String body, String tag, String keys, Map<String, String> properties) {
        try {
            Message.Builder builder = provider.newMessageBuilder()
                    .setTopic(normalTopic)
                    .setTag(tag)
                    .setKeys(keys)
                    .setBody(body.getBytes(StandardCharsets.UTF_8));

            if (properties != null) {
                properties.forEach(builder::addProperty);
            }

            Message message = builder.build();
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send normal message successfully, messageId={}", sendReceipt.getMessageId());
            return sendReceipt.getMessageId().toString();
        } catch (Exception e) {
            log.error("Failed to send normal message", e);
            throw new RuntimeException("Failed to send normal message", e);
        }
    }

    /**
     * 发送延迟消息
     * @param body 消息内容
     * @param tag 消息标签
     * @param keys 消息keys
     * @param properties 消息属性
     * @param delaySeconds 延迟时间(秒)
     * @return 消息ID
     */
    public String sendDelayMessage(String body, String tag, String keys, 
                                 Map<String, String> properties, long delaySeconds) {
        try {
            Message.Builder builder = provider.newMessageBuilder()
                    .setTopic(normalTopic)
                    .setTag(tag)
                    .setKeys(keys)
                    .setBody(body.getBytes(StandardCharsets.UTF_8))
                    .setDelayTimestamp(System.currentTimeMillis() + delaySeconds * 1000);

            if (properties != null) {
                properties.forEach(builder::addProperty);
            }

            Message message = builder.build();
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
            return sendReceipt.getMessageId().toString();
        } catch (Exception e) {
            log.error("Failed to send delay message", e);
            throw new RuntimeException("Failed to send delay message", e);
        }
    }
}

3. 使用示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class NotificationService {
    @Autowired
    private RocketMQNormalService rocketMQNormalService;

    public void sendOrderNotification(Order order) {
        // 准备消息内容
        String messageBody = "Order notification: " + order.getId();
        String tag = "ORDER_NOTIFICATION";
        String keys = "notification-" + order.getId();
        
        // 添加自定义属性
        Map<String, String> properties = new HashMap<>();
        properties.put("orderId", order.getId());
        properties.put("userId", order.getUserId());

        // 发送普通消息
        String messageId = rocketMQNormalService.sendNormalMessage(
            messageBody, tag, keys, properties);
        
        // 可以根据messageId做后续处理
    }

    public void sendDelayNotification(Order order, long delaySeconds) {
        String messageBody = "Delay notification for order: " + order.getId();
        String tag = "DELAY_NOTIFICATION";
        String keys = "delay-" + order.getId();
        
        Map<String, String> properties = new HashMap<>();
        properties.put("orderId", order.getId());

        // 发送延迟消息
        String messageId = rocketMQNormalService.sendDelayMessage(
            messageBody, tag, keys, properties, delaySeconds);
    }
}

4. 统一消息发送接口(可选)

如果需要更统一的使用方式,可以创建一个统一的发送接口:

public interface MessageSender {
    // 发送普通消息
    String send(String topic, String body, String tag, String keys, Map<String, String> properties);
    
    // 发送延迟消息
    String sendDelay(String topic, String body, String tag, String keys, 
                    Map<String, String> properties, long delaySeconds);
    
    // 发送事务消息
    Transaction sendTransactional(String topic, String body, String tag, 
                                String keys, Map<String, String> properties);
}

然后实现这个接口:

@Service
public class RocketMQMessageSender implements MessageSender {
    @Autowired
    private RocketMQNormalService normalService;
    
    @Autowired
    private RocketMQTransactionalService transactionalService;
    
    @Value("${rocketmq.normal.topic:yourNormalTopic}")
    private String normalTopic;
    
    @Value("${rocketmq.transaction.topic:yourTransactionTopic}")
    private String transactionTopic;

    @Override
    public String send(String topic, String body, String tag, String keys, Map<String, String> properties) {
        return normalService.sendNormalMessage(body, tag, keys, properties);
    }

    @Override
    public String sendDelay(String topic, String body, String tag, String keys, 
                          Map<String, String> properties, long delaySeconds) {
        return normalService.sendDelayMessage(body, tag, keys, properties, delaySeconds);
    }

    @Override
    public Transaction sendTransactional(String topic, String body, String tag, 
                                       String keys, Map<String, String> properties) {
        return transactionalService.sendTransactionalMessage(body, tag, keys, properties);
    }
}

5. 配置参数更新

更新application.yml

rocketmq:
  endpoints: localhost:8081
  normal:
    topic: yourNormalTopic
  transaction:
    topic: yourTransactionTopic

总结

现在项目中可以通过两种方式使用RocketMQ消息发送:

  1. 直接使用特定服务类

    • 普通消息:注入RocketMQNormalService
    • 事务消息:注入RocketMQTransactionalService
  2. 通过统一接口:注入MessageSender接口

这种封装提供了以下优点:

  • 代码复用性高
  • 配置集中管理
  • 异常统一处理
  • 支持普通消息和事务消息
  • 支持延迟消息
  • 易于扩展新功能

根据项目复杂度可以选择适合的使用方式,简单项目可以直接使用服务类,复杂项目建议使用统一接口以便于维护和扩展。

Logo

数据库是今天社会发展不可缺少的重要技术,它可以把大量的信息进行有序的存储和管理,为企业的数据处理提供了强大的保障。

更多推荐