clickhouse-copier是官方的数据迁移工具,用于多个集群之间的数据迁移。

详细的配置可以参照官方文档:https://clickhouse.tech/docs/en/operations/utilities/clickhouse-copier/

clickhouse-copier配置文件

配置文件有两个,一个是zkzookeeper.xml配置,一个是迁移任务的task.xml配置。

zookeeper.xml配置文件

<yandex>
    <logger>
        <level>trace</level>
        <size>100M</size>
        <count>3</count>
    </logger>

    <zookeeper>
        <node index="1">
            <host>zookeeper1</host>
            <port>2181</port>
        </node>
        <node index="2">
            <host>zookeeper2</host>
            <port>2181</port>
        </node>
        <node index="3">
            <host>zookeeper3</host>
            <port>2181</port>
        </node>
    </zookeeper>
</yandex>

这里主要包含两部分,源端集群zk的配置信息和日志级别信息。这个配置文件是用于clickhouse-copier启动时读入的。

task.xml配置文件

<yandex>
    <remote_servers>
        <src_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse1</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
                <replica>
                    <host>clickhouse2</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse3</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
                <replica>
                    <host>clickhouse4</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
            </shard>
        </src_cluster>
        <dst_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse1_copy</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
                <replica>
                    <host>clickhouse2_copy</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse3_copy</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
                <replica>
                    <host>clickhouse4_copy</host>
                    <port>9000</port>
                    <user>user</user>
                    <password>password</password>
                </replica>
            </shard>
        </dst_cluster>
    </remote_servers>
    <!-- 最大worker数 -->
    <max_workers>4</max_workers>
    <!-- fetch (pull) data 设置为只读 -->
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>
    <!-- insert (push) data 设置为读写 -->
    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>
    
    <!-- 任务描述,可以配置多个,多个任务会顺序执行 -->
    <tables>
        <!-- 复制任务,名字自定义 -->
        <table_test_copy>
            <!-- cluster_pull要在remote_servers中有配置 -->
            <cluster_pull>src_cluster</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>test</table_pull>

            <!-- cluster_push也要在remote_servers中有配置 -->
            <cluster_push>src_cluster</cluster_push>
            <database_push>default</database_push>
            <table_push>test_copy</table_push>

            <!-- 目标集群没有表的情况下,会根据下面的配置来创建表 -->
            <engine>
            ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test_copy', '{replica}')
            PARTITION BY toYYYYMM(EventDate)
            ORDER BY (CounterId, EventDate)
            </engine>
            
            <!-- sharding_key用来控制用什么方式写入目的端集群 -->
            <sharding_key>rand()</sharding_key>
        </table_test_copy>
    </tables>
</yandex>

这个配置文件是用于开始前写入zk节点上的,如下:

bin/zkCli.sh create /clickhouse/copytasks ""
bin/zkCli.sh create /clickhouse/copytasks/test ""
bin/zkCli.sh create /clickhouse/copytasks/test/description "`cat task.xml`"

# 查看任务信息
bin/zkCli.sh get /clickhouse/copytasks/test/description

# 更新任务信息
bin/zkCli.sh set /clickhouse/copytasks/test/description "`cat task.xml`"

当能够bin/zkCli.sh get到xml配置时,说明任务配置成功。

任务执行过程

启动任务

clickhouse-copier --daemon --config zookeeper.xml --task-path /clickhouse/copytasks/test --base-dir /tmp/copylogs

任务执行

在任务启动后会先后生成一些目录来辅助完成迁移任务,目录如下:

.
├── clickhouse-copier_20201217150025_127706
│   ├── data
│   │   └── _local
│   │       ├── %2Eread_shard_0%2Edst_cluster%2Edefault%2Etest_copy
│   │       ├── %2Eread_shard_1%2Edst_cluster%2Edefault%2Etest_copy
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_0
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_1
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_2
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_3
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_4
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_5
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_6
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_7
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_8
│   │       ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_9
│   ├── log.err.log
│   ├── log.log
│   └── status
└── zookeeper.xml

