![cover](https://img-blog.csdnimg.cn/e9335eaa5b644fb5869ed2f84d286c50.png)
Spring Boot整合Canal,完成数据库同步操作
Spring Boot整合Canal,完成数据库同步操作
Spring Boot整合Canal,完成数据库同步操作
canal简介
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
原理:
-
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。
-
mysql master收到dump请求,开始推送binary log给slave(也就是canal)。
-
canal解析binary log对象(原始为byte流),拿到主库增删改的数据。
环境部署
本次部署以Linux为主:
首先需要安装mysql服务(过程略)。
mysql安装完成后,需要做以下操作:
- 检查binlog功能是否有开启
- 如果显示状态为OFF表示该功能未开启,开启binlog功能
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
1 row in set (0.00 sec)
1)修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf 追加内容:
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能和canal的slaveId重复( instance.properties 文件中)
2)重启 mysql:
service mysql restart
3)再次查看变量值:
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON|
+---------------+-------+
1 row in set (0.00 sec)
4)进入mysql中执行下面语句查看binlog所在位置
show master status;
发现如果File中binlog文件不为 mysql-bin.000001,需要重置mysql
reset master;
Linux下载安装Canal服务:
下载地址:
https://github.com/alibaba/canal/releases
1)下载之后,放到目录中,解压文件(本次以1.1.3为例)
mkdir /usr/local/canal
cd /usr/local/canal
canal.deployer-1.1.3.tar.gz
tar zxvf canal.deployer-1.1.3.tar.gz
2)修改配置文件
vi conf/example/instance.properties
改成自己的地址,剩下的都不要动(表结构变化配置信息可以不配置,默认注释即可):
instance.properties 监听库或表的配置:
canal.instance.filter.regex=.*\\..* (默认为所有的库所有的表)
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal\\.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
3)进入bin目录下启动
./startup.sh
springboot整合canal
pom依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version> //此处跟服务器版本保持一致
</dependency>
监听代码:
@Component
public class CanalUtil {
public void run() throws Exception {
/*
* 此处Ip地址为linux虚拟机地址
* 端口号为固定 11111
*/
CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.231", 11111), "example", "", "");
while (true) {
conn.connect();
//订阅实例中所有的数据库和表
/*
这里注意下:
canal会一直向你的example.log日志文件写入日志,
测了一下大概12小时会写入20M大小的日志。
*/
//conn.subscribe(".*\\..*"); //此处监听的配置会使instance.properties里的配置失效
conn.subscribe("shuzijianzaopingtai_zhgd_db\\..*");
// 回滚到未进行ack的地方
conn.rollback();
// 获取数据 每次获取一百条改变数据
Message message = conn.getWithoutAck(100);
//获取这条消息的id
long id = message.getId();
int size = message.getEntries().size();
if (id != -1 && size > 0) {
// 数据解析
analysis(message.getEntries());
}else {
//暂停1秒防止重复链接数据库
Thread.sleep(1000);
}
// 确认消费完成这条消息
conn.ack(message.getId());
// 关闭连接
conn.disconnect();
}
}
/**
* 数据解析
*/
private void analysis(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
// 解析binlog
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //此段代码重要,所有的数据从这里解析出来
} catch (Exception e) {
throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
if (CollUtil.isNotEmpty(rowChange.getRowDatasList())) {
//操作类型
EventType eventType = rowChange.getEventType();
// 获取当前操作所属的数据库
String dbName = entry.getHeader().getSchemaName();
// 获取当前操作所属的表
String tableName = entry.getHeader().getTableName();
// 事务提交时间
long timestamp = entry.getHeader().getExecuteTime();
System.out.println("事务提交时间:" + time(timestamp));
System.out.println("数据库:" + dbName);
System.out.println("表名:" + tableName);
System.out.println("操作类型:" + eventType);
if (CanalEntry.EventType.INSERT.equals(eventType)) {
System.out.println("这是一条新增的数据");
insertSql(entry);
} else if (CanalEntry.EventType.DELETE.equals(eventType)) {
System.out.println("这是一条删除数据:");
deleteSql(entry);
} else {
System.out.println("这是一条更新数据");
updateSql(entry);
}
System.out.println();
}
}
}
//新增的SQL
private void insertSql(Entry entry) {
//实现自己的业务逻辑
}
//删除的SQL
private void deleteSql(Entry entry) {
//实现自己的业务逻辑
}
//修改的sql
private void updateSql(Entry entry) {
//实现自己的业务逻辑
}
private String time(long timestamp){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date(timestamp);
return simpleDateFormat.format(date);
}
}
监听自启动:
@Configuration
public class InitCanalConfig implements CommandLineRunner {
@Resource
private CanalUtil canalUtil;
@Override
public void run(String... args) throws Exception {
canalUtil.run();
}
}
以上所有配置完毕,看一下效果:
修改:
新增:
删除
完毕。
遇到的错误分享
启动后提示:
unsupported version at this client.
这个错误是pom的版本跟linux版本不一致,修改成统一版本即可。
配置都没有问题,项目和canal启动也没有报错,控制台就是接收不到数据
这个情况可以尝试一下修改/conf下的canal.properties
把 ##canal.instance.parser.parallelThreadSize = 16 改成 canal.instance.parser.parallelThreadSize = 16
更多推荐
所有评论(0)