Raft在etcd中的实现(三)选举流程

2018-04-07

Raft中的选举流程

Raft中使用心跳机制来出发leader选举。当服务器启动的时候,服务器成为follower。只要follower从leader或者candidate收到有效的RPCs就会保持follower状态。如果follower在一段时间内(该段时间被称为election timeout)没有收到消息,则它会假设当前没有可用的leader,然后开启选举新leader的流程。流程如下:

  1. follower增加当前的term,转变为candidate。
  2. candidate投票给自己,并发送RequestVote RPC给集群中的其他服务器。
  3. 收到RequestVote的服务器,在同一term中只会按照先到先得投票给至多一个candidate。且只会投票给log至少和自身一样新的candidate。
  4. candidate节点保持2的状态,直到下面三种情况中的一种发生。 4.1 该节点赢得选举。即收到大多数的节点的投票。则其转变为leader状态。 4.2 另一个服务器成为了leader。即收到了leader的合法心跳包(term值等于或大于当前自身term值)。则其转变为follower状态。 4.3 一段时间后依然没有胜者。该种情况下会开启新一轮的选举。 Raft中使用随机选举超时时间来保证当票数相同无法确定leader的问题可以被快速的解决。

    代码详解

    首先看下节点为follower状态时的情况。节点在启动的时候,以及在收到合法的心跳包的时候会转化为follower状态。此时会调用becomeFollower方法。该方法主要是设置节点的状态,以及相应的方法。即follower状态的step方法为stepFollower,tick方法为tickElecation。其中reset方法将重置ElectionTimeout的计时器等。

// github.com/coreos/etcd/raft/node.go
func StartNode(c *Config, peers []Peer) Node {
	r := newRaft(c)
	// become the follower at term 1 and apply initial configuration
	// entries of term 1
      //启动的时候节点进入follower状态,会调用becomeFollower函数。
	r.becomeFollower(1, None)
      //...
}
// github.com/coreos/etcd/raft/raft.go
// becomeFollower方法,设置step函数为stepFollower,tick函数为tickElection。
// 使用reset重置状态,修改当前状态为StateFollower,设置leader的id。
func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower
	r.reset(term)
	r.tick = r.tickElection
	r.lead = lead
	r.state = StateFollower
	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

follower正常运行的时候会收到leader节点的心跳。此时其会重设其electionElapsed。若一段时间收不到心跳包,electionElapsed达到ElectionTimeout值,则会调用Step方法处理MsgHup的消息。准备进入candidate状态。

// github.com/coreos/etcd/raft/raft.go
// follower状态收到leader的心跳包,会重设electionElapsed,可以理解为重设ElectionTimeout的计时器。
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	//......各种其他情况
	case pb.MsgHeartbeat:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleHeartbeat(m)
     }
}
// github.com/coreos/etcd/raft/raft.go
// tickElection,每一个tick所运行的方法。在follower状态是增加electionElapsed的值,当其到达ElectionTimeout的值时,调用Step方法处理MsgHup消息。
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

开启新term,进入选举的流程如下:

  1. Step收到MsgHup消息(tickElection发过来的),调用campaign方法(这里会先确认是否有可提交但未执行的关于集群配置变更的消息,若有则不会调用campaign)。另未简化以下均不讨论preVote的情况。
// github.com/coreos/etcd/raft/raft.go
//Step方法处理MsgHup消息,调用campaign方法
func (r *raft) Step(m pb.Message) error {
     switch m.Type {   
  //......
	 case pb.MsgHup:
		if r.state != StateLeader {
           //检查是否有未执行的配置变更,大致就是先取出可提交还未执行的这一段,
          //然后检查里面是否有是变更集群配置的消息,如果有则直接return不进入candidate状态。
			ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
			if err != nil {
				r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
			}
			if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
				r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
				return nil
			}

			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
			if r.preVote {
				r.campaign(campaignPreElection)
			} else {
				r.campaign(campaignElection)
			}
		} else {
			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
		}
    }
}
  1. 调用campaign方法,进入candidate状态方法。并向其他节点发送RequestVote RPCs。
// github.com/coreos/etcd/raft/raft.go
// campaign方法首先调用becomeCandidate方法进入candidate状态(暂时不考虑preVote的情况)
func (r *raft) campaign(t CampaignType) {
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
		r.becomePreCandidate()
		voteMsg = pb.MsgPreVote
		// PreVote RPCs are sent for the next term before we've incremented r.Term.
		term = r.Term + 1
	} else {
        //调用becomeCandidate进入candidate状态
		r.becomeCandidate()
		voteMsg = pb.MsgVote
		term = r.Term
	}
 //在发VoteRequest前,节点把票投个自己。如果只有一个节点,则这个时候会胜出
	if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
		// We won the election after voting for ourselves (which must mean that
		// this is a single-node cluster). Advance to the next state.
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			r.becomeLeader()
		}
		return
	}
  //将自己的lastTerm,lastIndex信息,以及voteMsg即RequestVote RPC发送给其他节点。
	for id := range r.prs {
		if id == r.id {
			continue
		}
		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}
