Raft在etcd中的实现(四)日志复制与执行

2018-04-11

流程

  1. 客户端向raft集群发送消息。若当前节点为follower,则将消息转发给leader。
  2. leader节点发送AppendEntries给follower。消息包含
term                               //leader当前的term值
leaderId                           //follower在收到client request时,可以用该值转发给leader
prevLogIndex                       //上一条日志条目的索引
prevLogTerm                        //上一条日志条目的term
entries[]                          //日志条目,对于心跳包则该值为空,日志条目可以为多条
leaderCommit                       //leader服务器的commitIndex
  1. follower收到leader发送的MsgApp消息,对比自身日志条目判断是否需要append自身条目,是否需要更新自身的值和状态。follower返回的消息
Results:
term                          //当前任期
success                       //具体的判断如下
Receiver implementation:
//任期值比当前任期小,则该RPC已失效,或当前leader已变更
1. Reply false if term < currentTerm 
//不包含匹配prevLogTerm的prevLogIndex所对应的条目,通常该情况为节点挂掉一段时间,落后leader节点
//leader会重新发包含较早的prevLogTerm及prevLogIndex的RPC给该节点
2. Reply false if log doesnt contain an entry at prevLogIndex whose term matches prevLogTerm 
// 以下均返回true
// 若日志条目已有内容与entries里的内容冲突,则删除已有及其后的条目
3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it 
// 将新的日志条目追加到日志中
4. Append any new entries not already in the log
//如果leaderCommit比自身commitIndex大,则更新自身的commitIndex为min(leaderCommit,当前最新日志条目索引)
5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
  1. leader收到半数以上机器的正确回复,则可以认为该条目有效。将该条目交由state machine处理,将执行结果返回给客户端。leader节点在下次心跳等AppendEntries RPCs中,会记录可被提交的日志条目编号commitIndex。(所以follower节点对这条日志条目的执行是在下次leader发心跳包或者Entries,更新commitIndex的时候)

代码详解

  1. 客户端发送request给raft集群。对request的基本处理由业务控制,大概就是进行些检查和类型转化后send给raftNode的proposeC或者confChangeC。业务层调用node.Propose来处理一般请求,调用node.ProposeConfChangeC来处理集群配置变更请求。这里我们重点关注一般请求。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
	//...
	go func() {
		var confChangeCount uint64 = 0
		for rc.proposeC != nil && rc.confChangeC != nil {
			select {
              //一般的业务请求
			case prop, ok := <-rc.proposeC:
				if !ok {
					rc.proposeC = nil
				} else {
					// blocks until accepted by raft state machine
					rc.node.Propose(context.TODO(), []byte(prop))
				}
            //集群配置信息变更请求
			case cc, ok := <-rc.confChangeC:
				if !ok {
					rc.confChangeC = nil
				} else {
					confChangeCount += 1
					cc.ID = confChangeCount
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		// client closed channel; shutdown raft if not already
		close(rc.stopc)
	}()
}

经由中间node层的几步操作(这里只是消息转一下),之后调用raft的Step方法。

// github.com/coreos/etcd/raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry})
}
// Step advances the state machine using msgs. The ctx.Err() will be returned,if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
	ch := n.recvc
	if m.Type == pb.MsgProp {
		ch = n.propc
	}
	select {
	case ch <- m:
		return nil
	//...
	}
}
func (n *node) run(r *raft) {
	//...
	for {
		//...
        //如果当前有leader,则会处理该条客户端请求,如果无leader,则不会处理当前的这条propc
		if lead != r.lead {
			if r.hasLeader() {
				if lead == None {
					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
				} else {
					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
				}
				propc = n.propc
			} else {
				r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
				propc = nil
			}
			lead = r.lead
		}

		select {
		// ...
		case m := <-propc:
			m.From = r.id
			r.Step(m)
	
		}
	}
}
  1. Step函数中会根据当前节点的状态调用相应的step处理方法。最终leader向follower节点发送AppendEntries RPC。
// github.com/coreos/etcd/raft/raft.go
func (r *raft) Step(m pb.Message) error {
     //...
	switch m.Type {
	default:
		err := r.step(r, m)
		if err != nil {
			return err
		}
	}
	return nil
}

如果当前的节点是leader,则leader发送appendEntry RPCs给其他节点。

// github.com/coreos/etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	//...
	case pb.MsgProp:
		//...
		r.appendEntry(m.Entries...)
		r.bcastAppend()
		return nil
	//...
	}
	return nil
}

如果当前节点是candidate,candidate节点不处理MsgProp消息,直接返回错误。

// github.com/coreos/etcd/raft/raft.go
func stepCandidate(r *raft, m pb.Message) error {
	//...
	switch m.Type {
	case pb.MsgProp:
		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
		return ErrProposalDropped
          }
}

如果当前节点是follower,follower会将消息转发给leader。这个时候就会在leader那边重新走1、2流程。

// github.com/coreos/etcd/raft/raft.go
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgProp:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
			return ErrProposalDropped
		} else if r.disableProposalForwarding {
			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
			return ErrProposalDropped
		}
		m.To = r.lead
		r.send(m)
    }
}

发送的消息的内容:

Type:    pb.MsgApp              //消息类型为MsgApp
Index:   pr.Next - 1            //Index值为当前Next-1,事实上就是自己最后一条日志条目的index
LogTerm: term                   //当前term 
Entries: ents                   //日志条目
Commit:  r.raftLog.committed    //leader发的leaderCommitted,告知follower现在到committed的日志条目可以确认提交
  1. follower节点收到leader节点的appendEntries RPCs后进行处理。
    • 根据term值,变更自身状态,或者直接回复。
    • 正常情况下,调用handleAppendEntries,判断是否需要增加日志条目,是否有新可提交的日志。是否当前落后于Leader太多,需要leader发更早的日志条目等。
// github.com/coreos/etcd/raft/raft.go
// 收到MsgApp后的处理,若当前节点自己是leader,收到MsgApp。若比自己当前term高,则自身转变为follower。然后按照follower的处理。
//candidate状态收到MsgApp后,转变为follower。调用handleAppendEntries处理
// follower状态收到MsgApp后,调用handleAppendEntries处理。
func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch {
       //...
       //收到比自己当前Term高的MsgApp,说明集群中有新leader了,则转变为becomeFollower
	case m.Term > r.Term:
		//...
		switch {
		//...
		default:
			r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From)
			} else {
				r.becomeFollower(m.Term, None)
			}
		}

	case m.Term < r.Term:
          //若收到比当前term小的MsgApp,直接回复
		if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
			//
			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
		} 
           //...
		return nil
	}
}

func stepCandidate(r *raft, m pb.Message) error {
      //...
	switch m.Type {
	//...
	case pb.MsgApp:
        //candidate状态下收到MsgApp说明已经选出leader,自己转变为follower
		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
		r.handleAppendEntries(m)
      }
}
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	//...
	case pb.MsgApp:
        // election计数器重置,将当前lead置为消息来源者
		r.electionElapsed = 0
		r.lead = m.From
		r.handleAppendEntries(m)
        }
}

handleAppendEntries

  • m.Index < r.raftLog.committed,说明leader发过来的entries比较老。回复给leader自身当前的committed。告诉leader自身这边的日志已经提交到index。
  • 调用raftLog的maybeAppend,判断日志条目是否需要Append,如果没有冲突可以Append,raftLog也会相应的更新自己committed值。
  • 如果Append失败,则证明不包含匹配LogTerm的Index所对应的条目,通常该情况为节点挂掉一段时间,落后leader节点。则发送reject的回复给leader,并告诉leader当前自身的lastIndex。leader会重新发包含较早的prevLogTerm及prevLogIndex的RPC给该节点。
// github.com/coreos/etcd/raft/raft.go
//判断是否需要append
func (r *raft) handleAppendEntries(m pb.Message) {
	if m.Index < r.raftLog.committed {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
		return
	}

	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	} else {
		r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
	}
}

maybeAppend,判断并更新节点自身的日志条目

  • 如果leader发来的消息中的index和logTerm都是和自身匹配的。则证明可以append。
  • 如果leader发来的Entries有和自身日志冲突的,找到冲突开始的点。如果冲突开始的点早于自身committed值,则作panic处理(正常流程不会有该情况,已经commited的条目是不会修改的),否则更新自身日志条目。
  • 比较leader传过来的committed的值,判断是否更新自身committed。
// github.com/coreos/etcd/raft/log.go
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
	if l.matchTerm(index, logTerm) {
		lastnewi = index + uint64(len(ents))
         //findConflict是逐条比较日志条目的ents里term和index的值。找到最早不一致的点
		ci := l.findConflict(ents)
		switch {
		case ci == 0:
		case ci <= l.committed:
			l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
		default:
			offset := index + 1
			l.append(ents[ci-offset:]...)
		}
		l.commitTo(min(committed, lastnewi))
		return lastnewi, true
	}
	return 0, false
}
  1. leader收到MsgAppResp,更新follower对应的Next值和Match值。
    • 如果是拒绝消息,则需要更新leader维护的该follower的Next值,然后重发entries给该follower。
    • 如果不是拒绝消息,更新该follower的Match值,更新该follower的状态。判断committed值是否需要更新。该值通过leader这边维护的Match值去判断是否更新。若committed更新,则发送AppendEntries消息给follower,follower利用这个committed的值去更新自身的committed。
// github.com/coreos/etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
	//...
	switch m.Type {
      //...
	case pb.MsgAppResp:
		pr.RecentActive = true

		if m.Reject {
                        //拒绝说明follower可能漏掉了很多日志条目,需要更新leader存储的对应的该节点的Next值,然后重新向follower发AppendEntries
			r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
				r.id, m.RejectHint, m.From, m.Index)
                    //调整该follower的Next值为Match + 1,然后重新发AppendEntries
			if pr.maybeDecrTo(m.Index, m.RejectHint) {
				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
				if pr.State == ProgressStateReplicate {
					pr.becomeProbe()
				}
				r.sendAppend(m.From)
			}
		} else {
			oldPaused := pr.IsPaused()
               //maybeUpdate是更新Match值
			if pr.maybeUpdate(m.Index) {
				switch {
                 //更新状态
				case pr.State == ProgressStateProbe:
					pr.becomeReplicate()
				case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
					r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
					pr.becomeProbe()
				case pr.State == ProgressStateReplicate:
					pr.ins.freeTo(m.Index)
				}
                //判断是否可提交,这边是统计维护的节点的Match值,进行个排序,
                //然后拿中间的值即可(即可以证明多数值都比该值大)
                //
				if r.maybeCommit() {
					r.bcastAppend()
				} else if oldPaused {
					// update() reset the wait state on this node. If we had delayed sending
					// an update before, send it now.
					r.sendAppend(m.From)
				}
				// Transfer leadership is in progress.
				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
					r.sendTimeoutNow(m.From)
				}
			}
		}
	}
	return nil
}
  1. committed值更新的时候,表示当前有新的日志可以提交给state machine来执行了。(这边交给state machine的过程实际上是通过发消息中带过去的,见 Raft在etcd中的实现(二)节点发送消息和接收消息流程,其中在发送消息时会判断是否有可以执行的日志条目)
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
    /*其他逻辑*/
    for {
        select {

        // store raft entries to wal, then publish over commit channel
        //node那边发送readyc后,这边收到消息
        case rd := <-rc.node.Ready():
             //将日志条目写入到wal文件
            rc.wal.Save(rd.HardState, rd.Entries)
             //如果snap不空,则保存快照文件,并提交
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
          //将日志条目写入到storage,storage是memorystorage
            rc.raftStorage.Append(rd.Entries)
          //将消息通过Transport发送给其他节点。
            rc.transport.Send(rd.Messages)
           //如果日志条目中有新的可提交日志,则提交到state machine那边执行
            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot()
            //发送advance给node
            rc.node.Advance()
                  /*其他情况*/
        }
    }
}

// github.com/coreos/etcd/contrib/raftexample/raft.go
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
	for i := range ents {
		switch ents[i].Type {
		case raftpb.EntryNormal:
			if len(ents[i].Data) == 0 {
				// ignore empty messages
				break
			}
			s := string(ents[i].Data)
			select {
               //将可以提交的内容发送给commitC,业务端需要对commitC进行消费
			case rc.commitC <- &s:
			case <-rc.stopc:
				return false
			}
		//...
		}

		// after commit, update appliedIndex
             //appliedIndex记录的实际上已经发送给业务端(state machine)执行了的条目的index
		rc.appliedIndex = ents[i].Index

		// special nil commit to signal replay has finished
		if ents[i].Index == rc.lastIndex {
			select {
			case rc.commitC <- nil:
			case <-rc.stopc:
				return false
			}
		}
	}
	return true
}