Raft在etcd中的实现(五)snapshot相关

2018-04-12

snapshot概念回顾

在正常运行过程中,raft集群的日志增长非常的快。通常使用镜像快照来压缩日志。即通过将当前的state写入到存储的snapshot中,然后到该点的日志即可被丢弃。

问题

该篇问题主要从业务的角度入手,介绍snapshot的一些相关问题。问题中涉及到raft共识部分的仅有InstallSnapshot RPC的部分。

  • 何时触发snapshot操作
  • 将哪些数据保存到snapshot
  • 如何将数据保存到snapshot
  • 何时会触发InstallSnapshot RPC
  • 何时会用到snapshot中的数据

    代码详解

    以etcd中的raftexample为例。来讲解下example中是如何处理上述问题的。

    何时触发snapshot操作

  • node.Ready()有新的数据时,会触发一系列操作,其中包括写wal,将可以commit的条目交由state machine执行等,snapshot的触发也在这里,具体实现是调用的maybeTriggerSnapshot。
func (rc *raftNode) serveChannels() {
	//...

	// event loop on raft state machine updates
	for {
		select {
		//...
		// store raft entries to wal, then publish over commit channel
		case rd := <-rc.node.Ready():
			rc.wal.Save(rd.HardState, rd.Entries)
			if !raft.IsEmptySnap(rd.Snapshot) {
				rc.saveSnap(rd.Snapshot)
				rc.raftStorage.ApplySnapshot(rd.Snapshot)
				rc.publishSnapshot(rd.Snapshot)
			}
			rc.raftStorage.Append(rd.Entries)
			rc.transport.Send(rd.Messages)
			if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
				rc.stop()
				return
			}
            //尝试触发snapshot
			rc.maybeTriggerSnapshot()
			rc.node.Advance()
		}
	}
}
  • maybeTriggerSnapshot首先判断appliedIndex和snapshotIndex的大小差距,是否达到设定的snapCount。从这里可以看出example中是按照条目的数目来确认是否触发snapshot的。其他的触发条件只需要在此处作相应判断即可。
func (rc *raftNode) maybeTriggerSnapshot() {
   //判断日志条目数目,确定是否需要进行snapshot
	if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
		return
	}

	log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex)
  //获取snapshot的数据,这里是调用初始化时传进来的函数getSnapshot,
	data, err := rc.getSnapshot()
	if err != nil {
		log.Panic(err)
	}
 //创建snapshot,可以看出这里会记录appliedIndex,当前的confState,以及刚才的snapshot数据
	snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
	if err != nil {
		panic(err)
	}
 //将snapshot存储到文件中
	if err := rc.saveSnap(snap); err != nil {
		panic(err)
	}

	compactIndex := uint64(1)
	if rc.appliedIndex > snapshotCatchUpEntriesN {
		compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
	}
	if err := rc.raftStorage.Compact(compactIndex); err != nil {
		panic(err)
	}

	log.Printf("compacted log at index %d", compactIndex)
	rc.snapshotIndex = rc.appliedIndex
}

将哪些数据保存到snapshot

  • 保存什么数据取决于业务的需求,一般来讲,应该如文章开始的图片那样,保存state machine的状态。example可以看到是把一个kv数据库(实质是一个map[string]string )直接进行了json Marshal操作。
  • 除了state machine的数据外,snapshot中还会保存一些当前日志状态的,如文章开始的图片中的last included index, last included term
  • etcd的实现中也可以将当前集群的节点配置状态也进行保存。
func (s *kvstore) getSnapshot() ([]byte, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	return json.Marshal(s.kvStore)
}
// func maybeTriggerSnapshot
 //获取snapshot的数据,这里是调用初始化时传进来的函数getSnapshot,
	data, err := rc.getSnapshot()
	if err != nil {
		log.Panic(err)
	}
 //创建snapshot,可以看出这里会记录appliedIndex,当前的confState,以及刚才的snapshot数据
	snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
	if err != nil {
		panic(err)
	}
// CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and
// can be used to reconstruct the state at that point.
// If any configuration changes have been made since the last compaction,
// the result of the last ApplyConfChange must be passed in.
func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) {
	ms.Lock()
	defer ms.Unlock()
    //判断i的值是否有效,太小说明已经打在之前的snapshot中了,太大说明当前还没有那么多日志条目
	if i <= ms.snapshot.Metadata.Index {
		return pb.Snapshot{}, ErrSnapOutOfDate
	}

	offset := ms.ents[0].Index
	if i > ms.lastIndex() {
		raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
	}

 //last last included index, last included term
	ms.snapshot.Metadata.Index = i
	ms.snapshot.Metadata.Term = ms.ents[i-offset].Term
   //cs是当前集群节点状态,是一个id组成的数组
	if cs != nil {
		ms.snapshot.Metadata.ConfState = *cs
	}
  //state machine的数据
	ms.snapshot.Data = data
	return ms.snapshot, nil
}

如何将数据保存到snapshot

  • raftexample中调用saveSnap。这里的saveSnapshot调用了wal以及snapshotter中的实现。业务中也可以自己实现该部分。具体wal和snapshotter中的实现,之后在相关模块中再分析。
 //将snapshot存储到文件中
	if err := rc.saveSnap(snap); err != nil {
		panic(err)
	}
func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
	// must save the snapshot index to the WAL before saving the
	// snapshot to maintain the invariant that we only Open the
	// wal at previously-saved snapshot indexes.
	walSnap := walpb.Snapshot{
		Index: snap.Metadata.Index,
		Term:  snap.Metadata.Term,
	}
	if err := rc.wal.SaveSnapshot(walSnap); err != nil {
		return err
	}
	if err := rc.snapshotter.SaveSnap(snap); err != nil {
		return err
	}
	return rc.wal.ReleaseLockTo(snap.Metadata.Index)
}

何时会触发InstallSnapshot RPC

  • 当有新节点加入或者有节点落后的比较多的时候,有可能会触发leader向其发送InstallSnapshot RPC。触发的情况基本上就是节点需要的数据leader这边已经打到snapshot中了,所以只能把snapshot发过去。具体逻辑在sendAppend中。
// sendAppend sends RPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
	pr := r.getProgress(to)
	if pr.IsPaused() {
		return
	}
	m := pb.Message{}
	m.To = to

      //寻找节点的Next对应的上一个index和term
	term, errt := r.raftLog.term(pr.Next - 1)
	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
       //出错说明应该是打到snapshot中了
	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
		if !pr.RecentActive {
			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
			return
		}

		m.Type = pb.MsgSnap
         //拿到当前的snapshot信息
		snapshot, err := r.raftLog.snapshot()
		if err != nil {
			if err == ErrSnapshotTemporarilyUnavailable {
				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
				return
			}
			panic(err) // TODO(bdarnell)
		}
		if IsEmptySnap(snapshot) {
			panic("need non-empty snapshot")
		}
		m.Snapshot = snapshot
		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
		pr.becomeSnapshot(sindex)
		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
	} else {
		//...
	}
  //发送snapshot信息给节点
	r.send(m)
}

何时会用到snapshot中的数据

snapshot数据主要有两大用途

  1. 节点挂掉之后重启,可以从snapshot中快速恢复
  2. 新节点加入或者有节点落后leader特别多,可以通过snapshot快速同步

    重启时使用snapshot中的数据

    节点重新启动的时候会调用replayWAL对snapshot以及wal中的日志条目进行回放。回放主要是将snapshot内容放入snapshot,将wal放入自身的日志条目中。

func (rc *raftNode) startRaft() {
    //...
	oldwal := wal.Exist(rc.waldir)
	rc.wal = rc.replayWAL()
    //...
}

回放: a. 将snapshot文件中的内容读出来。 b. 并根据该内容中last include Index和last include Term将wal文件中对应的之后的日志条目内容读出来。 c. ApplySnapshot,将读到的snapshot放到raftStorage的snapshot中。SetHardState,将hardState内容放到raftStorage的hardState中。Append(ents),将wal读出的日志条目,放到raftStorage的ents中。 d. 标lastIndex,发送nil给commitC(nil的作用是告知接收到的地方,需要处理snapshot)

// replayWAL replays WAL entries into the raft instance.
func (rc *raftNode) replayWAL() *wal.WAL {
	log.Printf("replaying WAL of member %d", rc.id)
     //loadSnapshot文件中的内容,这里实际上是调用的snapshotter那边的Load函数
	snapshot := rc.loadSnapshot()
    //从wal文件中读取日志条目信息
	w := rc.openWAL(snapshot)
	_, st, ents, err := w.ReadAll()
	if err != nil {
		log.Fatalf("raftexample: failed to read WAL (%v)", err)
	}
	rc.raftStorage = raft.NewMemoryStorage()
	if snapshot != nil {
          //ApplySnapshot,将读到的snapshot放到raftStorage的snapshot中
		rc.raftStorage.ApplySnapshot(*snapshot)
	}
	rc.raftStorage.SetHardState(st)

	// append to storage so raft starts at the right place in log
	rc.raftStorage.Append(ents)
	// send nil once lastIndex is published so client knows commit channel is current
	if len(ents) > 0 {
		rc.lastIndex = ents[len(ents)-1].Index
	} 
	rc.commitC <- nil
	return w
}

处理回放:

func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
	for data := range commitC {
		if data == nil {
			//处理回放的snapshot
			snapshot, err := s.snapshotter.Load()
			if err == raftsnap.ErrNoSnapshot {
				return
			}
			if err != nil && err != raftsnap.ErrNoSnapshot {
				log.Panic(err)
			}
			log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
              //从snapshot中拿到数据
			if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
				log.Panic(err)
			}
			continue
		}

		//...
	}
	//...
}
func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {
	var store map[string]string
    //raftexample中只是简单的将map[string]string复制给了当前的kvStore
	if err := json.Unmarshal(snapshot, &store); err != nil {
		return err
	}
	s.mu.Lock()
	s.kvStore = store
	s.mu.Unlock()
	return nil
}

处理InstallSnapshot

  • 接收到installSnapshot的节点会调用handleSnapshot来处理消息。
  • 如果snapshot消息比较旧,说明本地已有包含该snapshot的日志条目。则返回自身的commited值给leader,告诉leader自己已有这些数据了。
  • 如果snapshot消息比较新,重储log和raft节点的成员配置信息。返回新的lastIndex值给leader。
func (r *raft) handleSnapshot(m pb.Message) {
	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
	if r.restore(m.Snapshot) {
		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
	} else {
               //snapshot消息比较旧
		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
	}
}

// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine.
func (r *raft) restore(s pb.Snapshot) bool {
//snapshot的index比自身committed要小,说明已有这些数据,返回false
	if s.Metadata.Index <= r.raftLog.committed {
		return false
	}
// 自身日志条目中有相应的term和index,说明已有这些数据,返回false
	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}
  //...
    //restore log
	r.raftLog.restore(s)
   //restore configuration of state machine.
	r.prs = make(map[uint64]*Progress)
	r.learnerPrs = make(map[uint64]*Progress)
//分别重储普通节点和learner的信息
	r.restoreNode(s.Metadata.ConfState.Nodes, false)
	r.restoreNode(s.Metadata.ConfState.Learners, true)
	return true
}
//重储节点信息,主要是更新progress的match和next值
func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
	for _, n := range nodes {
		match, next := uint64(0), r.raftLog.lastIndex()+1
		if n == r.id {
			match = next - 1
			r.isLearner = isLearner
		}
		r.setProgress(n, match, next, isLearner)
		r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
	}
}
  • state machine或者业务层接收到要处理snapshot的信息。将数据保存到本地snapshot文件中,以及MemoryStorage中,并通过publishSnapshot发送nil消息给处理消息的部分。及上一部分中的readCommits。这样就可以从snapshot中加载出当时的state machine的状态。
  • 节点只需从leader那里正常同步snapshotIndex之后的数据即可。
//func serveChannels
    if !raft.IsEmptySnap(rd.Snapshot) {
		rc.saveSnap(rd.Snapshot)
		rc.raftStorage.ApplySnapshot(rd.Snapshot)
		rc.publishSnapshot(rd.Snapshot)
}
func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
	if raft.IsEmptySnap(snapshotToSave) {
		return
	}

	log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
	defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)

	if snapshotToSave.Metadata.Index <= rc.appliedIndex {
		log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rc.appliedIndex)
	}
	rc.commitC <- nil // trigger kvstore to load snapshot

	rc.confState = snapshotToSave.Metadata.ConfState
	rc.snapshotIndex = snapshotToSave.Metadata.Index
	rc.appliedIndex = snapshotToSave.Metadata.Index
}