Clickhouse副本与分片
一 副本与分片概述副本是指两个相同数据的表,作用是为了数据备份与安全分片是指不同的服务器存储同一张表的不同部分,作用是为了水平切分表,缓解单一服务的压力.针对于副本的需求,有两种不同的方式,后面会一一概述.二 下载并安装zookeeperclickhouse要实现副本与分片需要依赖于zookeeper,并且zookeeper版本要3.4.5以及以上.ZK的下载与安装参考我另外一篇博文Zookeep
一 副本与分片概述
副本(replica) 是指两个相同数据的表或表一部分,作用是为了数据备份与安全
分片(shard) 是指不同的服务器存储同一张表的不同部分,作用是为了水平切分表,缓解单一服务的压力.
针对于副本的需求,有两种不同的方式,后面会一一概述.
二 下载并安装zookeeper
clickhouse要实现副本与分片需要依赖于zookeeper,并且zookeeper版本要3.4.5以及以上.
ZK的下载与安装参考我另外一篇博文Zookeeper的下载与安装
三 ClickHouse配置zookeeper
安装启动好zookeeper后,我们需要在clickhouse中配置zookeeper
clickhouse两种配置zookeeper方式,一种是直接在config.xml中配置,另外一种是在外部文件中配置好了,在config.xml中进行引用.
1 内部配置方式:
vim /etc/clickhouse-server/config.xml
添加如下配置:
<zookeeper>
<node index="1"> #index是连接zk的顺序
<host>node01</host> #znode地址
<port>2181</port> #znode端口
</node>
<node index="2">
<host>node02</host>
<port>2181</port>
</node>
<node index="3">
<host>node03</host>
<port>2181</port>
</node>
</zookeeper>
2 外部配置方式:
创建外部配置文件:
vim /etc/clickhouse-server/config.d/zks.xml
<?xml version="1.0"?>
<yandex>
<zks>
<node index="1">
<host>node01</host>
<port>2181</port>
</node>
<node index="2">
<host>node02</host>
<port>2181</port>
</node>
<node index="3">
<host>node03</host>
<port>2181</port>
</node>
</zks>
</yandex>
引入外部配置文件中的配置
vim /etc/clickhouse-server/config.xml
#文件的路径
<include_from>/etc/clickhouseserver/config.d/zks.xml</include_from>
#incl中指的是配置在外部配置文件中的标签
<zookeeper incl="zks" optional="true" />
上方是麻烦的写法,如果在zks中的标签<zks>...</zks>
改为<zookeeper>..</zookeeper>
,那么可以不要config.xml中的<zookeeper incl...>
标签,只需要引用外部配置文件即可.
zk的配置不支持热更改,必须要重启clickhouse服务,但是在重启之前可以先使用以下sql查询:
select * from system.zookeeper where path = '/';
会报表不存在.
重启服务后,再还执行上方sql,就可以查询到zookeeper表,说明zookeeper配置好了.
4 副本实现方式
1 复制合并树引擎实现副本
当前直接支持副本的引擎是Replicatedxxx复制合并树引擎,对于复制合并树引擎实现副本我已经写过了,所以这里只上链接
ClickHouse ReplicatedMergeTree家族引擎
2 通过分片配置实现副本
首先要配置cluster.
vim /etc/clickhouse-server/config.xml
在vim模式下输入:/remote_servers
快速查找到配置cluster的标签
在<remote_servers>标签之中就是我们配置shard与replica的地方
先看下面配置,说明
<remote_servers incl="clickhouse_remote_servers" >
<my_shard>
<shard>
<replica>
<host>node01</host>
<port>9000</port>
</replica>
<replica>
<host>node02</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node03</host>
<port>9000</port>
</replica>
<replica>
<host>node04</host>
<port>9000</port>
</replica>
</shard>
</my_shard>
</remote_servers>
<my_shard>
是cluster名称,任意填写,在后面通过这个名字引用如 on cluster my_shard
<shard>
指分片,有几个分片就配置几个<shard>
标签
<replica>
指副本,有几个副本就有几个<replica>
标签.
上方的配置是有两个分片,每个分片两个副本.这里理解也可以是有两个分片,每个分片一个副本.
打个比方说,一张test表基于以上配置,共有id 1,2,3,4 四条数据,那么node01中有1,2两条,node02中有1,2两条,node03中有3,4两条,node04中有3,4两条,这样子举例比较好理解.,至于到底是两个分片两个副本,还是两个分片一个副本,随意,但是本文会说是两个副本.
因为这里我们是演示副本的实现,所以就不考虑分片了,分片后面说,所以现在只创建一个分片,分片中有三个副本.
每台服务器中都增加以下配置,host换成自己的ip
<remote_servers incl="clickhouse_remote_servers" >
<my_shard>
<shard>
<replica>
<host>node01</host>
<port>9000</port>
<!-- 默认为default
<user></user>
默认为空
<password></password> -->
</replica>
<replica>
<host>node02</host>
<port>9000</port>
</replica>
<replica>
<host>node03</host>
<port>9000</port>
</replica>
</shard>
</my_shard>
</remote_servers>
上面配置好了以后,cluster可以热加载,所以不需要重启服务,通过系统表可以查看cluster
select * from system.clusters where cluster='my_shard';
接下来的任务,需要借助于分布式表来完成,但是介绍分布式表之前,先说一下分布式ddl
分布式ddl指的是在一台服务器上执行sql,其他服务器同步执行,需要借助于cluster,如下方
create table tableName on cluster my_shard (id Int8,name String)engine=xxx ;
上面创建的一张表,除了on cluster my_shard
以外就是正常的创建表语句,my_shard我们上面配置过,里面一共包含了node01,node02,node03 三个replica,那么在执行的时候,就会到my_shard配置下的所有服务器中执行create table
语句.也就是这三台服务器任意一台执行以上sql,三台的表都会创建好,这就是分布式ddl,分布式ddl也需要zookeeper的支持,同样也支持drop table…
了解了分布式ddl,我们可以在任意一台机器上创建表:
create table default.replicaTest on cluster my_shard(id Int8,name String) engine =MergeTree order by id;
此时发现my_shard配置的node01,node02,node03上面都创建好了replicaTest表.表创建好了以后,接下来要创建分布式表,分布式表本身不存储任何数据,可以把它当成数据操作的工具,他会分发到被分布式表代理的其他表.
Distributed(cluster_name, db_name, table_name[, sharding_key[, policy_name]])
第一个是配置的cluster名称,第二个第三个分别代表代理的数据库,数据表,第四个参数是数据插入分发策略,指定一个字段名(必须是Int类型)或者rand(),第五个参数是策略名称.
下面基于上面的replicaTest创建一个分布式表:
create table replicaTest_all as replicaTest
engine = Distributed(my_shard,default,replicaTest,id);
我在node01机器上创建了一张分布式表(当然也可以通过分布式ddl,每台机器上都创建一张分布式表)
创建好了,我们插入数据:
insert into replicaTest_all values(1,'zhang');
插入成功后,每台机器查询replicaTest表数据,发现三台机器上都有1,zhang
这条数据.而查询replicaTest_all分布式表,也只查询到了1,zhang
一条数据.
插入分布式表时,基于my_shard的配置,会把插入的数据全部分发到代理的每一台服务器的replicaTest表,这也就是为什么我们插入replicaTest_all结果三台机器都有这条数据的原因.而查询时,会从副本中选择一个查询(查询有策略可以选择,感兴趣可以查询load_balancing
).针对于insert与select会作用于本地表,其他的操作基本都只会作用于分布式表.
通过分布式表的方式完成副本的实现,总结一下,1 配置文件配置cluster 2 cluster中配置的服务器上创建表 3 通过分布式表代理本地表,实现数据插入后分发到本地表.
这种数据副本的实现可以是任何引擎,但是对于某台Clickhouse的压力比较大(要负责向所有分片副本的发送),而第一种的副本实现必须是Repicated*合并树引擎,压力小于分布式表的方式(可以通过配置,只负责分片的发送).
通过分布式表的方式实现数据副本时,写入我们配置的分布式表时,分布式表会往shard下的所有replica都写入一份数据.如果我们这里的replicaTest表改成ReplicatedMergeTree
引擎,那么我们就只需要分布式表选择一个replica写入即可,由ReplicatedMergeTree完成数据同步,而不需要每个replica都要写入,这项配置需要添加 <internal_replication>true<internal_replication>
<my_shard>
<shard>
<internal_replication>true</internal_replication>
......
</shard>
</my_shard>
添加以上配置之后,每个shard只会写入一个<replica>
,由写入的replica负责数据同步给其他<replica>
五 分片
上面我们只使用了一个分片,接下来说一下分片
<remote_servers incl="clickhouse_remote_servers" >
<my_shard_2>
<shard>
<replica>
<host>node01</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node02</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node03</host>
<port>9000</port>
</replica>
</shard>
</my_shard_2>
</remote_servers>
我们每台服务器添加以上配置,此时没有使用副本,只是三个分片.
查询分片:
配置好后,通过分布式ddl先创建表
create table replicaTest2 on cluster my_shard_2 (id Int,name String)
engine=MergeTree order by id;
创建分布式表:
create table replicaTest2_all as replicaTest2
engine =Distributed(my_shard_2,default,replicaTest2,rand());
插入数据
insert into replicaTest2_all values(1,'zhang'),(2,'li'),(3,'zhao'),
(4,'qian'),(5,'sun'),(6,'wang'),(7,'tian'),(8,'he'),(9,'zheng'),(10,'dong');
查看数据:
node01:
node01.hadoop.com :) select * from replicaTest2;
┌─id─┬─name─┐
│ 4 │ qian │
│ 10 │ dong │
└────┴──────┘
node02:
node02.hadoop.com :) select * from replicaTest2;
┌─id─┬─name──┐
│ 1 │ zhang │
│ 2 │ li │
│ 3 │ zhao │
│ 5 │ sun │
│ 9 │ zheng │
└────┴───────┘
node03:
node03.hadoop.com :) select * from replicaTest2;
┌─id─┬─name─┐
│ 6 │ wang │
│ 7 │ tian │
│ 8 │ he │
└────┴──────┘
由此可见数据被replicaTest2_all分布式表随机分发到了三个分片中.
分发有策略,需要做配置<weight>
<shard>
<weight>2</weight>
<replica>
<host>node01</host>
<port>9000</port>
</replica>
</shard>
weight
默认为1
,既node01 node02 node03 都为1,那么总的权重为3,每个分片通过分布式表第四个参数sharding_key
分配到数据的概率是一样.
如果我们权重分别设置为1,2,3 那么总权重是6,那么总区间就是[0,6),排在shard配置第一位的node01,权重占比为1/6,所以属于区间[0,1),排在shard配置第二位的node02,占比2/6,所以区间为[1,3),至于最后的node03就是[3,6).所以如果rand()产生的数字除以6取余落在哪个区间,数据就会分发到哪个shard,通过权重配置,可以实现数据按照想要的比重分配.
六 副本与分片
上面分别讲述了副本与分片的实现,其实最合适的方法就是利用分布式表实现分片数据的写入,而每一分片内通过Replicated*引擎实现副本数据的同步,下面实现这个:
三台服务器配置my_shard_3
<my_shard_3>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>node01</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>node02</host>
<port>9000</port>
</replica>
<replica>
<host>node03</host>
<port>9000</port>
</replica>
</shard>
</my_shard_3>
此时有两个shard 第二shard有两个副本.(因为本人只有三台服务器,故这样分配)
利用分布式ddl创建ReplicatedMergeTree表
CREATE TABLE default.replicaTest3 on cluster my_shard_3
( `id` Int32,`name` String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard_name}/replicaTest3', '{replica_name}')
ORDER BY id
上方的创建表以及宏变量在上方贴的ClickHouse ReplicatedMergeTree家族引擎
链接中已经讲过了,这里不再赘述,下面是我的配置的宏变量:
node01:
node01.hadoop.com :) select * from system.macros;
┌─macro────────┬─substitution─┐
│ replica_name │ 01 │
│ shard_name │ 01 │
└──────────────┴──────────────┘
node02:
node02.hadoop.com :) select * from system.macros;
┌─macro────────┬─substitution─┐
│ replica_name │ 01 │
│ shard_name │ 02 │
└──────────────┴──────────────┘
node03
node03.hadoop.com :) select * from system.macros;
┌─macro────────┬─substitution─┐
│ replica_name │ 02 │
│ shard_name │ 02 │
└──────────────┴──────────────┘
创建好本地表后,创建分布式表:
CREATE TABLE default.replicaTest3_all
( `id` Int32, `name` String)
ENGINE = Distributed('my_shard_3', 'default', 'replicaTest3', id)
插入数据:
insert into replicaTest3_all values(1,'zhangfei'),(2,'guanyu'),(3,'liubie'),(4,'zhaoyun'),
(5,'machao'),(6,'caocao'),(7,'lvbu'),(8,'zhuge'),(9,'dianwei');
查看数据:
node01:
┌─id─┬─name────┐
│ 2 │ guanyu │
│ 4 │ zhaoyun │
│ 6 │ caocao │
│ 8 │ zhuge │
└────┴─────────┘
node02和node03
┌─id─┬─name─────┐
│ 1 │ zhangfei │
│ 3 │ liubie │
│ 5 │ machao │
│ 7 │ lvbu │
│ 9 │ dianwei │
└────┴──────────┘
至此,最适合生产的分片与副本机制就配置好了.
更多推荐
所有评论(0)