Python实现简单的区块链
记录自己假期所学相关内容
文章中的内容,开源代码地址见文末。
文章目录
- Python实现简单的区块链
- 1、分模块实现简单的单节点区块链
- 1.1 Transaction类
- 1.2 DaDaMessage类
- 1.3 Block类
- 1.4 Dada_BlockCoin类
- 1.5 主函数BlockChainApp类
- 1.6 主函数类中实现了可视化界面,以下为演示效果
- 2、网络共识区块链编程实现
- 2.1 DaDaCoinBlockChain类
- 2.2 flask框架部分
- 2.3 初始化三个节点,分别模拟网络中的三个矿工
- 2.4 效果演示
- 3、其他
1、分模块实现简单的单节点区块链
1.1 Transaction类
class Transaction:def __init__(self, payer,# 付款方 r ecer,# 收款方 money):# 数字货币的数额self.payer = payerself.recer = recerself.money = moneyself.timestamp = datetime.datetime.now()# 交易时间def __repr__(self):return str(self.payer) + " pays " + str(self.recer) + \ " " + str(self.money) + " " + str(self.timestamp)
1.2 DaDaMessage类
class DaDaMessage:# 交易记录类def __init__(self, data):self.hash = None# 自身的哈希self.prev_hash = None# 上一个信息记录的哈希self.timestamp = datetime.datetime.now()# 交易时间self.data = data# 交易信息self.payload_hash = self._hash_payload()# 交易后的哈希def _hash_payload(self):# 对于交易时间与交易数据进行哈希计算return hashlib.sha256((str(self.timestamp) + str(self.data)).encode()).hexdigest()def _hash_message(self):# 对于交易进行锁定return hashlib.sha256((str(self.prev_hash) + str(self.payload_hash)).encode()).hexdigest()def seal(self):# 密封self.hash = self._hash_message()# 对应数据锁定,对于交易前的链锁定def validate(self):# 验证if self.payload_hash != self._hash_payload():# 判断是否有人修改raise InvalidMessage("交易数据与时间被修改" + str(self))if self.hash != self._hash_message():# 判断消息链raise InvalidMessage("交易的哈希链接被修改" + str(self))return "数据正常" + str(self)def __repr__(self):# 返回对象的基本信息mystr = "hash:{}, prev_hash:{}, data:{}".format(self.hash, self.prev_hash, self.data)return mystrdef link(self, Message):self.prev_hash = Message.hash# 链接
1.3 Block类
class Block:def __init__(self, *args):# 初始化self.messageList = []# 存储多个交易记录,存放区块self.timestamp = None# 存储多个记录最终锁定的时间self.hash = None# 当前的哈希散列self.prev_hash = None# 上一块的哈希散列if args:for arg in args:self.add_message(arg)# self.messagelist.append(arg)def add_message(self, message):# 增加交易信息# 区分第一条与后面多条,是否需要链接if len(self.messageList) > 0:message.link(self.messageList[-1])# 链接message.seal()# 密封message.validate()# 校验self.messageList.append(message)# 追加记录def link(self, block):# 区块链链接self.prev_hash = block.hashdef seal(self):# 密封self.timestamp = datetime.datetime.now()# 密封确定当前时间self.hash = self._hash_block()# 密封当前的哈希值def _hash_block(self):# 密封 上一块哈希,时间线,交易记录的最后一个if len(self.messageList) > 0:return hashlib.md5((str(self.prev_hash) +str(self.timestamp) +str(self.messageList[-1].hash)).encode("utf-8")).hexdigest()# else:# return hashlib.sha256((str(self.prev_hash) +#str(self.timestamp) +#str(0)).encode("utf-8")).hexdigest()def validate(self):# 校验for i, message in enumerate(self.messageList):# 校验每一个交易记录message.validate()# 校验每一条if i > 0 and message.prev_hash != self.messageList[i - 1].hash:raise InvalidBlock("无效block,交易记录被修改为在第{}条记录".format(i) + str(self))# print("无效block,交易记录被修改为在第{}条记录".format(i))# return str(self) + "数据NO"return " " + str(self) + "数据OK"def __repr__(self):# 类的对象描述# return "money block = hash : {}, pre_hash : {}, len : {}, time : {}".\# format(self.hash, self.prev_hash, len(self.messageList), self.timestamp)return "money block \n hash : {} \n pre_hash : {} \n len : {} \n time : {}". \format(self.hash, self.prev_hash, len(self.messageList), self.timestamp)
1.4 Dada_BlockCoin类
class Dada_BlockCoin:# 区块链def __init__(self):# 初始化self.blockList = []def add_block(self, block):# 增加区块if (len(self.blockList) > 0):block.prev_hash = self.blockList[-1].hash# 区块链的哈希block.seal()# 密封block.validate()# 校验self.blockList.append(block)# 增加区块def validate(self):# 校验for i, block in enumerate(self.blockList):try:block.validate()except InvalidBlock as e:raise InvalidBlockCoin("区块校验错误,区块索引{}".format(i))except InvalidMessage as e:print(e)def __repr__(self):# 字符串格式化return "Dada_BlockCoin : {}".format(len(self.blockList))# 获取长度
1.5 主函数BlockChainApp类
class BlockChainApp:def __init__(self, root):self.root = rootself.root.title("区块链可视化")self.blockchain = Dada_BlockCoin()self.current_block_transactions = []self.selected_block_index = None# 添加滚动条self.canvas_frame = tk.Frame(root)self.canvas_frame.pack(expand=tk.YES, fill=tk.BOTH)self.scrollbar_x = tk.Scrollbar(self.canvas_frame, orient=tk.HORIZONTAL)self.scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)self.scrollbar_y = tk.Scrollbar(self.canvas_frame)self.scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)# 区块链信息显示框self.canvas = tk.Canvas(self.canvas_frame, width=800, yscrollcommand=self.scrollbar_y.set,xscrollcommand=self.scrollbar_x.set)self.canvas.pack(side=tk.LEFT, expand=tk.YES, fill=tk.BOTH)self.scrollbar_x.config(command=self.canvas.xview)self.scrollbar_y.config(command=self.canvas.yview)self.canvas.config(scrollregion=self.canvas.bbox(tk.ALL))# 交易信息输入框self.payer_label = tk.Label(root, text="付款方:")self.payer_label.pack()self.payer_entry = tk.Entry(root)self.payer_entry.pack()self.recer_label = tk.Label(root, text="收款方:")self.recer_label.pack()self.recer_entry = tk.Entry(root)self.recer_entry.pack()self.money_label = tk.Label(root, text="金额:")self.money_label.pack()self.money_entry = tk.Entry(root)self.money_entry.pack()# 添加交易按钮self.add_transaction_btn = tk.Button(root, text="添加交易", command=self.add_transaction)self.add_transaction_btn.pack()# 添加区块按钮self.add_block_btn = tk.Button(root, text="添加区块", command=self.add_block)self.add_block_btn.pack()# 模拟篡改按钮self.tamper_block_btn = tk.Button(root, text="模拟篡改数据", command=self.tamper_block)self.tamper_block_btn.pack()# 区块查询框self.query_frame = tk.Frame(root)self.query_frame.pack()self.query_label = tk.Label(self.query_frame, text="查询区块索引:")self.query_label.pack(side=tk.LEFT)self.query_entry = tk.Entry(self.query_frame)self.query_entry.pack(side=tk.LEFT)self.query_btn = tk.Button(self.query_frame, text="查询区块", command=self.query_block)self.query_btn.pack(side=tk.LEFT)# 交易信息显示框self.transaction_info_text = tk.Text(root, wrap=tk.WORD, width=60, height=10)self.transaction_info_text.pack()def add_transaction(self):payer = self.payer_entry.get()recer = self.recer_entry.get()money = self.money_entry.get()try:money = int(money)except ValueError:messagebox.showerror("错误", "金额必须为整数")returnif payer and recer and money > 0:transaction = Transaction(payer, recer, money)self.current_block_transactions.append(DaDaMessage(transaction))self.update_blockchain_info()messagebox.showinfo("成功", "交易添加成功")else:messagebox.showerror("错误", "请输入有效的交易信息")def add_block(self):if self.current_block_transactions:block = Block(*self.current_block_transactions)self.blockchain.add_block(block)self.current_block_transactions = []self.update_blockchain_info()messagebox.showinfo("成功", "区块添加成功")else:messagebox.showerror("错误", "当前区块没有交易信息")def tamper_block(self):try:block_index = int(input("请输入要篡改的区块索引:"))except ValueError:messagebox.showerror("错误", "请输入有效的区块索引")returnif 0 <= block_index < len(self.blockchain.blockList):block = self.blockchain.blockList[block_index]if len(block.messageList) > 0:# 模拟篡改第一条交易信息block.messageList[0].data = "篡改后的交易信息"try:self.blockchain.validate()except InvalidBlockCoin as e:messagebox.showerror("篡改数据", "区块链数据已被篡改,篡改发生在第{}个区块".format(e.args[0]))except InvalidMessage as e:messagebox.showerror("篡改数据", "区块链数据已被篡改,篡改发生在第{}个区块的第{}条交易信息".format(e.args[0][0], e.args[0][1]))else:messagebox.showinfo("篡改数据", "区块链数据未被篡改")finally:self.update_blockchain_info()else:messagebox.showerror("错误", "区块中没有交易信息,无法篡改")else:messagebox.showerror("错误", "区块索引超出范围")def query_block(self):try:block_index = int(self.query_entry.get())except ValueError:messagebox.showerror("错误", "请输入有效的区块索引")returnif 0 <= block_index < len(self.blockchain.blockList):block = self.blockchain.blockList[block_index]transactions = "\n".join(str(msg.data) for msg in block.messageList)self.transaction_info_text.delete("1.0", tk.END)self.transaction_info_text.insert(tk.END, transactions)self.selected_block_index = block_indexmessagebox.showinfo("成功", "查询成功")else:messagebox.showerror("错误", "区块索引超出范围")def update_blockchain_info(self):self.canvas.delete("all")# 清空画布上的内容x, y = 20, 50# 区块链的初始位置block_width, block_height = 250, 150# 区块的宽度和高度for block in self.blockchain.blockList:self.canvas.create_rectangle(x, y, x + block_width, y + block_height, outline="black")# 绘制区块框self.canvas.create_text(x + block_width // 2, y + block_height // 2, text=str(block))# 显示区块信息if block.prev_hash:# 绘制区块之间的连接线prev_x, prev_y = x - block_width, y + block_height // 2self.canvas.create_line(prev_x, prev_y, x, y + block_height // 2, fill="red")x += block_width + 50# 每个区块之间留一定的间隔# 设置Canvas可滚动范围self.canvas.config(scrollregion=self.canvas.bbox(tk.ALL))if __name__ == "__main__":root = tk.Tk()app = BlockChainApp(root)app.root.mainloop()
1.6 主函数类中实现了可视化界面,以下为演示效果
- 初始化界面
- 添加交易后添加区块
在上述区块链可视化界面中显示对应区块,并用红色连线连接,表示区块链的哈希连接。
- 查询区块信息
输入区块对应的索引(从0开始),即可查询对应区块的交易信息
2、网络共识区块链编程实现
2.1 DaDaCoinBlockChain类
class DaDaCoinBlockChain:def __init__(self):# 初始化self.current_transactions = []# 交易列表self.chain = []# 区块链管理多个区块self.nodes = set()# 保存网络中其他节点self.new_block(previous_hash="1", proof=100)# 创建创世区块def new_block(self,proof: int,# 确定proof为int类型previous_hash: Optional[str]# 上一块的哈希类型) -> Dict[str, Any]:# 创建一个区块,返回一个字典数据类型block = {"index": len(self.chain) + 1,# 索引"timestamp": time.time(),# 当前时间"transaction": self.current_transactions,# 交易记录"proof": proof,# 工作量证明"previous_hash": previous_hash or self.hash(self.chain[-1])# 前一区块哈希}self.current_transactions = []# 交易记录加入区块之后清空self.chain.append(block)# 区块加入区块链return blockdef new_transactions(self, sender: str, recipient: str, amount) -> int:# 创建一个交易self.current_transactions.append({"sender": sender,# 付款方"recipient": recipient,# 收款方"amount": amount# 数量})return self.last_block["index"] + 1# 索引标记交易数量@propertydef last_block(self) -> Dict[str, Any]:# 取得最后一个区块return self.chain[-1]@staticmethoddef hash(block: Dict[str, any]) -> str:# 哈希加密,传递一个字典,返回字符串blockString = json.dumps(block, sort_keys=True).encode()# 编码return hashlib.sha256(blockString).hexdigest()def proof_of_work(self, last_proof: int) -> int:# 工作量证明,挖矿过程proof = 0while self.valid_proof(last_proof, proof) is False:proof += 1return proof@staticmethod# 第N个区块依赖于N-1个区块,简单挖矿def valid_proof(last_proof: int, proof: int) -> bool:# 验证证明guess = f'{last_proof * proof}'.encode()guess_hash = hashlib.sha256(guess).hexdigest()# print(str(guess_hash))return guess_hash[-4:] == "1234"def valid_chain(self, chain: List[Dict[str, Any]]) -> bool:# 区块链校验# List[Dict[str, Any]]是一个列表,列表的每个元素都是字典last_block = chain[0]# 第一个区块curr_index = 1# 当前的第一个索引while curr_index < len(chain):block = chain[curr_index]# 当前区块# 哈希校验,校验区块链的链接if block["previous_hash"] != self.hash(last_block):return False# 工作量校验,挖矿的工作量校验if not self.valid_proof(last_block["proof"], block["proof"]):return Falselast_block = block# 轮询curr_index += 1# 索引自增return Truedef register_node(self, addr: str) -> None:# 加入网络的其他节点,用于更新now_url = urlparse(addr)# 解析self.nodes.add(now_url.netloc)# 增加网络节点def resolve_conflicts(self) -> bool:# 共识算法# 网络中的多个节点,取出最长的neighbours = self.nodes# 取得所有的节点new_chain = None# 新的区块链max_length = len(self.chain)# 当前的区块链长度for node in neighbours:response = requests.get(f"http://{node}/chain")# 访问网络节点if response.status_code == 200:length = response.json()["length"]# 取出长度chain = response.json()["chain"]# 取出区块链# 如果当前区块链比我长并且经得起校验,那么就更新if length > max_length and self.valid_chain(chain):max_length = lengthnew_chain = chain# 保存长度与区块链if new_chain:self.chain = new_chain# 替换区块链return Truereturn False
2.2 flask框架部分
dadacoin = DaDaCoinBlockChain()# 创建一个网络节点node_id = str(uuid4()).replace("-", "")# 节点替换,生成密钥print("当前钱包地址:", node_id)app = Flask(__name__)# 初始化flask框架@app.route("/")def index_page():return "你好,欢迎来到达达币系统!"@app.route("/chain")# 查看所有的区块链def index_chain():response = {"chain": dadacoin.chain,# 区块链"length": len(dadacoin.chain)# 区块链长度}return jsonify(response), 200@app.route("/mine")# 挖矿def index_mine():last_block = dadacoin.last_block# 取得最后一个区块last_proof = last_block["proof"]# 取得工作量证明proof = dadacoin.proof_of_work(last_proof)# 挖矿计算# 系统奖励比特币,挖矿产生交易dadacoin.new_transactions(sender="0",# 0代表系统奖励recipient=node_id,# 当前钱包地址amount=10# 奖励数量)block = dadacoin.new_block(proof, None)# 增加一个区块response = {"message": "新的区块创建","index": block["index"],# 仓建的索引"transaction": block["transaction"],# 交易"proof": block["proof"],# 工作量证明"previous_hash": block["previous_hash"]# 上一块的哈希}return jsonify(response), 200@app.route("/new_transaction", methods=["POST"])# 创建一个新的交易def new_transaction():values = request.get_json()# 抓取网络传输的信息required = ["sender", "recipient", "amount"]if not all(key in values for key in required):return "数据不完整", 400index = dadacoin.new_transactions(values["sender"],values["recipient"],values["amount"])# 新增一个交易response = {"message": f"交易加入到区块{index}",}return jsonify(response), 200@app.route("/new_node", methods=["POST"]) # 增加网络节点def new_node():values = request.get_json() # 获取json字符串nodes = values.get("nodes") # 获取所有节点if nodes is None:return "节点为空", 400for node in nodes:dadacoin.register_node(node)# 增加网络节点response = {"message": "网络节点已经增加","nodes": list(dadacoin.nodes) # 查看所有节点}return jsonify(response), 200@app.route("/node_refresh")def node_refresh():replaced = dadacoin.resolve_conflicts() # 共识算法进行最长替换message = ""if replaced:message += "区块链已经被替换为最长"else:message += "当前区块链已经是最长无需替换"response = {"message": message,"new--chain": dadacoin.chain}return jsonify(response), 200
2.3 初始化三个节点,分别模拟网络中的三个矿工
if __name__ == '__main__':app.run("127.0.0.1", 5000)if __name__ == '__main__':app.run("127.0.0.1", 5001)if __name__ == '__main__':app.run("127.0.0.1", 5002)
2.4 效果演示
分别启动三个矿工节点
- 初始化界面
- 初次访问chain路径时,显示创世区块
- 访问mine路径时,进行挖矿操作,挖出新的区块,矿工获得出块奖励
- 添加交易
使用postman进行操作
此时仅仅是将交易添加到区块,还没有挖出区块3,再次进行mine操作,可以看到区块3中除了出块奖励,还有此次交易
- 网络共识,解决分叉冲突
此时模拟节点1挖出6个区块,节点2挖出4个区块,节点3挖出3个区块。将节点1和节点2的信息同步给区块3,此时根据最长链原则,应该更新为节点1的6个区块。
使用postman添加节点
刷新节点信息
此时节点3已经被最长链代替。
3、其他
开源代码地址: Gitee仓库
b站参考视频:参考视频