Flask-SocketIO 是基于 Flask 的一个扩展,用于简化在 Flask 应用中集成 WebSocket 功能。WebSocket 是一种在客户端和服务器之间实现实时双向通信的协议,常用于实现实时性要求较高的应用,如聊天应用、实时通知等,使得开发者可以更轻松地构建实时性要求较高的应用。通过定义事件处理函数,可以实现双向实时通信,为应用提供更加丰富和实时的用户体验。

前端参数拼接

Flask 提供了针对WebSocket的支持插件flask_socketio直接通过pip命令安装即可导入使用,同时前端也需要引入SocketIO.js库文件。

如下代码通过ECharts图表库和WebSocket技术实现了一个实时监控主机CPU负载的动态折线图。通过WebSocket连接到Flask应用中的Socket.IO命名空间,前端通过实时接收后端传来的CPU负载数据,动态更新折线图,展示1分钟、5分钟和15分钟的CPU负载趋势。同时,通过控制台打印实时数据,实现了方便的调试和监控功能。

                
var display = function(time,x,y,z) { var echo = echarts.init(document.getElementById("Linechart")); var option = { title: { left: 'left', text: 'CPU 利用表动态监控', }, // 调节大小 grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, // tooltip 鼠标放上去之后会自动出现坐标 tooltip: { trigger: 'axis', axisPointer: { type: 'cross', label: { backgroundColor: '#6a7985' } } }, legend: { data: ['1分钟负载', '5分钟负载', '15分钟负载'] }, xAxis: { type: 'category', // data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] data: time }, yAxis: { type: 'value' }, series: [ { name: "1分钟负载", stack: "总量", //data: [10, 25, 99, 87, 54, 66, 2], data: x, type: 'line', areaStyle: {} }, { name: "5分钟负载", stack: "总量", //data: [89, 57, 85, 44, 25, 4, 54], data: y, type: 'line', areaStyle: {} }, { name: "15分钟负载", stack: "总量", //data: [1, 43, 2, 12, 5, 4, 7], data: z, type: 'line', areaStyle: {} } ] }; echo.setOption(option,true); } var time =["","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","","",""]; var cpu_load1 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; var cpu_load5 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; var cpu_load15 = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; var update_function = function(recv) { time.push(recv.datetime); cpu_load1.push(parseFloat(recv.load1)); cpu_load5.push(parseFloat(recv.load5)); cpu_load15.push(parseFloat(recv.load15)); if(time.length >=10) { time.shift(); cpu_load1.shift(); cpu_load5.shift(); cpu_load15.shift(); console.log("时间数组: " + time); console.log("1分钟: " + cpu_load1); console.log("5分钟: " + cpu_load5); console.log("15分钟: " + cpu_load15); // 调用绘图函数 display(time,cpu_load1,cpu_load5,cpu_load15); } }; $(document).ready(function() { namespace = '/Socket'; var socket = io.connect("http://" + document.domain + ":" + location.port + namespace); socket.emit("message",{"data":"hello lyshark"}); // 初始化完成后,发送一条消息. socket.on('response', function(recv) { console.log("时间: " + recv.datetime); console.log("1分钟: " + recv.load1); console.log("5分钟: " + recv.load5); console.log("15分钟: " + recv.load15); // 调用函数完成数据填充 update_function(recv); }); });

后台代码使用Flask和Flask-SocketIO搭建了一个实时监控主机CPU负载的WebSocket应用,并将数据通过socketio.emit函数将数据推送给前端展示。

关键点概括如下:

Flask和SocketIO集成:

  • 使用Flask框架创建了一个Web应用,并通过Flask-SocketIO集成了WebSocket功能,实现了实时双向通信。

消息接收与实时推送:

  • 定义了socket事件处理函数,用于接收前端通过WebSocket发送的消息。在无限循环中,通过socketio.sleep方法设置每2秒推送一次实时的CPU负载数据给前端。

前端连接和断开事件:

  • 定义了connectdisconnect事件处理函数,分别在WebSocket连接建立和断开时触发。在控制台打印相应信息,用于监控连接状态。

实时数据推送:

  • 使用socketio.emit方法实时将CPU负载数据推送给前端,以更新折线图。推送的数据包括当前时间、1分钟负载、5分钟负载和15分钟负载。

前端页面渲染:

  • 通过Flask的render_template方法渲染了一个HTML页面,用于展示实时更新的CPU负载折线图。

调试信息输出:

  • 在每个事件处理函数中使用print语句输出调试信息,方便监测WebSocket连接和消息的传递过程。

总体来说,该应用实现了一个简单而实用的实时监控系统,通过WebSocket技术实时推送主机CPU负载数据至前端,为用户提供了实时可视化的监控体验。

from flask import Flask,render_template,requestfrom flask_socketio import SocketIOimport time,psutilasync_mode = Noneapp = Flask(__name__)app.config['SECRET_KEY'] = "lyshark"socketio = SocketIO(app)@app.route("/")def index():    return render_template("index.html")# 出现消息后,率先执行此处@socketio.on("message",namespace="/Socket")def socket(message):    print("接收到消息:",message['data'])    while True:        socketio.sleep(2)        data = time.strftime("%M:%S",time.localtime())        cpu = psutil.cpu_percent(interval=None,percpu=True)        socketio.emit("response",                      {"datetime": data, "load1": cpu[0], "load5": cpu[1], "load15": cpu[2]},                      namespace="/Socket")# 当websocket连接成功时,自动触发connect默认方法@socketio.on("connect",namespace="/Socket")def connect():    print("链接建立成功..")# 当websocket连接失败时,自动触发disconnect默认方法@socketio.on("disconnect",namespace="/Socket")def disconnect():    print("链接建立失败..")if __name__ == '__main__':    socketio.run(app,debug=True)

运行后,即可输出当前系统下CPU的负载情况,如下图所示;

后端参数拼接

如上所示的代码是在前端进行的数据拼接,如果我们想要在后端进行数据的拼接,则需要对代码进行一定的改进。

前端编写以下代码,通过WebSocket建立通信隧道,而后台则每隔2秒向前台推送传递字典数据。

                
var display = function(time,x,y,z) { var echo = echarts.init(document.getElementById("Linechart")); var option = { title: { left: 'left', text: 'CPU 利用表动态监控', }, // 调节大小 grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, // tooltip 鼠标放上去之后会自动出现坐标 tooltip: { trigger: 'axis', axisPointer: { type: 'cross', label: { backgroundColor: '#6a7985' } } }, legend: { data: ['1分钟负载', '5分钟负载', '15分钟负载'] }, xAxis: { type: 'category', // data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] data: time }, yAxis: { type: 'value' }, series: [ { name: "1分钟负载", stack: "总量", //data: [10, 25, 99, 87, 54, 66, 2], data: x, type: 'line', areaStyle: {} }, { name: "5分钟负载", stack: "总量", //data: [89, 57, 85, 44, 25, 4, 54], data: y, type: 'line', areaStyle: {} }, { name: "15分钟负载", stack: "总量", //data: [1, 43, 2, 12, 5, 4, 7], data: z, type: 'line', areaStyle: {} } ] }; echo.setOption(option,true); } $(document).ready(function() { namespace = '/Socket'; var socket = io.connect("http://" + document.domain + ":" + location.port + namespace); socket.emit("message",{"data":"hello lyshark"}); // 初始化完成后,发送一条消息. socket.on('response', function(recv) { console.log("时间: " + recv.datetime); console.log("1分钟: " + recv.load1); console.log("5分钟: " + recv.load5); console.log("15分钟: " + recv.load15); // 调用绘图函数 display(recv.datetime,recv.load1,recv.load5,recv.load15); }); });

后台代码则是收集数据,并将数据通过socketio.emit函数,推送给前端。

from flask import Flask,render_template,requestfrom flask_socketio import SocketIOimport time,psutilasync_mode = Noneapp = Flask(__name__)app.config['SECRET_KEY'] = "lyshark"socketio = SocketIO(app)# 填充数据表local_time = ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]cpu_load1 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]cpu_load5 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]cpu_load15 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]# 左移填充def shift(Array, Size, Push):    if len(Array) = 0:        Array.pop(0)        Array.append(Push)        return True    return False# 右移填充def unshift(Array, Size, Push):    if len(Array) = 0:        Array.pop(Size-1)        Array.insert(0,Push)@app.route("/")def index():    return render_template("index.html")# 出现消息后,率先执行此处@socketio.on("message",namespace="/Socket")def socket(message):    print("接收到消息:",message['data'])    while True:        socketio.sleep(1)        data = time.strftime("%M:%S",time.localtime())        cpu = psutil.cpu_percent(interval=None,percpu=True)        # 实现数组最大35,每次左移覆盖第一个        shift(local_time,35,data)        shift(cpu_load1,35,cpu[0])        shift(cpu_load5, 35, cpu[1])        shift(cpu_load15, 35, cpu[2])        socketio.emit("response",                      {"datetime": local_time, "load1": cpu_load1, "load5": cpu_load5, "load15": cpu_load15},                      namespace="/Socket")# 当websocket连接成功时,自动触发connect默认方法@socketio.on("connect",namespace="/Socket")def connect():    print("链接建立成功..")# 当websocket连接失败时,自动触发disconnect默认方法@socketio.on("disconnect",namespace="/Socket")def disconnect():    print("链接建立失败..")if __name__ == '__main__':    socketio.run(app,debug=True)

运行动态图如下所示;

文章出处:https://www.cnblogs.com/LyShark/p/17860158.html
本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!