ETCD实现及原理分析
etcd的优点:高可用性:保存各个服务的部署,运行信息,若他出现故障可导致集群无法变更,业务瘫痪数据一致性:集群之间没有单点故障,多节点之间保障数据一致性低容量:通过内存树仅存储key版本数据,value数据存储在内存增删改查功能:还可以监听数据变化运维的可维护性:提供API变更节点,降低运维成本从高可用、数据一致性、增删改查功能角度,zookeeper也是满足需求的,但是zookeeper不支持
etcd的优点:
- 高可用性:保存各个服务的部署,运行信息,若他出现故障可导致集群无法变更,业务瘫痪
- 数据一致性:集群之间没有单点故障,多节点之间保障数据一致性
- 低容量:通过内存树仅存储key版本数据,value数据存储在内存
- 增删改查功能:还可以监听数据变化
- 运维的可维护性:提供API变更节点,降低运维成本
从高可用、数据一致性、增删改查功能角度,zookeeper也是满足需求的,但是zookeeper不支持通过API变更,运维成本比较高,其次zookeeper是java编写,部署繁琐,zookeer使用的是HashMap存储,而etcd使用的是内存树存储。其次etcd内部的raft算法可实现API在线、安全变更;同时etcd使用go语言编写,无依赖,部署简单。
内存树节点:
type node struct {
Path string //节点路径
Parent *node //关联父亲节点
Value string //key的value值
ExpireTime time.Time //过期时间
Children map[string]*node //此节点的孩子节点
}
etcd v2缺陷:
1、不支持范围查询和分页查询
2、不支持多key事物
3、不支持保存key历史版本数据,使用滑动窗口最大保存1000条数据,数据容易丢失
4、集群规模过大时,cpu负载高出现性能瓶颈
5、内存开销大,内部仅维护一棵树保存节点的key和value,全量更新到内存
etcd3就是为解决上述问题而诞生,其次etcd3采用了grpc API,使用protobuf定义消息,使编码性能较json大幅提升,并通过http2.0多路复用较少watcher连接数。
etcd基础架构
- client层:提供简单易用的API
- API层:client和server之间在etcdv2使用http协议,在etcdv3使用grpc协议,server之间通信使用raft算法保证数据一致性使用http协议
- raft算法层:保证etcd多个节点之间数据的一致性,是etcd的基石和亮点
- 逻辑层:kvserver模块、lease模块、auth鉴权模块
- 存储层:boltdb数据库保证用户写入数据,日志模块WAL可保证etcd crash后数据不丢失
etcdctl 读请求过程
// 指定节点访问,生产环境下会有多个节点
etcdctl get hello --endpoints http://127.0.0.1:2379
1. client执行etcdctl get name后,API解析get是KVServer的API,便使用该API访问etcd server,通过负载均衡选择一个节点,选好节点之后,client使用etcd server的KVServer模块的rpc方法把请求发给etcd server(client与server通信在etcd3是基于http2的grpc)
2.进入KVServer模块后,有两种读的方式
假设当前更新一个key的value,当leader节点收到请求会持久化到WAL日志并广播给每一个节点,当一半以上的节点持久化成功后,etcdserver异步从raft模块获取日志条目应用到boltdb(状态机),当leader节点应用成功后同时返回client告知更新成功。(注意此时除了leader节点的其他节点状态机并没有同步过来)
串行读:就是直接读取状态机,若此时部分节点未从raft同步过来,可能会读取到未更新的旧数据,不会经过图片的3、4步骤
线性读:节点C收到线性读请求后,会先从leaderA节点获取日志index(如下图步骤2),一半以上节点确认leader身份后返回返回给节点C(如下图步骤3),此时C节点等待直到状态机的索引大于等于leader的索引,去通知读请求,去访问状态机数据.
KVServer收到线性读请求后向raft模块发起readindex请求,raft模块将leader最新日志索引封装在readstate结构体返回给线性读模块,线性读模块等待本节点状态机大于或等于leader后通知KVServer模块,此时可访问状态机模块
3.进入MVCC模块
treeIndex中获取key对应的版本号,先从缓存中根据版本号查询对应的value,若缓存中不存在,再到boltdb根据版本号查询对应的value信息(每一个key对应一棵树,类似于mysql的表)
etcd写请求过程
1. client端通过负载均衡算法选择一个etcd节点发起grpc调用,
2、3. 进入Quota模块,目的是检查当前etcd db的大小加上请求的key-value大小是否超过配额,若超过配额则会产生告警,并通过raft日志同步给其他节点告知db无空间了,而且该告警会持久化存储到db中,当集群内其他节点想写入时会先检查是否存在告警,若存在无法写入,最终集群内所有节点都无法写入只能读;其次还需检查压缩配置是否开启,压缩配置主要是回收旧版本,防止内存和db一直膨胀,会将旧版本占用的空间打上free标记,后续新数据写入可复用该空间而无需申请新的空间。配额的大小默认是0代表2G大小,可以根据业务适当调优,如果填入小于0的数便会禁用配额功能导致db的性能下降。当然在提交给raft模块之前还会检查限速和提交的内容大小,然后转给raft模块。
4. raft收到KVServer的put请求,若非leader节点则会将请求转给leader节点,因为只有leader节点可以处理写请求
5. raft持久化WAL到磁盘
6. raft将日志信息添加到队列
7. etcd server从队列读取消息同步到状态机
8.写事物会到treeIndex查询key的最大版本号,并获取对应的创建版本号、修改次数等,生成+1的新版本号写入boltdb和buffer,最后将最新版本号存到treeIndex
9.数据写入bolt仅仅存储在内存中,put操作完成,但此时并未提交事务更新到磁盘,为了提高吞吐量,每隔100ms批量提交到磁盘,但这又导致从boltdb无法获取最新数据,所以采用buffer暂时保存未提交的数据
etcd数据一致性
全同步复制:leader节点受到写请求后等全部节点都复制完成后返回client
异步复制:leader节点受到写请求后返回client同时异步转发给其他节点
半同步复制:leader节点受到写请求后至少一个节点收到数据后返回给client
raft算法可以保证所有节点的日志一致性,但将日志应用到状态机的过程于raft算法无关属于server本身的数据存储逻辑。
当发生数据不一致的情况,各个节点都包含最新数据和脏数据,如果最终无法修复只能使用备份数据来恢复,因此数据备份显得特别重要,所以在任何重要变更前一定要备份数据,同时也建议定期备份数据
leader选举
leader节点要周期性给其他follower节点发送心跳,若follower节点超时(超过了lease设置的时间)未收到心跳,该follower节点变成candidate状态,发起leader选举,集群其他节点收到leader选举发现该节点的任期号大于自己,则投票支持,当获取集群中大多数节点支持后,变成leader,而之前的leader发现集群中出现任期号大于自己的自动变成follower,注意在follower节点变成candidate状态之前会先发起预投票,若获得大多数节点支持才可以进入candidate状态。
etcd数据存储
随着时间增长,你每次修改操作,版本号都会递增。每修改一次,生成一条新的数据记录。当你指定版本号读取数据时,它实际上访问的是版本号生成那个时间点的快照数据。当你删除数据的时候,它实际也是新增一条带删除标识的数据记录。版本号是int64类型,目前还etcd没有考虑超出限制的情况。
etcd 保存用户 key 与版本号映射关系的数据结构 B-tree,为什么 etcd 使用它而不使用哈希表、平衡二叉树?
从 etcd 的功能特性上分析, 因 etcd 支持范围查询,因此保存索引的数据结构也必须支持范围查询才行。所以哈希表不适合,而 B-tree 支持范围查询。
从性能上分析,平横二叉树每个节点只能容纳一个数据、导致树的高度较高,而 B-tree 每个节点可以容纳多个数据,树的高度更低,更扁平,涉及的查找次数更少,具有优越的增、删、改、查性能。
watch特性
etcd 基于 HTTP/2 协议的多路复用等机制,实现了一个 client/TCP 连接支持多 gRPC Stream, 一个 gRPC Stream 又支持多个 watcher,事件通知模式也从 client 轮询优化成 server 流式推送,极大降低了 server 端 socket、内存等资源。
创建watcher
1、当通过 etcdctl 或 API 发起一个 watch key 请求的时候,etcd 的 gRPCWatchServer 收到 watch 请求后,会创建一个 serverWatchStream, 它负责接收 client 的 gRPC Stream 的 create/cancel watcher 请求,并将从 MVCC 模块接收的 Watch 事件转发给 client。
2、当 serverWatchStream 收到 create watcher 请求后,serverWatchStream 会调用 MVCC 模块的 WatchStream 子模块分配一个 watcher id,并将 watcher 注册到 MVCC 的 WatchableKV 模块。
3、在 etcd 启动的时候,WatchableKV 模块会运行 syncWatchersLoop 和 syncVictimsLoop goroutine,分别负责不同场景下的事件推送,它们也是 Watch 特性可靠性的核心之一。
4、当创建完成 watcher 后,此时执行 put hello 修改操作时,请求经过 KVServer、Raft 模块后 Apply 到状态机时,在 MVCC 的 put 事务中,它会将本次修改的后的 mvccpb.KeyValue 保存到一个 changes 数组中。
5、在 put 事务结束时,它会将 KeyValue 转换成 Event 事件,然后notify 会匹配出监听过此 key 并处于 synced watcherGroup 中的 watcher,同时事件中的版本号要大于等于 watcher 监听的最小版本号,才能将事件发送到此 watcher 的事件 channel 中。
6、serverWatchStream 的 sendLoop goroutine 监听到 channel 消息后,读出消息立即推送给 client,至此,完成一个最新修改事件推送。
事务
MYSQL:悲观锁,认为多个并发事务可能会发生冲突,因此它要求事务必须先获得锁,才能进行修改数据操作,比如数据库读写锁
ETCD:乐观锁,认为数据不会发生冲突,但是当事务提交时,具备检测数据是否冲突的能力
ETCD事务一致性
$ etcdctl txn -i
compares: //对应If语句
value("ip") = "116.85.225.12" //判断ip账号资金是否为116.85.225.12
success requests (get, put, del): //对应Then语句
del ip "116.85.225.12"
put uuid "116.85.225.12"
failure requests (get, put, del): //对应Else语句
get ip
SUCCESS
OK
OK
持久化存储boltdb
page 按照功能可分为元数据页 (meta page)、B+ tree 索引节点页 (branch page)、B+ tree 叶子节点页 (leaf page)、空闲页管理页 (freelist page)、空闲页 (free page)
meta page 中存储的是固定的 db 元数据
freelist page空闲页管理页记录了 db 中哪些页是空闲、可使用的,当你在 boltdb 中删除大量数据的时候,其对应的 page 就会被释放,页 ID 存储到 freelist 所指向的空闲页中。当你写入数据的时候,就可直接从空闲页中申请页面使用。
leaf page可以存储 bucket 数据,也可以存储 key-value 数据
branch page 中解析出所有 key,然后二分搜索匹配 key
// 打开boltdb文件,获取db对象
db,err := bolt.Open("db", 0600, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 参数true表示创建一个写事务,false读事务
tx,err := db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// 使用事务对象创建key bucket
b,err := tx.CreatebucketIfNotExists([]byte("key"))
if err != nil {
return err
}
// 使用bucket对象更新一个key
if err := b.Put([]byte("r94"),[]byte("world")); err != nil {
return err
}
// 提交事务
if err := tx.Commit(); err != nil {
return err
}
回收旧数据
可以通过 client API 发起人工的压缩 操作,也可以配置自动压缩策略。在自动压缩策略中,可以根据你的业务场景选择合适的压缩模式。目前 etcd 支持两种压缩模式,分别是时间周期性压缩和版本号压缩。
# 查看当前版本号
etcdctl endpoint status
# 压缩指定版本
etcdctl compact 8989
压缩的核心工作原理分为两大任务
第一个任务是压缩 treeIndex 中的各 key 历史索引,清理已删除 key,并将有效的版本号保存到 map 数据结构中。
第二个任务是删除 boltdb 中的无效 key。基本原理是根据版本号遍历 boltdb 已压缩区间范围的 key,通过 treeIndex 返回的有效索引 map 数据结构判断 key 是否有效,无效则通过 boltdb API 删除它。
最后在执行压缩的操作中,虽然我们删除了 boltdb db 的 key-value 数据,但是 db 大小并不会减少。db 大小不变的原因是存放 key-value 数据的 branch 和 leaf 页,它们释放后变成了空闲页,并不会将空间释放给磁盘。
查看集群节点
etcdctl endpoint status -w json | python -m json.tool
# 同一集群下所有节点的cluster_id相同
# member_id是每个节点的标识
# revision是每个节点的版本号,全局递增,同一集群下各节点的revision差距不会过大
[
{
"Endpoint":"127.0.0.1:2379",
"Status":{
"header":{
"cluster_id":17237436991929493444,
"member_id":9372538179322589801,
"raft_term":10,
"revision":1052950
},
"leader":9372538179322589801,
"raftAppliedIndex":1098420,
"raftIndex":1098430,
"raftTerm":10,
"version":"3.3.17"
}
},
{
"Endpoint":"10.255.78.12:2379",
"Status":{
"header":{
"cluster_id":17237436991929493444,
"member_id":10501334649042878790,
"raft_term":10,
"revision":1025860
},
"leader":9372538179322589801,
"raftAppliedIndex":1098418,
"raftIndex":1098428,
"raftTerm":10,
"version":"3.3.17"
}
}
]
boltdb大小8G限制
- 启动耗时:每次etcd启动时要打开boltdb文件读取所有的key-value数据重建treeIndex树,若数据过大会导致启动缓慢
- 索引性能下降:数据过多查询缓慢
- 集群稳定性:数据过大容易出现丢包
- boltdb性能:事务提交耗时大影响性能
etcd延时
分析整个etcd处理流程发现,延时可能出现的2个地方一个是leader通过网络协议将日志同步给follower的2380端口,还有一个是leader持久化日志到磁盘依赖磁盘的I/O性能。
etcd内存
- raft日志至少保存最近5000条数据在内存
- treeIndex保存在内存
- boltdb通过map调用将文件放入内存
- watcher内存占用连接数
etcd分布式锁
分布式锁的第一核心要素是互斥性,在同一时间内,不允许多个client同时获得锁,第二个核心就是设置超时机制自动释放锁,避免出现死锁.
若多个 client 同时申请锁,则 client 通过比较各个 key 的 revision 大小,小的获得锁,确保了锁的安全性、互斥性。通过 Lease 机制确保了锁的活性,client 需定期向 etcd 服务发送"特殊心跳"汇报健康状态,若未正常发送心跳,并超过和 etcd 服务约定的最大存活时间后,就会被 etcd 服务移除此 Lease 和其关联的数据。无论 client 发生 crash 还是网络分区,都能保证不会出现死锁。通过 Watch 机制使其他 client 能快速感知到原 client 持有的锁已释放,提升了锁的可用性。最重要的是 etcd 是基于 Raft 协议实现的高可靠、强一致存储,正常情况下,不存在 Redis 主备异步复制协议导致的数据丢失问题。
Golang代码实现etcd初始化及存取数据
package etcd
import (
"context"
"fmt"
"strconv"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
endpoints = []string{"127.0.0.1:2379"}
etcdclient *clientv3.Client
)
func GetEtcdClient() *clientv3.Client {
return etcdclient
}
func InitEtcdClient(c context.Context, address []string, timeout time.Duration) (err error) {
if etcdclient, err = clientv3.New(clientv3.Config{
Context: context.Background(),
Endpoints: address,
DialTimeout: timeout}); err != nil {
if err == context.DeadlineExceeded {
fmt.Println(fmt.Sprintf("can not init db, exceeded: ", address, "timeout: ", timeout))
return err
}
fmt.Println("can not init db: ", address, "timeout: ", timeout)
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := etcdclient.Put(ctx, "init", strconv.Itoa(int(time.Now().UnixNano()))); err != nil {
logrus.Errorf("can not put db when init")
etcdclient.Close()
return err
}
if _, err := etcdclient.Delete(ctx, "init"); err != nil {
logrus.Errorf("can not Delete db when init")
etcdclient.Close()
return err
}
go func() {
<-c.Done()
etcdclient.Close()
}()
return nil
}
// 添加数据
func Save(ctx context.Context, key, value string) (*clientv3.PutResponse, error) {
kvc := clientv3.NewKV(GetEtcdClient())
resp, err := kvc.Put(ctx, key, value)
if err != nil {
fmt.Println(fmt.Sprintf("kvc.Put error: %v", err))
return nil, err
}
fmt.Println("Save to etcd success: key ", key, " value: ", value)
return resp, nil
}
// 删除数据
func Delete(ctx context.Context, key string) (err error) {
kvc := clientv3.NewKV(GetEtcdClient())
gresp, err := kvc.Get(ctx, key)
if err != nil {
fmt.Println("kvc.Get error: %v", err)
return err
}
dresp, err := kvc.Delete(ctx, key)
if err != nil {
fmt.Println("kvc.Delete error: %v", err)
return err
}
fmt.Println("Delete from etcd success: ", gresp.Kvs, " count ", dresp.Deleted)
return nil
}
// 获取一个数据
func Get(ctx context.Context, key string) (string, error) {
kvc := clientv3.NewKV(GetEtcdClient())
resp, err := kvc.Get(ctx, key)
if err != nil {
logCtx.Errorf("kvc.Get error: %v", err)
return "", err
}
if len(response.Kvs) != 1 {
return "", errors.New("key not found error")
}
return string(resp.Kvs[0].Value), nil
}
// 获取一段数据,相当于--prefix
func GetAll(ctx context.Context, key string) (map[string]string, error) {
kvc := clientv3.NewKV(GetEtcdClient())
resp, err := kvc.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
logCtx.Errorf("kvc.Get all error: %v", err)
return nil, err
}
if len(response.Kvs) == 0 {
return nil, errors.New("key not found error")
}
result := make(map[string]string, 0)
for _, kv := range resp.Kvs {
result[string(kv.Key)] = string(kv.Value)
}
return result, nil
}
func main() {
if err := InitEtcdClient(context.Background(), endpoints, time.Second*1); err != nil {
fmt.Println(fmt.Sprintf("error %+v", err))
}
Save(context.Background(), "name", "amber")
Get(context.Background(), "name")
GetAll(context.Background(), "name")
Delete(context.Background(), "name")
}
etcd写入json格式及watch
package etcd
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
type Student struct {
Name string
Age int
}
// json写入
func SaveInfo(ctx context.Context, key, stu *Student) error {
sStu, err := json.Marshal(stu)
if err != nil {
logctx.Error("marshal error")
return err
}
Save(ctx, key, sStu)
}
func Save(ctx context.Context, key, value string) (*clientv3.PutResponse, error) {
kvc := clientv3.NewKV(GetEtcdClient())
resp, err := kvc.Put(ctx, key, value)
if err != nil {
fmt.Println(fmt.Sprintf("kvc.Put error: %v", err))
return nil, err
}
fmt.Println("Save to etcd success: key ", key, " value: ", value)
return resp, nil
}
// json读取
func GetInfo(ctx context.Context, key string) error {
kvc := clientv3.NewKV(GetEtcdClient())
data, err := kvc.Get(ctx, key)
if err != nil {
fmt.Printf("kvc.Get error %v", err)
return nil, err
}
var stu Student
if err = json.Unmarshal(data, &stu); err != nil {
fmt.Printf("unmarshal json error %v", err)
}
fmt.Println("Get student info: name ", stu.Name, " age: ", stu.Age)
return nil
}
func WatchInfo(ctx context.Context, key string) error {
kvc := clientv3.NewKV(GetEtcdClient())
rch := kvc.Watch(ctx, key, clientv3.WithPrefix())
for wresp := range rch {
for _, event := range wresp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("receive put event %v", event)
case mvccpb.DELETE:
fmt.Println("key: %v, type: %v, ModRevision: %v\n", string(event.Kv.Key), event.Type, event.Kv.ModRevision)
}
}
}
func main() {
stu := &Student{
Name : "amber",
Age : 29,
}
// 初始化etcd的作用是将etcdclient赋值供后续使用
if err := InitEtcdClient(context.Background(), endpoints, time.Second*1); err != nil {
fmt.Println(fmt.Sprintf("error %+v", err))
}
SaveInfo(context.Background(), "info", stu)
GetInfo(context.Background(), "info")
}
etcd启动
/tmp/etcd --data-dir=cabin --name cabin.etcd01 --initial-advertise-peer-urls http://0.0.0.0:2380 --listen-peer-urls http://0.0.0.0:2380 --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --initial-cluster-token captainmo.cabin --initial-cluster cabin.etcd01=http://0.0.0.0:2380 --initial-cluster-state new &
–name:etcd集群中的节点名,这里可以随意,可区分且不重复就行
–data-dir:集群member信息生成的路径名,若不存在,会在当前执行该命令的路径下新建cabin文件夹
–initial-advertise-peer-urls:2380建议用于节点之间通信的url,节点间将以该值进行通信。
–listen-peer-urls:监听的用于节点之间2380通信的url,可监听多个,集群内部将通过这些url进行数据交互(如选举,数据同步等)
–listen-client-urls:监听的用于客户端通信2379的url,同样可以监听多个。
–advertise-client-urls:建议使用的客户端2379之间通信 url,该值用于 etcd 代理或 etcd 成员与 etcd 节点通信。
–initial-cluster-token: etcd-cluster-1,节点的 token 值,设置该值后集群将生成唯一 id,并为每个节点也生成唯一 id,当使用相同配置文件再启动一个集群时,只要该 token 值不一样,etcd 集群就不会相互影响。
–initial-cluster:也就是集群中所有的 initial-advertise-peer-urls 的合集。
–initial-cluster-state:new,新建集群的标志。
&代表挂在后端跑
执行完成后
netstat -nap | grep 2379 查看2379这个端口起的服务
更多推荐
所有评论(0)