Hyperledger fabric 1.0 代码解析 之 chaincode launch
2017-08-02
Launch chaincode整体流程
智能合约(chaincode)的执行一般是在docker中进行的,所以在执行之前,需要先完成将chaincode打包到docker容器中的过程。整个过程在core/chaincode/chaincode_support.go 的Launch函数中完成。
- 已经Launch的情况。对于正在运行(running)的则直接返回。
- 得到cds(ChaincodeDeploymentSpec),对于deploy情况在函数开始的地方便可以获得,对于非deploy情况下(cds == nil)需要先从lscc中得到chaincode data,然后从返回中得到cds。
- 对于系统container以及非dev模式需要lanuch容器。其它情况直接执行步骤6.(peer节点配置文件中的chaincode.mode,有两种模式分别为”dev”和”net”,”dev”chaincode是在本地执行,”net”模式chaincode是在docker中执行。”dev”的时候userRunsCC为true,即本地执行。执行环境exec分为有system和docker两种)
- 若codePackage为空,对于!userRunsCC且执行环境不是ChaincodeDeploymentSpec_SYSTEM,由于是需要部署在docker上的,必须要有chaincode信息,则尝试从文件系统中读取,若失败则直接返回。
- 创建builder(这里builder主要是读取cds的reader,之后在必要时用来创建镜像),然后交由launchAndWaitForRegister处理。该过程中会检查是否已经有镜像,没有则创建它,并启动相应的container。
- chaincodeSupport调用sendReady,保证container进入状态机的ready态。这部分是状态机的状态变化和事件。详见core/chaincode/handler.go
// Launch will launch the chaincode if not running (if running return nil) and will wait for handler of the chaincode to get into FSM ready state.
func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid *ccprovider.CCContext, spec interface{}) (*pb.ChaincodeID, *pb.ChaincodeInput, error) {
//build the chaincode
/*得到chaincodeID和chaincodeInput以及cds...*/
var cID *pb.ChaincodeID
var cMsg *pb.ChaincodeInput
var cds *pb.ChaincodeDeploymentSpec
var ci *pb.ChaincodeInvocationSpec
if cds, _ = spec.(*pb.ChaincodeDeploymentSpec); cds == nil {
if ci, _ = spec.(*pb.ChaincodeInvocationSpec); ci == nil {
panic("Launch should be called with deployment or invocation spec")
}
}
if cds != nil {
cID = cds.ChaincodeSpec.ChaincodeId
cMsg = cds.ChaincodeSpec.Input
} else {
cID = ci.ChaincodeSpec.ChaincodeId
cMsg = ci.ChaincodeSpec.Input
}
canName := cccid.GetCanonicalName()
/*canName已经 launch的情况*/
chaincodeSupport.runningChaincodes.Lock()
var chrte *chaincodeRTEnv
var ok bool
var err error
//if its in the map, there must be a connected stream...nothing to do
if chrte, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); ok {
if !chrte.handler.registered {
chaincodeSupport.runningChaincodes.Unlock()
chaincodeLogger.Debugf("premature execution - chaincode (%s) is being launched", canName)
err = fmt.Errorf("premature execution - chaincode (%s) is being launched", canName)
return cID, cMsg, err
}
if chrte.handler.isRunning() {
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("chaincode is running(no need to launch) : %s", canName)
}
chaincodeSupport.runningChaincodes.Unlock()
return cID, cMsg, nil
}
chaincodeLogger.Debugf("Container not in READY state(%s)...send init/ready", chrte.handler.FSM.Current())
}
chaincodeSupport.runningChaincodes.Unlock()
if cds == nil {//非deploy的情况
if cccid.Syscc {
return cID, cMsg, fmt.Errorf("a syscc should be running (it cannot be launched) %s", canName)
}
if chaincodeSupport.userRunsCC {
chaincodeLogger.Error("You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?")
}
var depPayload []byte
//从lscc中得到chaincode data
depPayload, err = GetCDSFromLSCC(context, cccid.TxID, cccid.SignedProposal, cccid.Proposal, cccid.ChainID, cID.Name)
if err != nil {
return cID, cMsg, fmt.Errorf("Could not get deployment transaction from LSCC for %s - %s", canName, err)
}
if depPayload == nil {
return cID, cMsg, fmt.Errorf("failed to get deployment payload %s - %s", canName, err)
}
//从返回中得到ChaincodeDeploymentSpec
cds = &pb.ChaincodeDeploymentSpec{}
//Get lang from original deployment
err = proto.Unmarshal(depPayload, cds)
if err != nil {
return cID, cMsg, fmt.Errorf("failed to unmarshal deployment transactions for %s - %s", canName, err)
}
}
//launch container if it is a System container or not in dev mode
if (!chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) && (chrte == nil || chrte.handler == nil) {
if cds.CodePackage == nil {
//no code bytes for these situations
if !(chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) {
//从文件系统中得到chaincode
ccpack, err := ccprovider.GetChaincodeFromFS(cID.Name, cID.Version)
if err != nil {
return cID, cMsg, err
}
cds = ccpack.GetDepSpec()
chaincodeLogger.Debugf("launchAndWaitForRegister fetched %d bytes from file system", len(cds.CodePackage))
}
}
builder := func() (io.Reader, error) { return platforms.GenerateDockerBuild(cds) }
cLang := cds.ChaincodeSpec.Type
err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, builder)
if err != nil {
chaincodeLogger.Errorf("launchAndWaitForRegister failed %s", err)
return cID, cMsg, err
}
}
if err == nil {
//launch will set the chaincode in Ready state
err = chaincodeSupport.sendReady(context, cccid, chaincodeSupport.ccStartupTimeout)
if err != nil {
chaincodeLogger.Errorf("sending init failed(%s)", err)
err = fmt.Errorf("Failed to init chaincode(%s)", err)
errIgnore := chaincodeSupport.Stop(context, cccid, cds)
if errIgnore != nil {
chaincodeLogger.Errorf("stop failed %s(%s)", errIgnore, err)
}
}
chaincodeLogger.Debug("sending init completed")
}
chaincodeLogger.Debug("LaunchChaincode complete")
return cID, cMsg, err
}
launchAndWaitForRegister流程
- 调用getArgsAndEnv获取参数和环境,构建startImageReq(启动container的请求)。
- 调用VMCProcess来处理startImageReq。这里是执行实际的start container的一些操作。
- 状态机进入了register过程,等待register的完成。
// launchAndWaitForRegister will launch container if not already running. Use the targz to create the image if not found
func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *ccprovider.CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, builder api.BuildSpecFactory) error {
canName := cccid.GetCanonicalName()
if canName == "" {
return fmt.Errorf("chaincode name not set")
}
chaincodeSupport.runningChaincodes.Lock()
//if its in the map, its either up or being launched. Either case break the
//multiple launch by failing
if _, hasBeenLaunched := chaincodeSupport.chaincodeHasBeenLaunched(canName); hasBeenLaunched {
chaincodeSupport.runningChaincodes.Unlock()
return fmt.Errorf("Error chaincode is being launched: %s", canName)
}
chaincodeSupport.runningChaincodes.Unlock()
//launch the chaincode
//获取参数和环境
args, env, err := chaincodeSupport.getArgsAndEnv(cccid, cLang)
if err != nil {
return err
}
chaincodeLogger.Debugf("start container: %s(networkid:%s,peerid:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID)
chaincodeLogger.Debugf("start container with args: %s", strings.Join(args, " "))
chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(env, "\n\t"))
//sysCC是ChaincodeDeploymentSpec_SYSTEM,其它是ChaincodeDeploymentSpec_DOCKER
vmtype, _ := chaincodeSupport.getVMType(cds)
/*set up the shadow handler JIT before container launch to reduce window of when an external chaincode can sneak in and use the launching context and make it its own*/
var notfy chan bool
preLaunchFunc := func() error {
notfy = chaincodeSupport.preLaunchSetup(canName)
return nil
}
//构建startImageReq
sir := container.StartImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, Version: cccid.Version}, Builder: builder, Args: args, Env: env, PrelaunchFunc: preLaunchFunc}
ipcCtxt := context.WithValue(ctxt, ccintf.GetCCHandlerKey(), chaincodeSupport)
resp, err := container.VMCProcess(ipcCtxt, vmtype, sir)
if err != nil || (resp != nil && resp.(container.VMCResp).Err != nil) {
if err == nil {
err = resp.(container.VMCResp).Err
}
err = fmt.Errorf("Error starting container: %s", err)
chaincodeSupport.runningChaincodes.Lock()
delete(chaincodeSupport.runningChaincodes.chaincodeMap, canName)
chaincodeSupport.runningChaincodes.Unlock()
return err
}
//wait for REGISTER state
select {
case ok := <-notfy:
if !ok {
err = fmt.Errorf("registration failed for %s(networkid:%s,peerid:%s,tx:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID, cccid.TxID)
}
case <-time.After(chaincodeSupport.ccStartupTimeout):
err = fmt.Errorf("Timeout expired while starting chaincode %s(networkid:%s,peerid:%s,tx:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID, cccid.TxID)
}
if err != nil {
chaincodeLogger.Debugf("stopping due to error while launching %s", err)
errIgnore := chaincodeSupport.Stop(ctxt, cccid, cds)
if errIgnore != nil {
chaincodeLogger.Debugf("error on stop %s(%s)", errIgnore, err)
}
}
return err
}
- VMCProcess主要过程是开启协程,尝试获得container的锁,执行startImageReq的do函数,然后释放锁.在do中主要是调用了start函数来启动相应的container。对于执行环境为system(即system chaincode)和执行环境为docker(即用户级chaincode),分别对应不同的start函数。 core/container/controller.go
func VMCProcess(ctxt context.Context, vmtype string, req VMCReqIntf) (interface{}, error) {
v := vmcontroller.newVM(vmtype)
if v == nil {
return nil, fmt.Errorf("Unknown VM type %s", vmtype)
}
c := make(chan struct{})
var resp interface{}
go func() {
defer close(c)
id, err := v.GetVMName(req.getCCID())
if err != nil {
resp = VMCResp{Err: err}
return
}
vmcontroller.lockContainer(id)
resp = req.do(ctxt, v)
vmcontroller.unlockContainer(id)
}()
select {
case <-c:
return resp, nil
case <-ctxt.Done():
//TODO cancel req.do ... (needed) ?
<-c
return nil, ctxt.Err()
}
}
func (si StartImageReq) do(ctxt context.Context, v api.VM) VMCResp {
var resp VMCResp
if err := v.Start(ctxt, si.CCID, si.Args, si.Env, si.Builder, si.PrelaunchFunc); err != nil {
resp = VMCResp{Err: err}
} else {
resp = VMCResp{}
}
return resp
}
- 对于用户级chaincode. Start函数中,主要是docker启动的操作。在启动之前检查container是否已经在运行,如果是则通过vm.stopInternal清理、停止、并移除container。然后尝试创建container,对于image不存在的情况在失败后会首先尝试创建image,然后重新创建container.之后对创建好的container加上需要的attach(这里的主要是log的处理),最后启动container. core/container/dockercontroller/dockercontroller.go
func (vm *DockerVM) Start(ctxt context.Context, ccid ccintf.CCID,
args []string, env []string, builder container.BuildSpecFactory, prelaunchFunc container.PrelaunchFunc) error {
imageID, err := vm.GetVMName(ccid)
if err != nil {
return err
}
client, err := vm.getClientFnc()
if err != nil {
dockerLogger.Debugf("start - cannot create client %s", err)
return err
}
containerID := strings.Replace(imageID, ":", "_", -1)
attachStdout := viper.GetBool("vm.docker.attachStdout")
//stop,force remove if necessary
dockerLogger.Debugf("Cleanup container %s", containerID)
//处理container已经存在的情况,将container停止并移除
vm.stopInternal(ctxt, client, containerID, 0, false, false)
dockerLogger.Debugf("Start container %s", containerID)
//创建容器
err = vm.createContainer(ctxt, client, imageID, containerID, args, env, attachStdout)
if err != nil {
//镜像不存在,则创建镜像并重试,其它错误则直接返回错误
if err == docker.ErrNoSuchImage {
if builder != nil {
dockerLogger.Debugf("start-could not find image <%s> (container id <%s>), because of <%s>..."+
"attempt to recreate image", imageID, containerID, err)
reader, err1 := builder()
if err1 != nil {
dockerLogger.Errorf("Error creating image builder for image <%s> (container id <%s>), "+
"because of <%s>", imageID, containerID, err1)
}
//部署镜像,该过程涉及docker镜像创建和部署过程,此处略
if err1 = vm.deployImage(client, ccid, args, env, reader); err1 != nil {
return err1
}
dockerLogger.Debug("start-recreated image successfully")
//创建容器
if err1 = vm.createContainer(ctxt, client, imageID, containerID, args, env, attachStdout); err1 != nil {
dockerLogger.Errorf("start-could not recreate container post recreate image: %s", err1)
return err1
}
} else {
dockerLogger.Errorf("start-could not find image <%s>, because of %s", imageID, err)
return err
}
} else {
dockerLogger.Errorf("start-could not recreate container <%s>, because of %s", containerID, err)
return err
}
}
if attachStdout {
/*该部分主要是关于docker 容器的log的,忽略*/
}
if prelaunchFunc != nil {
if err = prelaunchFunc(); err != nil {
return err
}
}
//Start Container
err = client.StartContainer(containerID, nil)
if err != nil {
dockerLogger.Errorf("start-could not start container: %s", err)
return err
}
dockerLogger.Debugf("Started container %s", containerID)
return nil
}
- 对于system chaincode.会启动协程执行launchInProc完成chaincode的launch工作。 core/container/inproccontroller/inproccontroller.go
//Start starts a previously registered system codechain
func (vm *InprocVM) Start(ctxt context.Context, ccid ccintf.CCID, args []string, env []string, builder container.BuildSpecFactory, prelaunchFunc container.PrelaunchFunc) error {
path := ccid.ChaincodeSpec.ChaincodeId.Path
ipctemplate := typeRegistry[path]
if ipctemplate == nil {
return fmt.Errorf(fmt.Sprintf("%s not registered", path))
}
instName, _ := vm.GetVMName(ccid)
ipc, err := vm.getInstance(ctxt, ipctemplate, instName, args, env)
if err != nil {
return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName))
}
if ipc.running {
return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
}
ccSupport, ok := ctxt.Value(ccintf.GetCCHandlerKey()).(ccintf.CCSupport)
if !ok || ccSupport == nil {
return fmt.Errorf("in-process communication generator not supplied")
}
if prelaunchFunc != nil {
if err = prelaunchFunc(); err != nil {
return err
}
}
ipc.running = true
go func() {
defer func() {
if r := recover(); r != nil {
inprocLogger.Criticalf("caught panic from chaincode %s", instName)
}
}()
ipc.launchInProc(ctxt, instName, args, env, ccSupport)
}()
return nil
}
Hyperledger fabric 1.0 代码解析 之 chaincode install
Hyperledger fabric 1.0 代码解析 之 chaincode instantiate or upgrade