1.首先任务先要生成read_shard_n的文件路径,用于读取每个shard上的指定表的数据,任务日志如下:

2020.12.22 07:52:09.263578 [ 21124 ] {} <Debug> ClusterCopier: There are 1 destination partitions in shard N1 (having a replica clickhouse1:9000, pull table default.test_copy of cluster src_cluster)

2.然后是按照表的partition_keyorderby的所有字段的cityHash64取余10的结果作为条件将数据进行拆分,任务日志如下:

2020.12.22 07:52:10.615014 [ 23148 ] {} <Debug> ClusterCopier: Checking shard N1 (having a replica clickhouse1:9000, pull table default.test of cluster src_cluster for partition 202001 piece 0 existence, executing query: SELECT 1 FROM _local.`.read_shard_0.dst_cluster.default.test_copy` WHERE (toYYYYMM(EventDate) = (202001 AS partition_key)) AND (cityHash64(`CounterId`, `EventDate`) % 10 = 0 ) LIMIT 1
.....
2020.12.22 07:52:10.619445 [ 23148 ] {} <Debug> ClusterCopier: Checking shard N1 (having a replica clickhouse1:9000, pull table default.test of cluster src_cluster for partition 202001 piece 1 existence, executing query: SELECT 1 FROM _local.`.read_shard_0.dst_cluster.default.test_copy` WHERE (toYYYYMM(EventDate) = (202001 AS partition_key)) AND (cityHash64(`CounterId`, `EventDate`) % 10 = 1 ) LIMIT 1
......

注意:这里的split都是从%2Esplit%2Edst_cluster%2Edefault%2Etest_copy中进行split的,并且是异步执行的,数据是流式的,没有数据直接落盘。

3.对应这10个piece,在目的端会有10个表与之对应,表名为test_copy_piece_n(n为0,1,2,…,9),任务日志如下:

2020.12.22 07:55:09.439708 [ 21123 ] {} <Warning> ConnectionPoolWithFailover: There is no table `default`.`test_copy_piece_0` on server: clickhouse1_copy:9000
......
2020.12.22 07:55:09.474486 [ 21117 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.dst_cluster.default.test_copy_piece_0` VALUES

从上段日志可以看出,clickhouse_copier也是用的insert into ... select的方式进行数据迁移的。

4.在任务完成后,会将test_copy_piece_n表中的数据attach到test_copy中,任务日志如下:

2020.12.22 07:51:47.136833 [ 20963 ] {} <Debug> ClusterCopier: Executing ALTER query:  ALTER TABLE default.test_copy ATTACH PARTITION 1992 FROM default.test_copy_piece_0

5.最后会将中间过程生成的test_copy_piece_n表都删除掉,任务日志如下:

2020.12.22 07:51:47.999071 [ 20963 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS default.test_copy_piece_0

至此,迁移任务执行完毕。

补充

clickhouse_copier为了能保证任务失败可以继续执行,会在zk上保留一些状态信息。

[zk: localhost:2181(CONNECTED) 0] ls /clickhouse/copytasks/test/
description    tables    task_active_workers     task_active_workers_version 

zk上除了刚刚自行创建的description,还自动生成了其余三个。

tables记录了表的同步任务,如下:

[zk: localhost:2181(CONNECTED) 0] ls /clickhouse/copytasks/test/tables/dst_cluster.default.test_copy
dst_cluster.default.test_copy 

task_active_workers记录了有哪些正在执行任务的worker

task_active_workers_version记录了任务执行到哪个阶段,如下:

[zk: localhost:2181(CONNECTED) 1] get /clickhouse/copytasks/test/task_active_workers_version
/clickhouse/copytasks/test/tables/dst_cluster.default.test_copy/1992/piece_4/shards/1
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