当前位置: 首页> 教育> 就业 > 90设计电商模板_企业展厅建设计划书_谷歌seo什么意思_长沙百度推广开户

90设计电商模板_企业展厅建设计划书_谷歌seo什么意思_长沙百度推广开户

时间:2025/7/12 15:16:10来源:https://blog.csdn.net/weixin_40780178/article/details/146195300 浏览次数:2次
90设计电商模板_企业展厅建设计划书_谷歌seo什么意思_长沙百度推广开户

PyTorch深度学习框架进阶学习计划 - 第20天

端到端图像生成系统

今天我们将综合应用之前学习的知识,构建一个完整的端到端艺术风格转换系统。这个系统将结合CycleGAN与风格迁移技术,并通过Gradio创建交互式界面,同时实现模型优化、部署和监控。

1. CycleGAN与风格迁移原理回顾

CycleGAN能够在没有配对数据的情况下学习两个领域之间的映射,而风格迁移则能将特定的艺术风格应用到内容图像上。我们将结合这两种技术,创建一个灵活的艺术风格转换系统。

CycleGAN核心概念:

  • 两个生成器(G, F)和两个判别器(DX, DY)
  • 循环一致性损失(Cycle Consistency Loss)
  • 身份映射损失(Identity Mapping Loss)

风格迁移核心概念:

  • 内容损失(Content Loss)
  • 风格损失(Style Loss)
  • Gram矩阵计算

2. 多GPU训练与混合精度训练

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms
import os
from torch.cuda.amp import autocast, GradScaler# 定义简化版的生成器和判别器
class ResidualBlock(nn.Module):def __init__(self, channels):super(ResidualBlock, self).__init__()self.block = nn.Sequential(nn.ReflectionPad2d(1),nn.Conv2d(channels, channels, 3),nn.InstanceNorm2d(channels),nn.ReLU(inplace=True),nn.ReflectionPad2d(1),nn.Conv2d(channels, channels, 3),nn.InstanceNorm2d(channels))def forward(self, x):return x + self.block(x)class Generator(nn.Module):def __init__(self, input_channels=3, output_channels=3, n_residual_blocks=9):super(Generator, self).__init__()# 初始卷积层self.initial = nn.Sequential(nn.ReflectionPad2d(3),nn.Conv2d(input_channels, 64, 7),nn.InstanceNorm2d(64),nn.ReLU(inplace=True))# 下采样self.downsample = nn.Sequential(nn.Conv2d(64, 128, 3, stride=2, padding=1),nn.InstanceNorm2d(128),nn.ReLU(inplace=True),nn.Conv2d(128, 256, 3, stride=2, padding=1),nn.InstanceNorm2d(256),nn.ReLU(inplace=True))# 残差块res_blocks = []for _ in range(n_residual_blocks):res_blocks.append(ResidualBlock(256))self.res_blocks = nn.Sequential(*res_blocks)# 上采样self.upsample = nn.Sequential(nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),nn.InstanceNorm2d(128),nn.ReLU(inplace=True),nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),nn.InstanceNorm2d(64),nn.ReLU(inplace=True))# 输出层self.output = nn.Sequential(nn.ReflectionPad2d(3),nn.Conv2d(64, output_channels, 7),nn.Tanh())def forward(self, x):x = self.initial(x)x = self.downsample(x)x = self.res_blocks(x)x = self.upsample(x)return self.output(x)class Discriminator(nn.Module):def __init__(self, input_channels=3):super(Discriminator, self).__init__()self.model = nn.Sequential(nn.Conv2d(input_channels, 64, 4, stride=2, padding=1),nn.LeakyReLU(0.2, inplace=True),nn.Conv2d(64, 128, 4, stride=2, padding=1),nn.InstanceNorm2d(128),nn.LeakyReLU(0.2, inplace=True),nn.Conv2d(128, 256, 4, stride=2, padding=1),nn.InstanceNorm2d(256),nn.LeakyReLU(0.2, inplace=True),nn.Conv2d(256, 512, 4, padding=1),nn.InstanceNorm2d(512),nn.LeakyReLU(0.2, inplace=True),nn.Conv2d(512, 1, 4, padding=1))def forward(self, x):return self.model(x)# 多GPU分布式训练设置
def setup_ddp(rank, world_size):os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = '12355'# 初始化进程组dist.init_process_group("nccl", rank=rank, world_size=world_size)# 设置当前设备torch.cuda.set_device(rank)def cleanup():dist.destroy_process_group()def train_model(rank, world_size, epochs):# 设置分布式训练环境setup_ddp(rank, world_size)# 数据预处理transform = transforms.Compose([transforms.Resize((256, 256)),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])# 加载数据集(示例使用CIFAR-10)dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)# 初始化模型generator = Generator().to(rank)discriminator = Discriminator().to(rank)# 将模型包装为DDP模型generator = DDP(generator, device_ids=[rank])discriminator = DDP(discriminator, device_ids=[rank])# 定义优化器optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))# 损失函数criterion_GAN = nn.MSELoss()criterion_cycle = nn.L1Loss()criterion_identity = nn.L1Loss()# 梯度缩放器(用于混合精度训练)scaler_G = GradScaler()scaler_D = GradScaler()# 训练循环for epoch in range(epochs):sampler.set_epoch(epoch)  # 设置epoch以确保数据洗牌for i, (real_A, _) in enumerate(dataloader):real_A = real_A.to(rank)# 训练判别器optimizer_D.zero_grad()# 混合精度训练with autocast():# 生成fake图像fake_B = generator(real_A)# 判别器损失pred_real = discriminator(real_A)pred_fake = discriminator(fake_B.detach())real_label = torch.ones_like(pred_real).to(rank)fake_label = torch.zeros_like(pred_fake).to(rank)loss_D_real = criterion_GAN(pred_real, real_label)loss_D_fake = criterion_GAN(pred_fake, fake_label)loss_D = (loss_D_real + loss_D_fake) * 0.5# 反向传播与优化scaler_D.scale(loss_D).backward()scaler_D.step(optimizer_D)scaler_D.update()# 训练生成器optimizer_G.zero_grad()with autocast():# 身份损失identity_B = generator(real_A)loss_identity = criterion_identity(identity_B, real_A) * 5.0# GAN损失fake_B = generator(real_A)pred_fake = discriminator(fake_B)loss_GAN = criterion_GAN(pred_fake, real_label)# 总损失loss_G = loss_identity + loss_GAN# 反向传播与优化scaler_G.scale(loss_G).backward()scaler_G.step(optimizer_G)scaler_G.update()if i % 100 == 0 and rank == 0:print(f"[Epoch {epoch}/{epochs}] [Batch {i}/{len(dataloader)}] "f"[D loss: {loss_D.item():.4f}] [G loss: {loss_G.item():.4f}]")# 保存模型(仅在主进程)if rank == 0:torch.save(generator.module.state_dict(), 'generator.pth')torch.save(discriminator.module.state_dict(), 'discriminator.pth')cleanup()# 执行多GPU训练
if __name__ == "__main__":world_size = torch.cuda.device_count()if world_size > 1:print(f"Using {world_size} GPUs!")torch.multiprocessing.spawn(train_model, args=(world_size, 10), nprocs=world_size, join=True)else:print("Only one GPU available or no GPU, running on single device")train_model(0, 1, 10)

多GPU训练和混合精度训练可以大幅加速CycleGAN等复杂模型的训练过程。上述代码展示了如何使用PyTorch的DistributedDataParalleltorch.cuda.amp来实现这些优化。

关键优化技术说明:

  1. DistributedDataParallel (DDP): 实现数据并行训练,每个GPU处理数据的不同子集
  2. 混合精度训练: 使用FP16和FP32混合精度,降低内存消耗并提高计算速度
  3. GradScaler: 解决混合精度训练中的梯度下溢问题

3. 使用Gradio构建交互式UI

import torch
import gradio as gr
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
import os# 假设已经训练好的模型
class StyleTransferModel:def __init__(self, model_path='generator.pth'):from model import Generator  # 假设Generator类在model.py中定义self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')self.model = Generator()# 加载预训练权重if os.path.exists(model_path):self.model.load_state_dict(torch.load(model_path, map_location=self.device))print(f"Model loaded from {model_path}")else:print(f"Model {model_path} not found. Using untrained model.")self.model.to(self.device)self.model.eval()# 预处理和后处理self.transform = transforms.Compose([transforms.Resize((256, 256)),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])def inference(self, image, style_name):# 图像预处理image = self.transform(image).unsqueeze(0).to(self.device)# 模型推理with torch.no_grad():output = self.model(image)# 后处理output = output.squeeze().cpu().detach()output = output * 0.5 + 0.5  # 反归一化output = output.clamp(0, 1)output = transforms.ToPILImage()(output)return output# 初始化模型
def load_model(style_name):model_path = f'models/{style_name}_generator.pth'return StyleTransferModel(model_path)# 创建风格转换函数
def apply_style(input_image, style_name):if input_image is None:return None# 加载对应风格的模型model = load_model(style_name)# 执行风格转换output_image = model.inference(input_image, style_name)return output_image# 定义可用的风格
styles = ["monet", "vangogh", "cezanne", "ukiyoe"]# 创建Gradio界面
def create_gradio_app():with gr.Blocks(title="艺术风格转换") as app:gr.Markdown("# 艺术风格转换系统")gr.Markdown("上传图像并选择艺术风格,系统将自动为您转换图像风格。")with gr.Row():with gr.Column():input_image = gr.Image(label="输入图像", type="pil")style_selector = gr.Dropdown(choices=styles, label="选择艺术风格", value="monet")submit_btn = gr.Button("应用风格")with gr.Column():output_image = gr.Image(label="风格化结果")# 设置提交按钮动作submit_btn.click(fn=apply_style, inputs=[input_image, style_selector], outputs=output_image)# 添加示例gr.Examples(examples=[["examples/landscape.jpg", "monet"],["examples/portrait.jpg", "vangogh"],["examples/cityscape.jpg", "cezanne"],],inputs=[input_image, style_selector],)# 添加额外信息gr.Markdown("""## 关于风格转换本系统使用CycleGAN结合风格迁移技术,将您的图像转换为各种艺术风格。可用风格:- Monet: 印象派风格,色彩柔和,笔触可见- Van Gogh: 后印象派风格,色彩鲜明,笔触有力- Cezanne: 现代艺术先驱,结构化笔触- Ukiyo-e: 日本浮世绘风格,平面色彩,明显轮廓系统使用PyTorch开发,模型经过优化可在CPU上运行,但GPU会有更好的性能。""")return app# 启动Gradio应用
if __name__ == "__main__":app = create_gradio_app()app.launch(share=True)  # 设置share=True可获得公共链接

Gradio是一个强大而简洁的工具,可以快速为深度学习模型创建交互式Web界面。上面的代码展示了如何为我们的风格转换系统创建一个用户友好的界面。

Gradio界面功能:

  1. 图像上传区域
  2. 风格选择下拉菜单
  3. 转换按钮
  4. 结果展示区域
  5. 样例图片

4. ONNX模型导出与Runtime部署

