当前位置: 首页> 文旅> 酒店 > wss客户端请求(python websocket)

wss客户端请求(python websocket)

时间:2025/7/9 1:29:55来源:https://blog.csdn.net/weixin_46770425/article/details/139999598 浏览次数:0次
import asyncio
import websockets
import jsonasync def connect_to_wss(uri):# 连接到WSS服务器async with websockets.connect(uri) as websocket:# 执行一些操作,例如发送和接收消息data = {"脱敏处理"}await websocket.send(json.dumps(data))while 1:response = await websocket.recv()try:rjson = json.loads(response)except:with open('ok.mp3','wb')as f:f.write(response)print(response)else:print(rjson)if rjson['event'] == 'TaskFinished':print('tts is ok')breakprint('='*50)# WSS服务器地址,包含协议
wss_url = "wss://脱敏处理/internal/api/v2/ws?device_id=脱敏处理&iid=脱敏处理"# 运行异步事件循环
asyncio.get_event_loop().run_until_complete(connect_to_wss(wss_url))
asyncio.get_event_loop().close()

上面采用的是异步方式 等待可以直接

下方的如果不采用异步方式,使用websocket-client需要不停发送消息的就需要使用线程

例如

import json
import time
import _thread
import websocketweb_socket_url = "wss://appcomm-user.脱敏处理.com/app-commserv-user/websocket?qrToken=%s"
qr_token = "ca6e6cfb70de4f2f915b968aefcad404"
once_password = ""
uuid = ""def wss_on_message(ws, message):print("=============== [message] ===============")message = json.loads(message)print(message)if "扫码成功" in message["msg"]:global once_password, uuidonce_password = message["oncePassword"]uuid = message["uuid"]ws.close()def wss_on_error(ws, error):print("=============== [error] ===============")print(error)ws.close()def wss_on_close(ws, close_status_code, close_msg):print("=============== [closed] ===============")print(close_status_code)print(close_msg)def wss_on_open(ws):def run(*args):while True:ws.send(qr_token)time.sleep(8)_thread.start_new_thread(run, (qr_token,))def wss():# websocket.enableTrace(True)  # 是否显示连接详细信息ws = websocket.WebSocketApp(web_socket_url % qr_token, on_open=wss_on_open,on_message=wss_on_message, on_error=wss_on_error,on_close=wss_on_close)ws.run_forever()

在这里需要每间隔8s进行发送请求获取二维码状态 那么这个时候 如果直接主线程会导致阻塞 无法获取服务器响应 所以必须采用线程 那么如果使用异步就不用啦

await asyncio.sleep(8)

如果想要更加简洁明了 还是使用websocket-client吧 方便的话websocket异步

当然 你也可以创建一个属于自己的服务端

import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:message = "I got your message: {}".format(message)await websocket.send(message)asyncio.get_event_loop().run_until_complete(websockets.serve(echo, 'localhost', 8765))
asyncio.get_event_loop().run_forever()

抓包调试推荐apipost

注意这个服务端 你需要在发送消息之后使用

await websocket.close()这样才能主动关闭客户端的连接 当然他也只有一次通信

 

关键字:wss客户端请求(python websocket)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: