使用RabbitMQ同步ES数据
借助rabbitMQ,当文章发布、修改、删除时发送一条消息,消息数据为文章id,搜索服务编写监听器接收消息,保存新的数据。一、文章微服务添加依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring...
·
借助rabbitMQ,当文章发布、修改、删除时发送一条消息,消息数据为文章id,搜索服务编写监听器接收消息,保存新的数据。
一、文章微服务添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置rabbitmq
指定自定义交换机:WEBLOG.ARTICLE.EXCHANGE
rabbitmq:
host: 你的rabbitmq的host
username: **
password: **
virtual-host: **
publisher-confirms: true
template:
exchange: WEBLOG.ARTICLE.EXCHANGE
三、文章Service层编写发送消息方法
/**
* 发送消息
* @param aid
*/
private void sendMsg(String type,String aid) {
try {
rabbitTemplate.convertAndSend("article."+type,aid);
}catch (AmqpException e){
e.printStackTrace();
}
}
在指定位置调用:
/**
* 发布文章
* @param map
*/
@Transactional(rollbackFor = Exception.class)
public void publish(Map<String,Object> map) throws ParseException {
...............
.............
//发送消息
sendMsg("publish",article.getAid());
}
TIP:
文章的发布、修改、软删除、强删除、点赞、评论分别对应的type为:
publish、update、black、delete、thumbup、comment
四、搜索微服务编写监听类
同上引入依赖加上配置
4.1 listener:
注:
- @Queue:绑定指定队列名
- exchange:交换机名,这里为配置文件中配置的交换机名
- key:接收的routingKey
/**
* @author MaoLin Wang
* @date 2019/10/1917:35
*/
@Component
public class ArticleListener {
@Autowired
private ArticleService articleService;
/**
* 修改或发布文章
* @param aid
* @throws InvocationTargetException
* @throws IllegalAccessException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "WEBLOG.ARTICLE.SAVE.QUEUE",durable = "true"),
exchange = @Exchange(value = "WEBLOG.ARTICLE.EXCHANGE",ignoreDeclarationExceptions ="true",type = ExchangeTypes.TOPIC),
key = {"article.publish","article.update"}
))
public void save(String aid) throws InvocationTargetException, IllegalAccessException {
if("".equals(aid)){
return;
}
articleService.save(aid);
}
/**
* 删除文章
* @param aid
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "WEBLOG.ARTICLE.DELETE.QUEUE",durable = "true"),
exchange = @Exchange(value = "WEBLOG.ARTICLE.EXCHANGE",ignoreDeclarationExceptions ="true",type = ExchangeTypes.TOPIC),
key = {"article.delete"}
))
public void delete(String aid) {
if("".equals(aid)){
return;
}
articleService.delete(aid);
}
/**
* 后台软删除文章
* @param aid
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "WEBLOG.ARTICLE.BLACK.QUEUE",durable = "true"),
exchange = @Exchange(value = "WEBLOG.ARTICLE.EXCHANGE",ignoreDeclarationExceptions ="true",type = ExchangeTypes.TOPIC),
key = {"article.black"}
))
public void black(String aid){
if("".equals(aid)){
return;
}
articleService.black(aid);
}
/**
* 点赞
* @param aid
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "WEBLOG.ARTICLE.THUMBUP.QUEUE",durable = "true"),
exchange = @Exchange(value = "WEBLOG.ARTICLE.EXCHANGE",ignoreDeclarationExceptions ="true",type = ExchangeTypes.TOPIC),
key = {"article.thumbup"}
))
public void thumbup(String aid){
if("".equals(aid)){
return;
}
articleService.thumbup(aid);
}
/**
* 增加评论
* @param aid
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "WEBLOG.ARTICLE.COMMENT.QUEUE",durable = "true"),
exchange = @Exchange(value = "WEBLOG.ARTICLE.EXCHANGE",ignoreDeclarationExceptions ="true",type = ExchangeTypes.TOPIC),
key = {"article.comment"}
))
public void comment(String aid) {
if("".equals(aid)){
return;
}
articleService.comment(aid);
}
}
4.2 保存文章
/**
* 发布文章、修改文章
* @param aid
* @throws InvocationTargetException
* @throws IllegalAccessException
*/
@Transactional(rollbackFor = Exception.class)
public void save(String aid) throws InvocationTargetException, IllegalAccessException {
.........相关业务.......
articleDao.save(article);
}
更多推荐
已为社区贡献1条内容
所有评论(0)