elasticsearch-数据同步
目录方案一:同步调用方案二:异步调用方案三:监听binlog利用MQ实现mysql与elasticsearch数据同步Hotel-Admin发送消息Hotel-Demo接收消息,完成对应操作在微服务情况下,负责酒店管理(对数据CRUD,操作mysql)的与负责酒店搜索(elasticsearch)的业务可能在两个不同的服务上,如何实现数据同步?方案一:同步调用缺点: 耦合较高,导致耗时较长,因为你
目录
在微服务情况下,负责酒店管理(对数据CRUD,操作mysql)的与负责酒店搜索(elasticsearch)的业务可能在两个不同的服务上,如何实现数据同步?
方案一:同步调用
缺点: 耦合较高,导致耗时较长,因为你酒店管理的服务除了数据库的操作,还有调用酒店搜索服务,然后酒店搜索服务还对elasticsearch作出操作,导致耗时较长,原本我只需要对数据库操作的,耗时一下变成了三个步骤之和;
方案二:异步调用
这里我们是通过mq进行解耦,酒店管理服务只需要管好自己的内容:写好数据库就行,写完之后服务再发一条消息给到MQ,其余操作就跟自己没有关系了,该干嘛干嘛,然后酒店搜索服务进行监听消息,更新elasticsearch,从而达到解耦效果
(21条消息) 初识MQ-01_Fairy要carry的博客-CSDN博客_mq全称
缺点:依赖于MQ的稳定性
方案三:监听binlog
这里我们利用mysql的中间件canal,当我们酒店管理服务对数据库进行CRUD时,会将变化的消息传给canal(相当于是一个监听的效果),然后我们的canal会通知给酒店搜索服务,然后搜索服务进行对应的更新;
特点:完全解除了服务之间的耦合,但是增加了数据库的压力,复杂较高
利用MQ实现mysql与elasticsearch数据同步
酒店数据修改和增加是一个消息,当hotel-admin服务完成这个操作时,会将消息传给消息队列,然后hotel-demo服务收到消息后完成对应的操作;删除也一样,只是在一个不同的消息队列中
Hotel-Admin发送消息
1.先导入mq依赖
<!--mq的依赖amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置yaml中mq信息
rabbitmq:
host: 192.168.184.129
port: 5672
username: itcast
password: 123321
virtual-host: /
3.将消息通过交换机路由给对应的消息队列进行缓存
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 新增酒店数据
* @param hotel
*/
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {
hotelService.save(hotel);
// 这里我们将数据通过交换机传给对应的消息队列,数据的话给个id就行,另外一个服务可以通过id获取数据
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id) {
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
) {
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
// @PostMapping
// public void saveHotel(@RequestBody Hotel hotel) {
// hotelService.save(hotel);
// }
/**
* 更新酒店数据
* @param hotel
*/
@PutMapping()
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
/**
* 删除酒店数据
* @param id
*/
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
//将消息通过交换机给到消息队列,然后另一个服务得到消息进行处理
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
}
4.交换机,消息队列,以及连接的声明
public class MqConstants {
//交换机
public final static String HOTEL_EXCHANGE="hotel.topic";
// 监听新增和修改的消息队列
public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
// 监听删除队列
public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
// RoutingKey:作为消息队列与交换机绑定的一个表示码
public final static String HOTEL_INSERT_KEY="hotel.insert";
public final static String HOTEL_DELETE_KEY="hotel.delete";
}
Hotel-Demo接收消息,完成对应操作
1.首先也是导入依赖,并且完成yaml中的配置,连接到mq
2.定义交换机以及消息队列,完成两者之间的绑定
/**
* @author diao 2022/5/26
*/
@Configuration
public class MqConfig {
/**
* 得到一个交换机定义
*/
@Bean
public TopicExchange topicExchange() {
//第二个是保证交换机持久化
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
/**
* 定义两个消息队列
*/
@Bean
public Queue insertQueue() {
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue() {
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
/**
* 将交换机与消息队列进行绑定
*/
@Bean
public Binding insertQueueBinding() {
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_QUEUE);
}
@Bean
public Binding deleteQueueBinding() {
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_QUEUE);
}
}
package cn.itcast.hotel.constants;
/**
* @author diao 2022/5/26
*/
public class MqConstants {
//交换机
public final static String HOTEL_EXCHANGE="hotel.topic";
// 监听新增和修改的消息队列
public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
// 监听删除队列
public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
// RoutingKey:作为消息队列与交换机绑定的一个表示码
public final static String HOTEL_INSERT_KEY="hotel.insert";
public final static String HOTEL_DELETE_KEY="hotel.delete";
}
3.对消息队列进行监听,并且完成对应操作
/**
* @author diao 2022/5/26
*/
//监听hotel-admin酒店服务端给的消息
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听insertQueue,得到消息中的id进行es中的操作
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id) throws IOException {
hotelService.insertById(id);
}
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id) throws IOException {
hotelService.deleteById(id);
}
}
4.业务中对数据插入以及数据删除的编写
/**
* 完成插入数据操作(服务监听消息队列,然后服务完成操作)
*
* @param id:消息中的id
*/
@Override
public void insertById(Long id) throws IOException {
//0.根据id查询酒店数据
Hotel hotel = getById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
//1.准备Request
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//2.准备DSL插入文档数据,将查询到的数据转为json
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//3.发送请求
client.index(request,RequestOptions.DEFAULT);
}
@Override
public void deleteById(Long id) throws IOException {
//1.准备Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//2.发送请求
client.delete(request,RequestOptions.DEFAULT);
}
更多推荐
所有评论(0)