【ClickHouse系列】clickhouse-copier是如何进行数据迁移的
clickhouse-copier是官方的数据迁移工具,用于多个集群之间的数据迁移。详细的配置可以参照官方文档:https://clickhouse.tech/docs/en/operations/utilities/clickhouse-copier/clickhouse-copier配置文件配置文件有两个,一个是zk的zookeeper.xml配置,一个是迁移任务的task.xml配置。zoo
clickhouse-copier
是官方的数据迁移工具,用于多个集群之间的数据迁移。
详细的配置可以参照官方文档:https://clickhouse.tech/docs/en/operations/utilities/clickhouse-copier/
clickhouse-copier
配置文件
配置文件有两个,一个是zk
的zookeeper.xml
配置,一个是迁移任务的task.xml
配置。
zookeeper.xml
配置文件
<yandex>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>
<zookeeper>
<node index="1">
<host>zookeeper1</host>
<port>2181</port>
</node>
<node index="2">
<host>zookeeper2</host>
<port>2181</port>
</node>
<node index="3">
<host>zookeeper3</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
这里主要包含两部分,源端集群zk的配置信息和日志级别信息。这个配置文件是用于clickhouse-copier
启动时读入的。
task.xml
配置文件
<yandex>
<remote_servers>
<src_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse1</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
<replica>
<host>clickhouse2</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse3</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
<replica>
<host>clickhouse4</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
</shard>
</src_cluster>
<dst_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse1_copy</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
<replica>
<host>clickhouse2_copy</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse3_copy</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
<replica>
<host>clickhouse4_copy</host>
<port>9000</port>
<user>user</user>
<password>password</password>
</replica>
</shard>
</dst_cluster>
</remote_servers>
<!-- 最大worker数 -->
<max_workers>4</max_workers>
<!-- fetch (pull) data 设置为只读 -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- insert (push) data 设置为读写 -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- 任务描述,可以配置多个,多个任务会顺序执行 -->
<tables>
<!-- 复制任务,名字自定义 -->
<table_test_copy>
<!-- cluster_pull要在remote_servers中有配置 -->
<cluster_pull>src_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>test</table_pull>
<!-- cluster_push也要在remote_servers中有配置 -->
<cluster_push>src_cluster</cluster_push>
<database_push>default</database_push>
<table_push>test_copy</table_push>
<!-- 目标集群没有表的情况下,会根据下面的配置来创建表 -->
<engine>
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test_copy', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterId, EventDate)
</engine>
<!-- sharding_key用来控制用什么方式写入目的端集群 -->
<sharding_key>rand()</sharding_key>
</table_test_copy>
</tables>
</yandex>
这个配置文件是用于开始前写入zk节点上的,如下:
bin/zkCli.sh create /clickhouse/copytasks ""
bin/zkCli.sh create /clickhouse/copytasks/test ""
bin/zkCli.sh create /clickhouse/copytasks/test/description "`cat task.xml`"
# 查看任务信息
bin/zkCli.sh get /clickhouse/copytasks/test/description
# 更新任务信息
bin/zkCli.sh set /clickhouse/copytasks/test/description "`cat task.xml`"
当能够bin/zkCli.sh get
到xml配置时,说明任务配置成功。
任务执行过程
启动任务
clickhouse-copier --daemon --config zookeeper.xml --task-path /clickhouse/copytasks/test --base-dir /tmp/copylogs
任务执行
在任务启动后会先后生成一些目录来辅助完成迁移任务,目录如下:
.
├── clickhouse-copier_20201217150025_127706
│ ├── data
│ │ └── _local
│ │ ├── %2Eread_shard_0%2Edst_cluster%2Edefault%2Etest_copy
│ │ ├── %2Eread_shard_1%2Edst_cluster%2Edefault%2Etest_copy
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_0
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_1
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_2
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_3
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_4
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_5
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_6
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_7
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_8
│ │ ├── %2Esplit%2Edst_cluster%2Edefault%2Etest_copy_piece_9
│ ├── log.err.log
│ ├── log.log
│ └── status
└── zookeeper.xml
1.首先任务先要生成read_shard_n的文件路径,用于读取每个shard上的指定表的数据,任务日志如下:
2020.12.22 07:52:09.263578 [ 21124 ] {} <Debug> ClusterCopier: There are 1 destination partitions in shard N1 (having a replica clickhouse1:9000, pull table default.test_copy of cluster src_cluster)
2.然后是按照表的partition_key
和orderby
的所有字段的cityHash64
取余10的结果作为条件将数据进行拆分,任务日志如下:
2020.12.22 07:52:10.615014 [ 23148 ] {} <Debug> ClusterCopier: Checking shard N1 (having a replica clickhouse1:9000, pull table default.test of cluster src_cluster for partition 202001 piece 0 existence, executing query: SELECT 1 FROM _local.`.read_shard_0.dst_cluster.default.test_copy` WHERE (toYYYYMM(EventDate) = (202001 AS partition_key)) AND (cityHash64(`CounterId`, `EventDate`) % 10 = 0 ) LIMIT 1
.....
2020.12.22 07:52:10.619445 [ 23148 ] {} <Debug> ClusterCopier: Checking shard N1 (having a replica clickhouse1:9000, pull table default.test of cluster src_cluster for partition 202001 piece 1 existence, executing query: SELECT 1 FROM _local.`.read_shard_0.dst_cluster.default.test_copy` WHERE (toYYYYMM(EventDate) = (202001 AS partition_key)) AND (cityHash64(`CounterId`, `EventDate`) % 10 = 1 ) LIMIT 1
......
注意:这里的split都是从%2Esplit%2Edst_cluster%2Edefault%2Etest_copy
中进行split的,并且是异步执行的,数据是流式的,没有数据直接落盘。
3.对应这10个piece,在目的端会有10个表与之对应,表名为test_copy_piece_n
(n为0,1,2,…,9),任务日志如下:
2020.12.22 07:55:09.439708 [ 21123 ] {} <Warning> ConnectionPoolWithFailover: There is no table `default`.`test_copy_piece_0` on server: clickhouse1_copy:9000
......
2020.12.22 07:55:09.474486 [ 21117 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.dst_cluster.default.test_copy_piece_0` VALUES
从上段日志可以看出,clickhouse_copier
也是用的insert into ... select
的方式进行数据迁移的。
4.在任务完成后,会将test_copy_piece_n
表中的数据attach到test_copy中,任务日志如下:
2020.12.22 07:51:47.136833 [ 20963 ] {} <Debug> ClusterCopier: Executing ALTER query: ALTER TABLE default.test_copy ATTACH PARTITION 1992 FROM default.test_copy_piece_0
5.最后会将中间过程生成的test_copy_piece_n
表都删除掉,任务日志如下:
2020.12.22 07:51:47.999071 [ 20963 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS default.test_copy_piece_0
至此,迁移任务执行完毕。
补充
clickhouse_copier
为了能保证任务失败可以继续执行,会在zk
上保留一些状态信息。
[zk: localhost:2181(CONNECTED) 0] ls /clickhouse/copytasks/test/
description tables task_active_workers task_active_workers_version
在zk
上除了刚刚自行创建的description
,还自动生成了其余三个。
tables
记录了表的同步任务,如下:
[zk: localhost:2181(CONNECTED) 0] ls /clickhouse/copytasks/test/tables/dst_cluster.default.test_copy
dst_cluster.default.test_copy
task_active_workers
记录了有哪些正在执行任务的worker
task_active_workers_version
记录了任务执行到哪个阶段,如下:
[zk: localhost:2181(CONNECTED) 1] get /clickhouse/copytasks/test/task_active_workers_version
/clickhouse/copytasks/test/tables/dst_cluster.default.test_copy/1992/piece_4/shards/1
更多推荐
所有评论(0)