一 副本与分片概述

副本(replica) 是指两个相同数据的表或表一部分,作用是为了数据备份与安全
分片(shard) 是指不同的服务器存储同一张表的不同部分,作用是为了水平切分表,缓解单一服务的压力.
针对于副本的需求,有两种不同的方式,后面会一一概述.

二 下载并安装zookeeper

clickhouse要实现副本与分片需要依赖于zookeeper,并且zookeeper版本要3.4.5以及以上.
ZK的下载与安装参考我另外一篇博文Zookeeper的下载与安装

三 ClickHouse配置zookeeper

安装启动好zookeeper后,我们需要在clickhouse中配置zookeeper
clickhouse两种配置zookeeper方式,一种是直接在config.xml中配置,另外一种是在外部文件中配置好了,在config.xml中进行引用.

1 内部配置方式:
vim /etc/clickhouse-server/config.xml

添加如下配置:

  <zookeeper>
    <node index="1">  #index是连接zk的顺序
        <host>node01</host> #znode地址
        <port>2181</port>   #znode端口
    </node>
    <node index="2">
        <host>node02</host>
        <port>2181</port>
    </node>
    <node index="3">
        <host>node03</host>
        <port>2181</port>
    </node>
  </zookeeper>
2 外部配置方式:

创建外部配置文件:

vim /etc/clickhouse-server/config.d/zks.xml
<?xml version="1.0"?>
<yandex>
   <zks>
     <node index="1">
            <host>node01</host>
            <port>2181</port>
     </node>
             <node index="2">
            <host>node02</host>
            <port>2181</port>
     </node>
             <node index="3">
            <host>node03</host>
            <port>2181</port>
     </node>
   </zks>
</yandex>

引入外部配置文件中的配置

vim /etc/clickhouse-server/config.xml
#文件的路径
<include_from>/etc/clickhouseserver/config.d/zks.xml</include_from>
#incl中指的是配置在外部配置文件中的标签
<zookeeper incl="zks" optional="true" />

上方是麻烦的写法,如果在zks中的标签<zks>...</zks>改为<zookeeper>..</zookeeper>,那么可以不要config.xml中的<zookeeper incl...>标签,只需要引用外部配置文件即可.

zk的配置不支持热更改,必须要重启clickhouse服务,但是在重启之前可以先使用以下sql查询:

 select * from system.zookeeper where path = '/';

会报表不存在.
重启服务后,再还执行上方sql,就可以查询到zookeeper表,说明zookeeper配置好了.

4 副本实现方式

1 复制合并树引擎实现副本

当前直接支持副本的引擎是Replicatedxxx复制合并树引擎,对于复制合并树引擎实现副本我已经写过了,所以这里只上链接
ClickHouse ReplicatedMergeTree家族引擎

2 通过分片配置实现副本

首先要配置cluster.

vim /etc/clickhouse-server/config.xml

在vim模式下输入:/remote_servers快速查找到配置cluster的标签
在这里插入图片描述
在<remote_servers>标签之中就是我们配置shard与replica的地方
先看下面配置,说明

 <remote_servers incl="clickhouse_remote_servers" >
   <my_shard>
               <shard>
                     <replica>
                            <host>node01</host>
                            <port>9000</port>
                     </replica>
                     <replica>
                            <host>node02</host>
                            <port>9000</port>
                     </replica>
              </shard>
              <shard>
                     <replica>
                            <host>node03</host>
                            <port>9000</port>
                     </replica>
                     <replica>
                            <host>node04</host>
                            <port>9000</port>
                     </replica>
              </shard>
      </my_shard>
 </remote_servers>

<my_shard>是cluster名称,任意填写,在后面通过这个名字引用如 on cluster my_shard
<shard>指分片,有几个分片就配置几个<shard>标签
<replica>指副本,有几个副本就有几个<replica>标签.
上方的配置是有两个分片,每个分片两个副本.这里理解也可以是有两个分片,每个分片一个副本.
打个比方说,一张test表基于以上配置,共有id 1,2,3,4 四条数据,那么node01中有1,2两条,node02中有1,2两条,node03中有3,4两条,node04中有3,4两条,这样子举例比较好理解.,至于到底是两个分片两个副本,还是两个分片一个副本,随意,但是本文会说是两个副本.
因为这里我们是演示副本的实现,所以就不考虑分片了,分片后面说,所以现在只创建一个分片,分片中有三个副本.
每台服务器中都增加以下配置,host换成自己的ip

<remote_servers incl="clickhouse_remote_servers" >
   <my_shard>
               <shard>
                     <replica>
                            <host>node01</host>
                            <port>9000</port>
                          <!-- 默认为default 
                          <user></user>
                          默认为空
                          <password></password> -->
                     </replica>
                     <replica>
                            <host>node02</host>
                            <port>9000</port>
                     </replica>
                     <replica>
                            <host>node03</host>
                            <port>9000</port>
                     </replica>    
              </shard>
      </my_shard>
 </remote_servers>

