Raft在etcd中的实现(一)存储及重要的组件

2018-04-01

raftexample简介

本文中raft的实现以etcd中raftexample实现为例。 raftexample的例子中,启动了一个kv数据库作为raft的state machine,另外启动了一个http数据库来响应客户端的请求。其中proposeC以及commitC,errorC主要用来交互,proposeC为由客户端发送过来的条目,通过httpserver流向共识部分raftNode;commitC为经共识的条目,由raftNode流向kv数据库。

/*
github.com/coreos/etcd/contrib/raftexample/main.go
main()
*/
	var kvs *kvstore
	getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
        //启动raft节点
	commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
       //启动kv数据库作为state machine
	kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

	// the key-value http handler will propose updates to raft
	serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)

raftexample中的存储

raftexample中的存储分为以下几部分

  • kv数据库, 存储state machine的状态。raftexample例子中是个简单的map。
  • 内存数据中的log,raft共识用到的日志条目,一般是一段较新的日志,可能包含一部分已共识的日志和一些尚未共识的日志条目。由于是内存维护,可以灵活的重写替换。该部分内容中位于commitIndex之前的部分是已共识的部分,内容不会被替换,commitIndex到lastIndex之间的内容是尚未共识的部分,index对应的条目内容可能被替换。该部分在raft.MemoryStorage实现。
  • wal部分,raft共识所用的日志文件。该文件只会追加,不会重写和覆盖。raft共识过程中收到的日志条目,都会记录在wal日志文件中。即可能在wal日志文件中看见同index不同term的日志条目。主要作用是在节点崩溃后可以通过wal部分重新启动。
  • snapshot文件部分,类似于存档点的概念,已经过共识的部分,可以在一定时间或者一点量消息后,生成snapshot,snapshot文件一般保存当前state machine的状态,及集群的相关配置等数据。snapshot可以帮助节点快速启动,以及新节点加入时的快速同步。

几个重要的组件

etcd中raft的实现了很好的模块化,其中raft共识模块主要是实现了算法的逻辑,而系统需要用到的存储、通讯等模块都从共识模块中很好的剥离了出来是单独模块的实现。这样做可以很方便对raft的共识模块进行移植,也可以很方便地支持多种底层的存储或者通讯方式。raftexample中的组件大概可以分为以下几部分。

  • 应用层部分:对应其raftNode组件,应用层可以根据自己的需要实现自己的raftNode。
  • 共识部分(github.com/coreos/etcd/raft):对应其raft组件。该部分实现了raft核心算法部分。其逻辑主要在raft结构体实现的方法中,其通过raftLog结构体以及progress结构体,实现raft算法中log部分的管理和节点对集群中其他节点信息的管理。并且通过node结构体提供给了应用层与共识部分沟通的渠道。
  • 通讯部分(github.com/coreos/etcd/rafthttp):对应其Transport组件。使用http协议完成节点间的相互通信。
  • 存储部分(github.com/coreos/etcd/raftsnap、github.com/coreos/etcd/wal):对应其snapshotter组件与wal组件。通过这两个组件分别对快照和日志条目进行持久化的存储。

raftNode组件:主要处理应用层服务

raftNode结构体,实际上是应用层的包装,真正的raft共识部分在其中的node(raft.Node)中。RaftNode结构体中可以放应用层需要的一些东西。大致有以下一些东西

  • 协程交互用的通道,包括proposeC,confChangeC,commitC,errorC。
  • raft节点的基本配置,包括id,peers,join,waldir,snapdir。
  • raft节点状态,包括lastIndex,confState,snapshotIndex,appliedIndex等。
  • raft共识部分的组件,包括node其是共识的关键部分,raftStorage是日志条目在内存中存储的实现,wal是日志条目存储在文件中的实现,snapshotter及其相关配置是snapshot处理的相关实现,transport是raft节点间通讯的组件。
/*
github.com/coreos/etcd/contrib/raftexample/raft.go
*/
// A key-value stream backed by raft
type raftNode struct {
	proposeC    <-chan string            // proposed messages (k,v)
	confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
	commitC     chan<- *string           // entries committed to log (k,v)
	errorC      chan<- error             // errors from raft session

	id          int      // client ID for raft session
	peers       []string // raft peer URLs
	join        bool     // node is joining an existing cluster
	waldir      string   // path to WAL directory
	snapdir     string   // path to snapshot directory
	getSnapshot func() ([]byte, error)
	lastIndex   uint64 // index of log at start

	confState     raftpb.ConfState
	snapshotIndex uint64                   //snapshotIndex
	appliedIndex  uint64                    //同论文中的appliedIndex,用于记录最新的已提交state machine执行的日志的索引

	// raft backing for the commit/error channel
	node        raft.Node                    //真正的共识部分的node
	raftStorage *raft.MemoryStorage          //raft中内存存储日志的部分
	wal         *wal.WAL                     //wal文件部分

	snapshotter      *raftsnap.Snapshotter
	snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready

	snapCount uint64
	transport *rafthttp.Transport
	stopc     chan struct{} // signals proposal channel closed
	httpstopc chan struct{} // signals http server to shutdown
	httpdonec chan struct{} // signals http server shutdown complete
}

大致有以下几种函数

  • 启动raft函数:startRaft
  • snapshot相关的函数,主要处理snapshot的触发、将什么内容保存到snapshot、保存snapshot、加载snapshot等事项,包括:maybeTriggerSnapshot,publishSnapshot,saveSnap,loadSnapshot
  • 日志条目相关的函数,主要将条目提交给state machine去执行,包括:publishEntries,entriesToApply
  • 重新加载和重放WAL,包括:openWAL,replayWAL
  • 响应Raft和客户端请求的函数:serverRaft,serveChannels

    node组件,应用层与共识模块的沟通者

    node结构体的主要作用是应用层和共识模块的衔接。将应用层的消息传递给底层共识模块,并将底层共识模块共识后的结果反馈给应用层。结构体的主要成员都是用来作消息传递用处的通道。

type node struct {
	propc      chan pb.Message
	recvc      chan pb.Message
	confc      chan pb.ConfChange
	confstatec chan pb.ConfState
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
	stop       chan struct{}
	status     chan chan Status

	logger Logger
}

大致函数有几种:

  1. 启动、新建、停止:StartNode,RestartNode,newNode,Stop。
  2. 发送命令给共识模块的函数:Tick,Campaign,Propose,Step,ProposeConfChange,ApplyConfChange,TransferLeadership。
  3. 反馈给应用层的函数Ready,Advance,,Status,ReportUnreachable,ReadIndex。

    raft组件:共识组件

    raft结构体,共识组件结构体。 首先放一下论文中需要维护的state来对比

currentTerm                   //当前任期   
votedFor                      //当前任期的候选者编号,无则为null
log[]                         //日志条目

//Volatile state on all servers,所有服务器上维护
commitIndex             //已知的最高的可被提交的日志条目的索引,初始为0
lastApplied             //当前已提交给state machine执行的条目的索引,初始为0

//Volatile state on leaders:(Reinitialized after election),只在leader节点上维护
nextIndex[]          //对于每一台服务器,下一条将要发给该服务器的条目的索引,初始为leader最后一条条目索引+1
matchIndex[]         //每一个服务器已知的最高的已复制的条目的索引,初始为0

etcd中raft实现中raft结构体内容,可以看出其主要包含一下几部分

  • state的基本项,包括:id,Term(论文中currentTerm),vote(论文中voteFor),raftLog(论文中log[],并且维护了comitIndex和lastApplied),prs及learnerPrs(论文中的matchIndex与nextIndex以及一些其他关于对应节点状态的信息)。
  • 一些其他状态项,包括state用于判别当前节点状态、isLearner判别当前节点是否是learner、lead表示当前leaderid、以及leadTransferee、pendingConfIndex、readStates、checkQuorum、preVote等。
  • 一些其他配置项,包括maxInflight,maxMsgSize,heartbeatTimeout,electionTimeout,randomizedElectionTimeout,disableProposalForwarding等。
  • 重要的函数,tick用来计时,step用来处理各种RPC。
type raft struct {
	//同论文中的state的一些基本项
	id         uint64               //节点id
	Term       uint64               //节点当前所处任期,即currentTerm
	Vote       uint64               // 即votedFor
	raftLog    *raftLog             //节点的日志,即log[]
	prs        map[uint64]*Progress //progress中存储了对应节点的matchIndex和nextIndex
	learnerPrs map[uint64]*Progress

	//一些其他状态
	readStates []ReadState
	state      StateType
	// isLearner is true if the local raft node is a learner.
	isLearner bool
	votes     map[uint64]bool
	msgs      []pb.Message
	readOnly  *readOnly
	lead      uint64 // the leader id
	// leadTransferee is id of the leader transfer target when its value is not zero.
	// Follow the procedure defined in raft thesis 3.10.
	leadTransferee   uint64
	pendingConfIndex uint64
	checkQuorum      bool
	preVote          bool

	//一些其他配置项
	maxInflight               int
	maxMsgSize                uint64
	electionElapsed           int
	heartbeatElapsed          int
	heartbeatTimeout          int
	electionTimeout           int
	randomizedElectionTimeout int
	disableProposalForwarding bool

	tick   func()
	step   stepFunc //按照leader,follower,candidate角色不同,有三个不同的函数
	logger Logger
}

raft结构体及其方法是算法中的主体部分。大致有以下几种函数:

  • 新建并启动:newRaft
  • 查询或者获取状态类:包括hasLeader,softState,hardState,quorum,nodes,learnerNodes,getProgress,maybeCommit,pastElectionTimeout,checkQuorumActive
  • 控制类函数:主要作用是根据消息类型调用响应的函数,包括:Step,stepLeader,stepCandidate,stepFollower
  • 发送消息类:包括send,sendAppend,sendHeartbeat,sendTimeoutNow,bcastAppend,bcastHearbeat,bcastHearbeatWithCtx。其中几个bcast打头函数表示向集群中的每个节点发送。
  • 处理消息类:包括handleAppendEntries,handleHeartbeat,handleSnapshot,
  • 处理集群成员变更类:addNode,addLearner,addNodeOrLearnerNode,removeNode,restoreNode,setProgress,delProgress
  • 实际操作类:包括appendEntry,
  • tick函数类:tickElection,tickHeartbeat
  • 角色转化类:becomeFollower,becomeCandidate,becomePreCandidate,becomeLeader
  • 辅助函数类:reset,restore,resetRandomizedElectionTimeout,loadState,campaign,poll

Progress:维护集群中的节点状态 Progress结构体,该结构体主要是在每个节点维护集群中其他节点的状态用。主要包括

  • 论文中用以共识用的Match(论文中的matchIndex),next(论文中的nextIndex)。
  • 包括一些节点的是否正常运行的状态RecentActive、Paused。
  • 节点的身份状态State及isLearner等。
type Progress struct {
	Match, Next     uint64
	State           ProgressStateType
	Paused          bool
	PendingSnapshot uint64
	RecentActive    bool
	ins             *inflights
	IsLearner       bool
}

大致有以下几种函数:

  • 控制类:pause、resume
  • 状态转变类函数: resetState, becomeProbe, becomeReplicate, becomeSnapshot
  • Match和Next维护类函数: maybeUpdate, optimisticUpdate, maybeDecrTo, -
  • 状态查询类函数:IsPaused
  • snapshot相关: snapshotFailure,needSnapshotAbort

raftLog:raft中的日志管理者 raftLog结构体,raft中的日志条目管理的结构体,前文中介绍raft中的日志存储包括文件存储和内存存储两部分。该结构体中storage即为内存存储的日志条目部分。unstable是在日志尚未存储到日志文件时的状态。commited和applied是论文中commitIndex和lastApplied的概念,及最新的已共识可提交的日志条目索引,和已提交给state machine执行的最新条目的索引。

type raftLog struct {
	// storage contains all stable entries since the last snapshot.
	storage Storage

	// unstable contains all unstable entries and snapshot.
	// they will be saved into storage.
	unstable unstable

	// committed is the highest log position that is known to be in
	// stable storage on a quorum of nodes.
	committed uint64
	// applied is the highest log position that the application has
	// been instructed to apply to its state machine.
	// Invariant: applied <= committed
	applied uint64

	logger Logger
}

大致有以下几种函数

  • 新建: newLog
  • 查询状态类:获取索引、快照或者条目信息等,包括:unstableEntries, nextEnts, hasNextEnts, snapshot, firstIndex, lastIndex, lastTerm , entries, allEntries。
  • 状态更新类:更新commited或者applied的值,包括:commitTo, appliedTo, stableTo, stableSnapTo 。
  • 条目更新类:append
  • 判断状态类:判断条目是否可以被追加到当前,是否是当前term,找出冲突等。包括:maybeAppend, findConflict, isUpToDate matchTerm, maybeCommit, mustCheckOutOfBounds
  • 实际操作类:
  • 辅助类:term, slice,zeroTermOnErrCompacted

    Snapshotter组件:快照管理者

type Snapshotter struct {
	dir string
}

大致包含保存快照SaveSnap,读取快照Read,加载快照Load等。

WAL组件:日志文件管理者

WAL结构体,用来管理日志文件。

type WAL struct {
	dir string // the living directory of the underlay files
	// dirFile is a fd for the wal directory for syncing on Rename
	dirFile *os.File
	metadata []byte           // metadata recorded at the head of each WAL
	state    raftpb.HardState // hardstate recorded at the head of WAL
	start     walpb.Snapshot // snapshot to start reading
	decoder   *decoder       // decoder to decode records
	readClose func() error   // closer for decode reader

	mu      sync.Mutex
	enti    uint64   // index of the last entry saved to the wal
	encoder *encoder // encoder to encode records

	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
	fp    *filePipeline
}

大致有以下几种函数:

  • 文件读取类:Open,OpenAtIndex,OpenForRead,ReadAll,
  • 文件写入类:saveEntry,saveState,Save,SaveSnapshot,saveCrc
  • 其他文件操作类:Create,renameWal,cut,Close

Transport组件:消息传输者

Transport组件是负责消息通信的,目前的实现方式是http的实现。

type Transport struct {
	DialTimeout time.Duration // maximum duration before timing out dial of the request
	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
	DialRetryFrequency rate.Limit

	TLSInfo transport.TLSInfo // TLS information used when creating connection

	ID          types.ID   // local member ID
	URLs        types.URLs // local peer URLs
	ClusterID   types.ID   // raft cluster ID for request validation
	Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
	Snapshotter *raftsnap.Snapshotter
	ServerStats *stats.ServerStats // used to record general transportation statistics
	// used to record transportation statistics with followers when
	// performing as leader in raft protocol
	LeaderStats *stats.LeaderStats
	// ErrorC is used to report detected critical errors, e.g.,
	// the member has been permanently removed from the cluster
	// When an error is received from ErrorC, user should stop raft state
	// machine and thus stop the Transport.
	ErrorC chan error

	streamRt   http.RoundTripper // roundTripper used by streams
	pipelineRt http.RoundTripper // roundTripper used by pipelines

	mu      sync.RWMutex         // protect the remote and peer map
	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
	peers   map[types.ID]Peer    // peers map

	prober probing.Prober
}

大致包含以下几种函数:

  • 启动和停止:Start,Stop
  • 消息发送和处理:Handler,Get,Send,SendSnapshot
  • 集群链接的维护:CutPeer,MendPeer,AddRemote,AddPeer,RemovePeer,RemoveAllPeers,UpdatePeer,ActiveSince,ActivePeers