server端代码如下
import socketserverimport ssl# 需要校验客户端ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)# 发给客户端ssl_context.load_cert_chain(certfile="./server.crt", keyfile="./server.key")# 信任库ssl_context.load_verify_locations(./ca.crt")ssl_context.verify_mode=ssl.CERT_REQUIRED# 检查主机名是否与提供的证书中的主机名匹配ssl_context.check_hostname=Falseclass MyHandler(socketserver.BaseRequestHandler):def handle(self):# 接收客户端的数据data = self.request.recv(1024).strip()print(f'Received from client: {data.decode()}')# 发送响应给客户端response = b'Hello, Client!'self.request.sendall(response)if __name__ == '__main__':server = socketserver.TCPServer(('0.0.0.0', 1234), MyHandler)server.socket = ssl_context.wrap_socket(server.socket, server_side=True)server.serve_forever()
client端代码如下
import socketimport ssl# 定义服务器的地址和端口server_address = ('localhost', 1234)ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)ssl_context.check_hostname = False# 加载证书链,用于发给对端,相当于keystoressl_context.load_cert_chain(certfile="./server.crt", keyfile="./server.key")ssl_context.verify_mode=ssl.CERT_REQUIRED# trustStoressl_context.load_verify_locations(cafile="D:\code\Quantity\ca.crt")# 创建一个TCP套接字client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)csocket = ssl_context.wrap_socket(client_socket, server_side=False)try:# 连接到服务器csocket.connect(server_address)# 发送数据给服务器data = b'Hello, Server!'csocket.sendall(data)# 接收服务器的响应数据response = csocket.recv(1024)print('Received from server:', response.decode())finally:# 关闭客户端套接字csocket.close()
双向认证证书生成:https://blog.csdn.net/u014644574/article/details/126190061