Hyperledger fabric 1.0 代码解析 之 channel update
client端逻辑代码
“github.com/hyperledger/fabric/peer/channel/update.go”
1.client发起channel update –c channel_name -f tx_path请求,其中参数channel_name为channel的名称,tx_path为需要更新的tx文件的路径。目前channel的 update操作只支持从tx文件更新。
2.接收到update请求后,从ConfigTx文件中(ReadFile)生成包体Envelope,并对生成的包体Envelope进行校验与签名,通过broadcast接口向orderer发送update channel消息即签过名的包体Envelope。
orderer端逻辑代码
“github.com/hyperledger/fabric/orderer” 该部分有很多和channel create的内容是重复的,主要差别在第4步及之后的内容。
1.orderer端对于broadcast、deliver和join分别有3个对应的handler来处理(server.go).Handle为broadcast主处理函数(broadcast/broadcast.go),负责peer消息的处理,包括channel的创建以及其他类型的消息,其中channel通过类型HeaderType_CONFIG_UPDATE进行区分。
2.若channelHeaderType是HeaderType_CONFIG_UPDATE,则为create channel或者是update channel的操作。此时调用process函数来获得新建或者更新的包体Envelope.
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
//...
}
3.Process(Configupdate/configupdate.go)以channel相关信息生成Envelope消息,并将该Envelope消息重新打包到以TYPE: HeaderType_CONFIG_UPDATE为类型的消息体中。函数中通过判断包体传递过来的chainID是否已存在来判断是create还是update操作。chainID存在,则调用existingChannelConfig函数。 Configupdate/configupdate.go
func (p *Processor) Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error) {
channelID, err := channelID(envConfigUpdate)
if err != nil {
return nil, err
}
support, ok := p.manager.GetChain(channelID)
if ok {
logger.Debugf("Processing channel reconfiguration request for channel %s", channelID)
return p.existingChannelConfig(envConfigUpdate, channelID, support)
}
logger.Debugf("Processing channel creation request for channel %s", channelID)
return p.newChannelConfig(channelID, envConfigUpdate)
}
4.通过GetChain获得该channel的ChainSupport。(在channel create的时候这个地方是system chain)
support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}
5.support.Filters().Apply(msg),该处的filter为该channel的filter。
_, filterErr := support.Filters().Apply(msg)
if filterErr != nil {
logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
以下通过代码看下普通channel的Filters与SystemChainFilters的区别。普通channel是没有SystemChainFilter的。
// createStandardFilters creates the set of filters for a normal (non-system) chain
func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
// createSystemChainFilters creates the set of filters for the ordering system chain
func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig().BatchSize().AbsoluteMaxBytes),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
newSystemChainFilter(ledgerResources, ml),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
6.Enqueue,这个地方在通道里等待orderer的处理,orderer处理方式可以在相应的consensus.go文件中找到。这边orderer何时生成block的具体代码分析,参见Hyperledger fabric 代码解析 之 Orderer Service中的2.6部分
if !support.Enqueue(msg) {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
consensus.go
func (ch *chain) main() {
var timer <-chan time.Time
for {
select {
case msg := <-ch.sendChan:
batches, committers, ok := ch.support.BlockCutter().Ordered(msg)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.batchTimeout)
continue
}
//将batches里的block挨个处理
for i, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers[i], nil)
}
if len(batches) > 0 {
timer = nil
}
//超时处理
case <-timer:
//clear the timer
timer = nil
batch, committers := ch.support.BlockCutter().Cut()
if len(batch) == 0 {
logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
logger.Debugf("Batch timer expired, creating block")
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers, nil)
case <-ch.exitChan:
logger.Debugf("Exiting")
return
}
}
}
7.WriteBlock,等待orderer这边满足写块的条件。这个地方会调用第五步filter.commiter返回的Committer的commit。 muitichain/chainsupport.go
func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
//之前filter返回的committer到这一步才真正的commit
for _, committer := range committers {
committer.Commit()
}
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
cs.addBlockSignature(block)
//configGroup的那些
cs.addLastConfigSignature(block)
err := cs.ledger.Append(block)
if err != nil {
logger.Panicf("[channel: %s] Could not append block: %s", cs.ChainID(), err)
}
logger.Debugf("[channel: %s] Wrote block %d", cs.ChainID(), block.GetHeader().Number)
return block
}
func (cc *configCommitter) Commit() {
err := cc.manager.Apply(cc.configEnvelope)
if err != nil {
panic(fmt.Errorf("Could not apply config transaction which should have already been validated: %s", err))
}
}
// Apply attempts to apply a ConfigEnvelope to become the new config
func (cm *configManager) Apply(configEnv *cb.ConfigEnvelope) error {
//prepareApply进行一堆的验证
configMap, result, err := cm.prepareApply(configEnv)
if err != nil {
return err
}
result.commit()
cm.current = &configSet{
configMap: configMap,
channelID: cm.current.channelID,
sequence: configEnv.Config.Sequence,
configEnv: configEnv,
}
cm.commitCallbacks()
return nil
}
8.Send