Python自动化安全测试:从Fofa资产收集到POC批量验证实战

📅 2026/6/23 17:56:19
Python自动化安全测试:从Fofa资产收集到POC批量验证实战
1. 项目概述与核心价值在安全测试和漏洞挖掘的日常工作中信息收集的效率直接决定了后续渗透测试的广度和深度。手动在Fofa、Shodan这类网络空间测绘引擎上一个个搜索、复制、整理资产信息不仅耗时耗力还容易遗漏关键目标。同时对于SRC安全应急响应中心项目或内部资产梳理如何从海量数据中快速定位可能存在漏洞的资产并自动化验证其脆弱性是每个安全工程师都想解决的痛点。这个项目就是利用Python将这几个环节串联起来形成一个自动化的工作流。核心思路是通过Python脚本自动化地从Fofa批量提取资产信息然后根据SRC项目或自定义规则筛选出目标最后调用对应的POC概念验证代码进行批量验证。它不是一个单一的工具而是一个可定制、可扩展的自动化框架。对于安全研究人员、渗透测试工程师和SRC漏洞挖掘者来说掌握这套方法意味着能将大量重复性劳动交给机器自己则专注于更核心的逻辑分析和漏洞利用上。我最初搭建这套脚本也是为了应对公司内部大规模的资产梳理和周期性漏洞扫描需求。手动操作了几天后我意识到必须用自动化来解放双手。经过多次迭代现在的脚本已经稳定运行了很长时间帮我发现了不少中低危漏洞也极大地提升了应急响应时的信息收集速度。下面我就把这套实战中打磨出来的思路、代码和踩过的坑毫无保留地分享出来。2. 整体架构设计与工具选型在动手写代码之前我们先要理清整个流程需要哪些模块以及为什么选择这些工具库。一个健壮的自动化脚本其架构应该清晰、模块解耦方便后续维护和扩展。2.1 核心流程拆解整个项目可以分解为三个核心阶段每个阶段对应一个Python模块信息收集模块负责与Fofa API交互根据用户提供的搜索语法批量获取资产信息如IP、端口、协议、标题、Banner等。资产处理与筛选模块负责解析和清洗从Fofa获取的原始数据。更重要的是要根据SRC目标范围例如特定域名、IP段或自定义规则如特定标题、特定端口的服务对资产进行过滤和整理生成最终的目标列表。POC验证模块负责加载POC脚本并发或顺序地对目标列表中的资产进行漏洞验证并输出结构化的结果报告。2.2 关键工具与库选型理由Requests这是进行HTTP网络请求的不二之选。相比于Python内置的urllibRequests的API更加人性化会话保持、代理设置、超时处理都非常方便。我们将用它来调用Fofa API和进行POC验证时的网络探测。注意务必使用Session对象来保持连接特别是在进行大量POC验证时可以显著提升效率并减少TCP握手开销。Pandas虽然对于小规模数据用Python内置的列表和字典也能处理但一旦数据量上千进行筛选、去重、合并等操作就会非常繁琐。Pandas的DataFrame数据结构就像一个内存中的电子表格能让你用一行代码完成复杂的查询和数据处理是处理Fofa返回的CSV/JSON数据的利器。Concurrent.futures / ThreadPoolExecutorPOC验证通常是I/O密集型任务等待网络响应非常适合使用多线程来提升速度。ThreadPoolExecutor提供了一个高级别的异步执行接口让我们能轻松实现批量目标的并发测试将数小时的验证任务压缩到几分钟内完成。实操心得线程数并非越多越好。一般设置为20-50个左右比较合适过多会导致网络连接数暴涨可能触发目标防火墙的规则或被封IP。建议根据网络环境和目标承受能力动态调整。Argparse为了让脚本更易用我们需要一个命令行参数解析器。Argparse是Python标准库的一部分功能强大可以轻松定义必选参数、可选参数、帮助信息等让我们的脚本看起来更专业。Logging任何严肃的脚本都需要完善的日志系统。Python标准库的logging模块可以分级DEBUG, INFO, WARNING, ERROR输出日志到控制台和文件在排查复杂问题时能救命。踩坑记录初期我直接用print打印信息当脚本在后台运行时一旦出错几乎无法追溯。引入logging后配合日志轮转可以清晰看到每个任务的执行状态和错误上下文。POC框架的考虑你可以自己编写简单的POC验证函数但对于维护大量POC建议借鉴或集成成熟的框架思路如Pocsuite3、Nuclei通过子进程调用或Xray的被动扫描模式。本项目为了讲解原理我们会从编写一个简单的POC模板开始。3. 核心模块一Fofa资产批量提取这是整个流程的源头稳定、高效地从Fofa获取数据是关键。3.1 Fofa API的申请与使用要点首先你需要一个Fofa会员账号来获取API密钥email和key。免费账户有调用频率和数量限制但对于个人学习和小规模使用足够了。API的核心调用参数是qbase64: 经过Base64编码的搜索语法如domainexample.com。fields: 指定返回的字段如host,title,ip,domain,port,protocol。size: 每次查询返回的最大条数有上限通常为10000。page: 翻页参数。我们的脚本需要实现1. 对查询语法进行Base64编码2. 构造带有认证头的请求3. 处理分页直到获取所有数据或达到数量上限。import base64 import requests import pandas as pd import time from typing import List, Dict class FofaClient: def __init__(self, email: str, key: str): self.email email self.key key self.base_url https://fofa.info/api/v1/search/all self.session requests.Session() # 设置公共请求头模拟浏览器 self.session.headers.update({ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 }) def search(self, query: str, fields: str host,title,ip,port,domain, max_count: int 1000) - List[Dict]: 执行FOFA搜索 :param query: FOFA搜索语法 :param fields: 返回字段 :param max_count: 最大获取数量 :return: 资产信息列表 all_results [] page 1 size 100 # 单页最大数量根据API权限调整 # 编码查询语句 query_base64 base64.b64encode(query.encode()).decode() while len(all_results) max_count: params { qbase64: query_base64, fields: fields, page: page, size: size, full: false # 不返回完整数据节省流量 } # FOFA API认证方式通过email和key参数 auth_params {email: self.email, key: self.key} params.update(auth_params) try: resp self.session.get(self.base_url, paramsparams, timeout30) resp.raise_for_status() # 检查HTTP错误 data resp.json() if data[error]: print(fFOFA API错误: {data[errmsg]}) break results data[results] if not results: break # 没有更多数据了 # 将结果转换为字典列表 for item in results: # fields 字符串分割为字段名列表 field_list fields.split(,) result_dict {field: item[i] for i, field in enumerate(field_list)} all_results.append(result_dict) print(f已获取第 {page} 页累计 {len(all_results)} 条记录) page 1 # 礼貌性延迟避免请求过快 time.sleep(1) # 如果返回数量小于请求数量说明已是最后一页 if len(results) size: break except requests.exceptions.RequestException as e: print(f网络请求失败: {e}) break except ValueError as e: print(f解析JSON响应失败: {e}) break # 限制返回数量 return all_results[:max_count] # 使用示例 if __name__ __main__: client FofaClient(emailyour_emailexample.com, keyyour_api_key) # 搜索所有标题包含“OA”且开放80端口的资产 assets client.search(querytitleOA port80, max_count500) print(f共获取到 {len(assets)} 条资产信息)3.2 数据处理与去重策略从API拿到的原始数据往往存在重复同一IP不同端口、同一域名不同路径或字段不全的情况。直接用于后续验证会浪费资源。转换为DataFrame利用Pandas可以轻松处理。import pandas as pd df pd.DataFrame(assets)关键字段提取与构造我们最终验证需要的是host可能是IP:PORT或域名和protocolhttp/https。需要从原始数据中智能提取。def normalize_host(row): 将host字段标准化为可用于请求的URL或IP:PORT格式 host row.get(host, ) ip row.get(ip, ) port row.get(port, ) protocol row.get(protocol, http).lower() # 默认http if not host: if ip and port: return f{ip}:{port} else: return None # 如果host已经是域名或IP:PORT格式直接使用 # 否则尝试组合 if :// not in host: # 假设是域名或IP if port and port not in [80, 443]: host f{host}:{port} # 为host添加协议头便于requests直接使用 base_url f{protocol}://{host} else: base_url host return base_url df[target_url] df.apply(normalize_host, axis1) df df.dropna(subset[target_url]) # 删除无效目标去重根据ip或target_url进行去重避免对同一目标重复测试。df_deduplicated df.drop_duplicates(subset[ip]) # 基于IP去重 # 或者基于归一化后的URL去重 df_deduplicated df.drop_duplicates(subset[target_url])筛选结合SRC范围进行筛选。例如如果SRC目标域名为*.example.com我们可以src_domains [.example.com, .test-example.com] def is_in_scope(url): import urllib.parse try: parsed urllib.parse.urlparse(url) netloc parsed.netloc.split(:)[0] # 移除端口 for domain in src_domains: if netloc.endswith(domain): return True except: pass return False df_filtered df_deduplicated[df_deduplicated[target_url].apply(is_in_scope)]经过以上步骤我们就得到了一个干净、唯一且属于目标范围的目标列表target_list df_filtered[target_url].tolist()。4. 核心模块二POC验证引擎的设计与实现这是项目的核心一个灵活、健壮的POC验证引擎能极大提升漏洞发现的效率。4.1 POC脚本的标准化模板为了统一管理我们要求每个POC都遵循一个固定的结构。这里设计一个简单的类模板# poc_template.py import requests import logging from urllib.parse import urljoin class BasePOC: POC基类所有具体POC都应继承此类 # POC信息 vuln_id CVE-XXXX-XXXX # 漏洞编号 name 漏洞名称 version 1.0 def __init__(self, target_url: str, timeout: int 10): 初始化 :param target_url: 目标URL (e.g., http://example.com) :param timeout: 请求超时时间 self.target_url target_url.rstrip(/) self.timeout timeout self.session requests.Session() self.session.headers.update({ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Accept: */*, Connection: close }) # 禁用SSL警告仅用于测试环境 requests.packages.urllib3.disable_warnings() self.verify_ssl False # 测试时可关闭SSL验证生产环境慎用 self.logger logging.getLogger(self.__class__.__name__) def verify(self) - dict: 验证漏洞的主函数 返回格式: { success: True/False, message: 描述信息, evidence: 漏洞证据如响应片段, extra: {} # 额外信息如回显内容 } raise NotImplementedError(子类必须实现 verify 方法) def _request(self, method, path, **kwargs): 统一的请求方法处理异常和日志 url urljoin(self.target_url, path) try: kwargs.setdefault(timeout, self.timeout) kwargs.setdefault(verify, self.verify_ssl) resp self.session.request(method, url, **kwargs) self.logger.debug(f{method} {url} - Status: {resp.status_code}) return resp except requests.exceptions.Timeout: self.logger.warning(f请求超时: {url}) return None except requests.exceptions.RequestException as e: self.logger.error(f请求失败: {url}, Error: {e}) return None # 一个具体的POC示例检测一个简单的信息泄露 class ExampleInfoDisclosurePOC(BasePOC): vuln_id EXAMPLE-2023-001 name 示例信息泄露漏洞 def verify(self): result_template {success: False, message: , evidence: , extra: {}} # 尝试访问一个常见的敏感路径 test_path /.git/config resp self._request(GET, test_path) if resp and resp.status_code 200 and [core] in resp.text: result_template[success] True result_template[message] f发现.git配置信息泄露 result_template[evidence] resp.text[:500] # 截取部分证据 return result_template result_template[message] 目标不存在此漏洞 return result_template4.2 并发验证引擎有了POC模板我们需要一个引擎来批量加载POC并对目标列表进行并发测试。# poc_engine.py import importlib.util import sys import os from concurrent.futures import ThreadPoolExecutor, as_completed import logging from typing import List, Dict, Any class POCEngine: def __init__(self, max_workers: int 20): self.max_workers max_workers self.results [] self.logger logging.getLogger(__name__) def load_poc_from_file(self, file_path: str): 从单个文件动态加载POC类 try: # 动态导入模块 module_name os.path.splitext(os.path.basename(file_path))[0] spec importlib.util.spec_from_file_location(module_name, file_path) module importlib.util.module_from_spec(spec) sys.modules[module_name] module spec.loader.exec_module(module) # 查找继承自BasePOC的类 for attr_name in dir(module): attr getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, BasePOC) and attr ! BasePOC): self.logger.info(f加载POC: {attr.name} ({attr.vuln_id})) return attr return None except Exception as e: self.logger.error(f加载POC文件失败 {file_path}: {e}) return None def scan_target(self, target_url: str, poc_class): 对单个目标执行单个POC验证 try: poc_instance poc_class(target_url) result poc_instance.verify() result[target] target_url result[poc_name] poc_class.name result[vuln_id] poc_class.vuln_id return result except Exception as e: self.logger.error(f扫描目标 {target_url} 时发生异常: {e}) return {target: target_url, success: False, message: f扫描异常: {str(e)}} def batch_scan(self, target_list: List[str], poc_file_path: str): 批量扫描一个POC对多个目标 poc_class self.load_poc_from_file(poc_file_path) if not poc_class: self.logger.error(无法加载有效的POC类) return [] self.logger.info(f开始批量扫描目标数: {len(target_list)}, 线程数: {self.max_workers}) all_results [] with ThreadPoolExecutor(max_workersself.max_workers) as executor: # 提交所有任务 future_to_target {executor.submit(self.scan_target, target, poc_class): target for target in target_list} # 处理完成的任务 for future in as_completed(future_to_target): target future_to_target[future] try: result future.result(timeout30) # 每个任务超时 all_results.append(result) if result.get(success): self.logger.warning(f[] 漏洞发现: {target} - {result.get(poc_name)}) else: self.logger.debug(f[-] 未发现: {target}) except Exception as e: self.logger.error(f处理目标 {target} 的结果时出错: {e}) all_results.append({target: target, success: False, message: f任务执行异常: {str(e)}}) self.results all_results return all_results4.3 结果汇总与报告生成扫描结束后我们需要将结果清晰地呈现出来。可以生成控制台报告、文本报告或更结构化的JSON/CSV报告。def generate_report(self, output_format: str console): 生成扫描报告 if not self.results: print(暂无扫描结果) return vuln_found [r for r in self.results if r.get(success)] safe_count len(self.results) - len(vuln_found) if output_format console: print(\n *60) print(漏洞扫描报告) print(*60) print(f扫描目标总数: {len(self.results)}) print(f发现漏洞数: {len(vuln_found)}) print(f安全目标数: {safe_count}) print(-*60) if vuln_found: print([] 漏洞列表:) for vuln in vuln_found: print(f 目标: {vuln[target]}) print(f 漏洞: {vuln[poc_name]} ({vuln[vuln_id]})) print(f 证据: {vuln.get(evidence, N/A)[:200]}...) # 截断长证据 print(f 信息: {vuln.get(message)}) print( -*40) print(*60) elif output_format csv: import csv with open(scan_report.csv, w, newline, encodingutf-8-sig) as f: writer csv.DictWriter(f, fieldnames[target, vuln_id, poc_name, success, message, evidence]) writer.writeheader() for r in self.results: # 证据字段可能很长可以只存路径或摘要 row r.copy() if len(row.get(evidence, )) 500: row[evidence] row[evidence][:500] ... writer.writerow(row) print(f报告已生成至: scan_report.csv) # 可以扩展JSON、HTML等格式5. 项目集成与实战演练现在我们将前几个模块组合起来形成一个完整的命令行工具。5.1 主程序逻辑与参数解析# main.py import argparse import logging import sys from fofa_client import FofaClient from poc_engine import POCEngine import pandas as pd def setup_logging(levellogging.INFO): logging.basicConfig( levellevel, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(scanner.log), logging.StreamHandler(sys.stdout) ] ) def main(): parser argparse.ArgumentParser(description自动化FOFA资产收集与POC验证工具) parser.add_argument(--email, requiredTrue, helpFOFA账号邮箱) parser.add_argument(--key, requiredTrue, helpFOFA API Key) parser.add_argument(--query, requiredTrue, helpFOFA搜索语法如titleOA countryCN) parser.add_argument(--poc, requiredTrue, helpPOC脚本文件路径) parser.add_argument(--max-assets, typeint, default1000, help最大资产获取数量) parser.add_argument(--threads, typeint, default20, help并发线程数) parser.add_argument(--filter-domain, help筛选特定域名范围内的资产如.example.com) parser.add_argument(--output, choices[console, csv, json], defaultconsole, help报告输出格式) args parser.parse_args() setup_logging() logger logging.getLogger(__name__) # 步骤1: 从FOFA获取资产 logger.info(步骤1: 正在从FOFA获取资产...) client FofaClient(args.email, args.key) raw_assets client.search(args.query, max_countargs.max_assets) if not raw_assets: logger.error(未从FOFA获取到任何资产请检查查询语法或API权限。) return df pd.DataFrame(raw_assets) logger.info(f原始获取资产数: {len(df)}) # 步骤2: 数据处理与筛选 logger.info(步骤2: 正在处理与筛选资产...) # 归一化URL (此处调用之前定义的normalize_host函数需导入或定义) from utils import normalize_host # 假设我们将数据处理函数放在utils.py df[target_url] df.apply(lambda row: normalize_host(row), axis1) df df.dropna(subset[target_url]) # 域名筛选 if args.filter_domain: target_domain args.filter_domain.lower() df df[df[target_url].str.contains(target_domain, caseFalse, naFalse)] logger.info(f按域名 {args.filter_domain} 筛选后资产数: {len(df)}) # 去重 df df.drop_duplicates(subset[target_url]) logger.info(f去重后资产数: {len(df)}) target_list df[target_url].tolist() if not target_list: logger.error(经过筛选后无有效目标程序退出。) return # 步骤3: POC批量验证 logger.info(f步骤3: 开始POC批量验证目标数: {len(target_list)}) engine POCEngine(max_workersargs.threads) engine.batch_scan(target_list, args.poc) # 步骤4: 生成报告 logger.info(步骤4: 生成扫描报告...) engine.generate_report(output_formatargs.output) if __name__ __main__: main()5.2 一个完整的实战案例假设某SRC项目范围是*.target-company.com我们想找其暴露在公网的Jenkins服务是否存在未授权访问漏洞。编写POC(poc_jenkins_unauth.py):from poc_template import BasePOC class JenkinsUnauthorizedPOC(BasePOC): vuln_id CVE-2017-1000353 # 示例实际是Jenkins未授权访问通用漏洞 name Jenkins未授权访问 def verify(self): result {success: False, message: , evidence: , extra: {}} # 尝试访问Jenkins控制台 test_paths [/manage, /script, /asynchPeople/] for path in test_paths: resp self._request(GET, path) if resp and resp.status_code 200: # 简单关键词判断实际应更严谨 if Jenkins in resp.text and (管理 in resp.text or Dashboard in resp.text): result[success] True result[message] f目标 {self.target_url} 可能存在Jenkins未授权访问 result[evidence] f访问路径 {path} 返回了Jenkins管理界面 return result result[message] 未发现Jenkins未授权访问漏洞 return result运行命令:python main.py \ --email your_fofa_email \ --key your_fofa_key \ --query appJenkins domaintarget-company.com \ --poc ./pocs/poc_jenkins_unauth.py \ --filter-domain .target-company.com \ --threads 30 \ --output csv结果脚本会自动从Fofa拉取所有归属于target-company.com且识别为Jenkins的资产去重筛选后用30个线程并发检测未授权访问漏洞最后将结果输出到scan_report.csv。6. 常见问题、优化思路与避坑指南在实际使用中你会遇到各种各样的问题。这里我总结了一些典型场景和解决方案。6.1 常见问题排查表问题现象可能原因解决方案FOFA API返回“查询语法错误”1. 语法不符合FOFA规则2. 特殊字符未转义1. 先在FOFA网页端验证语法是否正确。2. 将查询语句用双引号包裹或使用base64编码前确保它是字符串。获取资产数量远少于预期1. API权限限制免费用户数量少2. 查询语法过于宽泛但FOFA只返回部分结果3. 分页逻辑有误1. 检查API会员等级和剩余积分。2. 尝试更精确的语法或分多次不同条件查询后合并。3. 调试代码检查page和size参数以及判断数据是否已取完的逻辑。POC验证时大量超时或连接拒绝1. 目标网络不稳定或防火墙拦截2. 并发线程数过高3. 脚本中没有设置合理的超时和重试1. 在_request方法中增加随机延迟并降低并发数。2. 设置合理的超时时间如5-10秒并实现简单的重试机制最多2次。3. 考虑使用代理池来分散请求源。误报率很高1. POC的检测逻辑过于简单如仅检查状态码或关键字2. 目标有WAF返回了错误页面但包含了关键字1. 强化POC逻辑结合多个特征判断如检查响应头、特定JSON结构、计算MD5等。2. 在POC中增加对WAF指纹的识别如果检测到WAF则提高判断阈值或跳过。脚本被目标封禁IP1. 请求频率过高触发了目标速率限制2. User-Agent过于单一或可疑1. 在请求间增加随机睡眠时间time.sleep(random.uniform(0.5, 2))。2. 使用fake_useragent库轮换User-Agent头。处理大量数据时内存不足1. 一次性将数万条资产读入内存2. Pandas DataFrame操作占用高1. 使用分块处理或者逐行处理而非全部加载。2. 对于超大数据集考虑使用数据库如SQLite暂存中间结果。6.2 高级优化与扩展思路异步化升级将concurrent.futures替换为asyncioaiohttp可以更高效地管理成千上万的并发HTTP请求尤其适合大规模资产验证。POC仓库管理建立一个POC目录主程序自动遍历加载所有符合规范的POC脚本。可以给POC打上标签如info,rce,sqli实现按漏洞类型扫描。结果去重与聚合对于同一个目标多个POC可能发现不同漏洞。报告应支持按目标聚合漏洞并自动合并相同类型的发现如多个目录都泄露了源码。与扫描器联动可以将筛选后的资产列表导出为targets.txt直接喂给Nuclei、Xray等更专业的扫描器进行深度检测本脚本作为前置资产收集和初筛工具。定时任务与持续监控结合crontabLinux或Task SchedulerWindows定期执行脚本监控新增资产或漏洞状态变化实现持续的安全监控。图形化界面GUI使用PyQt或Tkinter为脚本制作一个简单的图形界面方便不熟悉命令行的同事使用通过界面输入API密钥、查询语法和选择POC。6.3 最重要的安全与法律提醒警告能力越大责任越大。请务必在合法授权的范围内使用此脚本。仅用于授权测试绝对不要对未授权的任何系统进行扫描或测试。遵守SRC规则在参与SRC项目时严格遵循该平台规定的测试范围、测试方法和漏洞提交规范。控制扫描力度设置合理的并发数、请求延迟避免对目标业务造成拒绝服务DoS影响。妥善保管数据扫描结果可能包含敏感信息请妥善存储切勿公开泄露。本脚本仅供学习交流使用者需自行承担因不当使用而产生的一切法律后果。这套脚本的搭建过程也是我个人对安全自动化理解的深化过程。从最初只能跑通单个功能到现在能处理从信息收集到漏洞验证的完整链路中间经历了无数次的调试和重构。最大的体会是自动化不是为了替代思考而是将人从重复劳动中解放出来去关注更本质的安全问题。希望这个分享能为你打开一扇门你可以基于这个框架融入更多自己的想法和技巧打造出更贴合自己工作流的利器。