import torch
import onnx
import onnxruntime as ort
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
import time
import os# 加载PyTorch模型
def load_pytorch_model(model_path):from model import Generator  # 假设Generator在model.py中定义model = Generator()model.load_state_dict(torch.load(model_path, map_location='cpu'))model.eval()return model# 导出为ONNX模型
def export_to_onnx(pytorch_model, onnx_path, input_shape=(1, 3, 256, 256)):# 创建示例输入dummy_input = torch.randn(input_shape)# 导出模型torch.onnx.export(pytorch_model,               # PyTorch模型dummy_input,                 # 示例输入onnx_path,                   # 输出路径export_params=True,          # 存储训练好的参数权重opset_version=12,            # ONNX操作集版本do_constant_folding=True,    # 常量折叠优化input_names=['input'],       # 输入名称output_names=['output'],     # 输出名称dynamic_axes={               # 动态轴(批处理维度)'input': {0: 'batch_size'},'output': {0: 'batch_size'}})# 验证ONNX模型onnx_model = onnx.load(onnx_path)onnx.checker.check_model(onnx_model)print(f"Model exported to {onnx_path} and validated successfully")return onnx_path# 创建ONNX推理会话
def create_onnx_session(onnx_path):# 检查可用的执行提供程序并选择最合适的if 'CUDAExecutionProvider' in ort.get_available_providers() and torch.cuda.is_available():providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']print("Using CUDA for ONNX inference")else:providers = ['CPUExecutionProvider']print("Using CPU for ONNX inference")# 创建推理会话session_options = ort.SessionOptions()session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALLsession = ort.InferenceSession(onnx_path, session_options, providers=providers)return session# 使用ONNX Runtime进行推理
def onnx_inference(session, image_path):# 图像预处理transform = transforms.Compose([transforms.Resize((256, 256)),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])# 加载和预处理图像image = Image.open(image_path).convert('RGB')input_tensor = transform(image).unsqueeze(0).numpy()# 获取输入名称input_name = session.get_inputs()[0].name# 执行推理start_time = time.time()output = session.run(None, {input_name: input_tensor})[0]inference_time = time.time() - start_time# 后处理output = output.squeeze()output = output * 0.5 + 0.5  # 反归一化output = np.clip(output, 0, 1)output = np.transpose(output, (1, 2, 0)) * 255.0output_image = Image.fromarray(output.astype(np.uint8))return output_image, inference_time# 性能比较函数:PyTorch vs ONNX
def compare_performance(pytorch_model, onnx_session, image_path, num_runs=10):# PyTorch推理pytorch_model.eval()transform = transforms.Compose([transforms.Resize((256, 256)),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])image = Image.open(image_path).convert('RGB')input_tensor = transform(image).unsqueeze(0)# 预热with torch.no_grad():_ = pytorch_model(input_tensor)# 计时pytorch_times = []for _ in range(num_runs):start_time = time.time()with torch.no_grad():_ = pytorch_model(input_tensor)pytorch_times.append(time.time() - start_time)# ONNX推理input_name = onnx_session.get_inputs()[0].nameinput_array = input_tensor.numpy()# 预热_ = onnx_session.run(None, {input_name: input_array})# 计时onnx_times = []for _ in range(num_runs):start_time = time.time()_ = onnx_session.run(None, {input_name: input_array})onnx_times.append(time.time() - start_time)# 计算统计数据avg_pytorch_time = sum(pytorch_times) / len(pytorch_times)avg_onnx_time = sum(onnx_times) / len(onnx_times)speedup = avg_pytorch_time / avg_onnx_timeresults = {'pytorch_avg_time': avg_pytorch_time,'onnx_avg_time': avg_onnx_time,'speedup': speedup}return results# 主函数:导出并使用ONNX模型
def main(model_path, onnx_path, image_path):# 步骤1:加载PyTorch模型print("Loading PyTorch model...")pytorch_model = load_pytorch_model(model_path)# 步骤2:导出为ONNXprint("Exporting to ONNX...")if not os.path.exists(onnx_path):export_to_onnx(pytorch_model, onnx_path)# 步骤3:创建ONNX推理会话print("Creating ONNX inference session...")onnx_session = create_onnx_session(onnx_path)# 步骤4:使用ONNX进行推理print("Running ONNX inference...")output_image, inference_time = onnx_inference(onnx_session, image_path)output_image.save('onnx_output.jpg')print(f"Inference completed in {inference_time:.4f} seconds, result saved to onnx_output.jpg")# 步骤5:比较性能print("Comparing performance...")perf_results = compare_performance(pytorch_model, onnx_session, image_path)print(f"PyTorch average inference time: {perf_results['pytorch_avg_time']:.4f} seconds")print(f"ONNX average inference time: {perf_results['onnx_avg_time']:.4f} seconds")print(f"Speedup: {perf_results['speedup']:.2f}x")if __name__ == "__main__":main(model_path="models/monet_generator.pth",onnx_path="models/monet_generator.onnx",image_path="examples/landscape.jpg")

ONNX (Open Neural Network Exchange) 是一种开放格式,用于表示深度学习模型。ONNX Runtime是一个高性能推理引擎,可以显著提高模型在生产环境中的性能。

ONNX优势:

  1. 跨平台兼容性: 支持多种硬件和框架
  2. 推理性能优化: 通常比原始PyTorch模型更快
  3. 减小模型大小: 通过量化和优化减小模型体积
  4. 部署简化: 简化从训练到部署的流程

5. 性能监控仪表盘实现

