当前位置: 首页> 教育> 培训 > 爱站网反链分析_人力资源外包公司靠谱吗_精准广告投放_广州网站优化多少钱

爱站网反链分析_人力资源外包公司靠谱吗_精准广告投放_广州网站优化多少钱

时间:2025/7/27 21:52:07来源:https://blog.csdn.net/xu_wenming/article/details/146183860 浏览次数:1次
爱站网反链分析_人力资源外包公司靠谱吗_精准广告投放_广州网站优化多少钱

结合这个ftdi_bin_decoder.py脚本文件,以及SDK的C文件,可总结录音文件格式是
基于 TLV 编码的 Apollo 设备数据解码
支持两种操作模式:离线解码 和 实时记录解码
数据帧结构:
+------------+--------+--------+--------+--------+---------------------+
| 帧头(2B)   | 长度(2)| 类型(2)| 包序号(2)| 时间戳(4) | 数据载荷(N)       |
+------------+--------+--------+--------+--------+---------------------+
0x4A 0x00    len      type    pkt_num  timestamp   payload
"""

from docopt import docopt          # 命令行参数解析
import struct                      # 二进制数据解包
import soundfile as sf             # 音频文件处理(未直接使用,可能为预留)
import numpy as np                 # 数据包序号校验
import pandas as pd                # 数据统计(未直接使用,可能为预留) 
import subprocess                  # 执行外部程序(AmU2S.exe)
import time                        # 时间处理(未直接使用)
import os                          # 文件系统操作

args = docopt(__doc__)             # 解析命令行参数

class U2SDecoder(object):
    """Apollo 设备数据解码器核心类"""
    
    def __init__(self, frame_header, input_bin_path, out_bin_name, integrity_check):
        """
        初始化解码器
        :param frame_header: 帧头标识,例如 [0x4a, 0x00]
        :param input_bin_path: 输入.bin文件路径 
        :param out_bin_name: 输出文件前缀名
        :param integrity_check: 是否启用数据完整性校验
        """
        # 帧头识别相关
        self.header_indx = 0               # 当前帧头位置索引
        self.header_pattern = frame_header # 帧头特征值(2字节)
        self.header_packet_len = 8         # 头部总长度(帧头+长度+类型+包序号)
        
        # 数据帧参数
        self.len = 0             # 当前帧数据长度(不含帧头)
        self.type = 0            # 数据类型 0-3
        self.packet_num = 0      # 包序号(用于完整性校验)
        
        # 缓冲区管理
        self.frame_buff = bytes()  # 当前帧有效载荷缓存
        self.file_end = False      # 文件结束标志
        
        # 文件操作
        self.usb_bin_handler = open(input_bin_path, 'rb')  # 输入文件句柄
        self.total_data_bytes = os.path.getsize(input_bin_path)  # 总字节数(进度计算)
        
        # 输出文件初始化(按类型分开存储)
        self.output_group_path = out_bin_name
        self.output_tp0 = open(out_bin_name + "_general", 'wb')  # 类型0:通用数据
        self.output_tp1 = open(out_bin_name + "_pcm", 'wb')      # 类型1:PCM音频
        self.output_tp2 = open(out_bin_name + "_spp", 'wb')      # 类型2:SPP协议数据
        self.output_tp3 = open(out_bin_name + "_opus", 'wb')     # 类型3:Opus音频
        
        # 完整性校验配置
        self.integrity_check = integrity_check
        self.generic_packet_num = np.array([], dtype=np.int32)  # 各类型包序号记录
        self.pcm_packet_num = np.array([], dtype=np.int32)
        self.spp_packet_num = np.array([], dtype=np.int32)
        self.codec_packet_num = np.array([], dtype=np.int32)
        
        # 数据量统计
        self.data_count_tp0 = 0  # 类型0数据字节数
        self.data_count_tp1 = 0  # 类型1数据字节数
        self.data_count_tp2 = 0  # 类型2数据字节数 
        self.data_count_tp3 = 0  # 类型3数据字节数

    def frame_check(self):
        """帧结构完整性检查"""
        # 读取帧头后的元数据(长度2B + 类型2B + 包序号2B = 6B)
        header_packet = self.usb_bin_handler.read(self.header_packet_len-2)  
        header_info = struct.unpack('<'+'B'*len(header_packet), header_packet)
        
        # 解析元数据(小端模式)
        self.len = header_info[0] + 256 * header_info[1]    # 数据长度
        self.type = header_info[2] + 256 * header_info[3]    # 数据类型
        self.packet_num = header_info[4] + 256 * header_info[5]  # 包序号
        
        # 长度合理性检查
        if self.len < (self.header_packet_len - 2):
            print(f"无效帧长度:{self.len}")
            self.usb_bin_handler.seek(-(self.header_packet_len-2), 1)
            return False
        
        # 类型有效性检查
        if self.type not in [0, 1, 2, 3]:
            print(f"未知数据类型:{self.type}")
            self.usb_bin_handler.seek(-(self.header_packet_len-2), 1)
            return False
            
        # 读取实际数据载荷(长度-头部已读部分)
        data_stream = self.usb_bin_handler.read(self.len - 4)
        
        # 检查下一帧头是否存在(验证帧完整性)
        if len(data_stream) == (self.len - 4):
            next_header = struct.unpack('BB', data_stream[-2:])
            if next_header == tuple(self.header_pattern):
                self.frame_buff += data_stream[:-2]  # 保存有效载荷(排除下一帧头)
                self.usb_bin_handler.seek(-2, 1)     # 回退文件指针
                
                # 记录包序号(完整性校验时使用)
                if self.integrity_check:
                    {  # 按类型存储包序号
                        0: self.generic_packet_num,
                        1: self.pcm_packet_num,
                        2: self.spp_packet_num,
                        3: self.codec_packet_num
                    }[self.type] = np.append(..., self.packet_num)
                return True
            else:
                self.usb_bin_handler.seek(-(self.len+2), 1)
                return False
        else:  # 文件末尾处理
            self.file_end = True
            return False

    def frame_load(self):
        """将解码后的帧数据写入对应文件"""
        if self.type == 0:  # 通用数据
            self.data_count_tp0 += self.output_tp0.write(self.frame_buff)
        elif self.type == 1:  # PCM音频
            self.data_count_tp1 += self.output_tp1.write(self.frame_buff)
        elif self.type == 2:  # SPP协议数据
            self.data_count_tp2 += self.output_tp2.write(self.frame_buff)
        elif self.type == 3:  # Opus音频
            self.data_count_tp3 += self.output_tp3.write(self.frame_buff)
        self.frame_buff = bytes()  # 清空缓冲区

    def data_pump_out(self):
        """主解码循环:查找帧头并处理数据"""
        data_input = self.usb_bin_handler.read(2)  # 每次读取2字节寻找帧头
        if len(data_input) == 2:
            header = struct.unpack('BB', data_input)
            if header == tuple(self.header_pattern):
                if self.frame_check():  # 验证帧完整性
                    self.frame_load()   # 写入数据

    def __del__(self):
        """析构函数:资源清理"""
        # 关闭所有文件句柄
        self.usb_bin_handler.close()
        self.output_tp0.close()
        self.output_tp1.close()
        self.output_tp2.close()
        self.output_tp3.close()
        
        # 删除空输出文件
        for path, size in [
            ('_general', self.data_count_tp0),
            ('_pcm', self.data_count_tp1),
            ('_spp', self.data_count_tp2),
            ('_opus', self.data_count_tp3)
        ]:
            if size == 0:
                os.remove(self.output_group_path + path)

# 命令行模式分发
if args['decoder']:  # 离线解码模式
    decoder = U2SDecoder(
        frame_header=[0x4a, 0x00],
        input_bin_path=args['<input_bin>'],
        out_bin_name=args['--output'],
        integrity_check=args['--integrity_check']
    )
    while not decoder.file_end:
        decoder.data_pump_out()
        # 打印进度信息...

elif args['logger']:  # 实时记录模式
    subprocess.call('AmU2S.exe apollo_spi.bin', shell=True)  # 调用硬件接口程序
    # 后续解码流程与离线模式相同...

关键设计说明

  1. 帧结构解析逻辑

    • 采用 TLV(Tag-Length-Value) 编码格式

    • 帧头验证:0x4A 0x00 作为每帧起始标识

    • 动态长度处理:根据Length字段动态读取后续数据

  2. 数据完整性保障

    • 包序号连续性检查:检测PacketNum的连续性和翻转(16位溢出)

    • 帧尾校验:通过下一帧头的存在验证当前帧完整性

  3. 资源管理策略

    • 自动清理空文件:删除未被写入数据的输出文件

    • 析构函数保障:确保文件句柄的可靠关闭

  4. 多数据类型支持

    • 并行维护4个输出流(通用/PCM/SPP/Opus)

    • 按数据类型独立统计数据量

关键字:爱站网反链分析_人力资源外包公司靠谱吗_精准广告投放_广州网站优化多少钱

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: