基于Sambanova API 实现流式同声传译功能
效果基于大模型 TTS流式输出实现同声传译_哔哩哔哩_bilibili
import asyncio
import aiohttp
import websockets
import json
import io
from pydub import AudioSegment
from pydub.playback import play
import logging
from concurrent.futures import ThreadPoolExecutor# 配置日志记录器
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')style_id = "ICL_7****296"
api_endpoint = 'https://api.sambanova.ai/v1/chat/completions'
api_key = '****************'
model = 'Meta-Llama-3.2-3B-Instruct'# 调用 Sambanova API 获取流式响应
async def get_sambanova_response(prompt):headers = {'Authorization': f'Bearer {api_key}','Content-Type': 'application/json',}data = {'model': model,'messages': [{"role": "system", "content": "你是一个同声传译的翻译官,请将中文翻译成英文,将英文翻译成中文。不要输出其他信息"},{"role": "user", "content": prompt}],'prompt': prompt,'stream': True}translated_text = ""try:async with aiohttp.ClientSession() as session:async with session.post(api_endpoint, headers=headers, json=data) as response:async for line in response.content:if line:line = line.decode('utf-8').strip()if line.startswith("data: "):line = line[len("data: "):]if line == "[DONE]":breaktry:chunk = json.loads(line)if "choices" in chunk and "delta" in chunk["choices"][0] and "content" in chunk["choices"][0]["delta"]:part = chunk["choices"][0]["delta"]["content"]print(part, end="", flush=True)translated_text += partexcept json.JSONDecodeError:logging.error(f"Failed to decode JSON: {line}")except Exception as e:logging.error(f"Error while processing response: {e}")return translated_text# WebSocket 服务器地址
wss_url = f"wss://wss.*****.com/audio/tts"# 自定义请求头
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 SamanthaDoubao/1.25.3","Origin": "chrome-extension://obkci***********cldeggd","Sec-WebSocket-Key": "V1******************4w==","Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits","Cookie": "***************************"
}# 播放 AAC 音频流(同步函数)
def play_audio_stream(aac_data):audio = AudioSegment.from_file(io.BytesIO(aac_data), format="aac")play(audio)# 使用线程池异步播放音频,避免阻塞事件循环
async def play_audio_async(aac_data, executor):loop = asyncio.get_event_loop()await loop.run_in_executor(executor, play_audio_stream, aac_data)# 处理音频播放任务,尽可能减少等待时间
async def audio_player(queue: asyncio.Queue, executor: ThreadPoolExecutor):while True:aac_data = await queue.get()if aac_data is None:breaktry:logging.info("播放音频数据...")await play_audio_async(aac_data, executor)except Exception as e:logging.error(f"音频播放过程中发生错误: {e}")# 连接并接收音频数据
async def listen_and_receive(queue: asyncio.Queue, translated_text):try:async with websockets.connect(wss_url, extra_headers=headers) as websocket:text_data = {"event": "text","text": translated_text}finish_data = {"event": "finish"}await websocket.send(json.dumps(text_data))await websocket.send(json.dumps(finish_data))current_audio_data = bytearray()while True:try:message = await websocket.recv()if isinstance(message, str):try:message_json = json.loads(message)logging.info(f'Received message: {message_json}')except json.JSONDecodeError:continueif message_json.get("event") == "sentence_start":logging.info(f"开始新的一段: {message_json['sentence_start_result']['readable_text']}")current_audio_data = bytearray()elif message_json.get("event") == "sentence_end":if current_audio_data:await queue.put(current_audio_data)elif message_json.get("event") == "finish":logging.info("所有段落接收完毕")breakelif isinstance(message, bytes):current_audio_data.extend(message)except websockets.ConnectionClosedOK:logging.info("WebSocket 连接正常关闭")breakexcept websockets.ConnectionClosedError as e:logging.error(f"WebSocket 连接意外关闭: {e}")breakawait websocket.send(json.dumps(finish_data))except Exception as e:logging.error(f"发生错误: {e}")async def main():prompt = """October 12, a city railway guard road joint defense and stability and railway line safety environment governance conference was held at the city administrative conference center.October 12, a city railway guard road joint defense and stability and railway line safety environment governance conference was held at the city administrative conference center。""" translated_text = await get_sambanova_response(prompt)print(translated_text)audio_queue = asyncio.Queue()executor = ThreadPoolExecutor()player_task = asyncio.create_task(audio_player(audio_queue, executor))await listen_and_receive(audio_queue, translated_text)await audio_queue.put(None)await player_taskexecutor.shutdown()# 运行 WebSocket 客户端
asyncio.run(main())