上面配置好了以后,cluster可以热加载,所以不需要重启服务,通过系统表可以查看cluster

select * from system.clusters where cluster='my_shard';

在这里插入图片描述
接下来的任务,需要借助于分布式表来完成,但是介绍分布式表之前,先说一下分布式ddl
分布式ddl指的是在一台服务器上执行sql,其他服务器同步执行,需要借助于cluster,如下方

create table tableName on cluster my_shard (id Int8,name String)engine=xxx ;

上面创建的一张表,除了on cluster my_shard以外就是正常的创建表语句,my_shard我们上面配置过,里面一共包含了node01,node02,node03 三个replica,那么在执行的时候,就会到my_shard配置下的所有服务器中执行create table语句.也就是这三台服务器任意一台执行以上sql,三台的表都会创建好,这就是分布式ddl,分布式ddl也需要zookeeper的支持,同样也支持drop table…
了解了分布式ddl,我们可以在任意一台机器上创建表:

create table default.replicaTest on cluster my_shard(id Int8,name String) engine =MergeTree order by id;

在这里插入图片描述

此时发现my_shard配置的node01,node02,node03上面都创建好了replicaTest表.表创建好了以后,接下来要创建分布式表,分布式表本身不存储任何数据,可以把它当成数据操作的工具,他会分发到被分布式表代理的其他表.

Distributed(cluster_name, db_name, table_name[, sharding_key[, policy_name]])

第一个是配置的cluster名称,第二个第三个分别代表代理的数据库,数据表,第四个参数是数据插入分发策略,指定一个字段名(必须是Int类型)或者rand(),第五个参数是策略名称.
下面基于上面的replicaTest创建一个分布式表:

create table replicaTest_all as replicaTest 
engine = Distributed(my_shard,default,replicaTest,id);

我在node01机器上创建了一张分布式表(当然也可以通过分布式ddl,每台机器上都创建一张分布式表)
创建好了,我们插入数据:

insert into replicaTest_all values(1,'zhang');

插入成功后,每台机器查询replicaTest表数据,发现三台机器上都有1,zhang这条数据.而查询replicaTest_all分布式表,也只查询到了1,zhang一条数据.

插入分布式表时,基于my_shard的配置,会把插入的数据全部分发到代理的每一台服务器的replicaTest表,这也就是为什么我们插入replicaTest_all结果三台机器都有这条数据的原因.而查询时,会从副本中选择一个查询(查询有策略可以选择,感兴趣可以查询load_balancing).针对于insert与select会作用于本地表,其他的操作基本都只会作用于分布式表.

通过分布式表的方式完成副本的实现,总结一下,1 配置文件配置cluster 2 cluster中配置的服务器上创建表 3 通过分布式表代理本地表,实现数据插入后分发到本地表.
这种数据副本的实现可以是任何引擎,但是对于某台Clickhouse的压力比较大(要负责向所有分片副本的发送),而第一种的副本实现必须是Repicated*合并树引擎,压力小于分布式表的方式(可以通过配置,只负责分片的发送).

通过分布式表的方式实现数据副本时,写入我们配置的分布式表时,分布式表会往shard下的所有replica都写入一份数据.如果我们这里的replicaTest表改成ReplicatedMergeTree引擎,那么我们就只需要分布式表选择一个replica写入即可,由ReplicatedMergeTree完成数据同步,而不需要每个replica都要写入,这项配置需要添加 <internal_replication>true<internal_replication>

 <my_shard>
               <shard>
                     <internal_replication>true</internal_replication>
                     ......
              </shard>
 </my_shard>

添加以上配置之后,每个shard只会写入一个<replica>,由写入的replica负责数据同步给其他<replica>

五 分片

上面我们只使用了一个分片,接下来说一下分片

<remote_servers incl="clickhouse_remote_servers" >
   <my_shard_2>
               <shard>
                     <replica>
                            <host>node01</host>
                            <port>9000</port>
                     </replica>
               </shard>
               <shard>       
                     <replica>
                            <host>node02</host>
                            <port>9000</port>
                     </replica>
               </shard>
               <shard>       
                     <replica>
                            <host>node03</host>
                            <port>9000</port>
                     </replica>    
              </shard>
      </my_shard_2>
 </remote_servers>

我们每台服务器添加以上配置,此时没有使用副本,只是三个分片.
查询分片:
在这里插入图片描述
配置好后,通过分布式ddl先创建表

create table replicaTest2 on cluster my_shard_2 (id Int,name String)
engine=MergeTree order by id;

创建分布式表:

