Raft在etcd中的实现(三)选举流程
Raft中的选举流程
Raft中使用心跳机制来出发leader选举。当服务器启动的时候,服务器成为follower。只要follower从leader或者candidate收到有效的RPCs就会保持follower状态。如果follower在一段时间内(该段时间被称为election timeout)没有收到消息,则它会假设当前没有可用的leader,然后开启选举新leader的流程。流程如下:
- follower增加当前的term,转变为candidate。
- candidate投票给自己,并发送RequestVote RPC给集群中的其他服务器。
- 收到RequestVote的服务器,在同一term中只会按照先到先得投票给至多一个candidate。且只会投票给log至少和自身一样新的candidate。
- candidate节点保持2的状态,直到下面三种情况中的一种发生。
4.1 该节点赢得选举。即收到大多数的节点的投票。则其转变为leader状态。
4.2 另一个服务器成为了leader。即收到了leader的合法心跳包(term值等于或大于当前自身term值)。则其转变为follower状态。
4.3 一段时间后依然没有胜者。该种情况下会开启新一轮的选举。
Raft中使用随机选举超时时间来保证当票数相同无法确定leader的问题可以被快速的解决。
代码详解
首先看下节点为follower状态时的情况。节点在启动的时候,以及在收到合法的心跳包的时候会转化为follower状态。此时会调用becomeFollower方法。该方法主要是设置节点的状态,以及相应的方法。即follower状态的step方法为stepFollower,tick方法为tickElecation。其中reset方法将重置ElectionTimeout的计时器等。
// github.com/coreos/etcd/raft/node.go
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
//启动的时候节点进入follower状态,会调用becomeFollower函数。
r.becomeFollower(1, None)
//...
}
// github.com/coreos/etcd/raft/raft.go
// becomeFollower方法,设置step函数为stepFollower,tick函数为tickElection。
// 使用reset重置状态,修改当前状态为StateFollower,设置leader的id。
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
follower正常运行的时候会收到leader节点的心跳。此时其会重设其electionElapsed。若一段时间收不到心跳包,electionElapsed达到ElectionTimeout值,则会调用Step方法处理MsgHup的消息。准备进入candidate状态。
// github.com/coreos/etcd/raft/raft.go
// follower状态收到leader的心跳包,会重设electionElapsed,可以理解为重设ElectionTimeout的计时器。
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
//......各种其他情况
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
}
}
// github.com/coreos/etcd/raft/raft.go
// tickElection,每一个tick所运行的方法。在follower状态是增加electionElapsed的值,当其到达ElectionTimeout的值时,调用Step方法处理MsgHup消息。
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
开启新term,进入选举的流程如下:
- Step收到MsgHup消息(tickElection发过来的),调用campaign方法(这里会先确认是否有可提交但未执行的关于集群配置变更的消息,若有则不会调用campaign)。另未简化以下均不讨论preVote的情况。
// github.com/coreos/etcd/raft/raft.go
//Step方法处理MsgHup消息,调用campaign方法
func (r *raft) Step(m pb.Message) error {
switch m.Type {
//......
case pb.MsgHup:
if r.state != StateLeader {
//检查是否有未执行的配置变更,大致就是先取出可提交还未执行的这一段,
//然后检查里面是否有是变更集群配置的消息,如果有则直接return不进入candidate状态。
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
}
}
- 调用campaign方法,进入candidate状态方法。并向其他节点发送RequestVote RPCs。
// github.com/coreos/etcd/raft/raft.go
// campaign方法首先调用becomeCandidate方法进入candidate状态(暂时不考虑preVote的情况)
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
//调用becomeCandidate进入candidate状态
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//在发VoteRequest前,节点把票投个自己。如果只有一个节点,则这个时候会胜出
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//将自己的lastTerm,lastIndex信息,以及voteMsg即RequestVote RPC发送给其他节点。
for id := range r.prs {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
// github.com/coreos/etcd/raft/raft.go
// candidate状态,设置自己的状态,step方法设为stepCandidate,tick方法设置为tickElection
func (r *raft) becomeCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
- 其他节点收到RequestVote RPC。判断是否可以投给它(是否是重复消息之前也投的该节点;是否自身尚未投票,且目前没有leader。)并判断该节点的日志是否至少和自己一样新。若可以投且至少一样新则回复投票给该节点并记录,否则回复拒绝。
// github.com/coreos/etcd/raft/raft.go
//Step方法处理MsgVote消息,判断投票还是拒绝
func (r *raft) Step(m pb.Message) error {
//......
switch m.Type {
//......
case pb.MsgVote, pb.MsgPreVote:
// 初步判断是否可以投票
//1. 如果自身记录的Vote值和消息的来源者相同,说明是条重复消息
//2. 如果自身尚未投票,且当前没有leader,则可以投。
canVote := r.Vote == m.From ||
(r.Vote == None && r.lead == None) ||
// PreVote的情况暂时忽略
(m.Type == pb.MsgPreVote && m.Term > r.Term)
//如果初步判断可以投,则看日志是否至少和自己一样新。这里主要是比较lastTerm和lastIndex。
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
//判断成功,则把票回复该节点,把票投给它。自身记录Vote,并重设election的计数器。
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
//否则回复拒绝投票给该节点
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
}
}
- candidate节点调用stepCandidate处理各种消息。若收到了其他节点的合法心跳包MsgHeartbeat,AppendEntries消息MsgApp,以及installSnapshot消息MsgSnap,说明当前已有其他节点成为了leader,则自身转为follower状态。若自身赢得选举。即收到大多数的节点的投票。则其转变为leader状态。并发送AppendEntries消息给其他节点。若收到了多数的拒绝消息,则也转化为follower状态。因为在candidate状态时,tick方法也是tickElection方法,依然在记录着electionTimout的超时,所以一段时间后依然没有转化为leader或者follower则会开启新一轮的选举。
// github.com/coreos/etcd/raft/raft.go
// stepCandidate处理收到的消息
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
// 收到AppendEntries RPC,或者心跳,或者installSnapshot RPC,都说明新leader已经选出来,则自身转变为follower状态
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
//收到的是RequestVote的回复消息。收到一条回复,判断一次是否是多数赞成还是多数反对
case myVoteRespType:
// 统计投给自己票的个数
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
switch r.quorum() {
case gr:
//多数赞成,则转化为leader,并发送AppendEntries消息给其他节点(preVote情况暂不考虑)
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
//多数反对,则转化为follower状态
case len(r.votes) - gr:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
- 某个candidate收到了多数的投票,转化为leader节点。发送空的日志条目的AppendEntries RPC给其他节点。此时选举完成,新的leader产生。leader会通过tickHeartbeat方法向其他节点发送心跳包。
// github.com/coreos/etcd/raft/raft.go
// becomeLeader 设置自身状态为leader,step方法设置为stepLeader,tick方法设置为tickHeartbeat,设置空的日志条目。
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}
if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
}
r.appendEntry(pb.Entry{Data: nil})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
// github.com/coreos/etcd/raft/raft.go
// leader节点向其他节点发送心跳包。
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
if r.state != StateLeader {
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}