家蛙树

Hyperledger fabric链码数据模型的探讨(1)之转账和交易

Zealot
区块链
2019-01-06

编写过一些链码的人可能会觉得是在操作一个简单的key-value数据库, 就是GetState和PutState去操作键值对,而对复杂些的一对多,多对多等实体关系和数据模型不知怎么设计。我们先从官方的例子入手一起探讨下。

1.简单转账例子
/fabric-samples/chaincode/chaincode_example02/go/chaincode_example02.go

假设链码调用peer chaincode invoke … -c ‘{“Args”:[“invoke”,”a”,”b”,”10”]}’
转账逻辑简单, 就是把用户a上的余额加10, b账户上钱减10, 最后重新putState保存两个用户状态即可。

而现实当中, 我们关系户头上剩下多少钱, 也关心消费和收入的每笔流水账。所有流水账的总和应该等于账户余额,后面我们考虑如何记账本上记录流水账。

// Transaction makes payment of X units from A to B
func (t *SimpleChaincode) invoke(stub shim.ChaincodeStubInterface, args []string) pb.Response {
        var A, B string    // Entities
        var Aval, Bval int // Asset holdings
        var X int          // Transaction value
        var err error

        if len(args) != 3 {
                return shim.Error("Incorrect number of arguments. Expecting 3")
        }

        A = args[0]
        B = args[1]

        // Get the state from the ledger
        // TODO: will be nice to have a GetAllState call to ledger
        Avalbytes, err := stub.GetState(A)
        if err != nil {
                return shim.Error("Failed to get state")
        }
        if Avalbytes == nil {
                return shim.Error("Entity not found")
        }
        Aval, _ = strconv.Atoi(string(Avalbytes))

        Bvalbytes, err := stub.GetState(B)
        if err != nil {
                return shim.Error("Failed to get state")
        }
        if Bvalbytes == nil {
                return shim.Error("Entity not found")
        }
        Bval, _ = strconv.Atoi(string(Bvalbytes))

        // Perform the execution
        X, err = strconv.Atoi(args[2])
        if err != nil {
                return shim.Error("Invalid transaction amount, expecting a integer value")
        }
        Aval = Aval - X
        Bval = Bval + X
        fmt.Printf("Aval = %d, Bval = %d\n", Aval, Bval)

        // Write the state back to the ledger
        err = stub.PutState(A, []byte(strconv.Itoa(Aval)))
        if err != nil {
                return shim.Error(err.Error())
        }

        err = stub.PutState(B, []byte(strconv.Itoa(Bval)))
        if err != nil {
                return shim.Error(err.Error())
        }

        return shim.Success(nil)
}

2.Marbles简单资产转移
Marbles是个弹珠游戏, 模拟多个用户可创建和转移弹珠, 具体可以参考说明
https://github.com/IBM-Blockchain/marbles/blob/master/README-cn.md
链码位置fabric-samples/chaincode/marbles02/go/marbles_chaincode.go

弹珠数据结构, name作为key值,拥有颜色,大小和拥有者等属性。

type marble struct {
        ObjectType string `json:"docType"` //docType is used to distinguish the various types of objects in state database
        Name       string `json:"name"`    //the fieldtags are needed to keep case from bouncing around
        Color      string `json:"color"`
        Size       int    `json:"size"`
        Owner      string `json:"owner"`
}

初始化新增三个弹珠

// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["initMarble","marble1","blue","35","tom"]}'
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["initMarble","marble2","red","50","tom"]}'
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["initMarble","marble3","blue","70","tom"]}'
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["transferMarble","marble2","jerry"]}'

把第二颗弹珠marble2的拥有者从tom修改为jerry,即tom把第二颗珠子给了jerry.
代码较为简单, 用marble2名字找到珠子后修改拥有人属性即可。

