乐优商城(二十六)——RabbitMQ及数据同步
四、项目改造改造项目,实现搜索服务、商品静态页的数据同步。4.1 思路分析发送方:商品微服务什么时候发?当商品服务对商品进行写操作:增、删、改的时候、商品上下架(属于修改),需要发送一条消息,通知其它服务。发送什么内容?对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因...
四、项目改造
改造项目,实现搜索服务、商品静态页的数据同步。
4.1 思路分析
发送方:商品微服务
-
什么时候发?
当商品服务对商品进行写操作:增、删、改的时候、商品上下架(属于修改),需要发送一条消息,通知其它服务。
-
发送什么内容?
对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此只发送商品id,其它服务可以根据id查询自己需要的信息。
接收方:搜索微服务、静态页微服务
接收消息后如何处理?
-
搜索微服务:
-
增/改:添加新的数据到索引库
-
删:删除索引库数据
-
-
静态页微服务:
-
增:创建新的静态页
-
删:删除原来的静态页
-
改:创建新的静态页并删除原来的
-
4.2 商品服务发送消息
先在商品微服务leyou-item-service中实现发送消息
4.2.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2.2 配置文件
在application.yml中添加一些有关RabbitMQ的配置:
-
template:有关
AmqpTemplate
的配置-
retry:失败重试
-
enabled:开启失败重试
-
initial-interval:第一次重试的间隔时长
-
max-interval:最长重试间隔,超过这个间隔将不再重试
-
multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
-
-
exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
-
-
publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
4.2.3 改造GoodsService
在GoodsService接口中封装一个发送消息到mq的方法:
实现:
这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange
注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑
4.2.4 业务调整
新增时调用
修改时调用
删除时调用
上下架的时候调用
4.3 搜索服务接收消息
搜索服务接收到消息后要做的事情:
-
增:添加新的数据到索引库
-
删:删除索引库数据
-
改:修改索引库数据
因为索引库的新增和修改方法是合二为一的,因此可以将这两类消息一同处理,删除另外处理。
4.3.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.3.2 添加配置
这里只是接收消息而不发送,所以不用配置template相关内容。
4.3.3 编写监听器
代码:
package com.leyou.listener;
import com.leyou.service.SearchService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author: 98050
* @Time: 2018-10-21 12:57
* @Feature: mq监听器,消费者
*/
@Component
public class GoodsListener {
@Autowired
private SearchService searchService;
/**
* 处理insert和update的消息
* @param id
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "leyou.create.index.queue",durable = "true"), //队列持久化
exchange = @Exchange(
value = "leyou.item.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"item.insert","item.update"}
))
public void listenCreate(Long id) throws Exception{
if (id == null){
return;
}
//创建或更新索引
this.searchService.createIndex(id);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "leyou.delete.index.queue",durable = "true"), //队列持久化
exchange = @Exchange(
value = "leyou.item.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"item.delete"}
))
public void listenDelete(Long id){
if (id == null){
return;
}
//删除索引
this.searchService.deleteIndex(id);
}
}
4.3.4 编写创建和删除索引的方法
这里因为要创建和删除索引,需要在SearchService中拓展两个方法,创建和删除索引
接口
实现
4.4 静态页服务接收消息
商品静态页服务接收到消息后的处理:
-
增:创建新的静态页
-
删:删除原来的静态页
-
改:创建新的静态页并删除原来的
不过,编写的创建静态页的方法也具备覆盖以前页面的功能,因此:增和改的消息可以放在一个方法中处理,删除消息放在另一个方法处理。
4.4.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.4.2 添加配置
这里只是接收消息而不发送,所以不用配置template相关内容。
4.4.3 编写监听器
代码:
package com.leyou.listener;
import com.leyou.service.GoodsHtmlService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author: 98050
* @Time: 2018-10-21 14:45
* @Feature: mq监听器,消费者
*/
@Component
public class GoodsListener {
@Autowired
private GoodsHtmlService goodsHtmlService;
/**
* 处理insert和update的消息
* @param id
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "leyou.create.web.queue",durable = "true"), //队列持久化
exchange = @Exchange(
value = "leyou.item.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"item.insert","item.update"}
))
public void listenCreate(Long id) throws Exception{
if (id == null){
return;
}
//创建或更新索引
this.goodsHtmlService.createHtml(id);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "leyou.delete.web.queue",durable = "true"), //队列持久化
exchange = @Exchange(
value = "leyou.item.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"item.delete"}
))
public void listenDelete(Long id){
if (id == null){
return;
}
//删除索引
this.goodsHtmlService.deleteHtml(id);
}
}
4.4.4 添加删除页面方法
创建页面的方法已经有了,直接调用即可,添加删除页面的方法。
接口
实现
4.5 测试
4.5.1 查看RabbitMQ控制台
重新启动项目,并且登录RabbitMQ管理界面:http://192.168.56.101:15672
可以看到,交换机已经创建出来了:
队列也已经创建完毕:
查看交换机绑定的队列:
4.5.2 修改数据
4.5.3 结果
更多推荐
所有评论(0)