import time
import threading
import queue
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from PIL import Image
import onnxruntime as ort
import psutil
import GPUtil
import datetime
import os# 创建队列用于存储性能数据
performance_queue = queue.Queue()# 性能监控线程函数
def monitor_system_performance():while True:try:# CPU使用率cpu_percent = psutil.cpu_percent(interval=1)# 内存使用率memory = psutil.virtual_memory()memory_percent = memory.percent# GPU使用率(如果有GPU)gpu_data = []try:gpus = GPUtil.getGPUs()for gpu in gpus:gpu_data.append({'id': gpu.id,'load': gpu.load * 100,'memory_used': gpu.memoryUsed,'memory_total': gpu.memoryTotal})except:pass# 当前时间timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 创建性能数据点perf_data = {'timestamp': timestamp,'cpu_percent': cpu_percent,'memory_percent': memory_percent,'gpu_data': gpu_data}# 将数据放入队列performance_queue.put(perf_data)# 每秒更新一次time.sleep(1)except Exception as e:print(f"Error in monitoring thread: {e}")time.sleep(5)# 模型推理性能测试函数
def inference_performance_test(onnx_session, input_data, n_runs=10):input_name = onnx_session.get_inputs()[0].name# 预热_ = onnx_session.run(None, {input_name: input_data})# 测量多次运行的性能latencies = []for _ in range(n_runs):start_time = time.time()_ = onnx_session.run(None, {input_name: input_data})latencies.append((time.time() - start_time) * 1000)  # 转换为毫秒return {'mean_latency': np.mean(latencies),'median_latency': np.median(latencies),'min_latency': np.min(latencies),'max_latency': np.max(latencies),'p95_latency': np.percentile(latencies, 95),'p99_latency': np.percentile(latencies, 99)}# Streamlit仪表盘应用
def create_monitoring_dashboard():st.set_page_config(page_title="模型性能监控仪表盘",page_icon="📊",layout="wide")st.title("艺术风格转换系统 - 性能监控仪表盘")# 启动监控线程monitor_thread = threading.Thread(target=monitor_system_performance, daemon=True)monitor_thread.start()# 存储历史数据if 'history' not in st.session_state:st.session_state.history = {'timestamp': [],'cpu_percent': [],'memory_percent': [],'gpu_load': [],'inference_latency': []}# 初始化模型@st.cache_resourcedef load_model():model_path = "models/monet_generator.onnx"if os.path.exists(model_path):# 选择合适的执行提供程序if 'CUDAExecutionProvider' in ort.get_available_providers():providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']else:providers = ['CPUExecutionProvider']session = ort.InferenceSession(model_path, providers=providers)return sessionelse:st.error(f"模型文件 {model_path} 不存在!")return Noneonnx_session = load_model()# 创建仪表盘布局col1, col2 = st.columns(2)with col1:st.subheader("系统资源监控")# CPU利用率图表cpu_chart = st.empty()# 内存利用率图表memory_chart = st.empty()if any(GPUtil.getGPUs()):# GPU利用率图表gpu_chart = st.empty()with col2:st.subheader("模型推理性能")# 模型选择model_options = ["monet", "vangogh", "cezanne", "ukiyoe"]selected_model = st.selectbox("选择模型", model_options)# 上传测试图像uploaded_file = st.file_uploader("上传测试图像", type=["jpg", "jpeg", "png"])# 推理性能图表latency_chart = st.empty()# 性能详情perf_metrics = st.empty()# 运行测试按钮test_button = st.button("运行推理性能测试")# 创建图表更新函数def update_charts():# 从队列获取所有数据while not performance_queue.empty():try:data = performance_queue.get_nowait()# 更新历史数据st.session_state.history['timestamp'].append(data['timestamp'])st.session_state.history['cpu_percent'].append(data['cpu_percent'])st.session_state.history['memory_percent'].append(data['memory_percent'])# 如果有GPU数据if data['gpu_data']:gpu_load = data['gpu_data'][0]['load']  # 假设使用第一个GPUst.session_state.history['gpu_load'].append(gpu_load)else:# 如果没有GPU,添加None或0st.session_state.history['gpu_load'].append(0)# 限制历史数据长度max_history = 100if len(st.session_state.history['timestamp']) > max_history:for key in st.session_state.history:st.session_state.history[key] = st.session_state.history[key][-max_history:]except queue.Empty:break# 创建数据框df = pd.DataFrame(st.session_state.history)# 更新CPU图表cpu_fig = px.line(df, x='timestamp', y='cpu_percent',title='CPU 利用率 (%)',labels={'cpu_percent': 'CPU %', 'timestamp': '时间'})cpu_fig.update_layout(yaxis_range=[0, 100])cpu_chart.plotly_chart(cpu_fig, use_container_width=True)# 更新内存图表memory_fig = px.line(df, x='timestamp', y='memory_percent',title='内存利用率 (%)',labels={'memory_percent': '内存 %', 'timestamp': '时间'})memory_fig.update_layout(yaxis_range=[0, 100])memory_chart.plotly_chart(memory_fig, use_container_width=True)# 如果有GPU数据,更新GPU图表if any(GPUtil.getGPUs()):gpu_fig = px.line(df, x='timestamp', y='gpu_load',title='GPU 利用率 (%)',labels={'gpu_load': 'GPU %', 'timestamp': '时间'})gpu_fig.update_layout(yaxis_range=[0, 100])gpu_chart.plotly_chart(gpu_fig, use_container_width=True)# 如果有推理数据,更新推理性能图表if 'inference_latency' in df.columns and not df['inference_latency'].empty():latency_fig = px.line(df, x='timestamp', y='inference_latency',title='推理延迟 (ms)',labels={'inference_latency': '延迟 (ms)', 'timestamp': '时间'})latency_chart.plotly_chart(latency_fig, use_container_width=True)# 处理推理性能测试if test_button and uploaded_file is not None and onnx_session is not None:with st.spinner("运行推理性能测试中..."):# 处理上传的图像image = Image.open(uploaded_file).convert('RGB')# 预处理from torchvision import transformstransform = transforms.Compose([transforms.Resize((256, 256)),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])input_tensor = transform(image).unsqueeze(0).numpy()# 运行性能测试perf_results = inference_performance_test(onnx_session, input_tensor, n_runs=20)# 展示性能指标col1, col2, col3 = perf_metrics.columns(3)col1.metric("平均延迟 (ms)", f"{perf_results['mean_latency']:.2f}")col2.metric("中位延迟 (ms)", f"{perf_results['median_latency']:.2f}")col3.metric("P95延迟 (ms)", f"{perf_results['p95_latency']:.2f}")# 记录推理延迟历史for i in range(min(20, len(st.session_state.history['timestamp']))):st.session_state.history['inference_latency'].append(perf_results['mean_latency'])# 定期更新图表update_charts()time.sleep(0.1)st.experimental_rerun()# 主函数
if __name__ == "__main__":create_monitoring_dashboard()

