Raft在etcd中的实现(二)节点发送消息和接收消息流程

2018-04-02

发送消息流程

raft中节点发送消息的流程.png

流程简介

上图是raft中发送消息的流程。大致分为以下的过程。

  1. 由raftexample/raft.go部分发起。前文提到过该部分主要是为应用层提供服务。
  2. 经由raft/node.go部分转发到共识模块。前文提到过该部分主要起到应用层和共识模块的衔接作用。
  3. 共识模块raft/raft.go,对消息进行处理后,将消息append到msgs。注意共识模块自身不管是如何将消息通过通讯传输到其他节点的。这边只是放入到msgs中。
  4. raft/node.go部分监测到msgs中有内容了。使用newReady生成一个一个新的ready对象,并发送给readyc通道。
  5. raftexample/raft.go中,监测到readyc通道有内容之后,会进行写wal日志,增加Storage日志条目,判断是否需要打快照等动作。最后通过transoport组件,经http协议将消息发送给对应的节点。

    代码详解

  6. 消息的发起部分。raft中有3类RPC,分别为AppendEntries RPCs,RequestVote RPCs,以及InstallSnapshot RPCs。通过发起的形式分成以下两种。 1.1 raft自行触发的比方说leader通过AppendEntries发送心跳包,以及RequestVote RPC等。图中最左部分的tickerC是发起者,是通过设置时钟来完成的。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
/*其他逻辑*/
  for {
		select {
		case <-ticker.C:
			rc.node.Tick()
          /*其他情况*/
        }
      }
}

1.2 首先由客户端发送请求给leader,然后leader向其他节点发送AppendEntires来进行日志备份。图中最左部分的proposeC即是http server接受到客户端请求后,将消息通过proposeC传递到raftNode中。

// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
/*其他逻辑*/
  // send proposals over raft
	go func() {
		var confChangeCount uint64 = 0

		for rc.proposeC != nil && rc.confChangeC != nil {
			select {
			case prop, ok := <-rc.proposeC:
				if !ok {
					rc.proposeC = nil
				} else {
					// blocks until accepted by raft state machine
					rc.node.Propose(context.TODO(), []byte(prop))
				}

			case cc, ok := <-rc.confChangeC:
				if !ok {
					rc.confChangeC = nil
				} else {
					confChangeCount += 1
					cc.ID = confChangeCount
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		// client closed channel; shutdown raft if not already
		close(rc.stopc)
	}()
}
  1. 经由node.go转发消息。 2.1 tick消息,调用node的Tick方法。方法中为向tickc发送消息。后由run中逻辑处理。见2.3
//github.com/coreos/raft/node.go
// Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks.
func (n *node) Tick() {
	select {
	case n.tickc <- struct{}{}:
	case <-n.done:
	default:
		n.logger.Warningf("A tick missed to fire. Node blocks too long!")
	}
}

2.2 propose消息,调用node的Propose方法。方法中调用node的step对消息进行判断,因为这边消息类型是MsgProp,所以是将消息m发送给propc通道。后由run中逻辑处理。见2.3

//github.com/coreos/raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry})
}

// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
	ch := n.recvc
	if m.Type == pb.MsgProp {
		ch = n.propc
	}

	select {
	case ch <- m:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
}

2.3 run中处理各种消息。收到tickc时调用raft的tick方法。在收到propc消息时,将m的发送者标记为raft的id,然后调用raft的Step方法。

//github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
	/*其他逻辑暂时忽略*/
	for {
		/*其他逻辑暂时忽略,下文接收消息部分会再讨论*/
		select {
		case m := <-propc:
			m.From = r.id
			r.Step(m)
		case <-n.tickc:
			r.tick()
                /*其他情况暂时忽略*/
		}
	}
}

  1. 共识模块raft中处理消息。 3.1 对于tick的情况根据当前节点的状态,有两个tick方法。leader调用的是tickHeartbeat,follower和candidate调用的是tickElection。两个方法都是达到相应的配置的时间的时候调用Step方法进行相应的处理。
//github.com/coreos/etcd/raft/raft.go
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++
	r.electionElapsed++

	if r.electionElapsed >= r.electionTimeout {
		r.electionElapsed = 0
		if r.checkQuorum {
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
		if r.state == StateLeader && r.leadTransferee != None {
			r.abortLeaderTransfer()
		}
	}

	if r.state != StateLeader {
		return
	}

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

3.2 Step方法。共识模块中最重要的方法,根据消息的类型和自身的状态进行响应。该部分逻辑留作下文分析。不过可以观察到Step方法在判断出该消息是需要共识给其他节点的时候,会通过调用sendAppend、sendHeartbeat、bcastAppend、bcastHeartbeat、bcastHeartbeatWithCtx几个方法,或者是直接调用send方法。 3.3 send方法,send方法结合term和msg的type对消息进行处理。然后append到msgs中。注意这里只进行了append,而不是真正的发送。

func (r *raft) send(m pb.Message) {
	m.From = r.id
	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
		if m.Term == 0 {
			panic(fmt.Sprintf("term should be set when sending %s", m.Type))
		}
	} else {
		if m.Term != 0 {
			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
		}
		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
			m.Term = r.Term
		}
	}
	r.msgs = append(r.msgs, m)
}

  1. node部分中run方法中调用newReady方法,将新的消息,一些未写入到文件的日志条目等放入生成ready对象rd,如果有更新,则会走select中的case readyc<-rd,然后将各个消息重置为nil。
