结合这个ftdi_bin_decoder.py脚本文件,以及SDK的C文件,可总结录音文件格式是
基于 TLV 编码的 Apollo 设备数据解码
支持两种操作模式:离线解码 和 实时记录解码
数据帧结构:
+------------+--------+--------+--------+--------+---------------------+
| 帧头(2B) | 长度(2)| 类型(2)| 包序号(2)| 时间戳(4) | 数据载荷(N) |
+------------+--------+--------+--------+--------+---------------------+
0x4A 0x00 len type pkt_num timestamp payload
"""
from docopt import docopt # 命令行参数解析
import struct # 二进制数据解包
import soundfile as sf # 音频文件处理(未直接使用,可能为预留)
import numpy as np # 数据包序号校验
import pandas as pd # 数据统计(未直接使用,可能为预留)
import subprocess # 执行外部程序(AmU2S.exe)
import time # 时间处理(未直接使用)
import os # 文件系统操作
args = docopt(__doc__) # 解析命令行参数
class U2SDecoder(object):
"""Apollo 设备数据解码器核心类"""
def __init__(self, frame_header, input_bin_path, out_bin_name, integrity_check):
"""
初始化解码器
:param frame_header: 帧头标识,例如 [0x4a, 0x00]
:param input_bin_path: 输入.bin文件路径
:param out_bin_name: 输出文件前缀名
:param integrity_check: 是否启用数据完整性校验
"""
# 帧头识别相关
self.header_indx = 0 # 当前帧头位置索引
self.header_pattern = frame_header # 帧头特征值(2字节)
self.header_packet_len = 8 # 头部总长度(帧头+长度+类型+包序号)
# 数据帧参数
self.len = 0 # 当前帧数据长度(不含帧头)
self.type = 0 # 数据类型 0-3
self.packet_num = 0 # 包序号(用于完整性校验)
# 缓冲区管理
self.frame_buff = bytes() # 当前帧有效载荷缓存
self.file_end = False # 文件结束标志
# 文件操作
self.usb_bin_handler = open(input_bin_path, 'rb') # 输入文件句柄
self.total_data_bytes = os.path.getsize(input_bin_path) # 总字节数(进度计算)
# 输出文件初始化(按类型分开存储)
self.output_group_path = out_bin_name
self.output_tp0 = open(out_bin_name + "_general", 'wb') # 类型0:通用数据
self.output_tp1 = open(out_bin_name + "_pcm", 'wb') # 类型1:PCM音频
self.output_tp2 = open(out_bin_name + "_spp", 'wb') # 类型2:SPP协议数据
self.output_tp3 = open(out_bin_name + "_opus", 'wb') # 类型3:Opus音频
# 完整性校验配置
self.integrity_check = integrity_check
self.generic_packet_num = np.array([], dtype=np.int32) # 各类型包序号记录
self.pcm_packet_num = np.array([], dtype=np.int32)
self.spp_packet_num = np.array([], dtype=np.int32)
self.codec_packet_num = np.array([], dtype=np.int32)
# 数据量统计
self.data_count_tp0 = 0 # 类型0数据字节数
self.data_count_tp1 = 0 # 类型1数据字节数
self.data_count_tp2 = 0 # 类型2数据字节数
self.data_count_tp3 = 0 # 类型3数据字节数
def frame_check(self):
"""帧结构完整性检查"""
# 读取帧头后的元数据(长度2B + 类型2B + 包序号2B = 6B)
header_packet = self.usb_bin_handler.read(self.header_packet_len-2)
header_info = struct.unpack('<'+'B'*len(header_packet), header_packet)
# 解析元数据(小端模式)
self.len = header_info[0] + 256 * header_info[1] # 数据长度
self.type = header_info[2] + 256 * header_info[3] # 数据类型
self.packet_num = header_info[4] + 256 * header_info[5] # 包序号
# 长度合理性检查
if self.len < (self.header_packet_len - 2):
print(f"无效帧长度:{self.len}")
self.usb_bin_handler.seek(-(self.header_packet_len-2), 1)
return False
# 类型有效性检查
if self.type not in [0, 1, 2, 3]:
print(f"未知数据类型:{self.type}")
self.usb_bin_handler.seek(-(self.header_packet_len-2), 1)
return False
# 读取实际数据载荷(长度-头部已读部分)
data_stream = self.usb_bin_handler.read(self.len - 4)
# 检查下一帧头是否存在(验证帧完整性)
if len(data_stream) == (self.len - 4):
next_header = struct.unpack('BB', data_stream[-2:])
if next_header == tuple(self.header_pattern):
self.frame_buff += data_stream[:-2] # 保存有效载荷(排除下一帧头)
self.usb_bin_handler.seek(-2, 1) # 回退文件指针
# 记录包序号(完整性校验时使用)
if self.integrity_check:
{ # 按类型存储包序号
0: self.generic_packet_num,
1: self.pcm_packet_num,
2: self.spp_packet_num,
3: self.codec_packet_num
}[self.type] = np.append(..., self.packet_num)
return True
else:
self.usb_bin_handler.seek(-(self.len+2), 1)
return False
else: # 文件末尾处理
self.file_end = True
return False
def frame_load(self):
"""将解码后的帧数据写入对应文件"""
if self.type == 0: # 通用数据
self.data_count_tp0 += self.output_tp0.write(self.frame_buff)
elif self.type == 1: # PCM音频
self.data_count_tp1 += self.output_tp1.write(self.frame_buff)
elif self.type == 2: # SPP协议数据
self.data_count_tp2 += self.output_tp2.write(self.frame_buff)
elif self.type == 3: # Opus音频
self.data_count_tp3 += self.output_tp3.write(self.frame_buff)
self.frame_buff = bytes() # 清空缓冲区
def data_pump_out(self):
"""主解码循环:查找帧头并处理数据"""
data_input = self.usb_bin_handler.read(2) # 每次读取2字节寻找帧头
if len(data_input) == 2:
header = struct.unpack('BB', data_input)
if header == tuple(self.header_pattern):
if self.frame_check(): # 验证帧完整性
self.frame_load() # 写入数据
def __del__(self):
"""析构函数:资源清理"""
# 关闭所有文件句柄
self.usb_bin_handler.close()
self.output_tp0.close()
self.output_tp1.close()
self.output_tp2.close()
self.output_tp3.close()
# 删除空输出文件
for path, size in [
('_general', self.data_count_tp0),
('_pcm', self.data_count_tp1),
('_spp', self.data_count_tp2),
('_opus', self.data_count_tp3)
]:
if size == 0:
os.remove(self.output_group_path + path)
# 命令行模式分发
if args['decoder']: # 离线解码模式
decoder = U2SDecoder(
frame_header=[0x4a, 0x00],
input_bin_path=args['<input_bin>'],
out_bin_name=args['--output'],
integrity_check=args['--integrity_check']
)
while not decoder.file_end:
decoder.data_pump_out()
# 打印进度信息...
elif args['logger']: # 实时记录模式
subprocess.call('AmU2S.exe apollo_spi.bin', shell=True) # 调用硬件接口程序
# 后续解码流程与离线模式相同...
关键设计说明
-
帧结构解析逻辑:
-
采用 TLV(Tag-Length-Value) 编码格式
-
帧头验证:0x4A 0x00 作为每帧起始标识
-
动态长度处理:根据Length字段动态读取后续数据
-
-
数据完整性保障:
-
包序号连续性检查:检测PacketNum的连续性和翻转(16位溢出)
-
帧尾校验:通过下一帧头的存在验证当前帧完整性
-
-
资源管理策略:
-
自动清理空文件:删除未被写入数据的输出文件
-
析构函数保障:确保文件句柄的可靠关闭
-
-
多数据类型支持:
-
并行维护4个输出流(通用/PCM/SPP/Opus)
-
按数据类型独立统计数据量
-