本文章利用 Python 实现一个简单的功能较为完善的区块链系统(包括区块链结构、账户、钱包、转账),采用的共识机制是 POW。
一、区块与区块链结构
Block.py
import hashlibfrom datetime import datetimeclass Block:"""区块链结构:prev_hash:父区块哈希值data: 区块内容timestamp:区块创建时间hash: 区块哈希值"""def __init__(self, data, prev_hash):# 将传入的父区块哈希值和数据保存到变量中self.prev_hash = prev_hashself.data = data# 获得当前的时间self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 计算区块哈希值# 获取哈希对象message = hashlib.sha256()# 先将数据内容转为字符串并进行编码,再将它们哈希# 注意:update() 方法现在只接受 bytes 类型的数据,不接收 str 类型message.update(str(self.prev_hash).encode('utf-8'))message.update(str(self.prev_hash).encode('utf-8'))message.update(str(self.prev_hash).encode('utf-8'))# update() 更新 hash 对象,连续的调用该方法相当于连续的追加更新# 返回字符串类型的消息摘要self.hash = message.hexdigest()
BlockChain.py
from Block import Blockclass BlockChain:"""区块链结构体blocks: 包含区块的列表"""def __init__(self):self.blocks = []def add_block(self, block):"""添加区块:param block::return:"""self.blocks.append(block)# 新建区块genesis_block = Block(data="创世区块", prev_hash="")new_block1 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash)new_block2 = Block(data="张三转给王五三个比特币", prev_hash=genesis_block.hash)# 新建一个区块链对象blockChain = BlockChain()# 将刚才新建的区块加入区块链blockChain.add_block(genesis_block)blockChain.add_block(new_block1)blockChain.add_block(new_block2)# 打印区块链信息print("区块链包含区块个数为:%d\n" % len(blockChain.blocks))blockHeight = 0for block in blockChain.blocks:print(f"本区块高度为:{blockHeight}")print(f"父区块哈希:{block.prev_hash}")print(f"区块内容:{block.data}")print(f"区块哈希:{block.hash}")print()blockHeight += 1
测试结果
二、加入工作量证明(POW)
将工作量证明加入到 Block.py 中
import hashlibfrom datetime import datetimefrom time import timeclass Block:"""区块链结构:prev_hash:父区块哈希值data: 区块内容timestamp:区块创建时间hash: 区块哈希值nonce:随机数"""def __init__(self, data, prev_hash):# 将传入的父区块哈希值和数据保存到变量中self.prev_hash = prev_hashself.data = data# 获得当前的时间self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 设置随机数、哈希初始值为 Noneself.nonce = Noneself.hash = None# 类的 __repr__() 方法定义了实例化对象的输出信息def __repr__(self):return f"区块内容:{self.data}\n区块哈希值:{self.hash}"class ProofOfWork:"""工作量证明:block:区块difficulty: 难度值"""def __init__(self, block, difficult=5):self.block = block# 定义出块难度,默认为 5,表示有效哈希值以 5 个零开头self.difficulty = difficultdef mine(self):"""挖矿函数:return:"""i = 0prefix = '0' * self.difficultywhile True:message = hashlib.sha256()message.update(str(self.block.prev_hash).encode('utf-8'))message.update(str(self.block.data).encode('utf-8'))message.update(str(self.block.timestamp).encode('utf-8'))message.update(str(i).encode('utf-8'))# digest() 返回摘要,作为二进制数据字符串值# hexdigest() 返回摘要,作为十六进制数据字符串值digest = message.hexdigest()# str.startswith(prefix) 检测字符串是否是以 prefix(字符串)开头,返回布尔值if digest.startswith(prefix):# 幸运数字self.block.nonce = i# 区块哈希值为十六进制数据字符串摘要self.block.hash = digestreturn self.blocki += 1def validate(self):"""验证有效性:return:"""message = hashlib.sha256()message.update(str(self.block.prev_hash).encode('utf-8'))message.update(str(self.block.data).encode('utf-8'))message.update(str(self.block.timestamp).encode('utf-8'))message.update(str(self.block.nonce).encode('utf-8'))digest = message.hexdigest()prefix = '0' * self.difficultyreturn digest.startswith(prefix)# ++++++++测试++++++++# 定义一个区块b = Block(data="测试", prev_hash="")# 定义一个工作量证明w = ProofOfWork(b)# 开始时间start_time = time()# 挖矿,并统计函数执行时间print("+++开始挖矿+++")valid_block = w.mine()# 结束时间end_time = time()print(f"挖矿花费时间:{end_time - start_time}秒")# 验证区块print(f"区块哈希值是否符合规则:{w.validate()}")print(f"区块哈希值为:{b.hash}")
测试结果
更新 BlockChain.py
from Block import Block, ProofOfWorkclass BlockChain:"""区块链结构体blocks: 包含区块的列表"""def __init__(self):self.blocks = []def add_block(self, block):"""添加区块:param block::return:"""self.blocks.append(block)# 新建一个区块链对象blockChain = BlockChain()# 新建区块block1 = Block(data="创世区块", prev_hash="")w1 = ProofOfWork(block1)genesis_block = w1.mine()blockChain.add_block(genesis_block)block2 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash)w2 = ProofOfWork(block2)block = w2.mine()blockChain.add_block(block)block3 = Block(data="张三转给王五三个比特币", prev_hash=block.hash)w3 = ProofOfWork(block3)block = w3.mine()blockChain.add_block(block)# 打印区块链信息print("区块链包含区块个数为:%d\n" % len(blockChain.blocks))blockHeight = 0for block in blockChain.blocks:print(f"本区块高度为:{blockHeight}")print(f"父区块哈希:{block.prev_hash}")print(f"区块内容:{block.data}")print(f"区块哈希:{block.hash}")print()blockHeight += 1
测试结果
三、实现钱包、账户、交易功能
实现钱包、账户、交易功能要先安装非对称加密算法库 ecdsa。如果网速慢,引用下面这个网站
-i https://pypi.tuna.tsinghua.edu.cn/simple
添加钱包、账户功能 Wallet.py
import base64import binasciifrom hashlib import sha256# 导入椭圆曲线算法from ecdsa import SigningKey, SECP256k1, VerifyingKeyclass Wallet:"""钱包"""def __init__(self):"""钱包初始化时基于椭圆曲线生成一个唯一的秘钥对,代表区块链上一个唯一的账户"""# 生成私钥self._private_key = SigningKey.generate(curve=SECP256k1)# 基于私钥生成公钥self._public_key = self._private_key.get_verifying_key()@propertydef address(self):"""这里通过公钥生成地址"""h = sha256(self._public_key.to_pem())# 地址先由公钥进行哈希算法,再进行 Base64 计算而成return base64.b64encode(h.digest())@propertydef pubkey(self):"""返回公钥字符串"""return self._public_key.to_pem()def sign(self, message):"""生成数字签名"""h = sha256(message.encode('utf8'))# 利用私钥生成签名# 签名生成的是一串二进制字符串,为了便于查看,这里转换为 ASCII 字符串进行输出return binascii.hexlify(self._private_key.sign(h.digest()))def verify_sign(pubkey, message, signature):"""验证签名"""verifier = VerifyingKey.from_pem(pubkey)h = sha256(message.encode('utf8'))return verifier.verify(binascii.unhexlify(signature), h.digest())
实现转账功能 Transaction.py
import jsonclass Transaction:"""交易的结构"""def __init__(self, sender, recipient, amount):"""初始化交易,设置交易的发送方、接收方和交易数量"""# 交易发送者的公钥self.pubkey = None# 交易的数字签名self.signature = Noneif isinstance(sender, bytes):sender = sender.decode('utf-8')self.sender = sender# 发送方if isinstance(recipient, bytes):recipient = recipient.decode('utf-8')self.recipient = recipient# 接收方self.amount = amount# 交易数量def set_sign(self, signature, pubkey):"""为了便于验证这个交易的可靠性,需要发送方输入他的公钥和签名"""self.signature = signature# 签名self.pubkey = pubkey# 发送方公钥def __repr__(self):"""交易大致可分为两种,一是挖矿所得,而是转账交易挖矿所得无发送方,以此进行区分显示不同内容"""if self.sender:s = f"从{self.sender}转自{self.recipient}{self.amount}个加密货币"elif self.recipient:s = f"{self.recipient}挖矿所得{self.amount}个加密货币"else:s = "error"return sclass TransactionEncoder(json.JSONEncoder):"""定义Json的编码类,用来序列化Transaction"""def default(self, obj):if isinstance(obj, Transaction):return obj.__dict__else:return json.JSONEncoder.default(self, obj)# return super(TransactionEncoder, self).default(obj)
更新 Block.py
import hashlibimport jsonfrom datetime import datetimefrom Transaction import Transaction, TransactionEncoderclass Block:"""区块结构prev_hash:父区块哈希值transactions: 交易对timestamp:区块创建时间hash: 区块哈希值Nonce:随机数"""def __init__(self, transactions, prev_hash):# 将传入的父哈希值和数据保存到类变量中self.prev_hash = prev_hash# 交易列表self.transactions = transactions# 获取当前时间self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 设置Nonce和哈希的初始值为Noneself.nonce = Noneself.hash = None# 类的 __repr__() 方法定义了实例化对象的输出信息def __repr__(self):return f"区块内容:{self.transactions}\n区块哈希值:{self.hash}"class ProofOfWork:"""工作量证明block:区块difficulty: 难度值"""def __init__(self, block, miner, difficult=5):self.block = blockself.miner = miner# 定义工作量难度,默认为5,表示有效的哈希值以5个“0”开头self.difficulty = difficult# 添加挖矿奖励self.reward_amount = 1def mine(self):"""挖矿函数"""i = 0prefix = '0' * self.difficulty# 设置挖矿自动生成交易信息,添加挖矿奖励t = Transaction(sender="",recipient=self.miner.address,amount=self.reward_amount,)sig = self.miner.sign(json.dumps(t, cls=TransactionEncoder))t.set_sign(sig, self.miner.pubkey)self.block.transactions.append(t)while True:message = hashlib.sha256()message.update(str(self.block.prev_hash).encode('utf-8'))# 更新区块中的交易数据# message.update(str(self.block.data).encode('utf-8'))message.update(str(self.block.transactions).encode('utf-8'))message.update(str(self.block.timestamp).encode('utf-8'))message.update(str(i).encode("utf-8"))digest = message.hexdigest()if digest.startswith(prefix):self.block.nonce = iself.block.hash = digestreturn self.blocki += 1def validate(self):"""验证有效性"""message = hashlib.sha256()message.update(str(self.block.prev_hash).encode('utf-8'))# 更新区块中的交易数据# message.update(str(self.block.data).encode('utf-8'))message.update(json.dumps(self.block.transactions).encode('utf-8'))message.update(str(self.block.timestamp).encode('utf-8'))message.update(str(self.block.nonce).encode('utf-8'))digest = message.hexdigest()prefix = '0' * self.difficultyreturn digest.startswith(prefix)
更新 BlockChain.py
from Block import Block, ProofOfWorkfrom Transaction import Transactionfrom Wallet import Wallet, verify_signclass BlockChain:"""区块链结构体blocks:包含的区块列表"""def __init__(self):self.blocks = []def add_block(self, block):"""添加区块"""self.blocks.append(block)def print_list(self):print(f"区块链包含个数为:{len(self.blocks)}")for block in self.blocks:height = 0print(f"区块链高度为:{height}")print(f"父区块为:{block.prev_hash}")print(f"区块内容为:{block.transactions}")print(f"区块哈希值为:{block.hash}")height += 1print()
为了方便我们对区块链进行操作,我们可以在 BlockChain.py 中补充一些方法
# 传入用户和区块链,返回用户的“余额”def get_balance(user, blockchain):balance = 0for block in blockchain.blocks:for t in block.transactions:if t.sender == user.address.decode():balance -= t.amountelif t.recipient == user.address.decode():balance += t.amountreturn balance# user生成创世区块(新建区块链),并添加到区块链中def generate_genesis_block(user):blockchain = BlockChain()new_block = Block(transactions=[], prev_hash="")w = ProofOfWork(new_block, user)genesis_block = w.mine()blockchain.add_block(genesis_block)# 返回创世区块return blockchain# 用户之间进行交易并记入交易列表def add_transaction(sender, recipient, amount):# 新建交易new_transaction = Transaction(sender=sender.address,recipient=recipient.address,amount=amount)# 生成数字签名sig = sender.sign(str(new_transaction))# 传入付款方的公钥和签名new_transaction.set_sign(sig, sender.pubkey)return new_transaction# 验证交易,若验证成功则加入交易列表def verify_new_transaction(new_transaction, transactions):if verify_sign(new_transaction.pubkey, str(new_transaction), new_transaction.signature ):# 验证交易签名没问题,加入交易列表print("交易验证成功")transactions.append(new_transaction)else:print("交易验证失败")# 矿工将全部验证成功的交易列表打包出块def generate_block(miner, transactions, blockchain):new_block = Block(transactions=transactions,prev_hash=blockchain.blocks[len(blockchain.blocks) - 1].hash)print("生成新的区块...")# 挖矿w = ProofOfWork(new_block, miner)block = w.mine()print("将新区块添加到区块链中")blockchain.add_block(block)
进行测试
# 新建交易列表transactions = []# 创建 3 个用户alice = Wallet()tom = Wallet()bob = Wallet()print("alice创建创世区块...")blockchain = generate_genesis_block(alice)print()print(f"alice 的余额为{get_balance(alice, blockchain)}个比特币")print(f"tom 的余额为{get_balance(tom, blockchain)}个比特币")print(f"bob 的余额为{get_balance(bob, blockchain)}个比特币")print()# 打印区块链信息blockchain.print_list()print("新增交易:alice 转账 0.5 比特币给 tom")nt = add_transaction(alice, tom, 0.5)print()verify_new_transaction(nt, transactions)print(f"矿工 bob 将全部验证成功的交易列表打包出块...")generate_block(bob, transactions, blockchain)print("添加完成\n")# 打印区块链信息blockchain.print_list()
测试结果