Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OWMmiASD-1645175287373)(canal.assets/image-20220218111615160.png)]

原理:

  1. canal将自己伪装为MySQL的slave,向master发送dump协议
  2. master收到dump协议,数据发生修改后推送binary log给canal
  3. canal解析binary log对象,转换为增量数据,同步到ES、Redis等

Canal 安装

  1. MySQL配置

    注:本案例的mysql在windows上,linux环境的配置没有太大区别

    首先要让mysql开启binlog模式

    1) 进入mysql查看是否启动binlog

    SHOW VARIABLES LIKE '%log_bin%'
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SY7g1LgV-1645175287374)(canal.assets/image-20220218102532501.png)]

    log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

    windows配置文件是MySQL安装目录的my.ini

    linux在/etc/my.cnf

    修改:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
    

    2) 创建用户

    进入mysql,创建canal用户并授权

    create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
  2. 下载和安装canal

    到官网下载 https://github.com/alibaba/canal/releases

    这里使用的是1.1.4版本

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2WCwP6Bs-1645175287375)(canal.assets/image-20220218101915230.png)]

    上传文件到Linux,解压到canal目录中

    cd /usr/local
    mkdir canal
    tar -vxf canal.deployer-1.1.4.tar.gz -C canal
    
  3. 配置Canal

    进入mysql,输入命令,记录文件名和位置

    show master status;
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VCLRaltm-1645175287376)(canal.assets/image-20220218103306017.png)]

    进入canal目录,修改配置文件

    vi conf/example/instance.properties
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rVNdFCXz-1645175287376)(canal.assets/image-20220218103504891.png)]

  4. 启动Canal

    进入bin目录启动服务

    ./startup.sh
    

    关闭服务使用 stop.sh

    查看启动日志文件

    cat /usr/local/canal/logs/canal/canal.log
    cat /usr/local/canal/logs/example/example.log
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dWGljKci-1645175287377)(canal.assets/image-20220218104002375.png)]

    以上效果表示已经运行,如果出现异常可以按日志情况解决

    主要问题总结:

    1. 异常信息 authentication error,数据库账号和密码配置错误
    2. 异常信息 can’t find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
    3. 客户端版本兼容问题,canal的版本和客户端的版本要一致

Canal 客户端

官方客户端

1) 引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2) Java代码

package com.blb.canal_demo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 客户端测试
 */
public class ClientTest {
    public static void main(String args[]) {
        // 创建canal连接对象
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.223.223",
                11111), "example", "canal", "canal");
        try {
            //连接
            connector.connect();
            //订阅所有数据库和表
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    //没有数据,就休眠1秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    //有数据就打印
                    printEntry(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR parse data:" + entry.toString(),e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                //判断增删改操作
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        }
    }
}

修改了数据库中任意一张表的数据,canal客户端监听到mysql数据的修改

在这里插入图片描述

在这里插入图片描述

第三方客户端

官方客户端的代码比较繁琐,这里使用了第三方客户端采用SpringBoot整合,使用比较简单

https://github.com/chenqian56131/spring-boot-starter-canal

1) 引入依赖

首先下载该开源项目,安装到本地的maven中,在项目中就可以使用该依赖

<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

2) 启动类添加注解

@EnableCanalClient

3)配置文件

canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

4) 监听器

package com.blb.canal_demo;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;

/**
 * 事件监听器
 */
@CanalEventListener
public class CanalListener {

    /**
     * 监听 erp数据库的customer表
     */
    @ListenPoint(schema = "erp",table = "customer")
    public void updateData(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        System.out.println("修改前");
        //打印改变之前的数据
        rowData.getBeforeColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));
        System.out.println("\n修改后");
        //打印改变之后的数据
        rowData.getAfterColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));
    }
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K4K1ZwqR-1645175287379)(canal.assets/image-20220218111323022.png)]

Canal+RabbitMQ实现数据增量同步

实际开发过程中,我们常使用Canal配合RabbitMQ实现MySQL和其它存储系统的增量同步,下面是分布式在线教育系统中实现数据库和Elasticsearch的同步过程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F4CRSzBg-1645175287380)(canal.assets/image-20220218164826702.png)]

步骤:

  1. 课程微服务对MySQL中的课程数据库课程表进行增删改操作,MySQL发送binlog给Canal
  2. 数据同步微服务通过Canal监听器获得具体的数据,通过RabbitMQ发送给搜索微服务
  3. 搜索微服务监听RabbitMQ消息,对Elasticsearch课程索引进行同步更新

课程表的增删改这里就不介绍了,主要看看同步服务的核心代码

  1. 依赖
<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>
  1. 配置文件
server.port=8701
# canal配置
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# rabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=myhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
  1. MQ配置
/**
 * RabbitMQ的配置
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() {
        return new Queue(QUEUE_COURSE_SAVE);
    }

    @Bean
    public Queue queueCourseRemove() {
        return new Queue(QUEUE_COURSE_REMOVE);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(COURSE_EXCHANGE);
    }

    @Bean
    public Binding bindCourseSave() {
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    }

    @Bean
    public Binding bindCourseRemove() {
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    }
}
  1. Canal监听器
/**
 * 课程表数据同步监听器
 */
@Slf4j
@CanalEventListener
public class CourseSyncListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 监听课程表的修改
     */
    @ListenPoint(schema = "edu_course",table = "course")
    public void handleCourseChange(EventType eventType, RowData rowData){
        log.info("course表操作:{}",eventType);
        if(eventType == EventType.INSERT || eventType == EventType.UPDATE){
            //获得修改后的数据
            Map<String,String> map = new HashMap<>();
            rowData.getAfterColumnsList().forEach(c -> {
                map.put(c.getName(),c.getValue());
            });
            String json = JSON.toJSONString(map);
            log.info("保存数据:{}",json);
            //发送给mq,通知搜索服务进行添加
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                RabbitMQConfig.KEY_COURSE_SAVE, json));
        }else if(eventType == EventType.DELETE){
            //获得删除前的id
            Long[] id = new Long[1];
            rowData.getBeforeColumnsList().forEach(c -> {
                if("id".equals(c.getName())){
                    id[0] = Long.valueOf(c.getValue());
                }
            });
            log.info("删除数据:{}",id[0]);
            //发送给mq,通知搜索服务进行删除
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                    RabbitMQConfig.KEY_COURSE_REMOVE, Long.valueOf(id[0]));
        }else{
            log.info("不支持其它操作");
        }
    }
}

搜索服务的消息监听

@Slf4j
@Component
public class CourseMQListener {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "course.exchange";
    
    @Autowired
    ICourseService courseService;

    /**
     * 监听课程添加和更新操作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE)})
    public void receiveCourseSaveMessage(String json, Channel channel, Message message) throws IOException {
        log.info("保存课程课程:{}",json);
        //将消息转为课程,保存到es中
        Course course = JSON.parseObject(json,Course.class);
        //保存课程到ElasticSearch中
        courseService.saveOrUpdate(course);
    }

    /**
     * 监听课程删除操作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_REMOVE)})
    public void receiveCourseDeleteMessage(Long id) {
        courseService.removeById(id);
        log.info("课程删除完成:{}",id);
    }
}

Logo

本社区面向用户介绍CSDN开发云部门内部产品使用和产品迭代功能,产品功能迭代和产品建议更透明和便捷

更多推荐