etcd/raft选举源码解读 - Wildhunt
source link: https://www.cnblogs.com/lt6668964/p/17298416.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
ETCD-raft笔记
该篇博客基于etcd v3.5.7版本,首先会简单介绍etcd/raft对Raft选举部分的算法优化,然后通过源码分析etcd/raft的选举实现。
1. etcd对于raft选举算法优化措施
该优化措施均在raft博士论文中有讲解
etcd/raft实现的与选举有关的优化有Pre-Vote、Check Quorum、和Leader Lease。在这三种优化中,只有Pre-Vote和Leader Lease最初是对选举过程的优化,Check Quorum是为了更高效地实现线性一致性读(Linearizable Read)而做出的优化,但是由于Leader Lease需要依赖Check Quorum,因此也放在这讲。
1.1 Pre-Vote
如下图所示,当Raft集群的网络发生分区时,会出现节点数达不到quorum(达成共识至少需要的节点数)的分区,如图中的Partition 1。
在节点数能够达到quorum的分区中,选举流程会正常进行,该分区中的所有节点的term最终会稳定为新选举出的leader节点的term。不幸的是,在节点数无法达到quorum的分区中,如果该分区中没有leader节点,因为节点总是无法收到数量达到quorum的投票而不会选举出新的leader,所以该分区中的节点在election timeout超时后,会增大term并发起下一轮选举,这导致该分区中的节点的term会不断增大。
如果网络一直没有恢复,这是没有问题的。但是,如果网络分区恢复,此时,达不到quorum的分区中的节点的term值会远大于能够达到quorum的分区中的节点的term,这会导致能够达到quorum的分区的leader退位(step down)并增大自己的term到更大的term,使集群产生一轮不必要的选举。
Pre-Vote机制就是为了解决这一问题而设计的,其解决的思路在于不允许达不到quorum的分区正常进入投票流程,也就避免了其term号的增大。为此,Pre-Vote引入了“预投票”,也就是说,当节点election timeout超时时,它们不会立即增大自身的term并请求投票,而是先发起一轮预投票。收到预投票请求的节点不会退位。只有当节点收到了达到quorum的预投票响应时,节点才能增大自身term号并发起投票请求。这样,达不到quorum的分区中的节点永远无法增大term,也就不会在分区恢复后引起不必要的一轮投票。
1.2 Check Quorum
在Raft算法中,保证线性一致性读取的最简单的方式,就是讲读请求同样当做一条Raft提议,通过与其它日志相同的方式执行,因此这种方式也叫作Log Read。显然,Log Read的性能很差。而在很多系统中,读多写少的负载是很常见的场景。因此,为了提高读取的性能,就要试图绕过日志机制。
但是,直接绕过日志机制从leader读取,可能会读到陈旧的数据,也就是说存在stale read的问题。在下图的场景中,假设网络分区前,Node 5是整个集群的leader。在网络发生分区后,Partition 0分区中选举出了新leader,也就是图中的Node 1。
但是,由于网络分区,Node 5无法收到Partition 0中节点的消息,Node 5不会意识到集群中出现了新的leader。此时,虽然它不能成功地完成日志提交,但是如果读取时绕过了日志,它还是能够提供读取服务的。这会导致连接到Node 5的client读取到陈旧的数据。
Check Quorum可以减轻这一问题带来的影响,其机制也非常简单:让leader每隔一段时间主动地检查follower是否活跃。如果活跃的follower数量达不到quorum,那么说明该leader可能是分区前的旧leader,所以此时该leader会主动退位转为follower。
需要注意的是,Check Quorum并不能完全避免stale read的发生,只能减小其发生时间,降低影响。如果需要严格的线性一致性,需要通过其它机制实现。
1.3 Leader Lease
分布式系统中的网络环境十分复杂,有时可能出现网络不完全分区的情况,即整个整个网络拓补图是一个连通图,但是可能并非任意的两个节点都能互相访问。
这种现象不止会出现在网络故障中,还会出现在成员变更中。在通过ConfChange
移除节点时,不同节点应用该ConfChange
的时间可能不同,这也可能导致这一现象发生——TODO (举个例子)。
在上图的场景下,Node 1与Node 2之间无法通信。如果它们之间的通信中断前,Node 1是集群的leader,在通信中断后,Node 2无法再收到来自Node 1的心跳。因此,Node 2会开始选举。如果在Node 2发起选举前,Node 1和Node 3中都没有新的日志,那么Node 2仍可以收到能达到quorum的投票(来自Node 2本身的投票和来自Node 3的投票),并成为leader。
Leader Lease机制对投票引入了一条新的约束以解决这一问题:当节点在election timeout超时前,如果收到了leader的消息,那么它不会为其它发起投票或预投票请求的节点投票。也就是说,Leader Lease机制会阻止了正常工作的集群中的节点给其它节点投票。
Leader Lease需要依赖Check Quorum机制才能正常工作。接下来笔者通过一个例子说明其原因。
假如在一个5个节点组成的Raft集群中,出现了下图中的分区情况:Node 1与Node 2互通,Node 3、Node 4、Node 5之间两两互通、Node 5与任一节点不通。在网络分区前,Node 1是集群的leader。
在既没有Leader Lease也没有Check Quorum的情况下,Node 3、Node 4会因收不到leader的心跳而发起投票,因为Node 2、Node 3、Node 4互通,该分区节点数能达到quorum,因此它们可以选举出新的leader。
而在使用了Leader Lease而不使用Check Quorum的情况下,由于Node 2仍能够收到原leader Node 1的心跳,受Leader Lease机制的约束,它不会为其它节点投票。这会导致即使整个集群中存在可用节点数达到quorum的分区,但是集群仍无法正常工作。
而如果同时使用了Leader Lease和Check Quorum,那么在上图的情况下,Node 1会在election timeout超时后因检测不到数量达到quorum的活跃节点而退位为follower。这样,Node 2、Node 3、Node 4之间的选举可以正常进行。
1.4 引入的新问题与解决方案
引入Pre-Vote和Check Quorum(etcd/raft的实现中,开启Check Quorum会自动开启Leader Lease)会为Raft算法引入一些新的问题。
当一个节点收到了term比自己低的消息时,原本的逻辑是直接忽略该消息,因为term比自己低的消息仅可能是因网络延迟的迟到的旧消息。然而,开启了这些机制后,在如下的场景中会出现问题:
场景1: 如上图所示,在开启了Check Quorum / Leader Lease后(假设没有开启Pre-Vote,Pre-Vote的问题在下一场景中讨论),数量达不到quorum的分区中的leader会退位,且该分区中的节点永远都无法选举出leader,因此该分区的节点的term会不断增大。当该分区与整个集群的网络恢复后,由于开启了Check Quorum / Leader Lease,即使该分区中的节点有更大的term,由于原分区的节点工作正常,它们的选举请求会被丢弃。同时,由于该节点的term比原分区的leader节点的term大,因此它会丢弃原分区的leader的请求。这样,该节点永远都无法重新加入集群,也无法当选新leader。(详见issue #5451、issue #5468)。
场景2: Pre-Vote机制也有类似的问题。如上图所示,假如发起预投票的节点,在预投票通过后正要发起正式投票的请求时出现网络分区。此时,该节点的term会高于原集群的term。而原集群因没有收到真正的投票请求,不会更新term,继续正常运行。在网络分区恢复后,原集群的term低于分区节点的term,但是日志比分区节点更新。此时,该节点发起的预投票请求因没有日志落后会被丢弃,而原集群leader发给该节点的请求会因term比该节点小而被丢弃。同样,该节点永远都无法重新加入集群,也无法当选新leader。(详见issue #8501、issue #8525)。
场景3: 在更复杂的情况中,比如,在变更配置时,开启了原本没有开启的Pre-Vote机制。此时可能会出现与上一条类似的情况,即可能因term更高但是log更旧的节点的存在导致整个集群的死锁,所有节点都无法预投票成功。这种情况比上一种情况更危险,上一种情况只有之前分区的节点无法加入集群,在这种情况下,整个集群都会不可用。(详见issue #8501、issue #8525)。
为了解决以上问题,节点在收到term比自己低的请求时,需要做特殊的处理。处理逻辑也很简单:
- 如果收到了term比当前节点term低的leader的消息,且集群开启了Check Quorum / Leader Lease或Pre-Vote,那么发送一条term为当前term的消息,令term低的节点成为follower。(针对场景1、场景2)
- 对于term比当前节点term低的预投票请求,无论是否开启了Check Quorum / Leader Lease或Pre-Vote,都要通过一条term为当前term的消息,迫使其转为follower并更新term。(针对场景3)
2. etcd中Raft选举的实现
2.1 发起vote或pre-vote流程
2.1.1 Election timeout
在集群刚启动时,所有节点的状态都为 follower
,等待超时触发 leader election
。超时时间由 Config
设置。etcd/raft
没有用真实时间而是使用逻辑时钟,当调用 tick
的次数超过指定次数时触发超时事件。 对于 follower
和 candidate
而言,tick
中会判断是否超时,若超时则会本地生成一个 MsgHup
类型的消息触发 leader election
:
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
2.1.2 MsgHup消息处理与hup方法
etcd/raft通过raft
结构体的Step
方法实现Raft状态机的状态转移。Step
方法是消息处理的入口,不同 state
处理的消息不同且处理方式不同,所以有多个 step
方法:
raft.Step()
: 消息处理的入口,做一些共性的检查,如term
,或处理所有状态都需要处理的消息。若需要更进一步处理,会根据状态 调用下面的方法:raft.stepLeader()
:leader
状态的消息处理方法;raft.stepFollower()
:follower
状态的消息处理方法;raft.stepCandidate()
:candidate
状态的消息处理方法。
func (r *raft) Step(m pb.Message) error {
// ... ...
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
// ... ...
}
// ... ...
}
Step
方法在处理MsgHup
消息时,会根据当前配置中是否开启了Pre-Vote
机制,以不同的CampaignType
调用hup
方法。CampaignType
是一种枚举类型(go语言的枚举实现方式),其可能值如下表所示。
值 | 描述 |
---|---|
campaignPreElection |
表示Pre-Vote的预选举阶段。 |
campaignElection |
表示正常的选举阶段(仅超时选举,不包括Leader Transfer)。 |
campaignTransfer |
表示Leader Transfer阶段。 |
接下来对hup
的实现进行分析。
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}
总结当节点出现以下情况时不能发起选举:
- 节点被移出集群
- 节点是learner
- 节点还有未保存到稳定存储的snapshot
- 节点有还未被应用的集群配置变更
ConfChange
消息
2.1.3 campaign
官方注释很详细了,因此不多废笔墨解释
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
// 因为调用campaign的方法不止有hup,campaign方法首先还是会检查promotable()是否为真。
if !r.promotable() {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
//won't send requestVote to learners, beacause learners[] are not in incoming[] and outgoing[]
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
至此,该节点已向其他节点发送MsgVote或MsgPreVote消息
2.2 节点收到vote或pre-vote消息处理流程
处理vote或pre-vote消息都在Step
方法内,不会进入各自的step方法,有效的MsgPreVote
必须满足其中一个条件(m.Term > r.Term)
官方注释很详细,简单易理解,因此不多废笔墨解释
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
// ........
}
switch m.Type {
case pb.MsgHup:
// ........
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
// ...........
}
return nil
}
注意:节点同意投票消息带的是m.Term
,拒绝投票消息是r.Term
,如果拒接MsgPreVote
消息,那么发送pre-vote消息的节点就变为
在r.Term
的follower
,在2.3.1节内体现
2.3 节点收到处理MsgPreVoteResp或MsgVoteResp消息流程
2.3.1 Step内处理
根据2.2节可以看到Step
内有这样一段代码:在2.2节最后有解释,官方也给了详细注释
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
2.3.2 stepCandidate内处理
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
如果预投票成功,则发起新一轮正式投票。如果正式投票成功,则转为leader,接着后续操作
2.4 转变领导者身份
2.4.1 becomeLeader()
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
从candidate
转变为leader
,需要在自己的log中append一条当前term的日志,并广播给其他节点
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK