Raft在etcd中的实现(四)日志复制与执行
流程
- 客户端向raft集群发送消息。若当前节点为follower,则将消息转发给leader。
- leader节点发送AppendEntries给follower。消息包含
term //leader当前的term值
leaderId //follower在收到client request时,可以用该值转发给leader
prevLogIndex //上一条日志条目的索引
prevLogTerm //上一条日志条目的term
entries[] //日志条目,对于心跳包则该值为空,日志条目可以为多条
leaderCommit //leader服务器的commitIndex
- follower收到leader发送的MsgApp消息,对比自身日志条目判断是否需要append自身条目,是否需要更新自身的值和状态。follower返回的消息
Results:
term //当前任期
success //具体的判断如下
Receiver implementation:
//任期值比当前任期小,则该RPC已失效,或当前leader已变更
1. Reply false if term < currentTerm
//不包含匹配prevLogTerm的prevLogIndex所对应的条目,通常该情况为节点挂掉一段时间,落后leader节点
//leader会重新发包含较早的prevLogTerm及prevLogIndex的RPC给该节点
2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm
// 以下均返回true
// 若日志条目已有内容与entries里的内容冲突,则删除已有及其后的条目
3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it
// 将新的日志条目追加到日志中
4. Append any new entries not already in the log
//如果leaderCommit比自身commitIndex大,则更新自身的commitIndex为min(leaderCommit,当前最新日志条目索引)
5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
- leader收到半数以上机器的正确回复,则可以认为该条目有效。将该条目交由state machine处理,将执行结果返回给客户端。leader节点在下次心跳等AppendEntries RPCs中,会记录可被提交的日志条目编号commitIndex。(所以follower节点对这条日志条目的执行是在下次leader发心跳包或者Entries,更新commitIndex的时候)
代码详解
- 客户端发送request给raft集群。对request的基本处理由业务控制,大概就是进行些检查和类型转化后send给raftNode的proposeC或者confChangeC。业务层调用node.Propose来处理一般请求,调用node.ProposeConfChangeC来处理集群配置变更请求。这里我们重点关注一般请求。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
//...
go func() {
var confChangeCount uint64 = 0
for rc.proposeC != nil && rc.confChangeC != nil {
select {
//一般的业务请求
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
//集群配置信息变更请求
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
}
经由中间node层的几步操作(这里只是消息转一下),之后调用raft的Step方法。
// github.com/coreos/etcd/raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry})
}
// Step advances the state machine using msgs. The ctx.Err() will be returned,if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
ch := n.recvc
if m.Type == pb.MsgProp {
ch = n.propc
}
select {
case ch <- m:
return nil
//...
}
}
func (n *node) run(r *raft) {
//...
for {
//...
//如果当前有leader,则会处理该条客户端请求,如果无leader,则不会处理当前的这条propc
if lead != r.lead {
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
propc = n.propc
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
lead = r.lead
}
select {
// ...
case m := <-propc:
m.From = r.id
r.Step(m)
}
}
}
- Step函数中会根据当前节点的状态调用相应的step处理方法。最终leader向follower节点发送AppendEntries RPC。
// github.com/coreos/etcd/raft/raft.go
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
default:
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}
如果当前的节点是leader,则leader发送appendEntry RPCs给其他节点。
// github.com/coreos/etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
//...
case pb.MsgProp:
//...
r.appendEntry(m.Entries...)
r.bcastAppend()
return nil
//...
}
return nil
}
如果当前节点是candidate,candidate节点不处理MsgProp消息,直接返回错误。
// github.com/coreos/etcd/raft/raft.go
func stepCandidate(r *raft, m pb.Message) error {
//...
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
}
}
如果当前节点是follower,follower会将消息转发给leader。这个时候就会在leader那边重新走1、2流程。
// github.com/coreos/etcd/raft/raft.go
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
}
}
发送的消息的内容:
Type: pb.MsgApp //消息类型为MsgApp
Index: pr.Next - 1 //Index值为当前Next-1,事实上就是自己最后一条日志条目的index
LogTerm: term //当前term
Entries: ents //日志条目
Commit: r.raftLog.committed //leader发的leaderCommitted,告知follower现在到committed的日志条目可以确认提交
- follower节点收到leader节点的appendEntries RPCs后进行处理。
- 根据term值,变更自身状态,或者直接回复。
- 正常情况下,调用handleAppendEntries,判断是否需要增加日志条目,是否有新可提交的日志。是否当前落后于Leader太多,需要leader发更早的日志条目等。
// github.com/coreos/etcd/raft/raft.go
// 收到MsgApp后的处理,若当前节点自己是leader,收到MsgApp。若比自己当前term高,则自身转变为follower。然后按照follower的处理。
//candidate状态收到MsgApp后,转变为follower。调用handleAppendEntries处理
// follower状态收到MsgApp后,调用handleAppendEntries处理。
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch {
//...
//收到比自己当前Term高的MsgApp,说明集群中有新leader了,则转变为becomeFollower
case m.Term > r.Term:
//...
switch {
//...
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
//若收到比当前term小的MsgApp,直接回复
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
//
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
}
//...
return nil
}
}
func stepCandidate(r *raft, m pb.Message) error {
//...
switch m.Type {
//...
case pb.MsgApp:
//candidate状态下收到MsgApp说明已经选出leader,自己转变为follower
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
}
}
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
//...
case pb.MsgApp:
// election计数器重置,将当前lead置为消息来源者
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
}
}
handleAppendEntries
- m.Index < r.raftLog.committed,说明leader发过来的entries比较老。回复给leader自身当前的committed。告诉leader自身这边的日志已经提交到index。
- 调用raftLog的maybeAppend,判断日志条目是否需要Append,如果没有冲突可以Append,raftLog也会相应的更新自己committed值。
- 如果Append失败,则证明不包含匹配LogTerm的Index所对应的条目,通常该情况为节点挂掉一段时间,落后leader节点。则发送reject的回复给leader,并告诉leader当前自身的lastIndex。leader会重新发包含较早的prevLogTerm及prevLogIndex的RPC给该节点。
// github.com/coreos/etcd/raft/raft.go
//判断是否需要append
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
maybeAppend,判断并更新节点自身的日志条目
- 如果leader发来的消息中的index和logTerm都是和自身匹配的。则证明可以append。
- 如果leader发来的Entries有和自身日志冲突的,找到冲突开始的点。如果冲突开始的点早于自身committed值,则作panic处理(正常流程不会有该情况,已经commited的条目是不会修改的),否则更新自身日志条目。
- 比较leader传过来的committed的值,判断是否更新自身committed。
// github.com/coreos/etcd/raft/log.go
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
//findConflict是逐条比较日志条目的ents里term和index的值。找到最早不一致的点
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
- leader收到MsgAppResp,更新follower对应的Next值和Match值。
- 如果是拒绝消息,则需要更新leader维护的该follower的Next值,然后重发entries给该follower。
- 如果不是拒绝消息,更新该follower的Match值,更新该follower的状态。判断committed值是否需要更新。该值通过leader这边维护的Match值去判断是否更新。若committed更新,则发送AppendEntries消息给follower,follower利用这个committed的值去更新自身的committed。
// github.com/coreos/etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
//...
switch m.Type {
//...
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
//拒绝说明follower可能漏掉了很多日志条目,需要更新leader存储的对应的该节点的Next值,然后重新向follower发AppendEntries
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
//调整该follower的Next值为Match + 1,然后重新发AppendEntries
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
//maybeUpdate是更新Match值
if pr.maybeUpdate(m.Index) {
switch {
//更新状态
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
//判断是否可提交,这边是统计维护的节点的Match值,进行个排序,
//然后拿中间的值即可(即可以证明多数值都比该值大)
//
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
}
return nil
}
- committed值更新的时候,表示当前有新的日志可以提交给state machine来执行了。(这边交给state machine的过程实际上是通过发消息中带过去的,见 Raft在etcd中的实现(二)节点发送消息和接收消息流程,其中在发送消息时会判断是否有可以执行的日志条目)
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
/*其他逻辑*/
for {
select {
// store raft entries to wal, then publish over commit channel
//node那边发送readyc后,这边收到消息
case rd := <-rc.node.Ready():
//将日志条目写入到wal文件
rc.wal.Save(rd.HardState, rd.Entries)
//如果snap不空,则保存快照文件,并提交
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
//将日志条目写入到storage,storage是memorystorage
rc.raftStorage.Append(rd.Entries)
//将消息通过Transport发送给其他节点。
rc.transport.Send(rd.Messages)
//如果日志条目中有新的可提交日志,则提交到state machine那边执行
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
//发送advance给node
rc.node.Advance()
/*其他情况*/
}
}
}
// github.com/coreos/etcd/contrib/raftexample/raft.go
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
select {
//将可以提交的内容发送给commitC,业务端需要对commitC进行消费
case rc.commitC <- &s:
case <-rc.stopc:
return false
}
//...
}
// after commit, update appliedIndex
//appliedIndex记录的实际上已经发送给业务端(state machine)执行了的条目的index
rc.appliedIndex = ents[i].Index
// special nil commit to signal replay has finished
if ents[i].Index == rc.lastIndex {
select {
case rc.commitC <- nil:
case <-rc.stopc:
return false
}
}
}
return true
}