// ===========================================================
// transfer a marble by setting a new owner name on the marble
// ===========================================================
func (t *SimpleChaincode) transferMarble(stub shim.ChaincodeStubInterface, args []string) pb.Response {

        //   0       1
        // "name", "bob"
// ===========================================================
func (t *SimpleChaincode) transferMarble(stub shim.ChaincodeStubInterface, args []string) pb.Response {

        //   0       1
        // "name", "bob"
        if len(args) < 2 {
                return shim.Error("Incorrect number of arguments. Expecting 2")
        }

        marbleName := args[0]
        newOwner := strings.ToLower(args[1])
        fmt.Println("- start transferMarble ", marbleName, newOwner)

        marbleAsBytes, err := stub.GetState(marbleName)
        if err != nil {
                return shim.Error("Failed to get marble:" + err.Error())
        } else if marbleAsBytes == nil {
                return shim.Error("Marble does not exist")
        }

        marbleToTransfer := marble{}
        err = json.Unmarshal(marbleAsBytes, &marbleToTransfer) //unmarshal it aka JSON.parse()
        if err != nil {
                return shim.Error(err.Error())
        }
        marbleToTransfer.Owner = newOwner //change the owner

        marbleJSONasBytes, _ := json.Marshal(marbleToTransfer)
        err = stub.PutState(marbleName, marbleJSONasBytes) //rewrite the marble
        if err != nil {
                return shim.Error(err.Error())
        }

        fmt.Println("- end transferMarble (success)")
        return shim.Success(nil)
}

相比第一个例子转账加强了些些, Marble是资产, 对应有拥有人。如果人还拥有其它资产例如汽车, 就有点一对多的味道了吧。我们继续看例子。

3.High-through高吞吐交易例子
参考fabric-samples/high-throughput/chaincode/high-throughput.go

(1)update使用一个组合key聚合, 名字~操作符+或-~数值~交易ID, value为0, 即主要的信息都是保存在组合键中, 组合键也方便用于类似模糊部分查询, 该例是使用name作为部分查询条件。 这里保存的实际每条交易就是上面我们说没记录的流水了。

/**
 * Updates the ledger to include a new delta for a particular variable. If this is the first time
 * this variable is being added to the ledger, then its initial value is assumed to be 0. The arguments
 * to give in the args array are as follows:
 *      - args[0] -> name of the variable
 *      - args[1] -> new delta (float)
 *      - args[2] -> operation (currently supported are addition "+" and subtraction "-")
 *
 * @param APIstub The chaincode shim
 * @param args The arguments array for the update invocation
 *
 * @return A response structure indicating success or failure with a message
 */
func (s *SmartContract) update(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
        // Check we have a valid number of args
        if len(args) != 3 {
                return shim.Error("Incorrect number of arguments, expecting 3")
        }

        // Extract the args
        name := args[0]
        op := args[2]
        _, err := strconv.ParseFloat(args[1], 64)
        if err != nil {
                return shim.Error("Provided value was not a number")
        }

        // Make sure a valid operator is provided
        if op != "+" && op != "-" {
                return shim.Error(fmt.Sprintf("Operator %s is unrecognized", op))
        }

        // Retrieve info needed for the update procedure
        txid := APIstub.GetTxID()
        compositeIndexName := "varName~op~value~txID"

        // Create the composite key that will allow us to query for all deltas on a particular variable
        compositeKey, compositeErr := APIstub.CreateCompositeKey(compositeIndexName, []string{name, op, args[1], txid})
        if compositeErr != nil {
                return shim.Error(fmt.Sprintf("Could not create a composite key for %s: %s", name, compositeErr.Error()))
        }

        // Save the composite key index
        compositePutErr := APIstub.PutState(compositeKey, []byte{0x00})
        if compositePutErr != nil {
                return shim.Error(fmt.Sprintf("Could not put operation for %s in the ledger: %s", name, compositePutErr.Error()))
        }

        return shim.Success([]byte(fmt.Sprintf("Successfully added %s%s to %s", op, args[1], name)))
}

(2)get则是按照name作为部分查询条件, 查询到对应name归属的所有交易流水了, 最后可以计算出交易流水的总和, 就是账户的余额。

/**
 * Retrieves the aggregate value of a variable in the ledger. Gets all delta rows for the variable
 * and computes the final value from all deltas. The args array for the invocation must contain the
 * following argument:
 *      - args[0] -> The name of the variable to get the value of
 *
 * @param APIstub The chaincode shim
 * @param args The arguments array for the get invocation
 *
 * @return A response structure indicating success or failure with a message
 */
func (s *SmartContract) get(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
        // Check we have a valid number of args
        if len(args) != 1 {
                return shim.Error("Incorrect number of arguments, expecting 1")
        }

        name := args[0]
        // Get all deltas for the variable
        deltaResultsIterator, deltaErr := APIstub.GetStateByPartialCompositeKey("varName~op~value~txID", []string{name})
        if deltaErr != nil {
                return shim.Error(fmt.Sprintf("Could not retrieve value for %s: %s", name, deltaErr.Error()))
        }
        defer deltaResultsIterator.Close()

        // Check the variable existed
        if !deltaResultsIterator.HasNext() {
                return shim.Error(fmt.Sprintf("No variable by the name %s exists", name))
        }

        // Iterate through result set and compute final value
        var finalVal float64
        var i int
        for i = 0; deltaResultsIterator.HasNext(); i++ {
                // Get the next row
                responseRange, nextErr := deltaResultsIterator.Next()
                if nextErr != nil {
                        return shim.Error(nextErr.Error())
                }

                // Split the composite key into its component parts
                _, keyParts, splitKeyErr := APIstub.SplitCompositeKey(responseRange.Key)
                if splitKeyErr != nil {
                        return shim.Error(splitKeyErr.Error())
                }

                // Retrieve the delta value and operation
                operation := keyParts[1]
                valueStr := keyParts[2]

                // Convert the value string and perform the operation
                value, convErr := strconv.ParseFloat(valueStr, 64)
                if convErr != nil {
                        return shim.Error(convErr.Error())
                }

                switch operation {
                case "+":
                        finalVal += value
                case "-":
                        finalVal -= value
                default:
                        return shim.Error(fmt.Sprintf("Unrecognized operation %s", operation))
                }
        }

        return shim.Success([]byte(strconv.FormatFloat(finalVal, 'f', -1, 64)))
}

(3)prueFast也是使用name作为查询条件查出某个用户所有的交易流水, 但累加流水记录的时候计算一条删除一条, 最后计算出余额后, 作为新的一条总流水调用update添加到该账户上, 注意这个操作是不安全的, 这个迭代过程因为删除流水如果中途出错会导致流水丢失。

/**
 * Prunes a variable by deleting all of its delta rows while computing the final value. Once all rows
 * have been processed and deleted, a single new row is added which defines a delta containing the final
 * computed value of the variable. This function is NOT safe as any failures or errors during pruning
 * will result in an undefined final value for the variable and loss of data. Use pruneSafe if data
 * integrity is important. The args array contains the following argument:
 *      - args[0] -> The name of the variable to prune
 *
 * @param APIstub The chaincode shim
 * @param args The args array for the pruneFast invocation
 *
 * @return A response structure indicating success or failure with a message
 */
func (s *SmartContract) pruneFast(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
        // Check we have a valid number of ars
        if len(args) != 1 {
                return shim.Error("Incorrect number of arguments, expecting 1")
        }

        // Retrieve the name of the variable to prune
        name := args[0]

        // Get all delta rows for the variable
        deltaResultsIterator, deltaErr := APIstub.GetStateByPartialCompositeKey("varName~op~value~txID", []string{name})
        if deltaErr != nil {
                return shim.Error(fmt.Sprintf("Could not retrieve value for %s: %s", name, deltaErr.Error()))
        }
        defer deltaResultsIterator.Close()

        // Check the variable existed
        if !deltaResultsIterator.HasNext() {
                return shim.Error(fmt.Sprintf("No variable by the name %s exists", name))
        }

        // Iterate through result set computing final value while iterating and deleting each key
        var finalVal float64
        var i int
        for i = 0; deltaResultsIterator.HasNext(); i++ {
                // Get the next row
                responseRange, nextErr := deltaResultsIterator.Next()
                if nextErr != nil {
                        return shim.Error(nextErr.Error())
                }

                // Split the key into its composite parts
                _, keyParts, splitKeyErr := APIstub.SplitCompositeKey(responseRange.Key)
                if splitKeyErr != nil {
                        return shim.Error(splitKeyErr.Error())
                }

                // Retrieve the operation and value
                operation := keyParts[1]
                valueStr := keyParts[2]

                // Convert the value to a float
                value, convErr := strconv.ParseFloat(valueStr, 64)
                if convErr != nil {
                        return shim.Error(convErr.Error())
                }

                // Delete the row from the ledger
                deltaRowDelErr := APIstub.DelState(responseRange.Key)
                if deltaRowDelErr != nil {
                        return shim.Error(fmt.Sprintf("Could not delete delta row: %s", deltaRowDelErr.Error()))
                }

                // Add the value of the deleted row to the final aggregate
                switch operation {
                case "+":
                        finalVal += value
                case "-":
                        finalVal -= value
                default:
                        return shim.Error(fmt.Sprintf("Unrecognized operation %s", operation))
                }
        }

        // Update the ledger with the final value and return
        updateResp := s.update(APIstub, []string{name, strconv.FormatFloat(finalVal, 'f', -1, 64), "+"})
        if updateResp.Status == OK {
                return shim.Success([]byte(fmt.Sprintf("Successfully pruned variable %s, final value is %f, %d rows pruned", args[0], finalVal, i)))
        }

        return shim.Error(fmt.Sprintf("Failed to prune variable: all rows deleted but could not update value to %f, variable no longer exists in ledger", finalVal))
}

(4)prueSafe则是安全版本, 首先会调用get方法获取对用name的总账, 先保存在key值为name_PRUE_BACKUP的值中, 接着删除所有的流水记录, 之后把总账作为新的流水update插入, 最后删除备份的总账。

/**
 * This function performs the same function as pruneFast except it provides data backups in case the
 * prune fails. The final aggregate value is computed before any deletion occurs and is backed up
 * to a new row. This back-up row is deleted only after the new aggregate delta has been successfully
 * written to the ledger. The args array contains the following argument:
 *      args[0] -> The name of the variable to prune
 *
 * @param APIstub The chaincode shim
 * @param args The arguments array for the pruneSafe invocation
 *
 * @result A response structure indicating success or failure with a message
 */
func (s *SmartContract) pruneSafe(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
        // Verify there are a correct number of arguments
        if len(args) != 1 {
                return shim.Error("Incorrect number of arguments, expecting 1 (the name of the variable to prune)")
        }

        // Get the var name
        name := args[0]

        // Get the var's value and process it
        getResp := s.get(APIstub, args)
        if getResp.Status == ERROR {
                return shim.Error(fmt.Sprintf("Could not retrieve the value of %s before pruning, pruning aborted: %s", name, getResp.Message))
        }

        valueStr := string(getResp.Payload)
        val, convErr := strconv.ParseFloat(valueStr, 64)
        if convErr != nil {
                return shim.Error(fmt.Sprintf("Could not convert the value of %s to a number before pruning, pruning aborted: %s", name, convErr.Error()))
        }

        // Store the var's value temporarily
        backupPutErr := APIstub.PutState(fmt.Sprintf("%s_PRUNE_BACKUP", name), []byte(valueStr))
        if backupPutErr != nil {
                return shim.Error(fmt.Sprintf("Could not backup the value of %s before pruning, pruning aborted: %s", name, backupPutErr.Error()))
        }

        // Get all deltas for the variable
        deltaResultsIterator, deltaErr := APIstub.GetStateByPartialCompositeKey("varName~op~value~txID", []string{name})
        if deltaErr != nil {
                return shim.Error(fmt.Sprintf("Could not retrieve value for %s: %s", name, deltaErr.Error()))
        }
        defer deltaResultsIterator.Close()

        // Delete each row
        var i int
        for i = 0; deltaResultsIterator.HasNext(); i++ {
                responseRange, nextErr := deltaResultsIterator.Next()
                if nextErr != nil {
                        return shim.Error(fmt.Sprintf("Could not retrieve next row for pruning: %s", nextErr.Error()))
                }

                deltaRowDelErr := APIstub.DelState(responseRange.Key)
                if deltaRowDelErr != nil {
                        return shim.Error(fmt.Sprintf("Could not delete delta row: %s", deltaRowDelErr.Error()))
                }
        }

        // Insert new row for the final value
        updateResp := s.update(APIstub, []string{name, valueStr, "+"})
        if updateResp.Status == ERROR {
                return shim.Error(fmt.Sprintf("Could not insert the final value of the variable after pruning, variable backup is stored in %s_PRUNE_BACKUP: %s", name, updateResp.Message))
        }

        // Delete the backup value
        delErr := APIstub.DelState(fmt.Sprintf("%s_PRUNE_BACKUP", name))
        if delErr != nil {
                return shim.Error(fmt.Sprintf("Could not delete backup value %s_PRUNE_BACKUP, this does not affect the ledger but should be removed manually", name))
        }

        return shim.Success([]byte(fmt.Sprintf("Successfully pruned variable %s, final value is %f, %d rows pruned", name, val, i)))
}

这个操作可以认为的类似人工保证数据库事务一样的操作, 是有点绕, 不过当然不是严格的ACID, 只是如果迭代删除流水的时候出错了, 有个备份总账, 后面还能恢复,保证总余额没丢。

可能大家也有不少问题, 为什么用组合键, 用couchdb的index似乎也可以记录流水, key就用交易ID, 能不能像传统数据库那样设计, 一个账户表, 保存名字和余额, 另外一个账户交易流水表, 保存用户的每个交易流水。

带着这些疑问, 我们下次再扯,good night bro.

t_b1057db7ee264b96905f35aad9bf0eaa.png

点赞 0
0条评论
其他心得
1. 问题场景 Fabric peer节点使用文件保存区块, 使用level db或couchdb数据库保存状态, 数据很多state db会膨胀, 我们探讨下一些解决方案。 2. couchdb集群 couchdb2.x支持集群, 分片, 应该能把数据分散到集群的其它节点。先简单过一下如何安装。 2.1 couchdb集群搭建 Fabric用到的couchdb镜像是自己打包的, 1.4对应的是hyperledger/fabric-couchdb:0.4.14, 不过很悲催, 笔者
Zealot · 21天前 
1.简介 Fabric 1.4引入operation service即运维服务接口, orderer,peer节点可提供http服务, 方便外部获取节点的运行指标,管理日志级别,健康检查。 2.如何使用运维服务 以fabirc-sample/first-network为例, ./byfn.sh up 2.1 Orderer节点运维服务 启动后连接到orderer容器 docker exec -it -e LINES=$(tput lines) -e COLUMNS=$(tput co
Zealot · 30天前 
1.使用场景 Fabric区块链网络一个channel即一个记账本, 在很多业务场景,一个记账本的数据自身组织可以读写,也可以提供给其它组织只读,部分读或部分写。数据隔离使用channel是粗粒度的,private data私有数据是fabric 1.2引入, 是为了在更细的粒度上控制数据访问。 2.如何使用私有数据? 以fabric-sample/chaincode/marble02_private弹珠游戏为例. (1)文件collections_config.json
Zealot · 31天前 
1.简介 Fabric CA基于开源项目CFSSL开发, 主要为fabric网络提供PKI证书服务,是MSP生成的基础。可能有人会问, 官方不是有cryptogen工具批量生成MSP吗? cryptogen实际是辅助测试工具,默认不同orderer,org都有不同的CA, 如果一个org要追加个peer或user, cryptogen就不管用了。生产环境我们建议使用fabric ca全面管理证书, 如果想简单来而区块链组织,节点和用户基本不会变, cryptogen也没问题。 2.
1.Kafka排序服务原理 官方文档在google doc上, 参考翻译 https://www.jianshu.com/p/db006359133d 2. kafka 排序服务安装 所有的代码已分享在https://github.com/zealzeng/kafka-orderer-demo 2.1 安装环境 官方文档有一些简单的描述 https://hyperledger-fabric.readthedocs.io/en/release-1.4/kafka.h