阿里的数据同步神器——Canal
数据增量同步工具 Canal
Canal介绍
Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。
原理:
- canal将自己伪装为MySQL的slave,向master发送dump协议
- master收到dump协议,数据发生修改后推送binary log给canal
- canal解析binary log对象,转换为增量数据,同步到ES、Redis等
Canal 安装
-
MySQL配置
注:本案例的mysql在windows上,linux环境的配置没有太大区别
首先要让mysql开启binlog模式
1) 进入mysql查看是否启动binlog
SHOW VARIABLES LIKE '%log_bin%'
log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin
windows配置文件是MySQL安装目录的my.ini
linux在/etc/my.cnf
修改:
[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1
2) 创建用户
进入mysql,创建canal用户并授权
create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
-
下载和安装canal
到官网下载 https://github.com/alibaba/canal/releases
这里使用的是1.1.4版本
上传文件到Linux,解压到canal目录中
cd /usr/local mkdir canal tar -vxf canal.deployer-1.1.4.tar.gz -C canal
-
配置Canal
进入mysql,输入命令,记录文件名和位置
show master status;
进入canal目录,修改配置文件
vi conf/example/instance.properties
-
启动Canal
进入bin目录启动服务
./startup.sh
关闭服务使用 stop.sh
查看启动日志文件
cat /usr/local/canal/logs/canal/canal.log cat /usr/local/canal/logs/example/example.log
以上效果表示已经运行,如果出现异常可以按日志情况解决
主要问题总结:
- 异常信息 authentication error,数据库账号和密码配置错误
- 异常信息 can’t find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
- 客户端版本兼容问题,canal的版本和客户端的版本要一致
Canal 客户端
官方客户端
1) 引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
2) Java代码
package com.blb.canal_demo;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 客户端测试
*/
public class ClientTest {
public static void main(String args[]) {
// 创建canal连接对象
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.223.223",
11111), "example", "canal", "canal");
try {
//连接
connector.connect();
//订阅所有数据库和表
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1000);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
//没有数据,就休眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
//有数据就打印
printEntry(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR parse data:" + entry.toString(),e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
//判断增删改操作
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
修改了数据库中任意一张表的数据,canal客户端监听到mysql数据的修改
第三方客户端
官方客户端的代码比较繁琐,这里使用了第三方客户端采用SpringBoot整合,使用比较简单
https://github.com/chenqian56131/spring-boot-starter-canal
1) 引入依赖
首先下载该开源项目,安装到本地的maven中,在项目中就可以使用该依赖
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
2) 启动类添加注解
@EnableCanalClient
3)配置文件
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
4) 监听器
package com.blb.canal_demo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
/**
* 事件监听器
*/
@CanalEventListener
public class CanalListener {
/**
* 监听 erp数据库的customer表
*/
@ListenPoint(schema = "erp",table = "customer")
public void updateData(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
System.out.println("修改前");
//打印改变之前的数据
rowData.getBeforeColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));
System.out.println("\n修改后");
//打印改变之后的数据
rowData.getAfterColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));
}
}
Canal+RabbitMQ实现数据增量同步
实际开发过程中,我们常使用Canal配合RabbitMQ实现MySQL和其它存储系统的增量同步,下面是分布式在线教育系统中实现数据库和Elasticsearch的同步过程
步骤:
- 课程微服务对MySQL中的课程数据库课程表进行增删改操作,MySQL发送binlog给Canal
- 数据同步微服务通过Canal监听器获得具体的数据,通过RabbitMQ发送给搜索微服务
- 搜索微服务监听RabbitMQ消息,对Elasticsearch课程索引进行同步更新
课程表的增删改这里就不介绍了,主要看看同步服务的核心代码
- 依赖
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
- 配置文件
server.port=8701
# canal配置
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# rabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=myhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
- MQ配置
/**
* RabbitMQ的配置
*/
@Slf4j
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "edu.course.exchange";
@Bean
public Queue queueCourseSave() {
return new Queue(QUEUE_COURSE_SAVE);
}
@Bean
public Queue queueCourseRemove() {
return new Queue(QUEUE_COURSE_REMOVE);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(COURSE_EXCHANGE);
}
@Bean
public Binding bindCourseSave() {
return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
}
@Bean
public Binding bindCourseRemove() {
return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
}
}
- Canal监听器
/**
* 课程表数据同步监听器
*/
@Slf4j
@CanalEventListener
public class CourseSyncListener {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 监听课程表的修改
*/
@ListenPoint(schema = "edu_course",table = "course")
public void handleCourseChange(EventType eventType, RowData rowData){
log.info("course表操作:{}",eventType);
if(eventType == EventType.INSERT || eventType == EventType.UPDATE){
//获得修改后的数据
Map<String,String> map = new HashMap<>();
rowData.getAfterColumnsList().forEach(c -> {
map.put(c.getName(),c.getValue());
});
String json = JSON.toJSONString(map);
log.info("保存数据:{}",json);
//发送给mq,通知搜索服务进行添加
rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
RabbitMQConfig.KEY_COURSE_SAVE, json));
}else if(eventType == EventType.DELETE){
//获得删除前的id
Long[] id = new Long[1];
rowData.getBeforeColumnsList().forEach(c -> {
if("id".equals(c.getName())){
id[0] = Long.valueOf(c.getValue());
}
});
log.info("删除数据:{}",id[0]);
//发送给mq,通知搜索服务进行删除
rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
RabbitMQConfig.KEY_COURSE_REMOVE, Long.valueOf(id[0]));
}else{
log.info("不支持其它操作");
}
}
}
搜索服务的消息监听
@Slf4j
@Component
public class CourseMQListener {
public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "course.exchange";
@Autowired
ICourseService courseService;
/**
* 监听课程添加和更新操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_SAVE)})
public void receiveCourseSaveMessage(String json, Channel channel, Message message) throws IOException {
log.info("保存课程课程:{}",json);
//将消息转为课程,保存到es中
Course course = JSON.parseObject(json,Course.class);
//保存课程到ElasticSearch中
courseService.saveOrUpdate(course);
}
/**
* 监听课程删除操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_REMOVE)})
public void receiveCourseDeleteMessage(Long id) {
courseService.removeById(id);
log.info("课程删除完成:{}",id);
}
}
更多推荐
所有评论(0)