SpringBoot 整合canal 实现数据同步
微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变更进行增量订阅&消费的业务。无论是canal实验需要还是为了增量备份、主从复制和恢复,都是需要开启mysql-binlog日志,数据目录设置到不同的磁盘分区可以降低io等待。官网:https://github.com/alibaba/canalcana
微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变更进行增量订阅&消费的业务。无论是canal实验需要还是为了增量备份、主从复制和恢复,都是需要开启mysql-binlog日志,数据目录设置到不同的磁盘分区可以降低io等待。
官网:https://github.com/alibaba/canal
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
canal 搭建
搭建mysql环境
1,修改配置文件
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复;
重启MySQL服务后,确认是否开启了binlog(注意一点是MySQL8.x默认开启binlog)SHOW VARIABLES LIKE '%bin%';:
2,授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant(省略第三步)
CREATE USER canal IDENTIFIED BY 'root';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ;
FLUSH PRIVILEGES;
3,新建一个用户名canal
密码为QWqw12!@
的新用户,赋予REPLICATION SLAVE
和 REPLICATION CLIENT
权限:
CREATE USER canal IDENTIFIED BY '123456!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456!@';
搭建canal环境
下载Linux最新稳定版(canal.deployer-1.1.4.tar.gz):https://github.com/alibaba/canal/releases
解压后修改/canal/conf/example下的instance.properties配置文件:
canal.instance.master.address,数据库地址,这里指定为127.0.0.1:3306。
canal.instance.dbUsername,监听的数据库用户名。
canal.instance.dbPassword,监听的数据库密码。
新增canal.instance.defaultDatabaseName,默认那个库,这里指定为test(需要在MySQL中建立一个test库)
启动
sh /canal/bin/startup.sh
# 查看服务日志
tail -100f /canal/logs/canal/canal
# 查看实例日志 -- 一般情况下,关注实例日志即可
tail -100f /canal/logs/example/example.log
到目前为止 canal的服务端我们已经搭建好了 但是到目前 我们只是把数据库的binlog 拉到canal中,我们还得编写客户端消费数据
properties配置文件
properties配置分为两部分:
- canal.properties (系统根配置文件)
- instance.properties (instance级别的配置文件,每个instance一份)
- instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)
- common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】
instance.properties介绍:
a. 在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件
比如:
canal.destinations = example1,example2 #spring客户端注意指定的不同名字
这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.
两种方式,官方提供的demo和springboot starter
1,官方提供的
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
package com.example.demo.test;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalTest {
public static void main(String[] args) throws Exception {
//canal.ip = 192.168.56.104
//canal.port = 11111
//canal.destinations = example
//canal.user =
//canal.passwd =
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.56.104", 11111), "example", "", "");
try {
connector.connect();
//监听的表, 格式为数据库.表名,数据库.表名
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 获取指定数量的数据
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(1000);
continue;
}
// System.out.println(message.getEntries());
printEntries(message.getEntries());
connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
// connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
}
}catch (Exception e){
}finally {
connector.disconnect();
}
}
private static void printEntries(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT ");
printColumns(rowData.getAfterColumnsList());
break;
case UPDATE:
System.out.println("UPDATE ");
printColumns(rowData.getAfterColumnsList());
break;
case DELETE:
System.out.println("DELETE ");
printColumns(rowData.getBeforeColumnsList());
break;
default:
break;
}
}
}
}
private static void printColumns(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
操作数据库增删改,控制台则会打印
参考:https://github.com/gxmanito/canal-client
https://gitee.com/zhiqishao/canal-client/tree/master
2,springboot starter
https://github.com/NormanGyllenhaal/canal-client
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
canal:
server: 127.0.0.1:11111 #canal 默认端口11111
destination: example
spring:
application:
name: canal-example
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test
username: root
password: root
jooq:
sql-dialect: mysql
flyway:
baseline-on-migrate: true
package com.example.demo.test;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Slf4j
@Component
@CanalTable(value = "test")
public class UserHandler implements EntryHandler<Test> {
@Override
public void insert(Test user) {
log.info("insert message {}", user);
}
@Override
public void update(Test before, Test after) {
log.info("update before {} ", before);
log.info("update after {}", after);
}
@Override
public void delete(Test user) {
log.info("delete {}", user);
}
}
package com.example.demo.test;
import lombok.Data;
import java.io.Serializable;
/**
* @Description //TODO
* @Author GaoX
* @Date 2020/6/28 14:44
*/
@Data
//@Table(name = "test")
public class Test implements Serializable {
private Integer id;
private String name;
}
更多推荐
所有评论(0)