Hyperledger fabric 1.0 代码解析 之 channel create
Client端逻辑代码
“github.com/hyperledger/fabric/peer/channel/create.go”
1.client发起channel create –c channel_name请求,其中参数channel_name为channel的名称。
2.client端从ConfigTx文件中(createChannelFromConfigTx)或者按照默认配置(createChannelFromDefaults)生成包体Envelope,并对生成的包体Envelope进行校验与签名,通过broadcast接口向orderer发送创建channel消息即签过名的包体Envelope。 (sendCreateChainTransaction)
3.client端收到channel创建成功消息后, 通过deliver接口获取orderer中第0块block(getGenesisBlock),并将该block存入本地生成channelname.block文件,作为该channel的创始块(WriteFile)。
Orderer端逻辑代码
“github.com/hyperledger/fabric/orderer”
1.列表orderer端对于broadcast、deliver和join分别有3个对应的handler来处理(server.go).Handle为broadcast主处理函数(broadcast/broadcast.go),负责peer消息的处理,包括channel的创建以及其他类型的消息,其中channel通过类型HeaderType_CONFIG_UPDATE进行区分。
2.若channelHeaderType是HeaderType_CONFIG_UPDATE,则为create channel或者是update channel的操作。此时调用process函数来获得新建或者更新的包体Envelope.
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
//...
}
3.Process(Configupdate/configupdate.go)以channel相关信息生成Envelope消息,并将该Envelope消息重新打包到以TYPE: HeaderType_CONFIG_UPDATE为类型的消息体中。函数中通过判断包体传递过来的chainID是否已存在来判断是create还是update操作。chainID不存在,则调用newChannelConfig函数生成新的channelconfig。(这里其实有很多内容待会再分析) Configupdate/configupdate.go
func (p *Processor) Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error) {
channelID, err := channelID(envConfigUpdate)
if err != nil {
return nil, err
}
support, ok := p.manager.GetChain(channelID)
if ok {
logger.Debugf("Processing channel reconfiguration request for channel %s", channelID)
return p.existingChannelConfig(envConfigUpdate, channelID, support)
}
logger.Debugf("Processing channel creation request for channel %s", channelID)
return p.newChannelConfig(channelID, envConfigUpdate)
}
4.通过GetChain获得System channel的ChainSupport。(现在这个chain还不存在)
support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}
5.support.Filters().Apply(msg),其中filter由一个名为systemChainFilter(mutichain/systemchain.go) 负责,其功能为生成一个新的chain(这个时候还暂时不会生成真正的chain,只是会得到一个相应的commiter)。
_, filterErr := support.Filters().Apply(msg)
if filterErr != nil {
logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
mutichain/systemchain.go
func (scf *systemChainFilter) Apply(env *cb.Envelope) (filter.Action, filter.Committer) {
msgData := &cb.Payload{}
//msgData 的各种检查
err := proto.Unmarshal(env.Payload, msgData)
if err != nil {
return filter.Forward, nil
}
//...
//当前是否可以创建channel的检查,(检查maxChannels)
maxChannels := scf.support.SharedConfig().MaxChannelsCount()
//...
//验证是否有权限操作,主要涉及签名的验证等
err = scf.authorizeAndInspect(configTx)
if err != nil {
logger.Debugf("Rejecting channel creation because: %s", err)
returnfilter.Reject, nil
}
return filter.Accept,&systemChainCommitter{
filter: scf,
configTx: configTx,
}
}
6.Enqueue,这个地方在通道里等待orderer的处理,orderer处理方式可以在相应的consensus.go文件中找到。这边orderer何时生成block的具体代码分析,参见Hyperledger fabric 代码解析 之 Orderer Service) 中的2.6部分
if !support.Enqueue(msg) {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
consensus.go
func (ch *chain) main() {
var timer <-chan time.Time
for {
select {
case msg := <-ch.sendChan:
batches, committers, ok := ch.support.BlockCutter().Ordered(msg)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.batchTimeout)
continue
}
//将batches里的block挨个处理
for i, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers[i], nil)
}
if len(batches) > 0 {
timer = nil
}
//超时处理
case <-timer:
//clear the timer
timer = nil
batch, committers := ch.support.BlockCutter().Cut()
if len(batch) == 0 {
logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
logger.Debugf("Batch timer expired, creating block")
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers, nil)
case <-ch.exitChan:
logger.Debugf("Exiting")
return
}
}
}
7.WriteBlock,等待orderer这边满足写块的条件。这个地方第五步filter.commiter返回的systemChainCommitter的commit,这个地方才会去调用newChain,才是真正的创建channel。 muitichain/chainsupport.go
func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
for _, committer := range committers {
committer.Commit()
}
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
cs.addBlockSignature(block)
cs.addLastConfigSignature(block)
err := cs.ledger.Append(block)
if err != nil {
logger.Panicf("[channel: %s] Could not append block: %s", cs.ChainID(), err)
}
logger.Debugf("[channel: %s] Wrote block %d", cs.ChainID(), block.GetHeader().Number)
return block
}
multichain/systemchain.go
func (scc *systemChainCommitter) Commit() {
logger.Warningf("Commit channel creation in systemCommitter")
scc.filter.cc.newChain(scc.configTx)
}
multichain/manager.go
func (ml *multiLedger) newChain(configtx *cb.Envelope) {
ledgerResources := ml.newLedgerResources(configtx)
ledgerResources.ledger.Append(ledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*chainSupport)
for key, value := range ml.chains {
newChains[key] = value
}
cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer)
chainID := ledgerResources.ChainID()
logger.Infof("Created and starting new chain %s", chainID)
newChains[string(chainID)] = cs
cs.start()
ml.chains = newChains
}
8.Send