【Golang】分布式 raft 共识算法 go 实现
本篇文章为 Raft 系列文章中的第一篇,Raft 的介绍。整个系列文章描述了 Raft 分布式共识算法及其在 Go 中的完整实现。Raft 是一种相对较新的算法(2014),但是它在业界已经被大量使用。最为大家所熟知的当属 K8s,它依赖于 Raft 通过 etcd 分布式键值存储。本系列文章的目的是描述 Raft 的功能齐全且经过严格测试的实现,并捎带介绍 Raft 的工作方式。我们假设读者至
一、介绍
本篇文章描述了 Raft 分布式共识算法及其在 Go 中的完整实现。
Raft 是一种相对较新的算法(2014),但是它在业界已经被大量使用。最为大家所熟知的当属 K8s,它依赖于 Raft 通过 etcd 分布式键值存储。
本系列文章的目的是描述 Raft 的功能齐全且经过严格测试的实现,并捎带介绍 Raft 的工作方式。我们假设读者至少了解过 Raft 相关文章。
不要指望在一天内完全掌握 Raft。尽管它的设计比 Paxos 更易于理解,但 Raft 仍然相当复杂。它要解决的问题 - 分布式共识 - 是一个难题,因此解决方案的复杂性自然有一个下限。
1.1 复制状态机
分布式共识算法可以看作是解决跨多个服务器复制确定性状态机的问题。状态机一词用来表示任意服务;毕竟,状态机是计算机科学的基础之一,并且一切都可以用它们来表示。数据库,文件服务器,锁服务器等都可以被认为是复杂的状态机。
考虑一些由状态机表示服务。多个客户端可以连接到它并发出请求,并期望得到响应:
只要执行状态机的服务器是可靠的,系统就可以正常工作。如果服务器崩溃,我们的服务将不可用,这可能是不可接受的。通常,我们系统的可靠性取决于运行它的单个服务器。
提高服务可靠性的一种常见方法是通过复制。我们可以在不同的服务器上运行服务的多个实例。这样就创建了一个集群,这些服务器可以协同工作以提供服务,并且任何一台服务器崩溃都不应导致该服务中断。通过消除会同时影响多台服务器的常见故障模式,将服务器彼此隔离进一步提高了可靠性。
客户端将跟整个集群请求服务,而不是单个服务器来执行服务。此外,组成集群的服务副本必须在它们之间进行通信以正确复制状态:
图中的每个状态机都是服务的副本。其思想是所有状态机都以锁步的方式执行,从客户端请求中获取相同的输入并执行相同的状态转换。这样可以确保即使某些服务器出现故障,它们也可以将相同的结果返回给客户端。Raft 就是实现此目的的算法。
介绍一些相关名词:
- 服务:是我们正在实现的分布式系统的逻辑任务。例如,键值数据库。
- 服务器或副本:一个启用 raft 的服务实例,它运行在一台与其他副本和客户端有网络连接的隔离机器上。
- 集群:一组 Raft 服务器进行协作以实现分布式服务。典型的群集大小为 3 或 5。
1.2 共识模块和 Raft 日志
作为一种通用算法,Raft 并没有规定如何使用状态机实现服务。它旨在实现的功能是可靠,确定性地记录和再现状态机的输入序列(在 Raft 中也称为命令)。给定初始状态和所有输入,就可以完全精确地重放状态机。另一种思考方法:如果我们从同一状态机获取两个单独的副本,并从相同的初始状态开始为它们提供相同的输入序列,则状态机将以相同的状态结束并产生相同的输出。
这是使用 Raft 的通用服务的结构:
关于该组件的更多细节:
- 状态机与我们上面看到的相同。它表示任意服务;在介绍 Raft 时,键值存储是一个常见例子。
- 日志是存储客户端发出的所有命令(输入)的地方。命令不直接应用于状态机;相反,当它们已成功复制到大多数服务器时,Raft 将应用它们。而且,该日志是持久性的 —— 它保存在稳定的存储中,可以在崩溃后幸免,并且可以用于在崩溃后回放状态机。
- 共识模块是 Raft 算法的核心。它接受来自客户端的命令,确保将它们保存在日志中,与集群中的其他 Raft 副本一起复制它们(与上图中的绿色箭头相同),并在确信安全时将它们提交给状态机。提交到状态机后会将实际更改通知客户。
1.3 领导者和追随者
Raft 使用了一个强大的领导模型,其中集群中的一个副本充当领导者,其他副本充当追随者。领导者负责根据客户的请求采取行动,将命令复制到追随者,并将响应返回给客户。
在正常操作期间,追随者的目标是简单地复制领导者的日志。如果领导者发生故障或网络分区,则一个追随者可以接管领导权,因此该服务仍然可用。
该模型有其优缺点。一个重要的优点是简单。数据总是从领导者流向跟随者,只有领导者才能响应客户请求。这使得 Raft 集群更容易分析、测试和调试。一个缺点是性能 —— 因为集群中只有一台服务器与客户机通信,这可能成为客户机活动激增时的瓶颈。答案通常是:Raft 不应该用于高流量的服务。它更适合于一致性非常重要的低流量场景,但可能会牺牲可用性 —— 我们将在容错一节中介绍。
1.4 客户端交互
前面说过:客户端将和整个集群通信,而不是和单个服务器通信来执行服务。什么意思呢?集群只是通过网络连接的一组服务器,那么将如何和整个集群通信?
答案很简单:
- 在使用 Raft 集群时,客户端知道集群副本的网络地址。
- 客户端最初向任意副本发送请求。如果该副本是领导者,它将立即接受请求,并且客户端将等待完整的响应。此后,客户端会记住该副本是领导者,而不必再次搜索它(直到出现某些故障,例如领导者崩溃)。
- 如果副本表示不是领导者,则客户端将尝试另一个副本。此处可能的优化是,跟随者副本可以告诉客户端哪个其他副本是领导者。由于副本之间不断进行通信,因此通常知道正确的答案。这样可以为客户端节省一些猜测的时间。在另一种情况下,客户端可能意识到与其通信的副本不是领导者,如果在一定的超时时间内未提交其请求。这可能意味着它通信的副本实际上不是领导者(即使它仍然认为是副本)—— 可能已经从其他 Raft 服务器中被分隔出来了。超时结束后,客户端将继续寻找其他领导者。
在多数情况下,第三点中提到的优化是不必要的。通常,在 Raft 中区分 “正常运行” 和 “故障情况” 很有用。很典型的服务将花费其 99.9%的时间用于 “正常运行”,该情况下,客户知道领导者是谁,因为首次跟该服务通信时就缓存了此信息。故障场景 —— 我们将在下一节中进行详细讨论 —— 肯定会造成混乱,但只是一小段时间。正如我们将在下一篇文章中详细了解的那样,一个 Raft 集群将很快地从服务器的临时故障或网络分区中恢复 —— 在大多数情况下,恢复间隔只有一秒钟。当新的领导者声明其领导权并且客户找到它是哪台服务器时,将会出现短暂的不可用状态,但是之后它将返回到 “正常操作模式”。
1.5 Raft 中的容错和 CAP 原则
让我们回顾一下这次没有连接客户端的三个 Raft 副本的示意图:
在集群中,我们可以预料到哪些故障?
现代计算机中的每个组件都可能发生故障,但是为了使讨论更加容易,我们将运行 Raft 实例的服务器视为原子单元。这给我们带来了两种主要的失败类型:
-
服务器崩溃,其中一台服务器在一段时间内停止响应所有网络流量。崩溃的服务器通常会重新启动,并可能在短暂中断后恢复联机。
-
一种网络分区,其中一个或多个服务器由于网络设备或传输介质的问题而与其他服务器和 / 或客户端断开连接。
从服务器 A 与服务器 B 进行通信的角度来看,B 崩溃与 A 和 B 之间的网络分区是无法区分的。它们都以相同的方式表现出来 ——A 停止接收来自 B 的任何消息或响应。在系统级看来,网络分区要隐蔽得多,因为它们会同时影响多台服务器。在本系列的下一部分中,我们将介绍一些由于分区而引起的棘手的情况。
为了能够优雅地处理任意网络分区和服务器崩溃,Raft 要求群集中的大多数服务器都可以启动,并且领导者可以在任何给定的时刻使用它来取得进展。对于 3 台服务器,Raft 可以容忍单个服务器故障。如果有 5 台服务器,它将容忍 2 台;对于 2N + 1 台服务器,它将容忍 N 个故障。
这就引出了 CAP 定理,它的实际结果是,在存在网络分区的情况下,我们必须权衡可用性和一致性。
在权衡中,Raft 处于一致性阵营中。其不变量旨在防止群集可能达到不一致状态的情况,在这种情况下,不同的客户端将获得不同的答案。为此,Raft 牺牲了可用性。
正如前面所说,Raft 并不是为高吞吐量,细粒度的服务而设计的。每个客户端请求都会触发大量工作 ——Raft 副本之间的通信,以将其复制到大多数副本并持久化;在客户得到回应之前。
因此,例如,我们不会设计一个所有客户端请求都通过 Raft 进行复制的数据库。那就太慢了。Raft 更适合粗粒度的分布式原语 —— 例如实现锁服务器,选举高层协议的领导者,在分布式系统中复制关键配置数据等等。
1.6 为什么选择 Go
本系列中介绍的 Raft 实现是用 Go 编写的。从作者角度来看,Go 具有三个强大的优势,这使其成为本系列以及一般网络服务的有希望的实现语言:
- 并发性:像 Raft 这样的算法,本质上是深度并发的。每个副本执行正在进行的操作,运行定时事件的计时器,并且必须响应来自其他副本和客户端的异步请求。
- 标准库:Go 具有功能强大的标准库,可以轻松编写复杂的网络服务器,而无需导入和学习任何第三方库。特别是在 Raft 的情况下,第一个必须回答的问题是 “如何在副本之间发送消息?”,许多人陷入设计协议和某些序列化或使用繁重的第三方库的细节中。Go 仅具有 net/rpc,它是用于此类任务的足够好的解决方案,它的建立速度非常快,并且不需要导入。
- 简便性:即使在我们开始考虑实现语言之前,实现分布式共识也已经足够复杂。可以用任何一种语言编写清晰,简单的代码,但是在 Go 语言中,这是默认的习惯用法,并且该语言在每个可能的级别上都反对复杂性。
二、选举
2.1 代码结构
关于 Raft 实现结构方式的相关信息;适用于系列文章中的所有部分。
通常,Raft 是作为引入到某些服务中的对象实现的。由于我们不在这里开发服务,而是仅研究 Raft 本身,因此我创建了一个简单的 Server 类型,该类型包裹 ConsensusModule 类型以尽可能地隔离代码中更感兴趣的部分:
共识模块(CM)实现了 Raft 算法的核心,位于 raft.go 文件中。它完全从与集群中其他副本的网络和连接的细节中抽象出来。ConsensusModule 中与网络相关的唯一字段是:
// CM中的服务器ID
id int
// 集群中节点的ID列表
peerIds []int
// 包含CM的服务器,处理节点间的RPC通信
server *Server
在实现中,每个 Raft 副本将群集中的其他副本称为” 端点 “。集群中的每个端点都有唯一的数字 ID ,以及所有端点的 ID 列表。server 字段是指向包含 Server 的指针(在 server.go 中实现),使 ConsensusModule 可以将消息发送给端点。稍后我们将看到它是如何实现的。
这样设计的目标是将所有网络细节都排除掉,把重点放在 Raft 算法上。通常,要将 Raft 论文映射到此实现上,只需要 ConsensusModule 类型及其方法。服务器代码是一个相当简单的 Go 网络框架,有一些小的复杂之处来支持严格的测试。在本系列文章中,我们不会花时间,但是如果有不清楚的地方,可以留言提问。
2.2 Raft 服务器状态
从总体上讲,Raft CM 是一个具有 3 种状态的状态机:
- 超时,开始选举
- 超时,新的选举
- 发现当前的领导人或者新的任期
- 接收来自大多数服务器的投票
- 发现更高的任期服务
- 这可能有点迷惑,因为上一部分花费了大量时间来解释 Raft 如何帮助实现状态机。通常情况下,状态一词在这里是重载的。Raft 是用于实现任意复制状态机的算法,但它内部也有一个小型状态机。后面,该状态意味着从上下文中可以清楚地确定,不明确的地方我们也会指出该状态。
在一个典型的稳定状态场景中,群集中的一台服务器是领导者,而其他所有服务器都是跟随者。尽管我们不希望出问题,但 Raft 的目标就是容错,因此我们将花费大部分时间来讨论非典型场景,失败情况,某些服务器崩溃,其他服务器断开连接等。
正如前面所说,Raft 使用了一个强大的领导模型。领导者响应客户端的请求,将新条目添加到日志中,并将其复制到跟随者。万一领导者失败或停止响应,每个跟随者都随时准备接管领导权。这是图中从 “跟随者” 到 “候选者” 的 “响应超时,开始选举” 的过渡。
2.3 任期
就像常规选举一样,在 Raft 中也有任期。任期是某个服务器担任领导者的时间段。新的选举触发一个新的任期,并且 Raft 算法可确保给定的任期只有一个领导者。
但是,这个类别有点牵强,因为 Raft 选举与真实选举有很大差别。在 Raft 中,选举更加合作;候选者的目标不是赢得选举,而是在任何一个特定的任期内有合适的候选者赢得选举。我们稍后将详细讨论 “合适” 的含义。
2.4 选举计时器
Raft 算法的关键组成部分是选举计时器。 这是每个跟随者连续运行的计时器,每次收到当前领导者的消息就会重新启动它。领导者发送周期性的心跳,因此当这些心跳停止到达时,跟随者会认为领导者已经崩溃或断开连接,并开始选举(切换到候选状态)。
问:不是所有的跟随者都会同时成为候选人?
选举计时器是随机的,这是 Raft 简单化的关键之一。Raft 使用此随机方法来降低多个关注者同时进行选举的机会。但是,即使他们确实同时成为候选人,在任何给定的任期中也只有一个当选为领导者。在极少数情况下,如果投票分裂,以致没有候选人能赢得选举,将进行新的选举(有新任期)。从理论上讲,永久地重新进行选举是可行的,但在每一轮选举中发生这种情况的可能性都大大降低。
问:如果跟随者与集群断开连接(分区)怎么办?它不会因为没有听到领导者的声音而开始选举吗?
答:这是网络分区的隐患,因为跟随者无法区分谁被分区。它确实将开始选举。但是,如果是这个跟随者被断开,那么这次选举将无济于事 - 因为它无法与其他端点联系,所以不会获得任何投票。它可能会继续保持候选状态(每隔一段时间重新启动一次新选举),直到重新连接到集群。稍后我们将更详细地研究这种情况。
2.5 对等 RPC
Raft 有两种 RPC 在端点之间互相发送。有关这些 RPC 的详细参数和规则,参见图 2。简要讨论它们的目标:
- RequestVotes(RV):仅在候选状态下使用;候选人使用它来请求选举中的端点投票。答复中包含是否批准投票的指示。
- AppendEntries(AE):仅在领导者状态下使用;领导者使用此 RPC 将日志条目复制到跟随者,也发送心跳。即使没有新的日志条目要复制,该 RPC 也会定期发送给每个跟随者。
从以上内容可以推断出跟随者没有发送任何 RPC。这是对的;跟随者不会向其他端点发起 RPC,但是他们在后台运行选举计时器。如果在没有当前领导者的通信的情况下经过了此计时器,则跟随者将成为候选者并开始发送 RV。
2.6 实现选举计时器
现在开始深入研究代码。以下代码文件会在文章末尾给出。关于 ConsensusModule 结构的字段的完整列表,可以在代码文件中查看。
我们的 CM 通过在 goroutine 中运行以下功能来实现选举计时器:
func (cm *ConsensusModule) runElectionTimer() {
timeoutDuration := cm.electionTimeout()
cm.mu.Lock()
termStarted := cm.currentTerm
cm.mu.Unlock()
cm.dlog("election timer started (%v), term=%d", timeoutDuration, termStarted)
// This loops until either:
// - we discover the election timer is no longer needed, or
// - the election timer expires and this CM becomes a candidate
// In a follower, this typically keeps running in the background for the
// duration of the CM's lifetime.
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
<-ticker.C
cm.mu.Lock()
if cm.state != Candidate && cm.state != Follower {
cm.dlog("in election timer state=%s, bailing out", cm.state)
cm.mu.Unlock()
return
}
if termStarted != cm.currentTerm {
cm.dlog("in election timer term changed from %d to %d, bailing out", termStarted, cm.currentTerm)
cm.mu.Unlock()
return
}
// Start an election if we haven't heard from a leader or haven't voted for
// someone for the duration of the timeout.
if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration {
cm.startElection()
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}
首先通过调用 cm.electionTimeout 选择一个伪随机的选举超时时间。正如论文中的建议,我们在这里使用的范围是 150 到 300 毫秒。和 ConsensusModule 的大多数方法一样,runElectionTimer 在访问字段时会锁定结构。这是必须要做的,因为实现尝试尽可能地保持同步,这也是 Go 的优势之一。这意味着顺序代码是… 顺序执行的,并且不会拆分为多个事件处理程序。但是,RPC 仍然同时发生,因此我们必须保护共享数据结构。我们很快就会讲到 RPC 处理程序。
这个方法的主循环运行一个 10 毫秒的代码。有更有效的方法来等待事件,但是这种习惯用法代码最为简单。每次循环迭代都在 10 毫秒之后进行。从理论上讲,这可以使整个选举超时,但随后响应速度会变慢,并且在日志中进行调试 / 跟踪会更加困难。我们检查状态是否仍然如预期且任期未更改。如果有任何关闭,我们终止选举计时器。
如果自上次 “选举重置事件” 以来已经过去了足够的时间,则此端点开始选举并成为候选人。这是什么选举重置事件?可以终止选举的任何因素 - 例如,收到有效的心跳,或给另一个候选人投票。
2.7 成为候选人
上面可以看到,一旦经过足够的时间而没有跟随者收到领导者或其他候选人的消息,它将开始选举。在查看代码之前,我们考虑一下进行选举所需的事情:
- 将状态切换为候选项,并增加条件项,因为这是算法为每次选举指定的条件。
- 将 RV RPC 发送给所有端点,要求他们在这次选举中为我们投票。
- 等待对这些 RPC 的答复,然后计数是否获得足够的选票成为领导者。
在 Go 中,所有这些逻辑都可以在一个函数中实现:
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
var votesReceived int32 = 1
// Send RequestVote RPCs to all other servers concurrently.
for _, peerId := range cm.peerIds {
go func(peerId int) {
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
}
var reply RequestVoteReply
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votes := int(atomic.AddInt32(&votesReceived, 1))
if votes*2 > len(cm.peerIds)+1 {
// Won the election!
cm.dlog("wins election with %d votes", votes)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// Run another election timer, in case this election is not successful.
go cm.runElectionTimer()
}
候选人首先为自己投票 - 将 voiceReceived 初始化为 1 并设置 cm.votedFor =cm.id。
然后,它与所有其他端点并行发出 RPC 。每个 RPC 都在自己的 goroutine 中完成,因为我们的 RPC 调用是同步的 - 它们会阻塞直到收到响应为止,这可能需要一段时间。
rpc 实现:
cm.server.Call(peer, “ConsensusModule.RequestVote”, args, &reply)
我们使用 ConsensusModule.server 字段中包含的 Server 指针发出远程调用,使用 ConsensusModule.RequestVotes 作为远程方法名称。最终调用第一个参数中给出的端点的 RequestVote 方法。
如果 RPC 成功,已经过了一段时间,因此我们必须检查状态,看看有哪些选项。如果我们的状态不再是候选人,就退出。什么时候会发生这种情况?例如,我们可能赢得了选举,因为其他 RPC 调用中有足够的选票。或者收到其他 RPC 调用中的一个具有更高的任期,所以我们切换成跟随者。重要的是,在网络不稳定的情况下,RPC 可能需要很长时间才能到达 - 当我们收到答复时,其余代码可能会继续进行,因此在这种情况下妥协放弃非常重要。
如果响应返回时我们仍然是候选人,我们将检查响应的任期,并将其与发送请求时的原始任期进行比较。如果答复的任期较高,我们将恢复为跟随者状态。例如,如果其他候选人在我们收集选票时赢得了选举,就会发生这种情况。
如果该任期与我们发出的任期相同,请检查是否已投票。我们使用一个原子投票变量来安全地从多个 goroutine 中收集投票。如果此服务器拥有多数表决权(包括它自己授予的表决权),它将成为领导者。
请注意,startElection 方法不会阻塞。它更新一些状态,启动一堆 goroutines 并返回。因此,它还应该在 goroutine 中启动一个新的选举计数器 - 在最后一行进行。这样可以确保如果这次选举没有任何用处,则超时后将开始新的选举。这也解释了 runElectionTimer 中的状态检查:如果此选举确实将端点转变为领导者,则并发运行的 runElecionTimer 将在观察它不希望进入的状态时才返回。
2.8 成为领导者
当投票记录显示该端点获胜时,我们已经在 startElection 中看到了 startLeader 调用。
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
// Send periodic heartbeats, as long as still leader.
for {
cm.leaderSendHeartbeats()
<-ticker.C
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}()
}
这实际上是一个相当简单的方法:所有操作都是运行心跳计时器 - 一个 goroutine,只要此 CM 仍然是领导者,它将每 50 毫秒调用一次 leaderSendHeartbeats。
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
}
go func(peerId int) {
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, 0, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
}
}(peerId)
}
}
有点类似于 startElection,从某种意义上说,它为每个对等点启动了一个 goroutine 以发送 RPC 。这次 RPC 是没有日志内容的 AppendEntries(AE),在 Raft 中起着心跳的作用。
与处理 RV 响应类似,如果 RPC 返回的任期高于我们的任期,则此端点切换为跟随者。
func (cm *ConsensusModule) becomeFollower(term int) {
cm.dlog("becomes Follower with term=%d; log=%v", term, cm.log)
cm.state = Follower
cm.currentTerm = term
cm.votedFor = -1
cm.electionResetEvent = time.Now()
go cm.runElectionTimer()
}
它将 CM 的状态设置为跟随者,并重置其条件和其他重要状态字段。启动一个新的选举计时器。
2.9 答复 RPC
目前为止,我们已经实现了活动部分 - 初始化 RPC、计时器和状态转换。在我们看到服务器方法之前,演示还不完整。 从 RequestVote 开始:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
注意检查是否为 “死亡” 状态。我们稍后再讨论。
检查该任期是否过时并成为跟随者。如果已经是跟随者,则状态不会更改,但其他状态字段将重置。
否则,如果调用者的任期与该任期一致,而我们还没有投票给其他候选人,将进行投票。不会对较旧的 RPC 投票。
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
//检查是否为“死亡”状态
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
//如果新的任期大于自己的任期,则成为追随者
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
//如果任期相等,但自己的状态不是追随者,将成为追随者
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
reply.Success = true
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
该逻辑也与上图的选择部分保持一致。需要了解的一个复杂情况是:
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
问:如果此端点是领导者怎么办?为什么它成为另一个领导者的跟随者?
答:Raft 在任何给定的任期内都保证只有一个领导者存在。如果仔细地遵循 RequestVote 的逻辑以及发送 RV 的 startElectio n 中的代码,将看到在集群中不能使用相同的任期存在两个领导者。对于发现其他端点赢得选举的候选人而言,这一条件很重要。
2.10 状态和 goroutine
回顾一下 CM 可能处于的所有状态,并在其中运行不同的 goroutine:
跟随者:将 CM 初始化为跟随者,并且在对 beginFollower 的每次调用中,一个新的 goroutine 开始运行 runElectionTimer 。注意,在短时间内一次可以运行多个。假设跟随者在较高的任期内从领导者那里获得了 RV ;将触发另一个 beginFollower 调用,该调用将启动新的计时器 goroutine。但是,旧的一旦发现任期发生变化,将不做任何事情直接退出。
2.11 服务器失控和任期增加
总结以上部分,我们研究一个可能发生的特殊场景以及 Raft 如何应对。这个例子非常有趣并且很有启发性。在这里,我试图将其呈现为一个故事,但是您可能希望使用一张纸来跟踪不同服务器的状态。 如果您不能遵循该示例 - 请给我发送电子邮件 - 我们将很乐意对其进行修复以使其更加清晰。
设想一个有三台服务器的群集:A,B 和 C。假设 A 是领导者,起始项为 1,并且该群集运行正常。A 每隔 50 毫秒向 B 和 C 发送一次心跳 AE RPC,并在几毫秒内获得快速响应;每个这样的 AE 都会重置 B 和 C 的 eletementResetEvent,因此它们仍然是的跟随者。
在某个时间点,由于网络路由器出现故障,服务器 B 从 A 和 C 中分区了。A 仍每 50 毫秒向其发送一次 AE,但是这些 AE 要么立即出错,要么在底层 RPC 引擎超时出错。A 对此无能为力,但这没什么大不了的。我们还没有讨论日志复制,但是由于三台服务器中的两台还处于活动状态,因此群集具有提交客户端命令的数量。
B 呢?假设当断开连接时,其选举超时设置为 200 毫秒。断开连接大约 200 毫秒后,B 的 runElectionTimer goroutine 意识到没有收到来自领导者的选举超时消息。B 无法区分谁存在,所以它将成为候选人并开始新的选举。
因此,B 的任期将变为 2(而 A 和 C 的项仍为 1)。B 会将 RV RPC 发送给 A 和 C,要求他们投票。当然,这些 RPC 在 B 的网络中丢失了。B 的 startElection 在开始时就启动了另一个 runElectionTimer goroutine,该 goroutine 等待 250 毫秒(超时范围在 150-300 毫秒之间是随机的),查看是否由于上次选举而发生了重要的事情。B 没做任何事情,因为它仍然是完全隔离的。因此,runElectionTimer 开始另一个新的选举,将期限增加到 3。
很长时间,B 的路由器需要花费几秒钟的时间来重置并恢复在线状态。同时,B 偶尔会重新选举一次,其任期已到 8。
此时,网络分区恢复,并且 B 重新连接到 A 和 C。
然后,AE RPC 从 A 到达。回想一下 A 一直每 50 ms 发送一次,尽管 B 暂时没有回复。
B 的 AppendEntries 被执行,并以 term = 8 发送回一个答复。
A 在 LeaderSendHeartbeats 中获得了此答复,检查了答复的任期,并发现其高于其本身。它将自己的任期更新为 8,并成为跟随者。集群暂时失去领导者。
现在可能会发生多种情况。B 是候选者,但它可能在网络恢复之前已经发送了 RV。C 是跟随者,但在其自身的选举超时时间内,它将成为候选者,因为它不再从 A 接收定期的 AE。A 成为跟随者,并且还将在其选举超时时间内成为候选者。
因此,这三台服务器中的任何一台都可以赢得下一次选举。这仅是因为我们实际上未在此处复制任何日志。正如我们将在下一部分中看到,在实际情况下,A 和 C 可能会在 B 不在时添加一些新的客户端命令,因此它们的日志将是最新的。因此,B 不能成为新的领导者 - 将会发生新的选举,由 A 或 C 赢得;我们将在下一部分中再次讨论该场景。
假设自从 B 断开连接以来未添加任何新命令,则由于重新连接而导致更换领导者也是完全可以的。
这看起来效率很低。因为领导者的更换并不是真正必要的,因为在整个场景中 A 都非常健康。但是,在个别情况下以不降低效率为代价来使不变量保持简单是 Raft 做出的设计选择之一。在最常见的情况下(没有任何中断),效率才是关键,因为 99.9% 的时间集群都是在正常状态。
候选人:也同时具有选举 goroutine 的计时器,除此之外,还有许多 goroutines 发送 RPC 。具有与跟随者相同的保护措施,可以在新的运行程序停止运行时停止 “旧的” 选举程序。请记住,RPC goroutine 可能需要很长时间才能完成,因此,如果他们注意到 RPC 调用返回时它们已过时,必须安静地退出,这一点至关重要。
领导者:领导者没有选举 goroutine ,但有心跳 goroutine 每 50 毫秒执行一次。
代码中还有一个附加状态 - 死亡状态。是为了有序地关闭 CM。调用 Stop 会将状态设置为 Dead,所有 goroutine 会在观察到该状态后立即退出。
使所有这些 goroutine 运行可能会令人担忧 - 如果其中一些仍在后台运行怎么办?或更糟糕的是,它们反复泄漏,其数量无边无际地增长?这就是泄漏检查的目的,并且一些测试启用了泄漏检查。
三、命令和日志复制
3.1 客户端交互
在第一篇文章中我们简要讨论了客户端交互,如果不清晰建议可以再回顾一下。这里我们先不关注客户端如何找到领导者,将重点讨论当找到一个领导者时会发生什么。
首先,客户端将命令提交给领导者。在 Raft 集群中,命令通常只提交给单个节点。
领导者将命令复制到其跟随者。
最后,如果大多数集群节点都承认在其日志中有该命令,该命令将被提交,并向所有客户端通知新的提交。
注意提交和提交命令之间的不对称性 - 在检查我们即将讨论的实现决策时,这一点很重要。命令被提交到单个 Raft 节点,但是多个节点(特别是所有已连接 / 活动的节点啊)会在一段时间后将其提交并通知其客户端。
回顾此图:
状态机代表使用 Raft 进行复制的任意服务。
然后我们在 Raft ConsensusModule 模块的上下文中讨论客户端,我们通常指的是此服务,因为这是将提交报告到的地方。换句话说,从 Consensus 模块到服务状态机的黑色箭头就是该通知。
3.2 提交管道
在我们的实现中,当一个 ConsensusModule 被创建时,它接受一个提交管道 - 一个用来向调用者发送提交命令的通道:commitChan chan<-CommitEntry。定义如下:
// CommitEntry is the data reported by Raft to the commit channel. Each commit
// entry notifies the client that consensus was reached on a command and it can
// be applied to the client's state machine.
type CommitEntry struct {
// Command is the client command being committed.
Command interface{}
// Index is the log index at which the client command is committed.
Index int
// Term is the Raft term at which the client command is committed.
Term int
}
使用通道是一种设计选择,但不是唯一方式。也可以改用回调。创建 ConsensusModule 时,调用者将注册一个回调函数,只要有要提交的命令,就会调用该回调函数。
在实现通道上发送条目的功能之前。我们需要先讨论 Raft 服务器如何复制命令并确定命令是否已提交。
3.3 Raft 日志
在文章中多次提到 Raft 日志,但还没有详细介绍。日志只是应该应用于状态机的线性命令序列;如果有需要,日志应该足以从某个开始状态 “重放” 状态机。在正常运行期间,所有 Raft 节点的日志都是相同的;当领导者收到新命令时,将其存放在自己的日志中,然后复制到跟随者。跟随者将命令放在日志中,并确认给领导者,领导者将保留已安全复制到群集中大多数服务器的最新日志索引的计数。
每个框都是一个日志条目;框顶部的数字是将其添加到日志中的任期。底部是此日志包含的键值命令。每个日志条目都有一个线性索引。框的颜色是任期的另一种表示形式。
如果将此日志应用于空键值存储,则最终结果将具有值 x = 4,y = 7。
如果不懂日志储存方式,传送门
在我们的实现中,日志条目由以下形式表示:
type LogEntry struct {
Command interface{}
Term int
}
每个 ConsensusModule 的日志都只是 log [] LogEntry。用户端通常不在乎任期。任期对 Raft 的正确性至关重要,在阅读代码时务必牢记。
3.4 提交新的命令
新的 Submit 方法,使客户端可以提交新命令:
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.dlog("... log=%v", cm.log)
return true
}
return false
}
很简单,如果此 CM 是领导者,则将新命令附加到日志中并返回 true。否则,将被忽略并返回 false。
问:“提交” 返回的真实值是否表明客户端已向领导者提交了命令?
答:在极少数情况下,领导者可能会与其他 Raft 服务器分开,而后者在一段时间后会继续选举新的领导者。但是,客户可能仍在与旧的领导者通信。客户端应等待一段合理的时间,以使其提交的命令出现在提交通道上;如果不是,则表示它联系了错误的领导者,应与其他领导者重试。
3.5 复制日志条目
我们看到,提交给领导者的新命令被添加到日志的末尾。这个新命令如何到达跟随者?领导者遵循的步骤在 Raft 论文中进行了精确描述。我们在 leaderSendHeartbeats 中完成实现。
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
go func(peerId int) {
cm.mu.Lock()
ni := cm.nextIndex[peerId]
prevLogIndex := ni - 1
prevLogTerm := -1
if prevLogIndex >= 0 {
prevLogTerm = cm.log[prevLogIndex].Term
}
entries := cm.log[ni:]
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: cm.commitIndex,
}
cm.mu.Unlock()
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
if cm.state == Leader && savedCurrentTerm == reply.Term {
if reply.Success {
cm.nextIndex[peerId] = ni + len(entries)
cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)
savedCommitIndex := cm.commitIndex
for i := cm.commitIndex + 1; i < len(cm.log); i++ {
if cm.log[i].Term == cm.currentTerm {
matchCount := 1
for _, peerId := range cm.peerIds {
if cm.matchIndex[peerId] >= i {
matchCount++
}
}
if matchCount*2 > len(cm.peerIds)+1 {
cm.commitIndex = i
}
}
}
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
} else {
cm.nextIndex[peerId] = ni - 1
cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
}
}
}
}(peerId)
}
}
这比我们在上一部分中所做的要复杂得多,但实际上它仅遵循本文的图 2。关于此代码的一些注意事项:
- 现在已完全填充了 AE RPC 的字段:有关其含义,请参见本文中的图 2。
- AE 响应有一个 success 字段,该字段告诉领导者跟随者是否看到 prevLogIndex 和 prevLogTerm 匹配。领导者基于此字段更新此跟随者的 nextIndex。
- commitIndex 根据复制特定日志索引的关注者的数量进行更新。如果索引被多数复制,则 commitIndex 前进到该索引。
与我们之前讨论的用户端交互有关,这部分代码特别重要:
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
newCommitReadyChan 是 CM 内部使用的通道,用于指示已准备好将新条目通过提交通道发送到客户端。它由在 CM 启动时在 goroutine 中运行的以下方法起作用:
func (cm *ConsensusModule) commitChanSender() {
for range cm.newCommitReadyChan {
// Find which entries we have to apply.
cm.mu.Lock()
savedTerm := cm.currentTerm
savedLastApplied := cm.lastApplied
var entries []LogEntry
if cm.commitIndex > cm.lastApplied {
entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
cm.lastApplied = cm.commitIndex
}
cm.mu.Unlock()
cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)
for i, entry := range entries {
cm.commitChan <- CommitEntry{
Command: entry.Command,
Index: savedLastApplied + i + 1,
Term: savedTerm,
}
}
}
cm.dlog("commitChanSender done")
}
此方法更新 lastApplied 状态变量以确定哪些条目已经发送到客户端,并且仅发送新条目。
3.6 更新跟随者的日志
我们已经看到了领导者如何处理新的日志条目。现在介绍跟随者的代码实现。特别是 AppendEntries RPC。
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
// Does our log contain an entry at PrevLogIndex whose term matches
// PrevLogTerm? Note that in the extreme case of PrevLogIndex=-1 this is
// vacuously true.
if args.PrevLogIndex == -1 ||
(args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
reply.Success = true
// Find an insertion point - where there's a term mismatch between
// the existing log starting at PrevLogIndex+1 and the new entries sent
// in the RPC.
logInsertIndex := args.PrevLogIndex + 1
newEntriesIndex := 0
for {
if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
break
}
if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
break
}
logInsertIndex++
newEntriesIndex++
}
// At the end of this loop:
// - logInsertIndex points at the end of the log, or an index where the
// term mismatches with an entry from the leader
// - newEntriesIndex points at the end of Entries, or an index where the
// term mismatches with the corresponding log entry
if newEntriesIndex < len(args.Entries) {
cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
cm.dlog("... log is now: %v", cm.log)
}
// Set commit index.
if args.LeaderCommit > cm.commitIndex {
cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
cm.dlog("... setting commitIndex=%d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
}
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
3.7 选举安全
目前为止,我们已经研究了添加的新代码以支持日志复制。但是,日志也会影响 Raft 的选举。Raft 使用选举程序来防止候选人赢得选举,除非其日志至少与集群中大多数节点的日志一样。
因此,RV 包含 lastLogIndex 和 lastLogTerm 字段。当候选人发出 RV 时,将使用有关其最后一个日志条目的信息填充这些 RV。跟随者将这些字段与自己的字段进行比较,并确定候选人是否是最新的才可以被选举
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
var votesReceived int32 = 1
// Send RequestVote RPCs to all other servers concurrently.
for _, peerId := range cm.peerIds {
go func(peerId int) {
cm.mu.Lock()
savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
cm.mu.Unlock()
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
LastLogIndex: savedLastLogIndex,
LastLogTerm: savedLastLogTerm,
}
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
var reply RequestVoteReply
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votes := int(atomic.AddInt32(&votesReceived, 1))
if votes*2 > len(cm.peerIds)+1 {
// Won the election!
cm.dlog("wins election with %d votes", votes)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// Run another election timer, in case this election is not successful.
go cm.runElectionTimer()
}
lastLogIndexAndTerm 是一个新的帮助器方法:
// lastLogIndexAndTerm returns the last log index and the last log entry's term
// (or -1 if there's no log) for this server.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
if len(cm.log) > 0 {
lastIndex := len(cm.log) - 1
return lastIndex, cm.log[lastIndex].Term
} else {
return -1, -1
}
}
我们的实现是基于 0 的索引,而不是基于 1 的 Raft 索引。因此 - 1 经常作为一个标记值。
这是一个更新的 RV 处理程序,实现选举安全检查:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
我们的实现是基于 0 的索引,而不是基于 1 的 Raft 索引。因此 - 1 经常作为一个标记值。
这是一个更新的 RV 处理程序,实现选举安全检查:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
四、持久化和调优
4.1 持久化
像 Raft 这样的共识算法的目标是通过在隔离的服务器之间复制任务来创建一个比其各个部分具有更高可用性的系统。到目前为止,我们一直专注于网络分区的故障情况,其中群集中的某些服务器与其他服务器(或与客户端)断开连接。 失败的另一种模式是崩溃,其中服务器停止工作并重新启动。
对于其他服务器,它看起来像一个网络分区 - 服务器暂时断开连接,而对于崩溃的服务器本身,情况则大不相同,因为重新启动后,其所有内存状态都会丢失。
正是由于这个原因,Raft 论文中的图 2 清楚地标记了哪个状态应该保持不变;持久状态将在每次更新时写入并刷新到持久化存储中。在服务器发出下一个 RPC 或答复正在进行的 RPC 之前,服务器必须保留的任何状态都将保留。
Raft 只能通过保留其状态的子集来实现,即:
- currentTerm - 此服务器观察到的最新任期
- votedFor - 此服务器在最新任期为其投票的节点 ID
- log - Raft 日志条目
4.2 命令传递语义
在 Raft 中,视不同情况,一个命令可以多次传递给客户端。有几种可能发生这种情况的场景,包括崩溃导致重新启动(再次重播日志时)。
就消息传递语义而言,Raft 选择的是” 至少一次”。提交命令后,它将最终复制到所有客户端,但是某些客户端可能多次看到同一命令。因此,建议命令带有唯一的 ID,并且客户端应忽略已交付的命令。这在 Raft 论文中的第 8 节有更详细的描述。
4.3 存储接口
为了实现持久性,我们在代码中添加了以下接口:
type Storage interface {
Set(key string, value []byte)
Get(key string) ([]byte, bool)
// HasData returns true iff any Sets were made on this Storage.
HasData() bool
}
可以将它看作是一个映射,从字符串映射到一个由持久存储支持的通用字节切片。
4.4 恢复和保存状态
CM 构造函数现在将接受一个 Storage 作为参数并调用:
if cm.storage.HasData() {
cm.restoreFromStorage(cm.storage)
}
restoreFromStorage 方法也是新增。它从存储中加载持久状态变量,使用标准的 encoding/gob 包对它们进行反序列化
func (cm *ConsensusModule) restoreFromStorage(storage Storage) {
if termData, found := cm.storage.Get("currentTerm"); found {
d := gob.NewDecoder(bytes.NewBuffer(termData))
if err := d.Decode(&cm.currentTerm); err != nil {
log.Fatal(err)
}
} else {
log.Fatal("currentTerm not found in storage")
}
if votedData, found := cm.storage.Get("votedFor"); found {
d := gob.NewDecoder(bytes.NewBuffer(votedData))
if err := d.Decode(&cm.votedFor); err != nil {
log.Fatal(err)
}
} else {
log.Fatal("votedFor not found in storage")
}
if logData, found := cm.storage.Get("log"); found {
d := gob.NewDecoder(bytes.NewBuffer(logData))
if err := d.Decode(&cm.log); err != nil {
log.Fatal(err)
}
} else {
log.Fatal("log not found in storage")
}
}
镜像方法为 persistToStorage - 将所有这些状态变量编码并保存到提供的 Storage 中:
func (cm *ConsensusModule) persistToStorage() {
var termData bytes.Buffer
if err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil {
log.Fatal(err)
}
cm.storage.Set("currentTerm", termData.Bytes())
var votedData bytes.Buffer
if err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil {
log.Fatal(err)
}
cm.storage.Set("votedFor", votedData.Bytes())
var logData bytes.Buffer
if err := gob.NewEncoder(&logData).Encode(cm.log); err != nil {
log.Fatal(err)
}
cm.storage.Set("log", logData.Bytes())
}
我们只需在这些状态变量发生变化的每个点调用 pesistToStorage 来实现持久化。如果看一下第 2 部分中 CM 的代码与本部分之间的区别,会发现它们散布在少数地方。
当然,这不是实现持久性的最有效的方法,但是简单有效,所以足以满足我们的需要。效率最低的是保存整个日志,这在实际应用中可能很大。为了真正解决这个问题,Raft 有一个日志压缩机制,该机制在本文的第 7 节中进行了描述。我们不打算实现压缩,但是可以将其作为练习添加到我们的实现中。
4.5 崩溃伸缩
实施持久性后,我们的 Raft 集群在一定程度上可以应对崩溃。只要集群中的少数节点崩溃并在以后的某个时间点重新启动,集群就将对客户端保持可用。具有 2N + 1 个服务器的 Raft 群集将容忍 N 台故障服务器,并且只要其他 N + 1 台服务器仍保持相互连接,便会保持可用。
如果查看此部分的测试,会注意到添加了许多新测试。崩溃伸缩可以测试更大范围的人为情况组合,本文中也对此进行了一定程度的描述
4.6 不可靠的 RPC 交付
需要注意的另一个方面是不可靠的 RPC 交付。到目前为止,我们已经假设在连接的服务器之间发送的 RPC 可能到达目的地的时间很短。如果查看 server.go ,会注意到它使用了一种称为 RPCProxy 的类型来实现这些延迟。每个 RPC 都会延迟 1-5 毫秒,以模拟位于同一数据中心的节点的真实性。
RPCProxy 让我们实现的另一件事是可选的不可靠交付。启用 RAFT_UNRELIABLE_RPC 环境变量后,RPC 有时会明显延迟(延迟 75 毫秒)或完全中断。模拟了实际的网络故障。
我们可以在 RAFT_UNRELIABLE_RPC 开启的情况下重新运行所有测试,并观察 Raft 群集在出现这些故障时的行为。如果有兴趣,可以尝试调整 RPCProxy,不仅让 RPC 请求延迟,还可以让 RPC 答复延迟
4.7 优化发送 AppendEntries
正如在第 2 部分中简要提到的,当前的领导者执行效率很低。领导者在 LeaderSendHeartbeats 中发送 AE,定时器每隔 50 毫秒调用一次。假设提交了一条新命令;领导者将等到下一个 50 毫秒的边界,而不是立即通知跟随者。更糟的是,因为需要两次 AE 往返来通知跟随者命令已提交。如图:
在时间(1),领导者将心跳 AE 发送给跟随者,并在几毫秒内获得响应。例如,在 35 毫秒后提交了新命令。领导者一直等到下一个 50 毫秒边界(2)才将更新的日志发送给跟随者。跟随者答复该命令已成功添加到日志(3)。此时,领导者已经提高了提交索引(假设它获得了多数),可以立即通知跟随者,但是它一直等到下一个 50 毫秒边界(4)为止。最后,当跟随者收到更新的 leaderCommit 时,它可以将新提交的命令通知其自己的客户端。
在领导者的 Submit (X) 和跟随者的 commitChan <-X 之间经过的大部分时间对于实现来讲都是不必要的。
真正想要的是使序列看起来像这样:
看一下实现的新部分,从 startLeader 开始。
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
for _, peerId := range cm.peerIds {
cm.nextIndex[peerId] = len(cm.log)
cm.matchIndex[peerId] = -1
}
cm.dlog("becomes Leader; term=%d, nextIndex=%v, matchIndex=%v; log=%v", cm.currentTerm, cm.nextIndex, cm.matchIndex, cm.log)
// This goroutine runs in the background and sends AEs to peers:
// * Whenever something is sent on triggerAEChan
// * ... Or every 50 ms, if no events occur on triggerAEChan
go func(heartbeatTimeout time.Duration) {
// Immediately send AEs to peers.
cm.leaderSendAEs()
t := time.NewTimer(heartbeatTimeout)
defer t.Stop()
for {
doSend := false
select {
case <-t.C:
doSend = true
// Reset timer to fire again after heartbeatTimeout.
t.Stop()
t.Reset(heartbeatTimeout)
case _, ok := <-cm.triggerAEChan:
if ok {
doSend = true
} else {
return
}
// Reset timer for heartbeatTimeout.
if !t.Stop() {
<-t.C
}
t.Reset(heartbeatTimeout)
}
if doSend {
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
cm.leaderSendAEs()
}
}
}(50 * time.Millisecond)
}
不仅要等待 50 ms 的计时,startLeader 中的循环还要等待两个可能的事件之一:
- 在 cm.triggerAEChan 上发送
- 计时器计数 50 毫秒
我们将很快看到触发 cm.triggerAEChan 的原因。这是现在应该发送 AE 的信号。每当触发通道时,计时器都会重置,并执行心跳逻辑 - 如果领导者没有新的要报告的内容,则最多等待 50 毫秒。
还要注意,实际发送 AE 的方法已从 leaderSendHeartbeats 重命名为 leaderSendAE,可以更好地在新代码中反映其目的。
我们所期望的,触发 cm.triggerAEChan 的方法之一是 SubmitL:
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.persistToStorage()
cm.dlog("... log=%v", cm.log)
cm.mu.Unlock()
cm.triggerAEChan <- struct{}{}
return true
}
cm.mu.Unlock()
return false
}
修改成:
每当提交新命令时,都会调用 cm.persistToStorage 来保留新的日志条目。
一个空结构在 cm.triggerAEChan 上发送。将通知领导者 goroutine 中的循环。
锁定处理将重新排序;在发送 cm.triggerAEChan 时不想保持锁定,因为在某些情况下可能导致死锁。
在领导者中处理 AE 答复并推进提交索引的代码中 cm.triggerAEChan 将被通知。
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
// Commit index changed: the leader considers new entries to be
// committed. Send new entries on the commit channel to this
// leader's clients, and notify followers by sending them AEs.
cm.newCommitReadyChan <- struct{}{}
cm.triggerAEChan <- struct{}{}
}
这个优化很重要,它使实现比以前对新命令的响应速度更快。
4.8 批量处理命令提交
现在,每次调用 Submit 都会触发很多活动 - 领导者立即向所有跟随者广播 RPC。如果想一次提交多个命令,连接 Raft 群集的网络可能会被 RPC 淹没。
尽管它看起来效率低,但是安全。Raft 的 RPC 都是幂等的,也就是说多次获得具有基本相同信息的 RPC 不会造成任何危害。
如果担心一次要频繁提交许多命令时的网络流量,那么批处理应该很容易实现。最简单的方法是提供一种将整个命令片段传递到 Submit 的方法。这样 Raft 实现中的代码改动会很小,并且客户端将能够提交整个命令组,而不会产生太多的 RPC 通信。有兴趣的可以尝试一下!
原文作者:CrazyZard
转自链接:https://learnku.com/articles/42697
Raft 参考:https://raft.github.io/raft.pdf
代码参考:https://github.com/eliben/raft/tree/master/part1
更多推荐
所有评论(0)