Shodan-Python自动化网络资产盘点与安全评估实战指南

📅 2026/6/30 19:32:28
Shodan-Python自动化网络资产盘点与安全评估实战指南
1. 项目概述为什么我们需要Shodan-Python如果你负责过网络运维、安全评估或者资产梳理肯定遇到过这样的问题我们到底有多少设备暴露在公网上这些设备运行着什么服务版本是否老旧有没有不该出现的端口被意外打开靠人工一台台去扫效率低不说还可能触发安全告警。Shodan这个被称为“互联网的黑暗面”或“最可怕的搜索引擎”恰恰是解决这个问题的利器。它不像Google那样搜索网页内容而是持续扫描全球互联网索引所有联网设备的横幅信息比如服务器、摄像头、路由器、工控系统等。而Shodan-Python是Shodan官方提供的Python客户端库。它把Shodan强大的搜索和数据分析能力封装成了几行简单的Python代码。这意味着你可以用脚本自动化地完成以前需要手动在Shodan网站上点点点的所有工作。比如批量查询你们公司ASN下的所有设备统计开放了22端口的Linux服务器有多少台或者找出所有还在使用老旧Apache版本的Web服务器。这不仅仅是安全人员的工具对于运维和架构师来说也是进行资产盘点、风险发现和合规检查的“外挂眼睛”。我最初接触它是为了做一次全面的外部攻击面评估。手动导出CSV再分析数据一多就非常麻烦。自从用上Shodan-Python把查询、过滤、统计、报告生成全部自动化效率提升了不止一个量级。接下来我就带你从零开始手把手搭建一个能快速分析网络设备统计信息的自动化工具链。2. 环境准备与核心库解析工欲善其事必先利其器。在写代码之前我们需要把环境和核心概念搞清楚。2.1 安装Python与必要库首先确保你有一个可用的Python环境建议使用Python 3.7及以上版本。你可以通过命令行输入python --version来检查。如果没有去Python官网下载安装包安装时务必勾选“Add Python to PATH”。接下来我们需要安装的核心库就是shodan。打开你的终端或命令提示符使用pip进行安装pip install shodan这个命令会安装Shodan官方库及其依赖。为了后续数据分析更方便我强烈建议再安装两个辅助库pip install pandas matplotlibpandas用于数据处理和统计分析matplotlib用于生成图表让我们的报告更直观。2.2 获取并配置Shodan API密钥使用Shodan-Python的所有功能都需要一个有效的Shodan API密钥。你需要去Shodan官网注册一个账户。注册后登录网站在个人资料页面就能找到你的API密钥。它通常是一串由数字和字母组成的字符串。注意Shodan的API有免费版和付费版。免费版每月有查询次数限制通常足够个人和小规模使用并且无法使用某些高级搜索过滤器。对于本指南中的基础统计和分析免费版完全够用。但如果你需要大规模、高频次查询需要考虑升级计划。拿到API密钥后有两种方式在代码中使用它环境变量推荐更安全在命令行中设置Linux/macOS:export SHODAN_API_KEY你的密钥Windows:set SHODAN_API_KEY你的密钥。或者在代码中通过os.environ读取。代码中直接初始化在创建Shodan客户端对象时传入。我推荐使用环境变量避免将密钥硬编码在脚本中特别是当你打算分享或版本控制代码时。2.3 理解Shodan搜索语法与返回数据结构这是最关键的一步。Shodan的搜索语法非常强大类似于一个专门针对设备信息的搜索引擎。基础查询你可以直接搜索产品名如apachenginx。过滤词使用filter:value格式进行精确过滤。例如net:8.8.8.0/24- 搜索特定IP段。port:80- 搜索开放80端口的设备。country:CN- 搜索位于中国的设备。org:Google- 搜索属于某个组织的设备。product:MySQL- 搜索运行MySQL数据库的设备。vuln:CVE-2014-0160- 搜索存在特定漏洞的设备通常需要付费API。组合查询使用空格连接多个条件表示“与”关系。例如apache country:US port:443搜索美国境内运行在443端口的Apache服务器。当你执行一次搜索后Shodan返回的数据是一个包含多个“匹配项”的列表。每个匹配项我们称之为一个“主机”或“设备”是一个庞大的JSON字典。对我们统计最有用的字段通常包括ip_str: 设备的IP地址。port: 开放的端口号。org: 所属组织或ISP。location.country_name: 国家。location.city: 城市。product: 检测到的产品名称如Nginx httpd。version: 产品版本号。os: 操作系统信息。data: 设备返回的原始横幅信息。timestamp: 最后一次扫描到的时间。理解这些字段是我们后续进行数据筛选和统计的基础。3. 核心功能实现从查询到基础统计现在我们开始编写核心代码。我会把功能模块化你可以像搭积木一样组合使用。3.1 初始化客户端与执行搜索首先我们创建一个Python脚本文件比如叫shodan_analyzer.py。import shodan import os import json # 方法1从环境变量读取API密钥推荐 API_KEY os.environ.get(SHODAN_API_KEY) if not API_KEY: print(错误请设置环境变量 SHODAN_API_KEY) exit(1) # 方法2直接写密钥不推荐用于生产 # API_KEY YOUR_API_KEY_HERE # 初始化Shodan客户端 api shodan.Shodan(API_KEY) def basic_search(query, limit100): 执行基础Shodan搜索 :param query: 搜索查询字符串 :param limit: 返回结果的最大数量 :return: 搜索结果列表 try: print(f正在执行搜索: {query}) # 使用Shodan的search方法它会自动处理分页但免费API有单次返回数量限制 # 这里我们使用search_cursor或多次search来获取更多结果 results api.search(query) print(f搜索完成。总计发现: {results[total]} 个设备) print(f本次返回前 {len(results[matches])} 个结果。) # 返回匹配的设备列表 return results[matches] except shodan.APIError as e: print(fShodan API 错误: {e}) return []这个basic_search函数是核心。你可以这样调用它# 示例搜索中国境内开放的Redis服务 devices basic_search(product:Redis country:CN, limit50)注意results[‘total’]是Shodan索引中符合条件的所有设备总数而results[‘matches’]是本次API调用实际返回的设备列表受API计划限制。3.2 解析结果与提取关键字段拿到设备列表后我们需要从中提取出对我们有用的统计信息。我们来写一个数据解析函数def parse_devices(devices): 解析设备列表提取关键信息 :param devices: basic_search返回的设备列表 :return: 包含提取信息的字典列表 parsed_list [] for device in devices: info { ip: device.get(ip_str, N/A), port: device.get(port, N/A), org: device.get(org, Unknown), country: device.get(location, {}).get(country_name, Unknown), city: device.get(location, {}).get(city, Unknown), product: device.get(product, N/A), version: device.get(version, N/A), os: device.get(os, N/A), timestamp: device.get(timestamp, N/A) } parsed_list.append(info) return parsed_list这个函数把每个设备复杂的JSON对象简化成了一个只包含我们关心字段的字典。这样后续处理起来就方便多了。3.3 实现基础统计功能有了清晰的数据统计就变得非常简单。我们可以用Python内置的collections.Counter来快速计数。from collections import Counter def generate_basic_stats(parsed_devices): 生成基础统计信息 :param parsed_devices: parse_devices函数返回的列表 :return: 包含各类统计结果的字典 if not parsed_devices: return {} stats {} # 1. 按国家统计 countries [d[country] for d in parsed_devices] stats[country_dist] Counter(countries) # 2. 按组织/ISP统计 orgs [d[org] for d in parsed_devices] stats[org_dist] Counter(orgs) # 3. 按产品统计 products [d[product] for d in parsed_devices] stats[product_dist] Counter(products) # 4. 按端口统计 ports [d[port] for d in parsed_devices] stats[port_dist] Counter(ports) # 5. 按产品版本统计示例统计特定产品的版本分布 # 例如统计所有Nginx的版本 nginx_versions [d[version] for d in parsed_devices if d[product] and nginx in d[product].lower()] stats[nginx_version_dist] Counter(nginx_versions) return stats def print_stats(stats): 以友好格式打印统计结果 for stat_name, counter in stats.items(): print(f\n {stat_name.upper().replace(_, )} ) if not counter: print( 无数据) continue for item, count in counter.most_common(10): # 只显示前10个 print(f {item}: {count}) if len(counter) 10: print(f ... 以及另外 {len(counter) - 10} 项)现在你可以将搜索、解析、统计串联起来# 主程序流程示例 if __name__ __main__: query net:203.0.113.0/24 # 替换成你想查询的IP段或条件 devices basic_search(query, limit200) parsed parse_devices(devices) stats generate_basic_stats(parsed) print_stats(stats)运行这个脚本你就能快速得到一个关于目标网络的基础设备统计报告包括哪些国家、哪些运营商、运行了什么服务、开放了哪些端口。实操心得免费API的api.search()单次返回结果有限通常100条。如果你需要分析成百上千的设备需要使用api.count()先获取总数然后结合api.search_cursor()或者通过循环和offset参数来分批获取所有结果。这是第一个容易遇到的“坑”处理不好会导致数据不全。4. 进阶分析深度数据处理与可视化基础统计能给我们一个概览但要做深入的洞察我们需要更强大的数据处理和可视化能力。这里就是pandas和matplotlib大显身手的地方。4.1 使用Pandas进行灵活的数据透视Pandas的DataFrame是处理表格数据的瑞士军刀。我们把解析后的设备列表转换成DataFrame分析能力直接提升一个维度。import pandas as pd def analyze_with_pandas(parsed_devices): 使用Pandas进行深度数据分析 :param parsed_devices: 解析后的设备列表 :return: 包含各种分析结果的字典或直接打印/保存报告 if not parsed_devices: print(没有数据可供分析。) return # 创建DataFrame df pd.DataFrame(parsed_devices) print( 数据概览 ) print(df.info()) print(\n 前5行数据 ) print(df.head()) # 示例分析1找到最常用的10个端口及其对应的常见产品 print(\n 端口-产品关联分析前10端口 ) top_ports df[port].value_counts().head(10).index for port in top_ports: port_df df[df[port] port] top_product port_df[product].value_counts().head(3) print(f端口 {port} 上最常见的产品: {top_product.to_dict()}) # 示例分析2按组织和产品交叉统计 print(\n 组织-产品分布交叉表) # 选取数据量前5的组织和前5的产品避免表格过大 top_orgs df[org].value_counts().head(5).index top_products df[product].value_counts().head(5).index filtered_df df[df[org].isin(top_orgs) df[product].isin(top_products)] crosstab pd.crosstab(filtered_df[org], filtered_df[product]) print(crosstab) # 示例分析3识别老旧或存在风险的版本 print(\n 潜在风险版本检查 ) # 假设我们关注Apache httpd 2.2.x 和 Nginx 1.14.x 以下的版本 risk_conditions ( (df[product].str.contains(apache, caseFalse, naFalse)) (df[version].str.startswith(2.2., naFalse)) | (df[product].str.contains(nginx, caseFalse, naFalse)) (df[version].apply(lambda x: isinstance(x, str) and [int(i) for i in x.split(.)[:2]] [1, 14] if x.replace(.,).isdigit() else False)) ) risk_devices df[risk_conditions][[ip, port, product, version, org]] if not risk_devices.empty: print(f发现 {len(risk_devices)} 个设备运行潜在老旧版本:) print(risk_devices.to_string(indexFalse)) else: print(未发现明显的潜在风险版本。) return df这个函数展示了如何用Pandas做关联分析、交叉统计和基于条件的风险筛选。你可以根据自己的需求轻松地修改和添加更多的分析维度。4.2 利用Matplotlib生成直观图表数字和表格有时不够直观一张图能更快地传达信息。我们来生成几个常见的统计图。import matplotlib.pyplot as plt def visualize_stats(df, save_path./shodan_report): 生成可视化图表 :param df: pandas DataFrame包含设备数据 :param save_path: 图表保存路径前缀 if df.empty: return plt.style.use(seaborn-v0_8-darkgrid) # 使用一个好看的样式 fig, axes plt.subplots(2, 2, figsize(14, 10)) fig.suptitle(网络设备统计信息可视化, fontsize16) # 1. 国家分布柱状图取前10 country_top10 df[country].value_counts().head(10) axes[0, 0].bar(country_top10.index, country_top10.values) axes[0, 0].set_title(设备数量TOP10国家分布) axes[0, 0].tick_params(axisx, rotation45) axes[0, 0].set_ylabel(设备数量) # 2. 端口分布水平柱状图取前15 port_top15 df[port].value_counts().head(15) axes[0, 1].barh(port_top15.index.astype(str), port_top15.values) # 端口转为字符串 axes[0, 1].set_title(开放端口TOP15分布) axes[0, 1].set_xlabel(设备数量) # 3. 产品分布饼图取前8其余归为“其他” product_counts df[product].value_counts() top_products product_counts.head(8) other_count product_counts[8:].sum() if other_count 0: top_products[其他] other_count axes[1, 0].pie(top_products.values, labelstop_products.index, autopct%1.1f%%, startangle90) axes[1, 0].set_title(主要产品分布) # 4. 组织分布折线图/点图展示头部集中度 org_counts df[org].value_counts().head(20).reset_index() org_counts.columns [org, count] axes[1, 1].plot(range(len(org_counts)), org_counts[count], markero) axes[1, 1].set_title(TOP20组织设备数量分布) axes[1, 1].set_xlabel(组织排名) axes[1, 1].set_ylabel(设备数量) axes[1, 1].set_xticks(range(len(org_counts))) axes[1, 1].set_xticklabels(org_counts[org], rotation90, fontsize8) plt.tight_layout(rect[0, 0.03, 1, 0.95]) # 调整布局避免标题重叠 # 保存图片 plt.savefig(f{save_path}_charts.png, dpi300, bbox_inchestight) print(f可视化图表已保存至: {save_path}_charts.png) # plt.show() # 如果在本地Jupyter或IDE中运行可以显示图表在主要分析流程后调用这个函数df analyze_with_pandas(parsed) if df is not None: visualize_stats(df, save_pathmy_network_analysis)运行后你会得到一个名为my_network_analysis_charts.png的图片文件里面包含了四张专业的统计图表。注意事项Matplotlib默认的字体可能对中文支持不好。如果你的数据中有中文比如城市名需要额外设置中文字体。例如plt.rcParams[‘font.sans-serif’] [‘SimHei’, ‘DejaVu Sans’]。另外图表样式 (plt.style.use) 可以按喜好更换如‘ggplot’,‘fivethirtyeight’等。5. 构建自动化分析脚本与报告生成到目前为止我们已经有了各个功能模块。现在我们将它们整合成一个完整的、可配置的自动化分析脚本并生成一份结构化的报告。5.1 整合功能模块与参数化设计我们创建一个主类或主函数通过命令行参数或配置文件来接受查询条件、输出格式等设置。import argparse import sys from datetime import datetime class ShodanNetworkAnalyzer: def __init__(self, api_keyNone): self.api_key api_key or os.environ.get(SHODAN_API_KEY) if not self.api_key: raise ValueError(未提供Shodan API密钥。请设置环境变量 SHODAN_API_KEY 或通过参数传入。) self.api shodan.Shodan(self.api_key) self.results [] self.parsed_devices [] self.df None def search(self, query, limit500, max_pages5): 增强的搜索函数尝试获取更多结果 all_matches [] try: # 首先获取总数 count_info api.count(query) total count_info[total] print(f查询 {query} 总计匹配设备: {total}) if total 0: return [] # 计算需要获取的页数每页最多100条 needed_pages min(max_pages, (min(limit, total) 99) // 100) for page in range(1, needed_pages 1): print(f正在获取第 {page} 页数据...) results api.search(query, pagepage) all_matches.extend(results[matches]) if len(all_matches) limit: all_matches all_matches[:limit] break self.results all_matches print(f成功获取 {len(self.results)} 条设备信息。) return self.results except shodan.APIError as e: print(f搜索过程中发生API错误: {e}) if limit in str(e).lower(): print(提示免费API有查询频率或数量限制请稍后再试或升级API计划。) return [] def run_analysis(self, query, output_prefixreport): 执行完整的分析流程 print(f\n{*50}) print(f开始分析: {query}) print(f时间: {datetime.now().strftime(%Y-%m-%d %H:%M:%S)}) print(*50) # 1. 搜索 devices self.search(query) if not devices: print(未获取到数据分析终止。) return # 2. 解析 self.parsed_devices parse_devices(devices) # 3. 基础统计并打印 print(\n[基础统计结果]) stats generate_basic_stats(self.parsed_devices) print_stats(stats) # 4. 使用Pandas深度分析 print(\n[Pandas深度分析]) self.df analyze_with_pandas(self.parsed_devices) # 5. 可视化 if self.df is not None and not self.df.empty: print(\n[生成可视化图表]) visualize_stats(self.df, save_pathoutput_prefix) # 6. 保存原始数据和解析后的数据可选 self.save_data(output_prefix) print(f\n分析完成报告和图表已基于前缀 {output_prefix} 保存。) def save_data(self, prefix): 保存数据到JSON和CSV文件 if self.results: with open(f{prefix}_raw_data.json, w, encodingutf-8) as f: json.dump(self.results, f, indent2, ensure_asciiFalse) if self.parsed_devices: pd.DataFrame(self.parsed_devices).to_csv(f{prefix}_parsed_data.csv, indexFalse, encodingutf-8-sig) def main(): parser argparse.ArgumentParser(descriptionShodan网络设备统计信息分析工具) parser.add_argument(-q, --query, requiredTrue, helpShodan搜索查询语句例如net:203.0.113.0/24 product:nginx) parser.add_argument(-o, --output, defaultshodan_report, help输出文件的前缀名默认为 shodan_report) parser.add_argument(-l, --limit, typeint, default300, help最大获取设备数量默认为300) args parser.parse_args() analyzer ShodanNetworkAnalyzer() try: analyzer.run_analysis(queryargs.query, output_prefixargs.output) except Exception as e: print(f程序运行出错: {e}, filesys.stderr) sys.exit(1) if __name__ __main__: main()现在你可以通过命令行来运行这个强大的分析工具了python shodan_analyzer.py -q org:”Your Company Name” -o my_company_assets -l 500这个命令会搜索属于“Your Company Name”组织的所有设备最多获取500条并生成以my_company_assets为前缀的报告文件。5.2 生成结构化文本报告除了屏幕输出和图表一份格式良好的文本报告更容易分享和存档。我们可以扩展run_analysis方法或单独创建一个报告生成函数。def generate_text_report(stats, df, query, output_file): 生成详细的文本报告 with open(output_file, w, encodingutf-8) as f: f.write(fShodan网络设备分析报告\n) f.write(f{*40}\n) f.write(f生成时间: {datetime.now().strftime(%Y-%m-%d %H:%M:%S)}\n) f.write(f搜索语句: {query}\n) f.write(f{*40}\n\n) f.write(一、 基础统计摘要\n) f.write(-*30 \n) if stats: for stat_name, counter in stats.items(): f.write(f\n{stat_name.replace(_, ).title()}:\n) for item, count in counter.most_common(5): f.write(f {item}: {count}\n) else: f.write( 无统计数据。\n) f.write(f\n二、 数据概览 (总计 {len(df) if df is not None else 0} 台设备)\n) f.write(-*30 \n) if df is not None and not df.empty: f.write(f数据字段: {, .join(df.columns)}\n) # 可以添加一些DataFrame的描述性统计 f.write(f\n端口开放情况摘要:\n) port_summary df[port].describe() for idx, val in port_summary.items(): f.write(f {idx}: {val}\n) f.write(f\n三、 潜在风险设备列表\n) f.write(-*30 \n) # 复用之前风险检查的逻辑 risk_df df[(df[product].str.contains(apache, caseFalse, naFalse)) (df[version].str.startswith(2.2., naFalse))] if not risk_df.empty: f.write(f发现 {len(risk_df)} 台设备运行Apache 2.2.x (已终止支持的老旧版本):\n) f.write(risk_df[[ip, port, version, org]].to_string(indexFalse)) f.write(\n\n) else: f.write(未发现明显的潜在风险版本。\n) else: f.write(无有效数据。\n) f.write(f\n{*40}\n) f.write(报告结束。详细数据请查看同目录下生成的CSV和JSON文件。\n) print(f文本报告已生成: {output_file})然后在run_analysis方法的最后调用它# 在 run_analysis 方法内保存数据之后 report_file f{output_prefix}_report.txt generate_text_report(stats, self.df, query, report_file)至此一个功能相对完整的自动化分析工具就成型了。它可以从Shodan获取数据进行多维度统计生成可视化图表并输出结构化的文本报告。6. 常见问题、排查技巧与最佳实践实录在实际使用中你肯定会遇到各种各样的问题。下面是我踩过的一些坑和总结的经验。6.1 API限制与查询优化问题shodan.APIError: Invalid API key或shodan.APIError: Access denied。排查首先检查API密钥是否正确是否在环境变量中。然后登录Shodan官网确认账户状态是否正常API密钥是否还有查询额度。问题搜索速度慢或者很快收到shodan.APIError: Request rate limit exceeded错误。排查与解决这是免费API最常见的限制。Shodan对API调用频率有严格限制。添加延迟在循环调用API如分页获取时使用time.sleep(1)或更长的间隔。减少请求量优化你的查询语句使其更精确减少返回的总结果数。避免使用过于宽泛的查询如port:22。使用计数和摘要对于只需要统计数量的场景优先使用api.count()和api.search()中的facets参数来获取聚合数据而不是拉取所有主机详情。问题查询结果不准确或遗漏。排查语法检查确认Shodan查询语法正确。例如net:192.168.1.0/24是正确的而ip:192.168.1.0/24可能无效。数据新鲜度Shodan的数据不是实时的。设备可能已下线或配置已更改。timestamp字段可以帮助你判断数据的扫描时间。扫描覆盖Shodan可能没有扫描到某些IP或端口。它的覆盖范围虽广但并非100%。6.2 数据处理中的坑问题KeyError当访问device[‘location’][‘country_name’]时。解决Shodan返回的JSON结构中某些字段可能缺失。这就是为什么我在parse_devices函数中大量使用.get(‘key’, ‘default’)方法。永远不要假设字段一定存在做好默认值处理。问题Pandas处理时出现SettingWithCopyWarning。解决这是一个常见的Pandas警告。确保在创建数据子集时使用.copy()方法或者使用.loc进行明确的索引赋值。例如risk_df df[condition].copy()。问题图表中文乱码。解决在调用Matplotlib绘图前设置中文字体。import matplotlib.pyplot as plt plt.rcParams[font.sans-serif] [SimHei, DejaVu Sans] # 用来正常显示中文标签 plt.rcParams[axes.unicode_minus] False # 用来正常显示负号6.3 脚本优化与最佳实践错误处理与日志我们的示例代码包含了基本的try…except但在生产脚本中你应该记录更详细的日志包括查询参数、获取的数据量、遇到的错误等方便后期排查。可以使用Python的logging模块。配置分离将API密钥、常用的搜索查询、风险版本规则等写入一个配置文件如config.yaml或config.ini而不是硬编码在脚本中。结果缓存对于相同的查询如果不需要实时最新数据可以将结果缓存到本地文件或数据库。下次分析时先读取缓存可以节省API调用次数并加快分析速度。异步请求如果需要查询大量不同的目标可以考虑使用aiohttp等库进行异步请求但务必注意Shodan的API速率限制避免被封。尊重隐私与合规这是最重要的原则。仅将本工具用于你拥有合法权限的资产排查如你所在公司的公网IP段或用于授权的安全评估。未经授权扫描他人的网络资产不仅是非法的也违背了道德准则。Shodan本身是一个合法的研究工具但如何使用它取决于你。最后这个工具链只是一个起点。你可以根据实际需求无限扩展它例如集成到CI/CD流程中定期扫描、与CMDB系统对比发现未知资产、加入更复杂的漏洞指纹识别、或者开发一个简单的Web界面。核心在于你掌握了利用Shodan-Python这个强大的杠杆来撬动海量互联网设备数据的能力并将其转化为 actionable 的洞察。