// github.com/coreos/etcd/raft/raft.go
// candidate状态,设置自己的状态,step方法设为stepCandidate,tick方法设置为tickElection
func (r *raft) becomeCandidate() {
	// TODO(xiangli) remove the panic when the raft implementation is stable
	if r.state == StateLeader {
		panic("invalid transition [leader -> candidate]")
	}
	r.step = stepCandidate
	r.reset(r.Term + 1)
	r.tick = r.tickElection
	r.Vote = r.id
	r.state = StateCandidate
	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
  1. 其他节点收到RequestVote RPC。判断是否可以投给它(是否是重复消息之前也投的该节点;是否自身尚未投票,且目前没有leader。)并判断该节点的日志是否至少和自己一样新。若可以投且至少一样新则回复投票给该节点并记录,否则回复拒绝。
// github.com/coreos/etcd/raft/raft.go
//Step方法处理MsgVote消息,判断投票还是拒绝
func (r *raft) Step(m pb.Message) error {
    //......
    switch m.Type {
   //......
    case pb.MsgVote, pb.MsgPreVote:
		// 初步判断是否可以投票
        //1. 如果自身记录的Vote值和消息的来源者相同,说明是条重复消息
       //2. 如果自身尚未投票,且当前没有leader,则可以投。 
		canVote := r.Vote == m.From ||
			(r.Vote == None && r.lead == None) ||
			// PreVote的情况暂时忽略
			(m.Type == pb.MsgPreVote && m.Term > r.Term)
       //如果初步判断可以投,则看日志是否至少和自己一样新。这里主要是比较lastTerm和lastIndex。
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			
           //判断成功,则把票回复该节点,把票投给它。自身记录Vote,并重设election的计数器。			
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
            //否则回复拒绝投票给该节点
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}
      }
}
  1. candidate节点调用stepCandidate处理各种消息。若收到了其他节点的合法心跳包MsgHeartbeat,AppendEntries消息MsgApp,以及installSnapshot消息MsgSnap,说明当前已有其他节点成为了leader,则自身转为follower状态。若自身赢得选举。即收到大多数的节点的投票。则其转变为leader状态。并发送AppendEntries消息给其他节点。若收到了多数的拒绝消息,则也转化为follower状态。因为在candidate状态时,tick方法也是tickElection方法,依然在记录着electionTimout的超时,所以一段时间后依然没有转化为leader或者follower则会开启新一轮的选举。
// github.com/coreos/etcd/raft/raft.go
// stepCandidate处理收到的消息
func stepCandidate(r *raft, m pb.Message) error {
	// Only handle vote responses corresponding to our candidacy (while in
	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
	// our pre-candidate state).
	var myVoteRespType pb.MessageType
	if r.state == StatePreCandidate {
		myVoteRespType = pb.MsgPreVoteResp
	} else {
		myVoteRespType = pb.MsgVoteResp
	}
	switch m.Type {
	case pb.MsgProp:
		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
		return ErrProposalDropped
     // 收到AppendEntries RPC,或者心跳,或者installSnapshot RPC,都说明新leader已经选出来,则自身转变为follower状态
	case pb.MsgApp:
		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
		r.handleAppendEntries(m)
	case pb.MsgHeartbeat:
		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
		r.handleHeartbeat(m)
	case pb.MsgSnap:
		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
		r.handleSnapshot(m)
     //收到的是RequestVote的回复消息。收到一条回复,判断一次是否是多数赞成还是多数反对
	case myVoteRespType:
        // 统计投给自己票的个数
		gr := r.poll(m.From, m.Type, !m.Reject)
		r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
		switch r.quorum() {
		case gr:
           //多数赞成,则转化为leader,并发送AppendEntries消息给其他节点(preVote情况暂不考虑)
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
				r.becomeLeader()
				r.bcastAppend()
			}
          //多数反对,则转化为follower状态
		case len(r.votes) - gr:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}
	case pb.MsgTimeoutNow:
		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
	}
	return nil
}
  1. 某个candidate收到了多数的投票,转化为leader节点。发送空的日志条目的AppendEntries RPC给其他节点。此时选举完成,新的leader产生。leader会通过tickHeartbeat方法向其他节点发送心跳包。
// github.com/coreos/etcd/raft/raft.go
// becomeLeader 设置自身状态为leader,step方法设置为stepLeader,tick方法设置为tickHeartbeat,设置空的日志条目。
func (r *raft) becomeLeader() {
	// TODO(xiangli) remove the panic when the raft implementation is stable
	if r.state == StateFollower {
		panic("invalid transition [follower -> leader]")
	}
	r.step = stepLeader
	r.reset(r.Term)
	r.tick = r.tickHeartbeat
	r.lead = r.id
	r.state = StateLeader

	ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
	if err != nil {
		r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
	}
	if len(ents) > 0 {
		r.pendingConfIndex = ents[len(ents)-1].Index
	}

	r.appendEntry(pb.Entry{Data: nil})
	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
// github.com/coreos/etcd/raft/raft.go
// leader节点向其他节点发送心跳包。
func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++
	r.electionElapsed++

	if r.electionElapsed >= r.electionTimeout {
		r.electionElapsed = 0
		if r.checkQuorum {
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
		if r.state == StateLeader && r.leadTransferee != None {
			r.abortLeaderTransfer()
		}
	}

	if r.state != StateLeader {
		return
	}

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}