docker快速搭建alibaba-canal
docker快速搭建alibaba-canal需求来源方案canal部署环境基础同步技术介绍准备RoketMQ 搭建搭建canal查看MQ消息消费消息写入ElasticSearch需求来源跟踪表对比前后修改数据变化方案AOP、拦截器优点: 实现简单缺点:代码入侵性强 每加入一个模块就需要处理切面添加切入点,模块配置也需要增加。项目依赖麻烦,档案模块需要依赖对应模块api包...
docker快速搭建alibaba-canal
需求来源
跟踪表对比前后修改数据变化
方案
-
AOP、拦截器
优点: 实现简单
缺点:
- 代码入侵性强 每加入一个模块就需要处理切面添加切入点,模块配置也需要增加。
- 项目依赖麻烦,档案模块需要依赖对应模块api包进行序列化实现对比,业务模块需要实现查询接口。
- 模块适配插拔式实现困难
- sql解析困难复杂容易出错(拦截mapper层解析update语句)
-
日志采集方案
优点:
- 只需要处理数据变化对比,可以直接集成三方插件如es查询,无代码入侵。后续扩展强,无论任何任何模块需要加入类似功能只需要中间件配置,添加log监听即可。
缺点:
- 需要部署其他服务,增加运维成本,依赖于三方服务可能存在数据丢失
canal部署
环境
这里使用的是mac os 搭配docker部署,几乎和linux环境差不多
canal + rocketMQ + ES 实现
基础同步技术介绍
准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
快速启动参考:QuickStart
其中主要注意配置匹配数据库、表规则配置,可以选择匹配部分表做记录。 也可以通过配置不同的topic消费做消费实现不同的业务。参考文档:配置 主要关注Topic配置匹配规则
RoketMQ 搭建
docker 使用 foxiswho/rocketmq 镜像
参考博客
docker run -d -p 10911:10911 -p 10909:10909\
--name rmqbroker --link rmqserver:namesrv\
-e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
foxiswho/rocketmq:broker-4.5.1
使用上面docker命令拉取镜像,快速搭建
搭建canal
下载docker启动文件
linux命令
wget https://github.com/alibaba/canal/blob/master/docker/run.sh
mac 命令
curl -0 https://github.com/alibaba/canal/blob/master/docker/run.sh
配置canal配置项
主要注意配置
# 按需修改成自己的数据库信息
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# mq config 这个配置使用动态topic就可以不配置,选其一
canal.mq.topic=example
# 针对库名或者表名发送动态topic,参考配置
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=172.17.0.7:3306 \
-e canal.instance.dbUsername=admin \
-e canal.instance.dbPassword=123456 \
-e canal.instance.defaultDatabaseName=mytest \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.mq.dynamicTopic=mytest:mytest.sg_pop_base_info \
-e canal.mq.partition=0 \
-e canal.serverMode=rocketmq \
-e canal.mq.servers=172.17.0.10:9876 \
-e canal.mq.retries=0 \
-e canal.mq.batchSize=16384 \
-e canal.mq.maxRequestSize=1048576 \
-e canal.mq.lingerMs=200 \
-e canal.mq.bufferMemory=33554432 \
-e canal.mq.canalBatchSize=50 \
-e canal.mq.canalGetTimeout=100 \
-e canal.mq.flatMessage=true \
-e canal.mq.compressionType=none \
-e canal.mq.acks=all \
-e canal.mq.transaction=false
mysql、mq如果使用的容器,注意添加link配置 或者 通过命令docker inspect [容器名] 查看IPAddress配置到配置中
docker查看启动成功
#运行 docker logs 容器名称
docker logs canal-server
# 如下表示成功
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key: [ OK ]
Starting sshd: [ OK ]
Starting crond: [ OK ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...
有兴趣的可以看一下run.sh这个文件,搞过服务器的肯定看得懂,就是各种变量然后组装docker 的run命令
查看MQ消息
修改监控表数据即可查看mq中消息,如果修改了数据没有消息,查看canal-server容器中日志报错
本地mq客户端查看消息:http://localhost:8180/#/message
实现技术通过canal获取日志拿到对比数据放入MQ,MQ消息数据如下:
新增消息
{
"data": [
{
"id": "111",
"name": "ad ",
"create_date": "2020-04-15 14:31:03",
"update_date": "2020-04-15 14:31:08",
"create_user": "asdf ",
"update_user": "sadf ",
"is_deleted": "0"
}
],
"database": "grid_comm_center",
"es": 1586932274000,
"id": 66,
"isDdl": false,
"mysqlType": {
"id": "bigint(64)",
"name": "varchar(200)",
"create_date": "timestamp",
"update_date": "timestamp",
"create_user": "varchar(32)",
"update_user": "varchar(32)",
"is_deleted": "tinyint(2) unsigned"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"name": 12,
"create_date": 93,
"update_date": 93,
"create_user": 12,
"update_user": 12,
"is_deleted": -6
},
"table": "demo_id",
"ts": 1586932275053,
"type": "INSERT"
}
修改消息
{
"data": [
{
"id": "111",
"name": "ad asdf ",
"create_date": "2020-04-15 14:31:03",
"update_date": "2020-04-15 14:31:54",
"create_user": "asdf adf ",
"update_user": "sadf asdf",
"is_deleted": "0"
}
],
"database": "grid_comm_center",
"es": 1586932314000,
"id": 67,
"isDdl": false,
"mysqlType": {
"id": "bigint(64)",
"name": "varchar(200)",
"create_date": "timestamp",
"update_date": "timestamp",
"create_user": "varchar(32)",
"update_user": "varchar(32)",
"is_deleted": "tinyint(2) unsigned"
},
"old": [
{
"name": "ad ",
"update_date": "2020-04-15 14:31:08",
"create_user": "asdf ",
"update_user": "sadf "
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"name": 12,
"create_date": 93,
"update_date": 93,
"create_user": 12,
"update_user": 12,
"is_deleted": -6
},
"table": "demo_id",
"ts": 1586932314983,
"type": "UPDATE"
}
删除消息
{
"data": [
{
"id": "1111",
"name": "123123",
"create_date": "2020-03-17 17:48:23",
"update_date": "2020-03-17 17:48:26",
"create_user": "你好",
"update_user": "你好",
"is_deleted": "1"
}
],
"database": "grid_comm_center",
"es": 1587366810000,
"id": 1049,
"isDdl": false,
"mysqlType": {
"id": "bigint(64)",
"name": "varchar(200)",
"create_date": "timestamp",
"update_date": "timestamp",
"create_user": "varchar(32)",
"update_user": "varchar(32)",
"is_deleted": "tinyint(2) unsigned"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"name": 12,
"create_date": 93,
"update_date": 93,
"create_user": 12,
"update_user": 12,
"is_deleted": -6
},
"table": "demo_id",
"ts": 1587366810548,
"type": "DELETE"
}
如上消息可以清楚拿到数据库、表详细变更,通过拿到变更数据解析插入数据库即可。
官方中还可以通过ClientAdapter 适配器入库到RDB、HBase、ES,这里根据需求做部署操作吧。目前这个里自己做档案模块进行消费写入档案数据,预计搜索可以放入es可以加快查询效率。
查看这些消息,然后做消费即可做自己想做的事情,至此已经完成了canal的基础搭建
消费消息写入ElasticSearch
maven配置
<elasticsearch.version>7.2.0</elasticsearch.version>
<!-- Java High Level REST Client -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
<version>${elasticsearch.version}</version>
</dependency>
mq使用的是公司封装的包,所以这里省略mq相关代码。直接集成mq消费相关消费做自己业务处理即可。
更多推荐
所有评论(0)