当前位置: 首页> 财经> 股票 > Python 中的 WebSocket 使用

Python 中的 WebSocket 使用

时间:2025/8/23 16:28:25来源:https://blog.csdn.net/pumpkin84514/article/details/141115011 浏览次数:0次

什么是 WebSocket?

WebSocket 是一种网络通信协议,能够在客户端和服务器之间建立全双工(双向)通信通道。这种连接使得双方可以在无需重新建立连接的情况下随时发送数据,适合需要实时数据传输的场景。

WebSocket 的原理

  1. 握手阶段:WebSocket 通信始于客户端发送一个特殊的 HTTP 请求,用以请求服务器升级协议为 WebSocket。如果服务器同意,会返回一个状态码 101,表示协议升级成功,接下来的通信会使用 WebSocket 协议。

  2. 数据传输阶段:在连接建立后,客户端和服务器之间可以自由地进行双向数据传输。数据被打包成帧(frame)后,通过底层的 TCP 连接传输。

  3. 关闭连接:当通信完成或出现异常时,任意一方可以发送一个关闭帧来终止连接。

WebSocket 使用场景

  • 实时聊天应用:如在线聊天室、即时通讯工具。
  • 实时数据更新:如股票价格、体育比赛直播、在线游戏状态。
  • 多人协作:如实时协作编辑工具、在线白板。
  • 实时通知:如在线通知系统、推送服务。

Python 中的 WebSocket 库和 API

在 Python 中,常用的 WebSocket 库包括 websocketsFastAPI 等。

1. websockets

websockets 是一个用于构建 WebSocket 服务器和客户端的库。它提供了简单的 API 来建立 WebSocket 连接和发送接收消息。

安装:

pip install websockets

API 示例:

服务器端代码:

import asyncio
import websockets# WebSocket 服务器处理函数
async def echo(websocket, path):async for message in websocket:await websocket.send(f"Echo: {message}")# 启动 WebSocket 服务器
start_server = websockets.serve(echo, "localhost", 8765)# 运行事件循环
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
  • websockets.serve(handler, host, port): 创建 WebSocket 服务器。
  • websocket.send(data): 发送消息到客户端。
  • websocket.recv(): 接收客户端消息。

客户端代码:

import asyncio
import websockets# 客户端处理函数
async def hello():async with websockets.connect("ws://localhost:8765") as websocket:await websocket.send("Hello, WebSocket!")response = await websocket.recv()print(f"Received: {response}")# 运行客户端
asyncio.get_event_loop().run_until_complete(hello())
  • websockets.connect(uri): 连接到 WebSocket 服务器。
  • websocket.send(data): 发送消息到服务器。
  • websocket.recv(): 接收服务器消息。
2. FastAPI 库中的 WebSocket

FastAPI 是一个现代的、快速的 Web 框架,支持异步操作,并且原生支持 WebSocket。

安装:

pip install fastapi uvicorn

API 示例:

服务器端代码:

from fastapi import FastAPI, WebSocketapp = FastAPI()# WebSocket 端点
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await websocket.accept()  # 接受 WebSocket 连接while True:data = await websocket.receive_text()  # 接收消息await websocket.send_text(f"Message text was: {data}")  # 发送消息
  • app.websocket("/ws"): 定义一个 WebSocket 端点,路径为 /ws
  • websocket.accept(): 接受客户端的 WebSocket 连接。
  • websocket.receive_text(): 接收来自客户端的文本消息。
  • websocket.send_text(data): 发送文本消息给客户端。

运行服务器:

uvicorn myapp:app --reload

在上面的命令中,myapp 是你的 Python 文件的名字,app 是 FastAPI 应用实例的名字。

场景应用示例:实时聊天应用

场景描述:用户可以在同一个聊天室中互相发送消息,消息会实时显示在所有用户的屏幕上。

实现思路:

  1. 使用 WebSocket 进行实时双向通信。
  2. 将所有连接的客户端保存在一个列表中,方便向所有客户端广播消息。
  3. 当一个客户端发送消息时,服务器将消息转发给所有其他客户端。

代码示例:

from fastapi import FastAPI, WebSocket, WebSocketDisconnectapp = FastAPI()clients = []  # 存储所有连接的客户端@app.websocket("/chat")
async def chat(websocket: WebSocket):await websocket.accept()clients.append(websocket)  # 将新连接的客户端添加到列表中try:while True:data = await websocket.receive_text()for client in clients:await client.send_text(f"User says: {data}")except WebSocketDisconnect:clients.remove(websocket)  # 移除断开连接的客户端

运行方式:
通过 uvicorn 运行服务器,并使用浏览器或其他 WebSocket 客户端工具连接到 /chat 端点。每个连接的用户发送的消息都会实时广播给所有其他用户。

总结

WebSocket 在需要实时双向通信的场景下非常有用,如聊天应用、实时数据更新等。在 Python 中,可以使用 websockets 库或 FastAPI 框架来实现 WebSocket 服务。通过掌握这些库的 API 和实际应用场景,可以轻松构建实时通信应用。

关键字:Python 中的 WebSocket 使用

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: