Spring Boot封装RocketMQ普通消息发送代码案例
直接使用特定服务类普通消息:注入事务消息:注入通过统一接口:注入接口代码复用性高配置集中管理异常统一处理支持普通消息和事务消息支持延迟消息易于扩展新功能根据项目复杂度可以选择适合的使用方式,简单项目可以直接使用服务类,复杂项目建议使用统一接口以便于维护和扩展。
·
Spring Boot封装RocketMQ普通消息发送
下面我将把RocketMQ普通消息发送功能封装为Spring Boot组件,与之前的事务消息发送形成完整解决方案。
1. 配置类增强
更新之前的RocketMQConfig
类,添加普通Producer的配置:
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
private static final Logger log = LoggerFactory.getLogger(RocketMQConfig.class);
@Value("${rocketmq.normal.topic:yourNormalTopic}")
private String normalTopic;
@Value("${rocketmq.transaction.topic:yourTransactionTopic}")
private String transactionTopic;
@Value("${rocketmq.endpoints:localhost:8081}")
private String endpoints;
@Bean
public ClientServiceProvider clientServiceProvider() {
return ClientServiceProvider.loadService();
}
@Bean
public TransactionChecker transactionChecker() {
return messageView -> {
log.info("Receive transactional message check, message={}", messageView);
return TransactionResolution.COMMIT;
};
}
@Bean(destroyMethod = "close")
public Producer transactionalProducer(ClientServiceProvider provider, TransactionChecker checker) {
try {
return provider.newProducerBuilder()
.setTopics(transactionTopic)
.setTransactionChecker(checker)
.build();
} catch (Exception e) {
log.error("Failed to create transactional producer", e);
throw new RuntimeException("Failed to create transactional producer", e);
}
}
@Bean(destroyMethod = "close")
public Producer normalProducer(ClientServiceProvider provider) {
try {
return provider.newProducerBuilder()
.setTopics(normalTopic)
.build();
} catch (Exception e) {
log.error("Failed to create normal producer", e);
throw new RuntimeException("Failed to create normal producer", e);
}
}
}
2. 普通消息服务类
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Service
public class RocketMQNormalService {
private static final Logger log = LoggerFactory.getLogger(RocketMQNormalService.class);
private final ClientServiceProvider provider;
private final Producer producer;
private final String normalTopic;
@Autowired
public RocketMQNormalService(ClientServiceProvider provider,
Producer normalProducer,
@Value("${rocketmq.normal.topic:yourNormalTopic}") String normalTopic) {
this.provider = provider;
this.producer = normalProducer;
this.normalTopic = normalTopic;
}
/**
* 发送普通消息
* @param body 消息内容
* @param tag 消息标签
* @param keys 消息keys
* @param properties 消息属性
* @return 消息ID
*/
public String sendNormalMessage(String body, String tag, String keys, Map<String, String> properties) {
try {
Message.Builder builder = provider.newMessageBuilder()
.setTopic(normalTopic)
.setTag(tag)
.setKeys(keys)
.setBody(body.getBytes(StandardCharsets.UTF_8));
if (properties != null) {
properties.forEach(builder::addProperty);
}
Message message = builder.build();
SendReceipt sendReceipt = producer.send(message);
log.info("Send normal message successfully, messageId={}", sendReceipt.getMessageId());
return sendReceipt.getMessageId().toString();
} catch (Exception e) {
log.error("Failed to send normal message", e);
throw new RuntimeException("Failed to send normal message", e);
}
}
/**
* 发送延迟消息
* @param body 消息内容
* @param tag 消息标签
* @param keys 消息keys
* @param properties 消息属性
* @param delaySeconds 延迟时间(秒)
* @return 消息ID
*/
public String sendDelayMessage(String body, String tag, String keys,
Map<String, String> properties, long delaySeconds) {
try {
Message.Builder builder = provider.newMessageBuilder()
.setTopic(normalTopic)
.setTag(tag)
.setKeys(keys)
.setBody(body.getBytes(StandardCharsets.UTF_8))
.setDelayTimestamp(System.currentTimeMillis() + delaySeconds * 1000);
if (properties != null) {
properties.forEach(builder::addProperty);
}
Message message = builder.build();
SendReceipt sendReceipt = producer.send(message);
log.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
return sendReceipt.getMessageId().toString();
} catch (Exception e) {
log.error("Failed to send delay message", e);
throw new RuntimeException("Failed to send delay message", e);
}
}
}
3. 使用示例
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class NotificationService {
@Autowired
private RocketMQNormalService rocketMQNormalService;
public void sendOrderNotification(Order order) {
// 准备消息内容
String messageBody = "Order notification: " + order.getId();
String tag = "ORDER_NOTIFICATION";
String keys = "notification-" + order.getId();
// 添加自定义属性
Map<String, String> properties = new HashMap<>();
properties.put("orderId", order.getId());
properties.put("userId", order.getUserId());
// 发送普通消息
String messageId = rocketMQNormalService.sendNormalMessage(
messageBody, tag, keys, properties);
// 可以根据messageId做后续处理
}
public void sendDelayNotification(Order order, long delaySeconds) {
String messageBody = "Delay notification for order: " + order.getId();
String tag = "DELAY_NOTIFICATION";
String keys = "delay-" + order.getId();
Map<String, String> properties = new HashMap<>();
properties.put("orderId", order.getId());
// 发送延迟消息
String messageId = rocketMQNormalService.sendDelayMessage(
messageBody, tag, keys, properties, delaySeconds);
}
}
4. 统一消息发送接口(可选)
如果需要更统一的使用方式,可以创建一个统一的发送接口:
public interface MessageSender {
// 发送普通消息
String send(String topic, String body, String tag, String keys, Map<String, String> properties);
// 发送延迟消息
String sendDelay(String topic, String body, String tag, String keys,
Map<String, String> properties, long delaySeconds);
// 发送事务消息
Transaction sendTransactional(String topic, String body, String tag,
String keys, Map<String, String> properties);
}
然后实现这个接口:
@Service
public class RocketMQMessageSender implements MessageSender {
@Autowired
private RocketMQNormalService normalService;
@Autowired
private RocketMQTransactionalService transactionalService;
@Value("${rocketmq.normal.topic:yourNormalTopic}")
private String normalTopic;
@Value("${rocketmq.transaction.topic:yourTransactionTopic}")
private String transactionTopic;
@Override
public String send(String topic, String body, String tag, String keys, Map<String, String> properties) {
return normalService.sendNormalMessage(body, tag, keys, properties);
}
@Override
public String sendDelay(String topic, String body, String tag, String keys,
Map<String, String> properties, long delaySeconds) {
return normalService.sendDelayMessage(body, tag, keys, properties, delaySeconds);
}
@Override
public Transaction sendTransactional(String topic, String body, String tag,
String keys, Map<String, String> properties) {
return transactionalService.sendTransactionalMessage(body, tag, keys, properties);
}
}
5. 配置参数更新
更新application.yml
:
rocketmq:
endpoints: localhost:8081
normal:
topic: yourNormalTopic
transaction:
topic: yourTransactionTopic
总结
现在项目中可以通过两种方式使用RocketMQ消息发送:
-
直接使用特定服务类:
- 普通消息:注入
RocketMQNormalService
- 事务消息:注入
RocketMQTransactionalService
- 普通消息:注入
-
通过统一接口:注入
MessageSender
接口
这种封装提供了以下优点:
- 代码复用性高
- 配置集中管理
- 异常统一处理
- 支持普通消息和事务消息
- 支持延迟消息
- 易于扩展新功能
根据项目复杂度可以选择适合的使用方式,简单项目可以直接使用服务类,复杂项目建议使用统一接口以便于维护和扩展。
更多推荐
所有评论(0)