现在让我完成关于CI/CD流水线与单元测试的部分:

name: Style Transfer CI/CD Pipelineon:push:branches: [ main, develop ]pull_request:branches: [ main, develop ]jobs:test:runs-on: ubuntu-lateststeps:- uses: actions/checkout@v3- name: Set up Pythonuses: actions/setup-python@v4with:python-version: '3.9'- name: Install dependenciesrun: |python -m pip install --upgrade pippip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpupip install pytest pytest-cov flake8 black isortif [ -f requirements.txt ]; then pip install -r requirements.txt; fi- name: Lint with flake8run: |# stop the build if there are Python syntax errors or undefined namesflake8 . --count --select=E9,F63,F7,F82 --show-source --statistics# exit-zero treats all errors as warningsflake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics- name: Format check with blackrun: |black --check .- name: Import order check with isortrun: |isort --check-only --profile black .- name: Run unit testsrun: |pytest --cov=./ --cov-report=xml- name: Upload coverage to Codecovuses: codecov/codecov-action@v3with:file: ./coverage.xmlfail_ci_if_error: truebuild:needs: testruns-on: ubuntu-latestif: github.event_name == 'push' && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop')steps:- uses: actions/checkout@v3- name: Set up Pythonuses: actions/setup-python@v4with:python-version: '3.9'- name: Install dependenciesrun: |python -m pip install --upgrade pippip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpupip install -r requirements.txtpip install onnx onnxruntime- name: Build and export ONNX modelsrun: |python scripts/export_models.py- name: Package applicationrun: |mkdir -p releasecp -r models release/cp -r scripts release/cp -r app.py release/cp -r requirements.txt release/tar -czvf style-transfer-app.tar.gz release/- name: Upload artifactsuses: actions/upload-artifact@v3with:name: style-transfer-apppath: style-transfer-app.tar.gzdeploy:needs: buildruns-on: ubuntu-latestif: github.event_name == 'push' && github.ref == 'refs/heads/main'steps:- uses: actions/checkout@v3- name: Download artifactuses: actions/download-artifact@v3with:name: style-transfer-app- name: Deploy to stagingrun: |# 这里可以添加部署到测试环境的脚本echo "Deploying to staging environment"- name: Run integration testsrun: |# 在测试环境运行集成测试echo "Running integration tests"- name: Deploy to productionif: success()run: |# 部署到生产环境echo "Deploying to production environment"
import pytest
import torch
import numpy as np
import os
import sys
from PIL import Image# 添加项目根目录到系统路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))# 导入模型和工具函数
from models.generator import Generator
from models.discriminator import Discriminator
from utils.transforms import preprocess_image, postprocess_image
from utils.onnx_utils import export_to_onnxclass TestGenerator:@pytest.fixturedef generator(self):"""创建一个测试用的生成器实例"""return Generator()def test_generator_output_shape(self, generator):"""测试生成器输出的形状是否正确"""batch_size = 2input_shape = (batch_size, 3, 256, 256)x = torch.randn(input_shape)output = generator(x)# 检查输出形状是否与输入一致assert output.shape == input_shapedef test_generator_output_range(self, generator):"""测试生成器输出的值域是否在[-1, 1]之间"""batch_size = 2input_shape = (batch_size, 3, 256, 256)x = torch.randn(input_shape)# 确保输入在合理范围内x = torch.clamp(x, -1, 1)output = generator(x)# 检查输出在[-1.1, 1.1]范围内(允许一点数值误差)assert output.min() >= -1.1assert output.max() <= 1.1class TestDiscriminator:@pytest.fixturedef discriminator(self):"""创建一个测试用的判别器实例"""return Discriminator()def test_discriminator_output_shape(self, discriminator):"""测试判别器输出的形状是否正确"""batch_size = 2input_shape = (batch_size, 3, 256, 256)x = torch.randn(input_shape)output = discriminator(x)# PatchGAN判别器输出应该是(batch_size, 1, H/16, W/16)expected_shape = (batch_size, 1, 256//16, 256//16)assert output.shape == expected_shapeclass TestImageProcessing:@pytest.fixturedef sample_image(self):"""创建一个测试用的样例图像"""# 创建一个简单的RGB图像img = np.zeros((100, 100, 3), dtype=np.uint8)img[:50, :50] = [255, 0, 0]  # 左上角是红色img[50:, :50] = [0, 255, 0]  # 左下角是绿色img[:50, 50:] = [0, 0, 255]  # 右上角是蓝色img[50:, 50:] = [255, 255, 0]  # 右下角是黄色return Image.fromarray(img)def test_preprocess_image(self, sample_image):"""测试图像预处理函数"""tensor = preprocess_image(sample_image)# 检查输出是否为PyTorch张量assert isinstance(tensor, torch.Tensor)# 检查形状是否正确 (C, H, W)assert tensor.dim() == 3assert tensor.shape[0] == 3  # RGB通道# 检查值域是否在[-1, 1]之间assert tensor.min() >= -1.1assert tensor.max() <= 1.1def test_postprocess_image(self):"""测试图像后处理函数"""# 创建一个在[-1, 1]范围内的张量tensor = torch.rand(3, 64, 64) * 2 - 1img = postprocess_image(tensor)# 检查输出是否为PIL图像assert isinstance(img, Image.Image)# 检查图像大小是否正确assert img.size == (64, 64)# 检查图像模式是否为RGBassert img.mode == "RGB"class TestONNXExport:@pytest.fixturedef temp_onnx_path(self, tmp_path):"""创建临时文件路径用于ONNX模型导出"""return str(tmp_path / "temp_model.onnx")def test_onnx_export(self, temp_onnx_path):"""测试ONNX模型导出功能"""# 跳过如果没有安装onnxpytest.importorskip("onnx")# 创建一个简单的模型generator = Generator()# 导出模型success = export_to_onnx(generator, temp_onnx_path)# 检查导出是否成功assert success# 检查文件是否存在assert os.path.exists(temp_onnx_path)# 检查文件大小是否合理 (至少1MB)assert os.path.getsize(temp_onnx_path) > 1_000_000class TestCycleGANTraining:def test_cycle_consistency_loss(self):"""测试循环一致性损失计算是否正确"""from utils.losses import cycle_consistency_loss# 创建示例数据real_images = torch.rand(2, 3, 64, 64)cycled_images = real_images + 0.1 * torch.rand(2, 3, 64, 64)# 计算损失loss = cycle_consistency_loss(real_images, cycled_images)# 损失应该是正数assert loss > 0# 相同图像的损失应该为0identical_loss = cycle_consistency_loss(real_images, real_images)assert identical_loss < 1e-5def test_identity_loss(self):"""测试身份映射损失计算是否正确"""from utils.losses import identity_mapping_loss# 创建示例数据real_images = torch.rand(2, 3, 64, 64)identity_mapped = real_images + 0.1 * torch.rand(2, 3, 64, 64)# 计算损失loss = identity_mapping_loss(real_images, identity_mapped)# 损失应该是正数assert loss > 0# 相同图像的损失应该为0identical_loss = identity_mapping_loss(real_images, real_images)assert identical_loss < 1e-5# 集成测试
class TestEndToEnd:def test_style_transfer_pipeline(self, monkeypatch):"""测试完整的风格转换流程"""# 模拟图像处理和模型推理def mock_load_image(*args, **kwargs):return Image.new('RGB', (256, 256), color='white')def mock_inference(*args, **kwargs):# 返回一个随机图像tensor = torch.rand(3, 256, 256) * 2 - 1return postprocess_image(tensor)# 打补丁替换真实函数import utils.image_utilsmonkeypatch.setattr(utils.image_utils, "load_image", mock_load_image)monkeypatch.setattr(utils.inference, "run_inference", mock_inference)# 测试端到端流程from pipeline import StyleTransferPipelinepipeline = StyleTransferPipeline(style="monet")result = pipeline.process("dummy_input.jpg")# 检查结果是否为PIL图像assert isinstance(result, Image.Image)# 检查图像尺寸assert result.size == (256, 256)if __name__ == "__main__":pytest.main(["-v"])

在这里插入图片描述

完整学习计划总结

今天我们学习了如何构建一个完整的端到端图像生成系统,将CycleGAN和风格迁移技术结合起来,并通过多种工程实践实现了高效的训练、部署和监控。

1. CycleGAN与风格迁移结合

我们回顾了CycleGAN的核心原理,它通过两个生成器和两个判别器实现了无配对数据的域间转换,同时引入了循环一致性损失和身份映射损失来保证转换质量。风格迁移的核心在于内容损失和风格损失,通过Gram矩阵计算实现了风格特征的提取和应用。

这两种技术结合后,我们能够创建一个灵活的艺术风格转换系统,既能够保持内容的完整性,又能够应用多种艺术风格。

2. 多GPU训练与混合精度训练

为了加速模型训练,我们使用了PyTorch的DistributedDataParallel实现数据并行训练,让多个GPU同时处理不同的数据批次。同时,我们引入了混合精度训练,使用torch.cuda.amp模块的autocastGradScaler,通过FP16和FP32混合计算大幅提高训练速度,减少内存使用。

3. Gradio交互式界面

我们通过Gradio创建了一个简洁而强大的Web界面,支持图像上传、风格选择和结果展示,大大提高了系统的可用性。Gradio的优势在于快速开发、易于部署,适合AI应用的原型开发和演示。

4. ONNX模型导出与优化

通过将PyTorch模型导出为ONNX格式,我们实现了跨平台的模型部署,并通过ONNX Runtime显著提高了推理速度。ONNX的图优化和算子融合功能进一步提升了模型性能,同时通过量化技术减小了模型体积。

5. 性能监控仪表盘

使用Streamlit构建了实时性能监控仪表盘,跟踪CPU、内存、GPU使用率和推理延迟等关键指标,帮助识别性能瓶颈并优化系统表现。

6. CI/CD流水线与单元测试

构建了完整的CI/CD流水线,包括代码质量检查、单元测试、构建与打包、部署到测试环境和生产环境。同时,编写了全面的单元测试用例,覆盖了模型结构、图像处理、损失计算等核心功能,保证了代码质量和系统稳定性。

实战项目演示流程

  1. 环境准备:安装PyTorch、ONNX、Gradio等依赖
  2. 模型训练:使用多GPU和混合精度训练CycleGAN模型
  3. 模型优化:导出为ONNX并使用Runtime加速
  4. 交互界面:启动Gradio界面进行演示
  5. 性能监控:启动监控仪表盘观察系统性能
  6. CI/CD部署:通过GitHub Actions自动部署

清华大学全三版的《DeepSeek教程》完整的文档需要的朋友,关注我私信:deepseek 即可获得。

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

关键字:90设计电商模板_企业展厅建设计划书_谷歌seo什么意思_长沙百度推广开户

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: