需求来源

 跟踪表对比前后修改数据变化

方案

  1. AOP、拦截器

    优点: 实现简单

    缺点:

    1. 代码入侵性强 每加入一个模块就需要处理切面添加切入点,模块配置也需要增加。
    2. 项目依赖麻烦,档案模块需要依赖对应模块api包进行序列化实现对比,业务模块需要实现查询接口。
    3. 模块适配插拔式实现困难
    4. sql解析困难复杂容易出错(拦截mapper层解析update语句)
  2. 日志采集方案

    优点:

    1. 只需要处理数据变化对比,可以直接集成三方插件如es查询,无代码入侵。后续扩展强,无论任何任何模块需要加入类似功能只需要中间件配置,添加log监听即可。

    缺点:

    1. 需要部署其他服务,增加运维成本,依赖于三方服务可能存在数据丢失

canal部署

环境

这里使用的是mac os 搭配docker部署,几乎和linux环境差不多

canal + rocketMQ + ES 实现

基础同步技术介绍

canal

准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

快速启动参考:QuickStart

其中主要注意配置匹配数据库、表规则配置,可以选择匹配部分表做记录。 也可以通过配置不同的topic消费做消费实现不同的业务。参考文档:配置 主要关注Topic配置匹配规则

RoketMQ 搭建

docker 使用 foxiswho/rocketmq 镜像
参考博客

docker run -d -p 10911:10911 -p 10909:10909\
 --name rmqbroker --link rmqserver:namesrv\
 -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\
 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
 foxiswho/rocketmq:broker-4.5.1

使用上面docker命令拉取镜像,快速搭建

搭建canal

docker搭建官方文档

下载docker启动文件

linux命令

wget  https://github.com/alibaba/canal/blob/master/docker/run.sh

mac 命令

curl -0 https://github.com/alibaba/canal/blob/master/docker/run.sh

配置canal配置项

参考配置

主要注意配置

#  按需修改成自己的数据库信息
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# mq config 这个配置使用动态topic就可以不配置,选其一
canal.mq.topic=example
# 针对库名或者表名发送动态topic,参考配置
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
sh run.sh -e canal.auto.scan=false \
		  -e canal.destinations=test \
		  -e canal.instance.master.address=172.17.0.7:3306  \
		  -e canal.instance.dbUsername=admin  \
		  -e canal.instance.dbPassword=123456  \
		  -e canal.instance.defaultDatabaseName=mytest  \
		  -e canal.instance.connectionCharset=UTF-8 \
		  -e canal.instance.tsdb.enable=true \
		  -e canal.instance.gtidon=false  \
		  -e canal.mq.dynamicTopic=mytest:mytest.sg_pop_base_info  \
		  -e canal.mq.partition=0 \
		  -e canal.serverMode=rocketmq \
		  -e canal.mq.servers=172.17.0.10:9876 \
		  -e canal.mq.retries=0 \
		  -e canal.mq.batchSize=16384 \
		  -e canal.mq.maxRequestSize=1048576 \
		  -e canal.mq.lingerMs=200 \
		  -e canal.mq.bufferMemory=33554432 \
		  -e canal.mq.canalBatchSize=50  \
		  -e canal.mq.canalGetTimeout=100 \
		  -e canal.mq.flatMessage=true \
		  -e canal.mq.compressionType=none \
		  -e canal.mq.acks=all \
		  -e canal.mq.transaction=false 

mysql、mq如果使用的容器,注意添加link配置 或者 通过命令docker inspect [容器名] 查看IPAddress配置到配置中

docker查看启动成功

#运行 docker logs 容器名称
docker logs canal-server

# 如下表示成功
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key:                              [  OK  ]
Starting sshd:                                             [  OK  ]
Starting crond:                                            [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

有兴趣的可以看一下run.sh这个文件,搞过服务器的肯定看得懂,就是各种变量然后组装docker 的run命令

查看MQ消息

修改监控表数据即可查看mq中消息,如果修改了数据没有消息,查看canal-server容器中日志报错

本地mq客户端查看消息:http://localhost:8180/#/message

实现技术通过canal获取日志拿到对比数据放入MQ,MQ消息数据如下:

新增消息

{
    "data": [
        {
            "id": "111",
            "name": "ad ",
            "create_date": "2020-04-15 14:31:03",
            "update_date": "2020-04-15 14:31:08",
            "create_user": "asdf ",
            "update_user": "sadf ",
            "is_deleted": "0"
        }
    ],
    "database": "grid_comm_center",
    "es": 1586932274000,
    "id": 66,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1586932275053,
    "type": "INSERT"
}

修改消息

{
    "data": [
        {
            "id": "111",
            "name": "ad asdf ",
            "create_date": "2020-04-15 14:31:03",
            "update_date": "2020-04-15 14:31:54",
            "create_user": "asdf adf ",
            "update_user": "sadf asdf",
            "is_deleted": "0"
        }
    ],
    "database": "grid_comm_center",
    "es": 1586932314000,
    "id": 67,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": [
        {
            "name": "ad ",
            "update_date": "2020-04-15 14:31:08",
            "create_user": "asdf ",
            "update_user": "sadf "
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1586932314983,
    "type": "UPDATE"
}

删除消息

{
    "data": [
        {
            "id": "1111",
            "name": "123123",
            "create_date": "2020-03-17 17:48:23",
            "update_date": "2020-03-17 17:48:26",
            "create_user": "你好",
            "update_user": "你好",
            "is_deleted": "1"
        }
    ],
    "database": "grid_comm_center",
    "es": 1587366810000,
    "id": 1049,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1587366810548,
    "type": "DELETE"
}

如上消息可以清楚拿到数据库、表详细变更,通过拿到变更数据解析插入数据库即可。

官方中还可以通过ClientAdapter 适配器入库到RDB、HBase、ES,这里根据需求做部署操作吧。目前这个里自己做档案模块进行消费写入档案数据,预计搜索可以放入es可以加快查询效率。

查看这些消息,然后做消费即可做自己想做的事情,至此已经完成了canal的基础搭建

消费消息写入ElasticSearch

参考博客

maven配置

<elasticsearch.version>7.2.0</elasticsearch.version>
<!-- Java High Level REST Client -->
<dependency>
     <groupId>org.elasticsearch</groupId>
     <artifactId>elasticsearch</artifactId>
     <version>${elasticsearch.version}</version>
 </dependency>
 <dependency>
     <groupId>org.elasticsearch.client</groupId>
     <artifactId>elasticsearch-rest-client</artifactId>
     <version>${elasticsearch.version}</version>
 </dependency>
 <dependency>
     <groupId>org.elasticsearch.client</groupId>
     <artifactId>elasticsearch-rest-high-level-client</artifactId>
     <exclusions>
         <exclusion>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
         </exclusion>
         <exclusion>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
         </exclusion>
     </exclusions>
     <version>${elasticsearch.version}</version>
 </dependency>

Spring boot封装工具类

mq使用的是公司封装的包,所以这里省略mq相关代码。直接集成mq消费相关消费做自己业务处理即可。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