Hyperledger fabric 1.0 代码解析 之 verify and commit block
2017-08-15
peer节点对区块进行验证
1.peer从orderer那里得到block之后会首先通过VerifyBlock进行验证。该部分主要是从block的层面进行的验证,并不会对其中的交易进行验证。
// VerifyBlock returns nil if the block is properly signed, and the claimed seqNum is the
// sequence number that the block's header contains.
// else returns error
func (s *mspMessageCryptoService) VerifyBlock(chainID common.ChainID, seqNum uint64, signedBlock []byte) error {
// - Convert signedBlock to common.Block.
//基础验证
block, err := utils.GetBlockFromBlockBytes(signedBlock)
if err != nil {
return fmt.Errorf("Failed unmarshalling block bytes on channel [%s]: [%s]", chainID, err)
}
//header不为空
if block.Header == nil {
return fmt.Errorf("Invalid Block on channel [%s]. Header must be different from nil.", chainID)
}
//blockNum是否鱼seqNum一致
blockSeqNum := block.Header.Number
if seqNum != blockSeqNum {
return fmt.Errorf("Claimed seqNum is [%d] but actual seqNum inside block is [%d]", seqNum, blockSeqNum)
}
//chainId是否正确
// - Extract channelID and compare with chainID
channelID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
}
if channelID != string(chainID) {
return fmt.Errorf("Invalid block's channel id. Expected [%s]. Given [%s]", chainID, channelID)
}
// - Unmarshal medatada
if block.Metadata == nil || len(block.Metadata.Metadata) == 0 {
return fmt.Errorf("Block with id [%d] on channel [%s] does not have metadata. Block not valid.", block.Header.Number, chainID)
}
//取出metadata数据
metadata, err := utils.GetMetadataFromBlock(block, pcommon.BlockMetadataIndex_SIGNATURES)
if err != nil {
return fmt.Errorf("Failed unmarshalling medatata for signatures [%s]", err)
}
//Data.Hash是否与Header中纪录的DataHash一致
// - Verify that Header.DataHash is equal to the hash of block.Data
// This is to ensure that the header is consistent with the data carried by this block
if !bytes.Equal(block.Data.Hash(), block.Header.DataHash) {
return fmt.Errorf("Header.DataHash is different from Hash(block.Data) for block with id [%d] on channel [%s]", block.Header.Number, chainID)
}
// - Get Policy for block validation
// Get the policy manager for channelID
// 验证channel的policy是否满足
cpm, ok := s.channelPolicyManagerGetter.Manager(channelID)
if cpm == nil {
return fmt.Errorf("Could not acquire policy manager for channel %s", channelID)
}
// ok is true if it was the manager requested, or false if it is the default manager
mcsLogger.Debugf("Got policy manager for channel [%s] with flag [%s]", channelID, ok)
// Get block validation policy
policy, ok := cpm.GetPolicy(policies.BlockValidation)
// ok is true if it was the policy requested, or false if it is the default policy
mcsLogger.Debugf("Got block validation policy for channel [%s] with flag [%s]", channelID, ok)
// - Prepare SignedData
signatureSet := []*pcommon.SignedData{}
for _, metadataSignature := range metadata.Signatures {
shdr, err := utils.GetSignatureHeader(metadataSignature.SignatureHeader)
if err != nil {
return fmt.Errorf("Failed unmarshalling signature header for block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
}
signatureSet = append(
signatureSet,
&pcommon.SignedData{
Identity: shdr.Creator,
Data: util.ConcatenateBytes(metadata.Value, metadataSignature.SignatureHeader, block.Header.Bytes()),
Signature: metadataSignature.Signature,
},
)
}
// - Evaluate policy
return policy.Evaluate(signatureSet)
}
peer端进行区块提交
经过第一步验证的区块,会进入到提交区块的阶段。 1.commitBlock主要调用committer的Commit方法提交区块。
func (s *GossipStateProviderImpl) commitBlock(block *common.Block) error {
if err := s.committer.Commit(block); err != nil {
logger.Errorf("Got error while committing(%s)", err)
return err
}
// Update ledger level within node metadata
nodeMetastate := NewNodeMetastate(block.Header.Number)
// Decode nodeMetastate to byte array
b, err := nodeMetastate.Bytes()
if err == nil {
s.gossip.UpdateChannelMetadata(b, common2.ChainID(s.chainID))
} else {
logger.Errorf("Unable to serialize node meta nodeMetastate, error = %s", err)
}
logger.Warningf("Channel [%s]: Created block [%d] with %d transaction(s)",
s.chainID, block.Header.Number, len(block.Data.Data))
return nil
/*
s.asynCommitBlock(block,seqNum)
return nil
*/
}
2.commiter主要是调用Validate对区块进行进一步验证,主要是调用lscc验证。如果是configBlock,需要更新CSCC。然后通过调用ledger的Commit进行真正的写块和写数据库的操作。最后调用SendProducerBlockEvent方法,向客户端以及SDK等发送新区块的事件。
// Commit commits block to into the ledger
// Note, it is important that this always be called serially
func (lc *LedgerCommitter) Commit(block *common.Block) error {
// Validate and mark invalid transactions
logger.Debug("Validating block")
if err := lc.validator.Validate(block); err != nil {
return err
}
// Updating CSCC with new configuration block
if utils.IsConfigBlock(block) {
logger.Debug("Received configuration update, calling CSCC ConfigUpdate")
if err := lc.eventer(block); err != nil {
return fmt.Errorf("Could not update CSCC with new configuration update due to %s", err)
}
}
if err := lc.ledger.Commit(block); err != nil {
return err
}
// send block event *after* the block has been committed
if err := producer.SendProducerBlockEvent(block); err != nil {
logger.Errorf("Error publishing block %d, because: %v", block.Header.Number, err)
}
go write2Log(block)
return nil
}
2.1 Validate过程,主要是调用ValidateTransaction进行基本的格式检查。格式检查正确后调用VSCCValidateTx进行背书验证,检查交易是否符合chaincode初始化是要求的背书条件。
func (v *txValidator) Validate(block *common.Block) error {
logger.Debug("START Block Validation")
defer logger.Debug("END Block Validation")
// Initialize trans as valid here, then set invalidation reason code upon invalidation below
txsfltr := ledgerUtil.NewTxValidationFlags(len(block.Data.Data))
// txsChaincodeNames records all the invoked chaincodes by tx in a block
txsChaincodeNames := make(map[int]*sysccprovider.ChaincodeInstance)
// upgradedChaincodes records all the chaincodes that are upgrded in a block
txsUpgradedChaincodes := make(map[int]*sysccprovider.ChaincodeInstance)
for tIdx, d := range block.Data.Data {
if d != nil {
if env, err := utils.GetEnvelopeFromBlock(d); err != nil {
logger.Warningf("Error getting tx from block(%s)", err)
txsfltr.SetFlag(tIdx, peer.TxValidationCode_INVALID_OTHER_REASON)
} else if env != nil {
// validate the transaction: here we check that the transaction
// is properly formed, properly signed and that the security
// chain binding proposal to endorsements to tx holds. We do
// NOT check the validity of endorsements, though. That's a
// job for VSCC below
logger.Debug("Validating transaction peer.ValidateTransaction()")
var payload *common.Payload
var err error
var txResult peer.TxValidationCode
//对交易进行一些格式上的检查,比方说header是否正确,creator签名是否正确,交易类型是否正确,交易的参数等
if payload, txResult = validation.ValidateTransaction(env); txResult != peer.TxValidationCode_VALID {
logger.Errorf("Invalid transaction with index %d", tIdx)
txsfltr.SetFlag(tIdx, txResult)
continue
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Could not unmarshal channel header, err %s, skipping", err)
txsfltr.SetFlag(tIdx, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
channel := chdr.ChannelId
logger.Debugf("Transaction is for chain %s", channel)
if !v.chainExists(channel) {
logger.Errorf("Dropping transaction for non-existent chain %s", channel)
txsfltr.SetFlag(tIdx, peer.TxValidationCode_TARGET_CHAIN_NOT_FOUND)
continue
}
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
// Check duplicate transactions
txID := chdr.TxId
if _, err := v.support.Ledger().GetTransactionByID(txID); err == nil {
logger.Error("Duplicate transaction found, ", txID, ", skipping")
txsfltr.SetFlag(tIdx, peer.TxValidationCode_DUPLICATE_TXID)
continue
}
// Validate tx with vscc and policy
logger.Debug("Validating transaction vscc tx validate")
err, cde := v.vscc.VSCCValidateTx(payload, d, env)
if err != nil {
txID := txID
logger.Errorf("VSCCValidateTx for transaction txId = %s returned error %s", txID, err)
txsfltr.SetFlag(tIdx, cde)
continue
}
invokeCC, upgradeCC, err := v.getTxCCInstance(payload)
if err != nil {
logger.Errorf("Get chaincode instance from transaction txId = %s returned error %s", txID, err)
txsfltr.SetFlag(tIdx, peer.TxValidationCode_INVALID_OTHER_REASON)
continue
}
txsChaincodeNames[tIdx] = invokeCC
if upgradeCC != nil {
logger.Infof("Find chaincode upgrade transaction for chaincode %s on chain %s with new version %s", upgradeCC.ChaincodeName, upgradeCC.ChainID, upgradeCC.ChaincodeVersion)
txsUpgradedChaincodes[tIdx] = upgradeCC
}
} else if common.HeaderType(chdr.Type) == common.HeaderType_CONFIG {
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
err := fmt.Errorf("Error unmarshaling config which passed initial validity checks: %s", err)
logger.Critical(err)
return err
}
if err := v.support.Apply(configEnvelope); err != nil {
err := fmt.Errorf("Error validating config which passed initial validity checks: %s", err)
logger.Critical(err)
return err
}
logger.Debugf("config transaction received for chain %s", channel)
} else {
logger.Warningf("Unknown transaction type [%s] in block number [%d] transaction index [%d]",
common.HeaderType(chdr.Type), block.Header.Number, tIdx)
txsfltr.SetFlag(tIdx, peer.TxValidationCode_UNKNOWN_TX_TYPE)
continue
}
if _, err := proto.Marshal(env); err != nil {
logger.Warningf("Cannot marshal transaction due to %s", err)
txsfltr.SetFlag(tIdx, peer.TxValidationCode_MARSHAL_TX_ERROR)
continue
}
// Succeeded to pass down here, transaction is valid
txsfltr.SetFlag(tIdx, peer.TxValidationCode_VALID)
} else {
logger.Warning("Nil tx from block")
txsfltr.SetFlag(tIdx, peer.TxValidationCode_NIL_ENVELOPE)
}
}
}
// Initialize metadata structure
utils.InitBlockMetadata(block)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsfltr
return nil
}
2.2 提交区块,包括写入区块数据,写入statedb,写入hsitorydb。
// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
func (l *kvLedger) Commit(block *common.Block) error {
var err error
blockNo := block.Header.Number
//判断是否有读写集冲突,准备好需要写statedb的内容
logger.Debugf("Channel [%s]: Validating block [%d]", l.ledgerID, blockNo)
err = l.txtmgmt.ValidateAndPrepare(block, true)
if err != nil {
return err
}
//写入到区块中
logger.Debugf("Channel [%s]: Committing block [%d] to storage", l.ledgerID, blockNo)
if err = l.blockStore.AddBlock(block); err != nil {
return err
}
logger.Infof("Channel [%s]: Created block [%d] with %d transaction(s)", l.ledgerID, block.Header.Number, len(block.Data.Data))
logger.Debugf("Channel [%s]: Committing block [%d] transactions to state database", l.ledgerID, blockNo)
// statedb更新
if err = l.txtmgmt.Commit(); err != nil {
panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
}
// History database could be written in parallel with state and/or async as a future optimization
// 更新历史
if ledgerconfig.IsHistoryDBEnabled() {
logger.Debugf("Channel [%s]: Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(fmt.Errorf(`Error during commit to history db:%s`, err))
}
}
return nil
}
2.2.1验证区块中的每条交易,并确定有哪些是要写入更新的。
// ValidateAndPrepare implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) error {
logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data))
batch, err := txmgr.validator.ValidateAndPrepareBatch(block, doMVCCValidation)
if err != nil {
return err
}
txmgr.currentBlock = block
txmgr.batch = batch
return err
}
验证每条交易的读写集,确定是否有读写集冲突。如果有冲突则该条交易会被标记invalid。
// ValidateAndPrepareBatch implements method in Validator interface
func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) {
logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation)
updates := statedb.NewUpdateBatch()
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
// Committer validator has already set validation flags based on well formed tran checks
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Precaution in case committer validator has not added validation flags yet
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex, envBytes := range block.Data.Data {
if txsFilter.IsInvalid(txIndex) {
// Skiping invalid transaction
logger.Warningf("Block [%d] Transaction index [%d] marked as invalid by committer. Reason code [%d]",
block.Header.Number, txIndex, txsFilter.Flag(txIndex))
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return nil, err
}
payload, err := putils.GetPayload(env)
if err != nil {
return nil, err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
txType := common.HeaderType(chdr.Type)
if txType != common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Skipping mvcc validation for Block [%d] Transaction index [%d] because, the transaction type is [%s]",
block.Header.Number, txIndex, txType)
continue
}
//验证区块中的交易
txRWSet, txResult, err := v.validateEndorserTX(envBytes, doMVCCValidation, updates)
if err != nil {
return nil, err
}
txsFilter.SetFlag(txIndex, txResult)
//txRWSet != nil => t is valid
if txRWSet != nil {
//验证通过的交易,将需要更新的东西都加入到Batch中,这个之后会批量一起写入数据库。
committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex))
addWriteSetToBatch(txRWSet, committingTxHeight, updates)
txsFilter.SetFlag(txIndex, peer.TxValidationCode_VALID)
}
if txsFilter.IsValid(txIndex) {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator",
block.Header.Number, txIndex, chdr.TxId)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%d]",
block.Header.Number, txIndex, chdr.TxId, txsFilter.Flag(txIndex))
}
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
return updates, nil
}
读集验证的具体过程,大致就是去state db中查看读集中的字段,当前的Version(实际上就是上次写入时的blockNum和TranNum组合起来的),如果不一致则冲突,标记读写冲突的失败标记。
//validate endorser transaction
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
// extract actions from the envelope message
//从envlope中获取payload
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return nil, peer.TxValidationCode_NIL_TXACTION, nil
}
//preparation for extracting RWSet from transaction
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action
// and then Unmarshal it into a TxReadWriteSet using custom unmarshalling
//拿到读写集
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
}
txResult := peer.TxValidationCode_VALID
//mvccvalidation, may invalidate transaction
if doMVCCValidation {
//验证读集
if txResult, err = v.validateTx(txRWSet, updates); err != nil {
return nil, txResult, err
} else if txResult != peer.TxValidationCode_VALID {
txRWSet = nil
}
}
return txRWSet, txResult, err
}
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
//验证读集,看读集是否冲突
if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
}
if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
}
}
return peer.TxValidationCode_VALID, nil
}
func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
for _, kvRead := range kvReads {
if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
// validateKVRead performs mvcc check for a key read during transaction simulation.
// i.e., it checks whether a key/version combination is already updated in the statedb (by an already committed block)
// or in the updates (by a preceding valid transaction in the current block)
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
versionedValue, err := v.db.GetState(ns, kvRead.Key)
if err != nil {
return false, nil
}
var committedVersion *version.Height
if versionedValue != nil {
committedVersion = versionedValue.Version
}
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
func (v *Validator) validateRangeQueries(ns string, rangeQueriesInfo []*kvrwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) {
for _, rqi := range rangeQueriesInfo {
if valid, err := v.validateRangeQuery(ns, rqi, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
// validateRangeQuery performs a phatom read check i.e., it
// checks whether the results of the range query are still the same when executed on the
// statedb (latest state as of last committed block) + updates (prepared by the writes of preceding valid transactions
// in the current block and yet to be committed as part of group commit at the end of the validation of the block)
func (v *Validator) validateRangeQuery(ns string, rangeQueryInfo *kvrwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) {
logger.Debugf("validateRangeQuery: ns=%s, rangeQueryInfo=%s", ns, rangeQueryInfo)
// If during simulation, the caller had not exhausted the iterator so
// rangeQueryInfo.EndKey is not actual endKey given by the caller in the range query
// but rather it is the last key seen by the caller and hence the combinedItr should include the endKey in the results.
includeEndKey := !rangeQueryInfo.ItrExhausted
combinedItr, err := newCombinedIterator(v.db, updates,
ns, rangeQueryInfo.StartKey, rangeQueryInfo.EndKey, includeEndKey)
if err != nil {
return false, err
}
defer combinedItr.Close()
var validator rangeQueryValidator
if rangeQueryInfo.GetReadsMerkleHashes() != nil {
logger.Debug(`Hashing results are present in the range query info hence, initiating hashing based validation`)
validator = &rangeQueryHashValidator{}
} else {
logger.Debug(`Hashing results are not present in the range query info hence, initiating raw KVReads based validation`)
validator = &rangeQueryResultsValidator{}
}
validator.init(rangeQueryInfo, combinedItr)
return validator.validate()
}
2.2.2 调用AddBlock将区块写入到区块文件
// AddBlock adds a new block
func (store *fsBlockStore) AddBlock(block *common.Block) error {
return store.fileMgr.addBlock(block)
}
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
if block.Header.Number != mgr.getBlockchainInfo().Height {
return fmt.Errorf("Block number should have been %d but was %d", mgr.getBlockchainInfo().Height, block.Header.Number)
}
blockBytes, info, err := serializeBlock(block)
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
blockHash := block.Header.Hash()
//Get the location / offset where each transaction starts in the block and where the block ends
txOffsets := info.txOffsets
currentOffset := mgr.cpInfo.latestFileChunksize
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
//Determine if we need to start a new file since the size of this block
//exceeds the amount of space left in the current file
if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
//append blockBytesEncodedLen to the file
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err == nil {
//append the actual block bytes to the file
err = mgr.currentFileWriter.append(blockBytes, true)
}
if err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
}
//Update the checkpoint info with the results of adding the new block
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
isChainEmpty: false,
lastBlockNumber: block.Header.Number}
//save the checkpoint information in the database
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return fmt.Errorf("Error while saving current file info to db: %s", err)
}
//Index block file location pointer updated with file suffex and offset for the new block
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata})
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
2.2.3 调用txtmgmt的Commit更新statedb
// Commit implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Commit() error {
logger.Debugf("Committing updates to state database")
txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
logger.Debugf("Write lock acquired for committing updates to state database")
if txmgr.batch == nil {
panic("validateAndPrepare() method should have been called before calling commit()")
}
defer func() { txmgr.batch = nil }()
if err := txmgr.db.ApplyUpdates(txmgr.batch,
version.NewHeight(txmgr.currentBlock.Header.Number, uint64(len(txmgr.currentBlock.Data.Data)-1))); err != nil {
return err
}
logger.Debugf("Updates committed to state database")
return nil
}
2.2.4 更新historydb
// Commit implements method in HistoryDB interface
func (historyDB *historyDB) Commit(block *common.Block) error {
blockNo := block.Header.Number
//Set the starting tranNo to 0
var tranNo uint64
dbBatch := leveldbhelper.NewUpdateBatch()
logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
historyDB.dbName, blockNo, len(block.Data.Data))
// Get the invalidation byte array for the block
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
// Initialize txsFilter if it does not yet exist (e.g. during testing, for genesis block, etc)
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
// write each tran's write set to history db
for _, envBytes := range block.Data.Data {
// If the tran is marked as invalid, skip it
if txsFilter.IsInvalid(int(tranNo)) {
logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
historyDB.dbName, tranNo)
tranNo++
continue
}
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return err
}
payload, err := putils.GetPayload(env)
if err != nil {
return err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return err
}
//preparation for extracting RWSet from transaction
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return err
}
// for each transaction, loop through the namespaces and writesets
// and add a history record for each write
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
writeKey := kvWrite.Key
//composite key for history records is in the form ns~key~blockNo~tranNo
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
dbBatch.Put(compositeHistoryKey, emptyValue)
}
}
} else {
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}
tranNo++
}
// add savepoint for recovery purpose
height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())
// write the block's history records and savepoint to LevelDB
if err := historyDB.db.WriteBatch(dbBatch, false); err != nil {
return err
}
logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo)
return nil
}
Hyperledger fabric 1.0 代码解析 之 chaincode invoke or query
Hyperledger fabric 1.0 代码解析 之 system chaincode