1、问题
我使用Flask和Flask-SocketIO 来做 Websocket 链接。前期正常使用,但是后期布置修改什么导致Websocket连接失败。排查需求,才发现初始化不正常导致。
SocketIO 和 Flask 应用的初始化顺序和引用循环的问题
2、环境
python-engineio==4.11.1
python-socketio==5.12.0
Flask-SocketIO==5.3.6
3、正常初始化【单文件】
from flask import Flask, render_template
from flask_socketio import SocketIOapp = Flask(__name__)
socketio = SocketIO(app) # 初始化 SocketIO# 默认路由,用于渲染 HTML 页面
@app.route('/')
def index():return render_template('index.html')# 出现消息后,率先执行此处
@socketio.on("message", namespace="/ws")
def socket(message):print(f"接收到消息: {message}")for i in range(1, 10):socketio.sleep(1)print(f"发送消息: {i}")socketio.emit("response", # 绑定通信{"data": i}, # 返回socket数据namespace="/ws")# 当websocket连接成功时,自动触发connect默认方法
@socketio.on("connect", namespace="/ws")
def connect():print("链接建立成功..")# 当websocket连接失败时,自动触发disconnect默认方法
@socketio.on("disconnect", namespace="/ws")
def disconnect():print("链接建立失败..")if __name__ == '__main__':socketio.run(app, debug=True, host='0.0.0.0', port=5000,allow_unsafe_werkzeug=True)
4、多文件初始化【两个文件】
主要是 manage.py 和 init.py 两个文件。
app/init.py
from flask import Flask
from flask_socketio import SocketIO
from flasgger import Swagger
from flask_cors import CORS
from flask_migrate import Migrate
from app.config import config
from app.extension import db# 创建一个全局的 socketio 对象,但不立即初始化
socketio = SocketIO()# 其他 swagger 配置保持不变...def create_app(DevelopmentConfig=None):if DevelopmentConfig is None:DevelopmentConfig = 'development'app = Flask(__name__)app.config['SECRET_KEY'] = '123456'# 加载配置项app.config.from_object(config.get(DevelopmentConfig))from app.api import config_blueprint# 注册蓝图config_blueprint(app)# 数据库配置初始化config_extensions(app)# 数据库迁移相关代码...from app.api.models.EsModels import sysEsLogs,sysCretitLogsfrom app.api.models.NetModels import sysNetLogsfrom app.api.models.UpsModels import sysUpsLogsmigrate = Migrate(app, db)# Swagger初始化Swagger(app, config=swagger_config, template=swagger_template)# CORS配置CORS(app, resources={r'/*': {'origins': '*'}}, supports_credentials=True)# 初始化 SocketIOsocketio.init_app(app, cors_allowed_origins="*")return app
socket_events.py
from app import socketio# Socket.IO 事件处理
@socketio.on("message", namespace="/ws")
def socket(message):print(f"接收到消息: {message}")for i in range(1, 10):socketio.sleep(1)print(f"发送消息: {i}")socketio.emit("response",{"data": i},namespace="/ws")@socketio.on("connect", namespace="/ws")
def connect():print("链接建立成功..")@socketio.on("disconnect", namespace="/ws")
def disconnect():print("链接建立失败..")
manage.py
# -*- coding: utf-8 -*-
# @Time : 2024/5/2 12:01
# @Author : 南宫乘风
# @Email : 1794748404@qq.com
# @File : manage.py
# @Software: PyCharm
import atexit
import os
import sysfrom apscheduler.schedulers.background import BackgroundSchedulerfrom app import create_app, socketio
from app.common.util.LogHandler import log
from app.crontab.NetCronTab import NetworkMonitor
from app.crontab.UpsCronTab import UPSMonitor
from app.socket_events import *
# 默认为开发环境,按需求修改 production development
config_name = 'development'app = create_app(config_name)
# 解决中文乱码
# app.json.ensure_ascii = False# from skywalking import agent, config
# config.init(agent_collector_backend_services='192.168.82.105:11800', agent_name='python-flask@devTenant',agent_instance_name="python-flask")
# agent.start()# 数据库迁移
def start_scheduler():# 创建一个后台调度器scheduler = BackgroundScheduler(timezone="Asia/Shanghai")from app.crontab.EsCronTab import EsIndexCronscheduler.add_job(func=EsIndexCron, trigger='interval', minutes=1)log.info("EsIndexCron 定时任务启动成功")# scheduler.add_job(func=send_alert, trigger="interval", seconds=20)# 启动调度器scheduler.start()atexit.register(lambda: scheduler.shutdown())if __name__ == '__main__':# 获取当前文件的绝对路径current_file = os.path.abspath(__file__)base_dir = os.path.dirname(current_file)# 将项目目录添加到 sys.pathif base_dir not in sys.path:sys.path.append(base_dir)if not app.debug or app.testing:start_scheduler()log.info("定时任务启动成功")socketio.run(app, debug=True, use_reloader=True, host='0.0.0.0', port=5001)# use_reloader=False, threaded=True,
在postman输入地址和监听事件