Spring Boot整合Canal,完成数据库同步操作

canal简介

在这里插入图片描述
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

原理:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。

  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)。

  3. canal解析binary log对象(原始为byte流),拿到主库增删改的数据。

环境部署

本次部署以Linux为主:

首先需要安装mysql服务(过程略)。

mysql安装完成后,需要做以下操作:

  • 检查binlog功能是否有开启
  • 如果显示状态为OFF表示该功能未开启,开启binlog功能
	mysql> show variables like 'log_bin';
	+---------------+-------+
	| Variable_name | Value |
	+---------------+-------+
	| log_bin | OFF |
	+---------------+-------+
	1 row in set (0.00 sec)

1)修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf 追加内容:

log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能和canal的slaveId重复( instance.properties 文件中)

2)重启 mysql:

service mysql restart

3)再次查看变量值:

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON|
+---------------+-------+
1 row in set (0.00 sec)

4)进入mysql中执行下面语句查看binlog所在位置

show master status;

在这里插入图片描述
发现如果File中binlog文件不为 mysql-bin.000001,需要重置mysql

reset master;

在这里插入图片描述

Linux下载安装Canal服务:

下载地址:
https://github.com/alibaba/canal/releases
1)下载之后,放到目录中,解压文件(本次以1.1.3为例)

mkdir /usr/local/canal
cd /usr/local/canal
canal.deployer-1.1.3.tar.gz
tar zxvf canal.deployer-1.1.3.tar.gz

2)修改配置文件

vi conf/example/instance.properties

在这里插入图片描述
改成自己的地址,剩下的都不要动(表结构变化配置信息可以不配置,默认注释即可):
在这里插入图片描述
instance.properties 监听库或表的配置:

canal.instance.filter.regex=.*\\..*  (默认为所有的库所有的表)
mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal\\.test1

5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

3)进入bin目录下启动

./startup.sh

在这里插入图片描述

springboot整合canal

pom依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>  //此处跟服务器版本保持一致
</dependency>

监听代码:

@Component
public class CanalUtil {

    public void run() throws Exception {

        /*
         * 此处Ip地址为linux虚拟机地址
         * 端口号为固定 11111
         */
        CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.231", 11111), "example", "", "");
        while (true) {
            conn.connect();
            //订阅实例中所有的数据库和表
            /*
             这里注意下:
               canal会一直向你的example.log日志文件写入日志,
               测了一下大概12小时会写入20M大小的日志。
            */
            //conn.subscribe(".*\\..*");  //此处监听的配置会使instance.properties里的配置失效
            conn.subscribe("shuzijianzaopingtai_zhgd_db\\..*"); 

            // 回滚到未进行ack的地方
            conn.rollback();
            // 获取数据 每次获取一百条改变数据
            Message message = conn.getWithoutAck(100);
            //获取这条消息的id
            long id = message.getId();
            int size = message.getEntries().size();
            if (id != -1 && size > 0) {
                // 数据解析
                analysis(message.getEntries());
            }else {
                //暂停1秒防止重复链接数据库
                Thread.sleep(1000);
            }
            // 确认消费完成这条消息
            conn.ack(message.getId());
            // 关闭连接
            conn.disconnect();
        }
    }

    /**
     * 数据解析
     */
    private void analysis(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 解析binlog
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  //此段代码重要,所有的数据从这里解析出来
            } catch (Exception e) {
                throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
            }

            if (CollUtil.isNotEmpty(rowChange.getRowDatasList())) {
                //操作类型
                EventType eventType = rowChange.getEventType();
                // 获取当前操作所属的数据库
                String dbName = entry.getHeader().getSchemaName();
                // 获取当前操作所属的表
                String tableName = entry.getHeader().getTableName();
                // 事务提交时间
                long timestamp = entry.getHeader().getExecuteTime();

                System.out.println("事务提交时间:" + time(timestamp));
                System.out.println("数据库:" + dbName);
                System.out.println("表名:" + tableName);
                System.out.println("操作类型:" + eventType);
                if (CanalEntry.EventType.INSERT.equals(eventType)) {
                    System.out.println("这是一条新增的数据");
                    insertSql(entry);
                } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                    System.out.println("这是一条删除数据:");
                    deleteSql(entry);
                } else {
                    System.out.println("这是一条更新数据");
                    updateSql(entry);
                }
                System.out.println();
            }
        }
    }

    //新增的SQL
    private void insertSql(Entry entry) {
    //实现自己的业务逻辑
    }

    //删除的SQL
    private void deleteSql(Entry entry) {
         //实现自己的业务逻辑
    }

    //修改的sql
    private void updateSql(Entry entry) {
        //实现自己的业务逻辑
    }

    private String time(long timestamp){
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date(timestamp);
        return simpleDateFormat.format(date);
    }
}

监听自启动:

@Configuration
public class InitCanalConfig implements CommandLineRunner {

    @Resource
    private CanalUtil canalUtil;


    @Override
    public void run(String... args) throws Exception {
        canalUtil.run();
    }
}

以上所有配置完毕,看一下效果:
修改:
在这里插入图片描述在这里插入图片描述
新增:
在这里插入图片描述
在这里插入图片描述
删除
在这里插入图片描述
在这里插入图片描述
完毕。

遇到的错误分享

 启动后提示:
 unsupported version at this client.

这个错误是pom的版本跟linux版本不一致,修改成统一版本即可。

配置都没有问题,项目和canal启动也没有报错,控制台就是接收不到数据

这个情况可以尝试一下修改/conf下的canal.properties
把 ##canal.instance.parser.parallelThreadSize = 16 改成 canal.instance.parser.parallelThreadSize = 16

更多推荐