背景
在处理分布式系统的问题时,经常需要从大量日志文件中提取特定时间段的日志进行分析。今天分享一个实用的Python工具,它能够从多个日志文件中精确提取指定时间范围的日志内容。
功能
- 支持多时区转换(上海时间与纽约时间)
- 处理压缩(.gz)和未压缩日志文件
- 支持批量处理多个日志文件
- 精确的时间范围过滤(目标时间±2分钟)
代码
import gzip
import sys
from datetime import datetime, timedelta
import pytz
from pathlib import Path
import osdef convert_shanghai_to_ny(shanghai_time_str):shanghai_tz = pytz.timezone('Asia/Shanghai')ny_tz = pytz.timezone('America/New_York')shanghai_time = shanghai_tz.localize(datetime.strptime(shanghai_time_str, '%Y-%m-%d %H:%M:%S'))ny_time = shanghai_time.astimezone(ny_tz)return ny_timedef get_output_filename(target_time_str):time_str = datetime.strptime(target_time_str, '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d_%H%M%S')return f"logs_{time_str}.log"def parse_log_time(line):try:log_time_str = line[:23] # 提取时间戳部分log_time = datetime.strptime(log_time_str, '%Y-%m-%d %H:%M:%S.%f')return pytz.timezone('America/New_York').localize(log_time)except (ValueError, IndexError):return Nonedef process_file(file_path, time_range_start, time_range_end):matching_logs = []found_start = False# 根据文件类型选择打开方式open_func = gzip.open if file_path.endswith('.gz') else openmode = 'rt' if file_path.endswith('.gz') else 'r'try:with open_func(file_path, mode, encoding='utf-8') as f:for line in f:log_time = parse_log_time(line)if log_time is None:# 如果已经开始收集日志,则继续保存if found_start:matching_logs.append(line)continue# 找到开始时间点if not found_start and log_time >= time_range_start:found_start = Truematching_logs.append(line)# 在开始和结束时间之间elif found_start and log_time <= time_range_end:matching_logs.append(line)# 超过结束时间,停止收集elif found_start and log_time > time_range_end:breakexcept Exception as e:print(f"处理文件 {file_path} 时发生错误: {str(e)}")return []return matching_logsdef process_log_files(target_time_str, input_files):# 转换目标时间为纽约时间target_time = convert_shanghai_to_ny(target_time_str)time_range_start = target_time - timedelta(minutes=2)time_range_end = target_time + timedelta(minutes=2)output_file = get_output_filename(target_time_str)# 检查并删除已存在的输出文件if os.path.exists(output_file):try:os.remove(output_file)print(f"已删除已存在的输出文件: {output_file}")except Exception as e:print(f"删除已存在的输出文件时发生错误: {str(e)}")returnprint(f"\n搜索时间范围: ")print(f"上海时间: {target_time_str} ±2分钟")print(f"纽约时间: {target_time.strftime('%Y-%m-%d %H:%M:%S')} ±2分钟")all_matching_logs = []for input_file in input_files:print(f"\n处理文件: {input_file}")matching_logs = process_file(input_file, time_range_start, time_range_end)if matching_logs:all_matching_logs.extend(matching_logs)print(f"找到符合条件的日志 {len(matching_logs)} 行")else:print("未找到符合条件的日志")if all_matching_logs:try:with open(output_file, 'w', encoding='utf-8') as f:f.writelines(all_matching_logs)print(f"\n所有匹配的日志已写入文件: {output_file}")print(f"总计: {len(all_matching_logs)} 行")except Exception as e:print(f"写入输出文件时发生错误: {str(e)}")else:print("\n未找到任何符合条件的日志")def main():if len(sys.argv) < 2:print("使用方法: python script.py <目标时间> <日志文件1> [日志文件2 ...]")print("时间格式: YYYY-MM-DD HH:MM:SS")print("示例: python script.py \"2025-02-20 12:00:00\" app.log app.log.gz")returntarget_time = sys.argv[1]input_files = sys.argv[2:]if not input_files:print("请指定至少一个日志文件")return# 验证所有输入文件是否存在for input_file in input_files:if not Path(input_file).exists():print(f"文件不存在: {input_file}")returnprocess_log_files(target_time, input_files)if __name__ == "__main__":main()
~~~