分布式一致性之 Raft

出于设计高可用调度器的原因,最近重新看了 Raft 作者 PHD 论文和 tidb/etcd 的 raft 实现,梳理一下相关的细节做一个笔记。

Raft 的核心可以分为三部分:

  1. leader 选举和日志复制
  2. 日志压缩
  3. 集群变更

Overview

Raft 会从集群所有节点中选举出一个 leader,其余节点称为 follower。leader 负责处理 client 的请求,将请求转化为 log,并复制到其他的节点上。当 leader 将 log 复制到大多数节点时,就能将这条 log commit 到状态机,同时返回给 client 请求成功。

为了保证 log entry 的有序性,Raft 引入了 term 的概念,term 是一个连续且递增的整数。当节点中发生 leader 选举时,term 就会加一。当节点观察到比自身更大的 term 时,进行相关判断后切换到该 term,注意不是 term 大于自身就一定切换。每条 log 具有一个 index 来唯一表示,index 也是递增的递增。Raft 中就是用 term 和 index 来表示 log。

为了证明 Raft 算法的正确性,Raft 可以保证下面几条性质,阅读 Raft Paper 以及实现 Raft 的时候可以多思考下面几条看看 Raft 是怎么保证的,下文也会对这几条性质进行解释证明:

  • Election Safety: 最多只有一个节点成为 leader。
  • Leader Append-Only: leader 不会删除或者覆盖自身的 log,只会不断的 append。
  • Log Matching: 如果两条 log entry(日志条目)的 term 和 index 相同,那么从这两条 log 开始,之前所有的 log entry 都是相同的。
  • Leader Completeness: 如果一条 log entry 在一个 term 中被 commit 了,那么在该 term 之后的所有 term,新 leader 都中都包含这条 log entry。
  • State Machine Safety: 如果一个节点 commit 了一条日志到状态机,那么其他机器不会 commit 一个 index 与此日志相同,内容却不同的日志到状态机。也就是说对于不同节点 commit 到状态机的日志,只要 index 相同,日志就是相同的。

Raft 中不同节点之间通过 Rpc 通信,暂时可以理解为有五种 RPC 请求,实际实现中会有扩展:

  1. RequestVote RPC: 由 candidate 调用,用于进行 leader 选举,其他节点根据相关规则进行回复。
  2. AppendEntries RPC: 用于 leader 复制 log 到其他节点,以及作为心跳信息让其他 follower 知道 leader 此时是正常的,心跳 rpc 时一个空的 AppendEntries RPC。
  3. InstallSnapshot Rpc: 用于将 leader 的 snapshot 发送给其他节点,对于新加入的节点可能落后 leader 太多数据,将 log compaction 后方便新 follower 追上自己非常有必要。
  4. AddServer RPC: 新增加节点。
  5. RemoveServer RPC: 移除相关的节点。

Leader Election

每个节点上会有以下几个变量来维护 log 信息,这几个变量可以分为需要持久化的和不需要持久化的。

  • 持久化:
    • currentTerm: 可以简单节点当前的最大的 term,初始为 0。最大的 term 这个措辞不太准确,因为为了避免网络分区下 term 无限增长加了一条约束,节点如果能和 leader 保持通信可以拒绝新的 leader election。
    • votedFor: 集群中某个节点的 id,表示当前节点同意该 id 对应节点成为 leader。初始为空。
    • log[]: 日志条目集,每一个 log entry 都包含请求对应的指令内容,以及指令对应的 term。
  • 非持久化:
    • commitIndex: 已知的最大的已经被 commit 的 log entry 的 index。
    • lastApplied: 最后被应用到状态机的 log entry 的索引值。
  • 对于 leader 还会维持两个变量来保存集群中其他节点的状态:
    • nextIndex[]: 需要发送给某节点的下一个 log entry 的 index,初始化为领导人最后一条 log entry 索引值加一。
    • matchIndex[]: 已经复制给某节点的最大索引值。如果存在一个满足 N > commitIndex 的 N,并且大多数的 matchIndex[i] ≥ N && log[N].term == currentTerm 成立,就代表 leader 的 commitIndex 可以为 N。

刚开始,所有节点都是 follower,这时候节点不知道集群中是否存在 leader,都会随机等待一段时间(election timeout):

  • 如果 election time 内一个节点收到其他节点(candidate)的 RequestVoteRpc,并且 candidate 的 term 大于等于自身的 term,就设置 voteFor 为 candidate 的 id,并承诺不再给投票给小于该 term 的节点投票,节点继续保持 follower 状态。如果一个 candidate 能收到超半数节点的投票,就会将自己变为 leader。
  • 如果过了 election timeout 后,follower 如果还没有收到来自 leader 的通知时,会由 follower 状态更新到到 candidate 状态参与 leader 选举,并将 voteFor 更新为自己。

candidate 会发起一个 RequestVoteRpc 请求:

  1. 请求参数如下:
    • term: candidate 的任期号。
    • candidateId: candidate 的 id。
    • lastLogIndex: candidate 最后一条 log 的 index,最后一条 log 包含 uncommited log。
    • lastLogTerm: candidate 最后一条 log 对应的 term。
  2. node 对 RequestVoteRpc 的返回值如下:
    • term: 节点当前的任期号 currentTerm,当 candidate 发现自身的 term 号小于返回的 term 时,将自己的状态由 candidate 切换为 follower。
    • voteGranted: 给该 candidate 投票时为 true。

leader 会定期给 follower 发送心跳,心跳就是一个空的 AppendEntriesRpc ,follower 会设置一个 timeout,timeout 后没收到心跳 follower 变为 candidate 重新发起选举。经过 pre-vote 后就会将自己的 term 加起将自己的状态由 follower 切换为 candiate 发起新一轮选举,什么是 pre-vote 将在下面的网络分区进行介绍。其他 follower 收到这个 candidate 会做如下处理:

  1. 如果 follower 当前能正常收到 leader 的心跳信息,会拒绝给该 candidate 投票。所以 Raft 并不是收到一个 RequestVoteRpc 后,发现 term>currentTerm 就就行投票的,这一点刚开始学习 Raft 时我没明白走了很多弯路。

  2. 如果不能收到 leader 的心跳信息,才会判断 candidate 发过来的 term 和 last log info,看是否 vote:

    1. 首先比较 term,如果 candidate 的 term 大于自己当前的 currentTerm 则进行投票,小于则拒绝投票。
    2. 如果 term 等于则比较 log index,注意这个 log index 是比较 uncommited log。如果 lastLogIndex 大于等于 follower 最后一条 log entry 的 index 则进行投票,反之则拒绝投票。

网络分区

假设现在有四台机器,a/b/c/d,现在 a 为 leader,a 和 d 之间发生了网络分区,但是 a 和 bc 之间能正常通信,d 和 bc 之间也能正常通信。d 因为不能收到 a 的心跳信息,会发起选举,d 增加自己的 term 后 bc 收到请求后 d 被选举为新的 leader,a 发现 b/c 的 term 比自己大变为 follower,过了一段时间后,a 收不到 b 的心跳信息,重复 d 的过程。最终会形成一个死循环。

为了避免网络分区中的这种问题,节点发起选举前需要进行一次 pre-vote,pre-vote 能获得超半数的选票再增加 term 发起 vote 进行选主。在 pre-vote 阶段,其他节点收到请求后,如果节点能和收到的 leader 的心跳会拒绝投票。对应上面的过程,b 和 c 会拒绝给 d 投票。

Log Replication

一个节点收到 majority node 的 vote 后,就可以开始接受 client 的请求,将请求转换成一条 log,会封装一个 AppendEntriesRpc 请求将 log 复制到其他节点:

  1. AppendEntriesRpc 请求的内容:
    • term: leader 的任期号。
    • leaderId: leader 的 id,用于 redirect request。Raft 中的读写都在 leader 中处理。
    • prevLogIndex: entries[] 之前的 log index。
    • prevLogTerm: prevLogIndex 处 log 的 term。
    • entries[]: 准备发送的日志条目,一次发送多个提高效率。
    • leaderCommit: leader 已经 commit 的 log 索引值。
  2. follower 收到 AppendEntriesRpc 请求后,会返回两个值:
    • term: follower 当前的 term,用于 leader 来更新自身。
    • success: follower 如果能匹配上 prevLogIndex 和 prevLogTerm 时为 true。

接受者收到 RPC 请求后处理如下:

  1. 收到的 term 如果小于 currentTerm 会返回 false。
  2. 如果日志在 prevLogIndex 位置处 log 的任期号和 prevLogTerm 不匹配,则返回 false。
  3. 如果已经存在的 log 和新的产生冲突,索引值相同但是任期号不同,删除这一条和之后所有的。
  4. 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和新日志索引值中较小的一个。

leader 收到一个请求后的处理过程可以简单理解为一个 2PC:

  1. leader 收到 Client 发起的请求,将请求转换为日志写入到 uncommit 状态机中。
  2. leader 将这条日志通过 AppendEntiresRpc 转发给 follower,AppendEntiresRpc 中包含的内容下面在具体分析,包含当前 leader 的 term,log 的 next_index,以及 log 内容。
  3. follower 收到这条内容后写入到 uncommit 状态机中,并回复 leader。
  4. leader 收到超过半数的 follower 后,将日志写入更新成 commited。
  5. leader 回复 client 请求成功。
  6. leader 再次同步给 follower,follower 将这条日志更新成 commited。

上面是一个理想的过程,可以思考一下这样一种情况:leader 完成了第 5 步将日志 commit 并通知 client request success,但是还没来得及完成第 6 步同步给其他的 follower 就宕机了,这个时候 Raft 这条 commited log 不会丢失吗?

Raft 可以保证只要一条 log 被 commited,只要超过半数的机器是正常,那这条 log 一定不会丢失,新选举出来的 leader 一定包含这条 log,因为被选出来的 leader 需要获得超过半数机器的投票,上面提到 leader 进行选举的时候会比较 term 和 log 的索引,如果一个节点不包含这条 log,它的 log 索引小于超过包含这条 commited log 节点的索引,无法通过 pre-vote。

Safety

Raft 选举中还有一个非常重要的约束,新选举出来的 leader 只能 commit 当前 term 的 log,不能直接 commit 之前(old) term 的 log,这个约束非常重要,也比较难理解。

raft-safety

在图 a 中,S1 是 leader,正在将 term=2 index=2 处的 log 复制给其他的 follower;如图 b 所示,S1 突然 crash,S5 被选 S3/S4/S5 投票当选为新 leader;如图 c 所示,S5 刚当上 leader 也 crash 了,这时 S1 被选举为 leader,如果允许 old term 直接提交,S1 当选为 leader 后重新开始复制 term=2 index=2 的 log 到 S3,这个时候 S1 就可以 commit 这条日志了,这次提交 client 端也可以看到,但这个时候 S1 又 crash 了。

在图 d1 的情形,S5 可以被 S2/S3/S4/S5 选举为 leader,因为 S5 的 term(3) 大于 S2/S3/S4,它开始复制 term=3 index=2 的 log 到 majority 节点,这个时候图 c 提交的 term=2 index=2 的 log 就被 override 了,这就导致了 client 会看到数据前后不一致。

但如果有上面的约束,如图 d2,S1 不会像图 c 一样首先直接 replicate term=2 的 log,它会先 replicate term=3 index=2 的 log,replicate 完后间接将 term=2 index=2 的 log 通过 AppendEntiresRpc 复制到其他节点,如果 S1 能将 CurrentTerm(4) 的 log 复制到 majority 节点(S1/S2/S3),这时 S5 是不可能被选为 leader 的,因为 S5 term=3,Pre-Vote 阶段就不满足选主条件了。在 Raft 的实现中,leader 新当选后会根据当前 term 提交一条空日志(no-op)来保证这一点。

因为这个约束的存在,在 Raft 的 Client 端实现中,需要注意一种情况,一个 leader 当选后,会将它自己的 log 都通过 AppendEntiresRpc 复制给其他 follower,即使 leader 中有一条 log 之前没有被 majority 通过也会被 replicate 到其他节点,client 端需要注意这一点。正如上图 a-d2 的情形,term 为 2 index 为 2 的 log 最终会被间接复制。

集群变更

上面讨论的都是建立在集群中节点数量不变的情况,真实场景中会有增加或者下线节点的需求。将 Raft 集群中节点的数量以及每个节点的地址信息表示为集群配置文件,每个节点都会保存一份集群配置文件,在集群中增删节点实际上就是对集群所有节点的这个配置文件进行修改。

在分布式下不能保证能同时修改所有节点的配置,可能会导致脑裂。比如在一个有 3 个节点(s1, s2, s3)的集群中增加 2 个节点(s4, s5),在某一时刻会出现 s1 和 s2 节点仍然是旧配置,s3/s4/s5 已经更新为新配置。这时候 s1, s2 就可以选举出一个 leader,s3, s4, s5 同样可以选举出一个 leader。会导致集群中同一时刻出现了两个 leader,违背了 Election Safety 原则。

在 Raft 作者的 PHD 论文中,对于集群变更提供了两种办法。第一种方法比较简单,每次只增删一个节点,这样旧配置的和新配置的节点不可能同时满足 quorum。

第二种增删节点的方法则较为复杂,但是能同时处理任意数量的节点增删。当 leader 收到 admin 变更配置的请求后,会针对这条请求新生成一条 log,将 old config 和 new config 都记下来,然后将这条log append 到 log[] 中,并将其复制到他节点(所有新旧节点)。任一节点收到该请求后,都会立即切换到一个状态,称为 $C_{old,new}$。注意此时新旧配置中的节点都可以成为 leader。

此时集群中同时存在不同配置的节点, Raft 引入 joint consensus 来描述这种状态。对于处于 $C_{old,new}$ 状态的节点, RequestVote RPC 和 AppendEntries RPC 调用要成功,必须要分别得到新的配置和旧的配置中的大多数节点的同意。

一旦 leader 能 commit 此日志,即新配置下的大多数节点均复制了此日志,leader 就会将新的配置对应的日志复制到新配置下的其他节点,复制成功的节点就会转移到 $C_{new}$ 状态。当这条日志成功 commit 后 集群配置就修改成功了。

上面这样一种情况中可以避免同一时刻集群中出现两个 leader,但是可能会出现旧的配置的节点当选为 leader,此时拥有新配置的节点发现集群中没有 leader 后会自动重新发起一种选举。

日志压缩

按照上面的机制,各个节点的日志会不断增长,这样会导致几个问题:

  • 日志过长占用大量存储空间。
  • 机器重启时需要重放日志,会重启花费的时间过长。
  • 新增节点,需要复制大量的日志,占用带宽过多而且花费时间过长。

Raft 提出了 Log Compaction 机制来压缩日志长度,PHD 论文中提到了多种不同的快照方案,小论文中只针对内容版的快照有所提及。

每个节点都可以独立判断是否需要进行日志压缩,节点可以根据日志长度或剩余空间等来进行判断。snapshot 完成后就可以将 Snapshot 的最后一条日志及其前面的所有日志删除,上一次的快照也可以删除。snapshot 中会记录最后一条日志的 index 以及 term。

虽然各个节点的 Snapshot 操作是独立进行的,但是设想这样的情况,leader 进行一次 Snapshot 后发现某个节点的日志落后太多,需要传送那些已经删除的日志给该节点(比如新增节点的时候)。这时候由于日志已经删除,就需要将节点的 Snapshot 发送给该节点。Raft 中会通过 InstallSnapshot RPC 将自身的 Snapshot 发送给其他节点。节点对 InstallSnapshot RPC 的处理比较简单,有可能存在 snapshot 对应的最后一条日志节点中也有,可能是由于延迟导致的,这时节点不对本地日志做处理。