Python 实现区块链

环境

python3(本次用的3.8)、postman、requests、Flask,pip,pipenv等工具

环境步骤

  1. 先安装一个环境
    pip install pipenv
    pipenv使用

  2. 创建环境
    pipenv install 会生成一个pipfile文件,用于管理库的依赖

  3. 在虚拟环境中安装依赖
    pipenv install flask==2.0.2
    pipenv install requests==2.18.4
    安装成功后可看到pipfile中看到

  4. 启动虚拟环境
    pipenv shell

  5. 新建一个blockchain.py 开始撸代码

代码思路

确定区块结构

{    "index":0, // 块序号    "timestamp":"",// 时间戳    "transactions":""[ // 交易信息        {            "sender":"",            "recipient":"",            "amount":5,        }    ],    "proof":"", // 工作量证明    "previous_hash":"",// 前区块的哈希值}

确定一下区块类功能

class Blockchain:    def __init__(self):        self.chain = [] # 存块        self.current_transactions = [] # 交易实体    def new_block(self): # 新建区块        pass    def new_transaction(self):# 新建交易        pass    @staticmethod    def hash(block):# 计算哈希值        pass    @property    def last_block(self):# 获取当前链中最后一个区块        pass        def proof_of_work(self):# 工作量证明计算    pass   def vaild_proof(self):# 验证计算值是否符合要求passdef vaild_chain(self):# 验证链是否符合要求passdef register_node(self):# 节点注册pass def resolve_conflicts:# 共识算法,解决冲突pass

添加交易

def new_transaction(self, sender: str, recipient: str, amount: int) -> int:        """添加新的交易        Args:            sender (str): 发送方            recipient (str): 接收方            amount (int): 金额        Returns:            int: 返回一个包含此交易的区块序号        """        self.current_transactions.append({            'sender': sender,            'recipient': recipient,            'amount': amount        })        return self.last_block['index'] + 1

添加新块

 def new_block(self, proof: int, previous_hash=None):  # 新建区块        block = {            'index': len(self.chain) + 1,            'timestamp': time(),            'transactions': self.current_transactions,            'proof': proof,            'previous_hash': previous_hash or self.hash(self.last_block),        }        self.current_transactions = []  # 新建区块打包后重置当前交易信息        self.chain.append(block)  # 把新建的区块加入链        return block

计算哈希值

@staticmethod    def hash(block: Dict[str, Any]) -> str:        """计算哈希值,返回哈希后的摘要信息        Args:            block (Dict[str, Any]): 传入一个块        Returns:            str: 摘要信息        """        block_string = json.dumps(block, sort_keys=True).encode()        return hashlib.sha256(block_string).hexdigest()

工作量证明与验证

def proof_of_work(self, last_proof: int) -> int:        """工作量计算,计算一个符合要求的哈希值        Args:            last_proof (int): 上一个块的工作量随机数        Returns:            int: 返回符合要求的工作量随机数        """        proof = 0        while self.valid_proof(last_proof, proof) is False:            proof += 1        # print(proof) 输出计算结果        return proofdef valid_proof(self, last_proof: int, proof: int) -> bool:        """工作量证明验证,验证计算结果是否以2个0开头        Args:            last_proof (int): 前工作证明            proof (int): 当前工作证明        Returns:            bool: 返回验证是否有效        """        guess = f'{last_proof}{proof}'.encode()        guess_hash = hashlib.sha256(guess).hexdigest()        # print(guess_hash) 输出计算过程        if guess_hash[0:2] == "00":            return True        else:            return False

验证一下,创建一个类,设定前一个工作量证明为100

尝试运行代码

可以看到这里算出前两位为0的就停止,结果为226

节点注册

def register_node(self, address: str) -> None:        """添加一个新节点到节点集中        Args:            address (str): 节点的地址。Eg:"http://127.0.0.1:5002"        """        parsed_url = urlparse(address)  # 解析url参数        self.nodes.add(parsed_url.netloc)  # 获取域名服务器

共识算法

def resolve_conflicts(self) -> bool:        """共识算法,解决冲突,以最长且有效的链为主        Returns:            bool: 冲突是否解决成功        """        neighbours = self.nodes  # 获取节点信息        new_chain = None  # 定义可能的新链        max_length = len(self.chain)  # 获取当前链长度        for node in neighbours:  # 获取节点的链条信息,如果更长且有效则直接替换            response = requests.get(f'http://{node}/chain')            if response.status_code == 200:                length = response.json()['length']                chain = response.json()['chain']                if length > max_length and self.vaild_chain(chain):                    max_length = length                    new_chain = chain        if new_chain:            self.chain = new_chain            return True        return False

Web功能

使用flask 部署服务器

app = Flask(__name__)if __name__ == "__main__":    app.run(host='0.0.0.0', port=5000)

尝试运行代码

从postman 中可看到一些信息,目前没有定义接口,自然是404

确定一下接口

@app.route('/transactions/new', methods=['POST'])def new_transaction():    pass@app.route('/mine', methods=['GET'])def mine():    pass@app.route('/chain', methods=['GET'])def full_chain():    pass@app.route('/nodes/register', methods=['POST'])def register_nodes():pass@app.route('/nodes/resolve', methods=['GET'])def consensus():pass

交易接口

# 添加新建交易接口@app.route('/transactions/new', methods=['POST'])def new_transaction():    values = request.get_json()    required = ['sender', 'recipient', 'amount']    if not all(k in values for k in required):        return '确少参数', 400    index = blockchain.new_transaction(values['sender'], values['recipient'],                                       values['amount'])    response = {'message': f'交易将会被添加到块 {index}'}    return jsonify(response), 201

查看链接口

# 查看链接口@app.route('/chain', methods=['GET'])def full_chain():    response = {        'chain': blockchain.chain,        'length': len(blockchain.chain),    }    return jsonify(response), 200

打包区块接口

@app.route('/mine', methods=['GET'])def mine():    last_block = blockchain.last_block  # 获取链上最后一个区块的信息    last_proof = last_block['proof']    proof = blockchain.proof_of_work(last_proof)    # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励    blockchain.new_transaction(        sender="0",        recipient=node_identifier,        amount=1,    )    block = blockchain.new_block(proof, None)  # 生成一个新块    response = {        'message': "打包成功,新区块已生成!",        'index': block['index'],        'transactions': block['transactions'],        'proof': block['proof'],        'previous_hash': block['previous_hash'],    }    return jsonify(response), 200

去postman 依次请求chain(查看当前链)、transactions/new(新建交易,注意是post方法提交的json数据)、mine(打包区块)、chain


这时,一个单节点的区块链流程基本实现。

节点注册接口

# 节点注册接口@app.route('/nodes/register', methods=['POST'])def register_nodes():    values = request.get_json()    nodes = values.get('nodes')    if nodes is None:        return "Error: 请提供一个符合规则的节点", 400    for node in nodes:        blockchain.register_node(node)    response = {        'message': '新节点已经被添加!',        'total_nodes': list(blockchain.nodes),    }    return jsonify(response), 201

测试一下

共识接口

#  共识接口@app.route('/nodes/resolve', methods=['GET'])def consensus():    replaced = blockchain.resolve_conflicts()    if replaced:        response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain}    else:        response = {'message': '当前链符合要求', 'chain': blockchain.chain}    return jsonify(response), 200

完整运行

  • 打开两个终端构造两个节点5000 和 5001
  • 分别查看5000节点 和 5001节点 的当前链
    5000节点

    5001节点

    均只有一个创世区块
  • 接着对5000节点,建立新的交易,并封装区块,再次查看链,此时应该有链上应该有两个区块,而5001节点没有操作,所以链中只有一个块
  • 此时将5001节点注册到5000节点中,5000节点注册到5001节点中,由于共识机制,选取最长的链,所以此时查看5001节点的链时,会出现两个节点而不是一个。

    至此,简易区块链实现结束。

完整代码

import hashlibimport jsonfrom time import timefrom urllib.parse import urlparse  # url解析from uuid import uuid4  # 生成唯一idfrom flask import Flask, jsonify, requestfrom typing import Any, Dict, Listimport requestsfrom argparse import ArgumentParser  # 命令行参数解析class Blockchain:    def __init__(self):        self.chain = []  # 存块        self.current_transactions = []  # 交易实体        self.nodes = set()  # 无重复的节点集合        # 创建创世区块        self.new_block(previous_hash='1', proof=100)    def register_node(self, address: str) -> None:        """添加一个新节点到节点集中        Args:            address (str): 节点的地址。Eg:"http://127.0.0.1:5002"        """        parsed_url = urlparse(address)  # 解析url参数        self.nodes.add(parsed_url.netloc)  # 获取域名服务器    def new_block(self, proof: int, previous_hash=None):  # 新建区块        block = {            'index': len(self.chain) + 1,            'timestamp': time(),            'transactions': self.current_transactions,            'proof': proof,            'previous_hash': previous_hash or self.hash(self.last_block),        }        self.current_transactions = []  # 新建区块打包后重置当前交易信息        self.chain.append(block)  # 把新建的区块加入链        return block    def new_transaction(self, sender: str, recipient: str, amount: int) -> int:        """添加新的交易        Args:            sender (str): 发送方            recipient (str): 接收方            amount (int): 金额        Returns:            int: 返回一个包含此交易的区块序号        """        self.current_transactions.append({            'sender': sender,            'recipient': recipient,            'amount': amount        })        return self.last_block['index'] + 1    @staticmethod    def hash(block: Dict[str, Any]) -> str:        """计算哈希值,返回哈希后的摘要信息        Args:            block (Dict[str, Any]): 传入一个块        Returns:            str: 摘要信息        """        block_string = json.dumps(block, sort_keys=True).encode()        return hashlib.sha256(block_string).hexdigest()    @property    def last_block(self) -> Dict[str, Any]:  # 获取当前链中最后一个区块        return self.chain[-1]    def proof_of_work(self, last_proof: int) -> int:        """工作量计算,计算一个符合要求的哈希值        Args:            last_proof (int): 上一个块的工作量随机数        Returns:            int: 返回符合要求的工作量随机数        """        proof = 0        while self.valid_proof(last_proof, proof) is False:            proof += 1        # print(proof) 输出计算结果        return proof    def vaild_chain(self, chain: List[Dict[str, Any]]) -> bool:        """验证链是否合理:最长且有效        Args:            chain (List[Dict[str, Any]]): 传入链        Returns:            bool: 返回是否有效        """        last_block = chain[0]  # 从第一个创世区块开始遍历验证        current_index = 1        while current_index < len(chain):            block = chain[current_index]            print(f'{last_block}')            print(f'{block}')            print("\n-----------\n")            # 如果当前区块的前哈希和前一个计算出来的哈希值不同则是无效链            if block['previous_hash'] != self.hash(last_block):                return False            # 检验工作量证明是否符合要求            if not self.valid_proof(last_block['proof'], block['proof']):                return False            last_block = block            current_index += 1        return True    def valid_proof(self, last_proof: int, proof: int) -> bool:        """工作量证明验证,验证计算结果是否以2个0开头        Args:            last_proof (int): 前工作证明            proof (int): 当前工作证明        Returns:            bool: 返回验证是否有效        """        guess = f'{last_proof}{proof}'.encode()        guess_hash = hashlib.sha256(guess).hexdigest()        # print(guess_hash) 输出计算过程        if guess_hash[0:2] == "00":            return True        else:            return False    def resolve_conflicts(self) -> bool:        """共识算法,解决冲突,以最长且有效的链为主        Returns:            bool: 冲突是否解决成功        """        neighbours = self.nodes  # 获取节点信息        new_chain = None  # 定义可能的新链        max_length = len(self.chain)  # 获取当前链长度        for node in neighbours:  # 获取节点的链条信息,如果更长且有效则直接替换            response = requests.get(f'http://{node}/chain')            if response.status_code == 200:                length = response.json()['length']                chain = response.json()['chain']                if length > max_length and self.vaild_chain(chain):                    max_length = length                    new_chain = chain        if new_chain:            self.chain = new_chain            return True        return Falseapp = Flask(__name__)  # flask框架node_identifier = str(uuid4()).replace('-', '')  # 使者获取一个唯一的uidblockchain = Blockchain()# 添加新建交易接口@app.route('/transactions/new', methods=['POST'])def new_transaction():    values = request.get_json()    required = ['sender', 'recipient', 'amount']    if not all(k in values for k in required):        return '确少参数', 400    index = blockchain.new_transaction(values['sender'], values['recipient'],                                       values['amount'])    response = {'message': f'交易将会被添加到块 {index}'}    return jsonify(response), 201# 添加新建打包区块接口@app.route('/mine', methods=['GET'])def mine():    last_block = blockchain.last_block  # 获取链上最后一个区块的信息    last_proof = last_block['proof']    proof = blockchain.proof_of_work(last_proof)    # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励    blockchain.new_transaction(        sender="0",        recipient=node_identifier,        amount=1,    )    block = blockchain.new_block(proof, None)  # 生成一个新块    response = {        'message': "打包成功,新区块已生成!",        'index': block['index'],        'transactions': block['transactions'],        'proof': block['proof'],        'previous_hash': block['previous_hash'],    }    return jsonify(response), 200# 查看链接口@app.route('/chain', methods=['GET'])def full_chain():    response = {        'chain': blockchain.chain,        'length': len(blockchain.chain),    }    return jsonify(response), 200# 节点注册接口@app.route('/nodes/register', methods=['POST'])def register_nodes():    values = request.get_json()    nodes = values.get('nodes')    if nodes is None:        return "Error: 请提供一个符合规则的节点", 400    for node in nodes:        blockchain.register_node(node)    response = {        'message': '新节点已经被添加!',        'total_nodes': list(blockchain.nodes),    }    return jsonify(response), 201#  共识接口@app.route('/nodes/resolve', methods=['GET'])def consensus():    replaced = blockchain.resolve_conflicts()    if replaced:        response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain}    else:        response = {'message': '当前链符合要求', 'chain': blockchain.chain}    return jsonify(response), 200if __name__ == "__main__":    parser = ArgumentParser()  # 命令行参数解析,端口默认5000    parser.add_argument('-p',                        '--port',                        default=5000,                        type=int,                        help='port to listen on')    args = parser.parse_args()    port = args.port    app.run(host='127.0.0.1', port=port)  # 启动web服务,默认本机