Python 实现区块链
环境
python3(本次用的3.8)、postman、requests、Flask,pip,pipenv等工具
环境步骤
先安装一个环境
pip install pipenv
pipenv使用创建环境
pipenv install
会生成一个pipfile文件,用于管理库的依赖在虚拟环境中安装依赖
pipenv install flask==2.0.2
pipenv install requests==2.18.4
安装成功后可看到pipfile中看到
启动虚拟环境
pipenv shell
新建一个blockchain.py 开始撸代码
代码思路
确定区块结构
{ "index":0, // 块序号 "timestamp":"",// 时间戳 "transactions":""[ // 交易信息 { "sender":"", "recipient":"", "amount":5, } ], "proof":"", // 工作量证明 "previous_hash":"",// 前区块的哈希值}
确定一下区块类功能
class Blockchain: def __init__(self): self.chain = [] # 存块 self.current_transactions = [] # 交易实体 def new_block(self): # 新建区块 pass def new_transaction(self):# 新建交易 pass @staticmethod def hash(block):# 计算哈希值 pass @property def last_block(self):# 获取当前链中最后一个区块 pass def proof_of_work(self):# 工作量证明计算 pass def vaild_proof(self):# 验证计算值是否符合要求passdef vaild_chain(self):# 验证链是否符合要求passdef register_node(self):# 节点注册pass def resolve_conflicts:# 共识算法,解决冲突pass
添加交易
def new_transaction(self, sender: str, recipient: str, amount: int) -> int: """添加新的交易 Args: sender (str): 发送方 recipient (str): 接收方 amount (int): 金额 Returns: int: 返回一个包含此交易的区块序号 """ self.current_transactions.append({ 'sender': sender, 'recipient': recipient, 'amount': amount }) return self.last_block['index'] + 1
添加新块
def new_block(self, proof: int, previous_hash=None): # 新建区块 block = { 'index': len(self.chain) + 1, 'timestamp': time(), 'transactions': self.current_transactions, 'proof': proof, 'previous_hash': previous_hash or self.hash(self.last_block), } self.current_transactions = [] # 新建区块打包后重置当前交易信息 self.chain.append(block) # 把新建的区块加入链 return block
计算哈希值
@staticmethod def hash(block: Dict[str, Any]) -> str: """计算哈希值,返回哈希后的摘要信息 Args: block (Dict[str, Any]): 传入一个块 Returns: str: 摘要信息 """ block_string = json.dumps(block, sort_keys=True).encode() return hashlib.sha256(block_string).hexdigest()
工作量证明与验证
def proof_of_work(self, last_proof: int) -> int: """工作量计算,计算一个符合要求的哈希值 Args: last_proof (int): 上一个块的工作量随机数 Returns: int: 返回符合要求的工作量随机数 """ proof = 0 while self.valid_proof(last_proof, proof) is False: proof += 1 # print(proof) 输出计算结果 return proofdef valid_proof(self, last_proof: int, proof: int) -> bool: """工作量证明验证,验证计算结果是否以2个0开头 Args: last_proof (int): 前工作证明 proof (int): 当前工作证明 Returns: bool: 返回验证是否有效 """ guess = f'{last_proof}{proof}'.encode() guess_hash = hashlib.sha256(guess).hexdigest() # print(guess_hash) 输出计算过程 if guess_hash[0:2] == "00": return True else: return False
验证一下,创建一个类,设定前一个工作量证明为100
尝试运行代码
可以看到这里算出前两位为0的就停止,结果为226
节点注册
def register_node(self, address: str) -> None: """添加一个新节点到节点集中 Args: address (str): 节点的地址。Eg:"http://127.0.0.1:5002" """ parsed_url = urlparse(address) # 解析url参数 self.nodes.add(parsed_url.netloc) # 获取域名服务器
共识算法
def resolve_conflicts(self) -> bool: """共识算法,解决冲突,以最长且有效的链为主 Returns: bool: 冲突是否解决成功 """ neighbours = self.nodes # 获取节点信息 new_chain = None # 定义可能的新链 max_length = len(self.chain) # 获取当前链长度 for node in neighbours: # 获取节点的链条信息,如果更长且有效则直接替换 response = requests.get(f'http://{node}/chain') if response.status_code == 200: length = response.json()['length'] chain = response.json()['chain'] if length > max_length and self.vaild_chain(chain): max_length = length new_chain = chain if new_chain: self.chain = new_chain return True return False
Web功能
使用flask 部署服务器
app = Flask(__name__)if __name__ == "__main__": app.run(host='0.0.0.0', port=5000)
尝试运行代码
从postman 中可看到一些信息,目前没有定义接口,自然是404
确定一下接口
@app.route('/transactions/new', methods=['POST'])def new_transaction(): pass@app.route('/mine', methods=['GET'])def mine(): pass@app.route('/chain', methods=['GET'])def full_chain(): pass@app.route('/nodes/register', methods=['POST'])def register_nodes():pass@app.route('/nodes/resolve', methods=['GET'])def consensus():pass
交易接口
# 添加新建交易接口@app.route('/transactions/new', methods=['POST'])def new_transaction(): values = request.get_json() required = ['sender', 'recipient', 'amount'] if not all(k in values for k in required): return '确少参数', 400 index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount']) response = {'message': f'交易将会被添加到块 {index}'} return jsonify(response), 201
查看链接口
# 查看链接口@app.route('/chain', methods=['GET'])def full_chain(): response = { 'chain': blockchain.chain, 'length': len(blockchain.chain), } return jsonify(response), 200
打包区块接口
@app.route('/mine', methods=['GET'])def mine(): last_block = blockchain.last_block # 获取链上最后一个区块的信息 last_proof = last_block['proof'] proof = blockchain.proof_of_work(last_proof) # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励 blockchain.new_transaction( sender="0", recipient=node_identifier, amount=1, ) block = blockchain.new_block(proof, None) # 生成一个新块 response = { 'message': "打包成功,新区块已生成!", 'index': block['index'], 'transactions': block['transactions'], 'proof': block['proof'], 'previous_hash': block['previous_hash'], } return jsonify(response), 200
去postman 依次请求chain(查看当前链)、transactions/new(新建交易,注意是post方法提交的json数据)、mine(打包区块)、chain
这时,一个单节点的区块链流程基本实现。
节点注册接口
# 节点注册接口@app.route('/nodes/register', methods=['POST'])def register_nodes(): values = request.get_json() nodes = values.get('nodes') if nodes is None: return "Error: 请提供一个符合规则的节点", 400 for node in nodes: blockchain.register_node(node) response = { 'message': '新节点已经被添加!', 'total_nodes': list(blockchain.nodes), } return jsonify(response), 201
测试一下
共识接口
# 共识接口@app.route('/nodes/resolve', methods=['GET'])def consensus(): replaced = blockchain.resolve_conflicts() if replaced: response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain} else: response = {'message': '当前链符合要求', 'chain': blockchain.chain} return jsonify(response), 200
完整运行
- 打开两个终端构造两个节点5000 和 5001
- 分别查看5000节点 和 5001节点 的当前链
5000节点
5001节点
均只有一个创世区块 - 接着对5000节点,建立新的交易,并封装区块,再次查看链,此时应该有链上应该有两个区块,而5001节点没有操作,所以链中只有一个块
- 此时将5001节点注册到5000节点中,5000节点注册到5001节点中,由于共识机制,选取最长的链,所以此时查看5001节点的链时,会出现两个节点而不是一个。
至此,简易区块链实现结束。
完整代码
import hashlibimport jsonfrom time import timefrom urllib.parse import urlparse # url解析from uuid import uuid4 # 生成唯一idfrom flask import Flask, jsonify, requestfrom typing import Any, Dict, Listimport requestsfrom argparse import ArgumentParser # 命令行参数解析class Blockchain: def __init__(self): self.chain = [] # 存块 self.current_transactions = [] # 交易实体 self.nodes = set() # 无重复的节点集合 # 创建创世区块 self.new_block(previous_hash='1', proof=100) def register_node(self, address: str) -> None: """添加一个新节点到节点集中 Args: address (str): 节点的地址。Eg:"http://127.0.0.1:5002" """ parsed_url = urlparse(address) # 解析url参数 self.nodes.add(parsed_url.netloc) # 获取域名服务器 def new_block(self, proof: int, previous_hash=None): # 新建区块 block = { 'index': len(self.chain) + 1, 'timestamp': time(), 'transactions': self.current_transactions, 'proof': proof, 'previous_hash': previous_hash or self.hash(self.last_block), } self.current_transactions = [] # 新建区块打包后重置当前交易信息 self.chain.append(block) # 把新建的区块加入链 return block def new_transaction(self, sender: str, recipient: str, amount: int) -> int: """添加新的交易 Args: sender (str): 发送方 recipient (str): 接收方 amount (int): 金额 Returns: int: 返回一个包含此交易的区块序号 """ self.current_transactions.append({ 'sender': sender, 'recipient': recipient, 'amount': amount }) return self.last_block['index'] + 1 @staticmethod def hash(block: Dict[str, Any]) -> str: """计算哈希值,返回哈希后的摘要信息 Args: block (Dict[str, Any]): 传入一个块 Returns: str: 摘要信息 """ block_string = json.dumps(block, sort_keys=True).encode() return hashlib.sha256(block_string).hexdigest() @property def last_block(self) -> Dict[str, Any]: # 获取当前链中最后一个区块 return self.chain[-1] def proof_of_work(self, last_proof: int) -> int: """工作量计算,计算一个符合要求的哈希值 Args: last_proof (int): 上一个块的工作量随机数 Returns: int: 返回符合要求的工作量随机数 """ proof = 0 while self.valid_proof(last_proof, proof) is False: proof += 1 # print(proof) 输出计算结果 return proof def vaild_chain(self, chain: List[Dict[str, Any]]) -> bool: """验证链是否合理:最长且有效 Args: chain (List[Dict[str, Any]]): 传入链 Returns: bool: 返回是否有效 """ last_block = chain[0] # 从第一个创世区块开始遍历验证 current_index = 1 while current_index < len(chain): block = chain[current_index] print(f'{last_block}') print(f'{block}') print("\n-----------\n") # 如果当前区块的前哈希和前一个计算出来的哈希值不同则是无效链 if block['previous_hash'] != self.hash(last_block): return False # 检验工作量证明是否符合要求 if not self.valid_proof(last_block['proof'], block['proof']): return False last_block = block current_index += 1 return True def valid_proof(self, last_proof: int, proof: int) -> bool: """工作量证明验证,验证计算结果是否以2个0开头 Args: last_proof (int): 前工作证明 proof (int): 当前工作证明 Returns: bool: 返回验证是否有效 """ guess = f'{last_proof}{proof}'.encode() guess_hash = hashlib.sha256(guess).hexdigest() # print(guess_hash) 输出计算过程 if guess_hash[0:2] == "00": return True else: return False def resolve_conflicts(self) -> bool: """共识算法,解决冲突,以最长且有效的链为主 Returns: bool: 冲突是否解决成功 """ neighbours = self.nodes # 获取节点信息 new_chain = None # 定义可能的新链 max_length = len(self.chain) # 获取当前链长度 for node in neighbours: # 获取节点的链条信息,如果更长且有效则直接替换 response = requests.get(f'http://{node}/chain') if response.status_code == 200: length = response.json()['length'] chain = response.json()['chain'] if length > max_length and self.vaild_chain(chain): max_length = length new_chain = chain if new_chain: self.chain = new_chain return True return Falseapp = Flask(__name__) # flask框架node_identifier = str(uuid4()).replace('-', '') # 使者获取一个唯一的uidblockchain = Blockchain()# 添加新建交易接口@app.route('/transactions/new', methods=['POST'])def new_transaction(): values = request.get_json() required = ['sender', 'recipient', 'amount'] if not all(k in values for k in required): return '确少参数', 400 index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount']) response = {'message': f'交易将会被添加到块 {index}'} return jsonify(response), 201# 添加新建打包区块接口@app.route('/mine', methods=['GET'])def mine(): last_block = blockchain.last_block # 获取链上最后一个区块的信息 last_proof = last_block['proof'] proof = blockchain.proof_of_work(last_proof) # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励 blockchain.new_transaction( sender="0", recipient=node_identifier, amount=1, ) block = blockchain.new_block(proof, None) # 生成一个新块 response = { 'message': "打包成功,新区块已生成!", 'index': block['index'], 'transactions': block['transactions'], 'proof': block['proof'], 'previous_hash': block['previous_hash'], } return jsonify(response), 200# 查看链接口@app.route('/chain', methods=['GET'])def full_chain(): response = { 'chain': blockchain.chain, 'length': len(blockchain.chain), } return jsonify(response), 200# 节点注册接口@app.route('/nodes/register', methods=['POST'])def register_nodes(): values = request.get_json() nodes = values.get('nodes') if nodes is None: return "Error: 请提供一个符合规则的节点", 400 for node in nodes: blockchain.register_node(node) response = { 'message': '新节点已经被添加!', 'total_nodes': list(blockchain.nodes), } return jsonify(response), 201# 共识接口@app.route('/nodes/resolve', methods=['GET'])def consensus(): replaced = blockchain.resolve_conflicts() if replaced: response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain} else: response = {'message': '当前链符合要求', 'chain': blockchain.chain} return jsonify(response), 200if __name__ == "__main__": parser = ArgumentParser() # 命令行参数解析,端口默认5000 parser.add_argument('-p', '--port', default=5000, type=int, help='port to listen on') args = parser.parse_args() port = args.port app.run(host='127.0.0.1', port=port) # 启动web服务,默认本机