// github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
	for {
		if advancec != nil {
			readyc = nil
		} else {
                       
			rd = newReady(r, prevSoftSt, prevHardSt)
			if rd.containsUpdates() {
				readyc = n.readyc
			} else {
				readyc = nil
			}
		}
           /*其他逻辑*/
		select {
           /*其他情况*/
		case readyc <- rd:
			if rd.SoftState != nil {
				prevSoftSt = rd.SoftState
			}
			if len(rd.Entries) > 0 {
				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
				prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
				havePrevLastUnstablei = true
			}
			if !IsEmptyHardState(rd.HardState) {
				prevHardSt = rd.HardState
			}
			if !IsEmptySnap(rd.Snapshot) {
				prevSnapi = rd.Snapshot.Metadata.Index
			}

			r.msgs = nil
			r.readStates = nil
			advancec = n.advancec
		
		}
	}
}

将msgs等信息放入到Ready中

// github.com/coreos/raft/node.go
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	if softSt := r.softState(); !softSt.equal(prevSoftSt) {
		rd.SoftState = softSt
	}
	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
		rd.HardState = hardSt
	}
	if r.raftLog.unstable.snapshot != nil {
		rd.Snapshot = *r.raftLog.unstable.snapshot
	}
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates
	}
	rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries))
	return rd
}

判断Ready中是否有新的信息

//github.com/coreos/raft/node.go
func (rd Ready) containsUpdates() bool {
	return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
		!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
		len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}
  1. 有新的消息时,node那边会将消息发送到readyc通道。raftexample中raftNode的serverChannels会收到Readc并进行处理。将日志条目写入到wal文件和memory storage中,如果有快照保存快照并提交给state machine去执行。通过transport将消息发送给其他节点。如果有的可提交的日志条目也提交给state machine去执行。判断是否需要触发snapshot,最后发送advance给node。 经过该步之后,日志条目才写入到wal文件和memorty storage。消息也才经Transport发送给其他节点。
// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) serveChannels() {
	/*其他逻辑*/
	for {
		select {

		// store raft entries to wal, then publish over commit channel
        //node那边发送readyc后,这边收到消息
		case rd := <-rc.node.Ready():
             //将日志条目写入到wal文件
			rc.wal.Save(rd.HardState, rd.Entries)
             //如果snap不空,则保存快照文件,并提交
			if !raft.IsEmptySnap(rd.Snapshot) {
				rc.saveSnap(rd.Snapshot)
				rc.raftStorage.ApplySnapshot(rd.Snapshot)
				rc.publishSnapshot(rd.Snapshot)
			}
          //将日志条目写入到storage,storage是memorystorage
			rc.raftStorage.Append(rd.Entries)
          //将消息通过Transport发送给其他节点。
			rc.transport.Send(rd.Messages)
           //如果日志条目中有新的可提交日志,则提交到state machine那边执行
			if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
				rc.stop()
				return
			}
			rc.maybeTriggerSnapshot()
            //发送advance给node
			rc.node.Advance()
                  /*其他情况*/
		}
	}
}

node节点收到advance消息,即确定了之前的消息已写入wal文件和storage等。然后更新raftLog信息。并将advancec赋值为空,即保证下一次新消息的执行。

//github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
	for {
           /*其他逻辑*/
		select {
           /*其他情况*/
			case <-advancec:
            //更新raft中日志的信息
			if prevHardSt.Commit != 0 {
				r.raftLog.appliedTo(prevHardSt.Commit)
			}
			if havePrevLastUnstablei {
				r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
				havePrevLastUnstablei = false
			}
			r.raftLog.stableSnapTo(prevSnapi)
			advancec = nil
		
		}
	}
}

接收消息流程

raft处理接收到消息.png

流程简介

  1. 由transport模块流入,收到其他节点传递过来的消息。
  2. 通过raftexample中的raftNode的Process方法,调用node来处理消息。
  3. node部分对消息进行简单的判断和处理后,将消息转由共识模块处理。
  4. 共识模块处理消息。

    代码详解

  5. transport模块接收到消息。调用raftNode中的Process方法。
// github.com/coreos/etcd/rafthttp/peer.go
func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
	/*其他逻辑*/
	go func() {
		for {
			select {
          //接收到消息后,调用Process方法
			case mm := <-p.recvc:
				if err := r.Process(ctx, mm); err != nil {
					plog.Warningf("failed to process raft message (%v)", err)
				}
			case <-p.stopc:
				return
			}
		}
	}()
}

2.经由raftNode的Process调用node的Step。

// github.com/coreos/etcd/contrib/raftexample/raft.go
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
	return rc.node.Step(ctx, m)
}
  1. Step判断是否为自身的消息,如果不是则通过step处理。step中消息不是MsgProp类型,所以是将消息m发送给recvc通道。后由run 中逻辑处理。
// github.com/coreos/raft/node.go
func (n *node) Step(ctx context.Context, m pb.Message) error {
	// ignore unexpected local messages receiving over network
	if IsLocalMsg(m.Type) {
		// TODO: return an error?
		return nil
	}
	return n.step(ctx, m)
}

// github.com/coreos/raft/node.go
func (n *node) step(ctx context.Context, m pb.Message) error {
	ch := n.recvc
	if m.Type == pb.MsgProp {
		ch = n.propc
	}

	select {
	case ch <- m:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
}

3.1 run中将收到的消息发送给共识模块的Step方法。

// github.com/coreos/raft/node.go
func (n *node) run(r *raft) {
	for {
           /*其他逻辑*/
		select {
           /*其他情况*/
		case m := <-n.recvc:
			// filter out response message from unknown From.
			if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
				r.Step(m) // raft never returns an error
			}
		
		}
	}
}

  1. 共识模块中的Step根据消息类型和信息以及自身状态进行相应的操作。