Hyperledger fabric 1.0 代码解析 之 channel join

2017-07-21

channeljoin.png 1.sdk或client发起join channel请求,然后开始构建一个Proposal包体。 主要过程是通过getJoinCCSpec得到ChaincodeInvocationSpec(主要是指定chaincode为cscc,将block文件打包到input中),然后通过CreateProposalFromCIS以及GetSignedProposal 继续打包,主要是得到一个transcation id(txid)加上各种包头等。注意这里打包最后得到的Proposal请求理的channel id是““。

2.通过EndorserClient发起请求,调用endorser(通常是某个或某些peer)的ProcessProposal函数,处理第一步得到的Proposal包体。ProcessProposal中会调用simulateProposal对chaincode进行模拟执行。模拟执行过程中是通过一套状态机,通过消息变化在对transcation进行处理的时候,调用beforeTranscation,然后最终达到调用cscc的joinChain函数的。ProcessProposal的详细过程见Hyperledger fabric 1.0 代码解析 之 ProcessProposal

3.cscc端首先获得签名的Proposal,并进行policy的验证。验证成功后调用joinchain函数。 core/scc/cscc/configure.go

func (e *PeerConfiger) Invoke(stub shim.ChaincodeStubInterface) pb.Response {

	/*忽略Args参数个数检查部分*/

	cnflogger.Debugf("Invoke function: %s", fname)
	// Handle ACL:
	// 1. get the signed proposal
	sp, err := stub.GetSignedProposal()
	if err != nil {
		return shim.Error(fmt.Sprintf("Failed getting signed proposal from stub: [%s]", err))
	}

	switch fname {
	case JoinChain:
		/*以下代码中的错误处理暂时都忽略了*/
		block, err := utils.GetBlockFromBlockBytes(args[1])
		cid, err := utils.GetChainIDFromBlock(block)
		if err := validateConfigBlock(block); err != nil {
			/**/
		}

		// 2. check local MSP Admins policy
		if err = e.policyChecker.CheckPolicyNoChannel(mgmt.Admins, sp); err != nil {
			/**/
		}

		return joinChain(cid, block)
		/*忽略掉其他case情况*/
	}
}

4.根据发起请求自带的block参数,创建相应的chain信息,包括chain对应的ledger,并将该block数据提交到该ledger中。 core/scc/cscc/configure.go

func joinChain(chainID string, block *common.Block) pb.Response {
	if err := peer.CreateChainFromBlock(block); err != nil {
		return shim.Error(err.Error())
	}

	peer.InitChain(chainID)

	if err := producer.SendProducerBlockEvent(block); err != nil {
		cnflogger.Errorf("Error sending block event %s", err)
	}

	return shim.Success(nil)
}
// CreateChainFromBlock creates a new chain from config block
func CreateChainFromBlock(cb *common.Block) error {
	cid, err := utils.GetChainIDFromBlock(cb)
	if err != nil {
		return err
	}

	var l ledger.PeerLedger
	if l, err = ledgermgmt.CreateLedger(cb); err != nil {
		return fmt.Errorf("Cannot create ledger from genesis block, due to %s", err)
	}

	return createChain(cid, l, cb)
}

5.peer将创建好的chain信息同步到相同组织其它的peer中。并与orderer通信,并建立与orderer之间在该channerl上的数据传输通道。

// createChain creates a new chain object and insert it into the chains
func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {

	envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
	if err != nil {
		return err
	}

	configtxInitializer := configtx.NewInitializer()

	gossipEventer := service.GetGossipService().NewConfigEventer()

	gossipCallbackWrapper := func(cm configtxapi.Manager) {
		ac, ok := configtxInitializer.ApplicationConfig()
		if !ok {
			// TODO, handle a missing ApplicationConfig more gracefully
			ac = nil
		}
		gossipEventer.ProcessConfigUpdate(&chainSupport{
			Manager:     cm,
			Application: ac,
		})
		service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
			// TODO: this is a place-holder that would somehow make the MSP layer suspect
			// that a given certificate is revoked, or its intermediate CA is revoked.
			// In the meantime, before we have such an ability, we return true in order
			// to suspect ALL identities in order to validate all of them.
			return true
		})
	}

	trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {
		updateTrustedRoots(cm)
	}

	configtxManager, err := configtx.NewManagerImpl(
		envelopeConfig,
		configtxInitializer,
		[]func(cm configtxapi.Manager){gossipCallbackWrapper, trustedRootsCallbackWrapper},
	)
	if err != nil {
		return err
	}

	// TODO remove once all references to mspmgmt are gone from peer code
	mspmgmt.XXXSetMSPManager(cid, configtxManager.MSPManager())

	ac, ok := configtxInitializer.ApplicationConfig()
	if !ok {
		ac = nil
	}
	cs := &chainSupport{
		Manager:     configtxManager,
		Application: ac, // TODO, refactor as this is accessible through Manager
		ledger:      ledger,
	}

	c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {
		chainID, err := utils.GetChainIDFromBlock(block)
		if err != nil {
			return err
		}
		return SetCurrConfigBlock(block, chainID)
	})

	ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()
	if len(ordererAddresses) == 0 {
		return errors.New("No ordering service endpoint provided in configuration block")
	}
	service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)

	chains.Lock()
	defer chains.Unlock()
	chains.list[cid] = &chain{
		cs:        cs,
		cb:        cb,
		committer: c,
	}

	
	var vi *pp.VariableInspector
	vi = pp.GetInstance()
	vi.Register("chains",chains)
	return nil
}

6.返回创建chain创建成果通知消息。该部分从代码层面会回到core/chaincode/shim/chaincode.go的handleTranscation中触发Transcation的nextState即COMPELETED。状态发生变化后会执行chatWithPeer以及processStream的相应函数。最终会调用notify函数。通知加入channel完成。 core/chaincode/handler.go

func (handler *Handler) HandleMessage(msg *pb.ChaincodeMessage) error {
	chaincodeLogger.Debugf("[%s]Fabric side Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, handler.FSM.Current())

	if (msg.Type == pb.ChaincodeMessage_COMPLETED || msg.Type == pb.ChaincodeMessage_ERROR) && handler.FSM.Current() == "ready" {
		chaincodeLogger.Debugf("[%s]HandleMessage- COMPLETED. Notify", msg.Txid)
		handler.notify(msg)
		return nil
	}
}