create table replicaTest2_all as replicaTest2 
engine =Distributed(my_shard_2,default,replicaTest2,rand());

插入数据

insert into replicaTest2_all values(1,'zhang'),(2,'li'),(3,'zhao'),
(4,'qian'),(5,'sun'),(6,'wang'),(7,'tian'),(8,'he'),(9,'zheng'),(10,'dong');

查看数据:
node01:

node01.hadoop.com :) select * from replicaTest2;
┌─id─┬─name─┐
│  4 │ qian │
│ 10 │ dong │
└────┴──────┘

node02:

node02.hadoop.com :) select * from replicaTest2;
┌─id─┬─name──┐
│  1 │ zhang │
│  2 │ li    │
│  3 │ zhao  │
│  5 │ sun   │
│  9 │ zheng │
└────┴───────┘

node03:

node03.hadoop.com :) select * from replicaTest2;
┌─id─┬─name─┐
│  6 │ wang │
│  7 │ tian │
│  8 │ he   │
└────┴──────┘

由此可见数据被replicaTest2_all分布式表随机分发到了三个分片中.
分发有策略,需要做配置<weight>

           <shard>  
                 <weight>2</weight>     
                 <replica>
                        <host>node01</host>
                        <port>9000</port>
                 </replica>    
          </shard>

weight默认为1,既node01 node02 node03 都为1,那么总的权重为3,每个分片通过分布式表第四个参数sharding_key分配到数据的概率是一样.
如果我们权重分别设置为1,2,3 那么总权重是6,那么总区间就是[0,6),排在shard配置第一位的node01,权重占比为1/6,所以属于区间[0,1),排在shard配置第二位的node02,占比2/6,所以区间为[1,3),至于最后的node03就是[3,6).所以如果rand()产生的数字除以6取余落在哪个区间,数据就会分发到哪个shard,通过权重配置,可以实现数据按照想要的比重分配.

六 副本与分片

上面分别讲述了副本与分片的实现,其实最合适的方法就是利用分布式表实现分片数据的写入,而每一分片内通过Replicated*引擎实现副本数据的同步,下面实现这个:
三台服务器配置my_shard_3


    <my_shard_3>
               <shard>
                      <internal_replication>true</internal_replication>
                     <weight>1</weight>
                     <replica>
                            <host>node01</host>
                            <port>9000</port>
                     </replica>
               </shard>
               <shard>
                      <internal_replication>true</internal_replication>
                      <weight>1</weight>
                     <replica>
                            <host>node02</host>
                            <port>9000</port>
                     </replica>
                     <replica>
                            <host>node03</host>
                            <port>9000</port>
                     </replica>
              </shard>
     </my_shard_3>

此时有两个shard 第二shard有两个副本.(因为本人只有三台服务器,故这样分配)
利用分布式ddl创建ReplicatedMergeTree表

CREATE TABLE default.replicaTest3 on cluster my_shard_3
( `id` Int32,`name` String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard_name}/replicaTest3', '{replica_name}')
ORDER BY id

上方的创建表以及宏变量在上方贴的ClickHouse ReplicatedMergeTree家族引擎链接中已经讲过了,这里不再赘述,下面是我的配置的宏变量:
node01:

node01.hadoop.com :) select * from system.macros;

┌─macro────────┬─substitution─┐
│ replica_name │ 01           │
│ shard_name   │ 01           │
└──────────────┴──────────────┘

node02:

node02.hadoop.com :) select * from system.macros;

┌─macro────────┬─substitution─┐
│ replica_name │ 01           │
│ shard_name   │ 02           │
└──────────────┴──────────────┘

node03

node03.hadoop.com :) select * from system.macros;

┌─macro────────┬─substitution─┐
│ replica_name │ 02           │
│ shard_name   │ 02           │
└──────────────┴──────────────┘

创建好本地表后,创建分布式表:


CREATE TABLE default.replicaTest3_all
( `id` Int32, `name` String)
ENGINE = Distributed('my_shard_3', 'default', 'replicaTest3', id) 

插入数据:

insert into replicaTest3_all values(1,'zhangfei'),(2,'guanyu'),(3,'liubie'),(4,'zhaoyun'),
(5,'machao'),(6,'caocao'),(7,'lvbu'),(8,'zhuge'),(9,'dianwei');

查看数据:
node01:

┌─id─┬─name────┐
│  2 │ guanyu  │
│  4 │ zhaoyun │
│  6 │ caocao  │
│  8 │ zhuge   │
└────┴─────────┘

node02和node03

┌─id─┬─name─────┐
│  1 │ zhangfei │
│  3 │ liubie   │
│  5 │ machao   │
│  7 │ lvbu     │
│  9 │ dianwei  │
└────┴──────────┘

至此,最适合生产的分片与副本机制就配置好了.

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