当前位置: 首页> 健康> 美食 > 泉州手机端建站模板_工程接单网_站长网_互联网广告营销是什么

泉州手机端建站模板_工程接单网_站长网_互联网广告营销是什么

时间:2025/7/12 2:03:11来源:https://blog.csdn.net/yyfhq/article/details/143093328 浏览次数:0次
泉州手机端建站模板_工程接单网_站长网_互联网广告营销是什么
"""
This code started out as a PyTorch port of Ho et al's diffusion models:
https://github.com/hojonathanho/diffusion/blob/1e0dceb3b3495bbe19116a5e1b3596cd0706c543/diffusion_tf/diffusion_utils_2.pyDocstrings have been added, as well as DDIM sampling and a new collection of beta schedules.
"""
import enum
import math
import numpy as np
import torch as th
from dataset.path_manager import *
from diffusion.nn import mean_flat, mask_img, decompose_featmaps
from diffusion.losses import normal_kl, discretized_gaussian_log_likelihood
from diffusion.scheduler import get_schedule_jumpdef get_named_beta_schedule(schedule_name, num_diffusion_timesteps):"""Get a pre-defined beta schedule for the given name.The beta schedule library consists of beta schedules which remain similarin the limit of num_diffusion_timesteps.Beta schedules may be added, but should not be removed or changed oncethey are committed to maintain backwards compatibility."""if schedule_name == "linear":# Linear schedule from Ho et al, extended to work for any number of# diffusion steps.scale = 1000 / num_diffusion_timestepsbeta_start = scale * 0.0001beta_end = scale * 0.02return np.linspace(beta_start, beta_end, num_diffusion_timesteps, dtype=np.float64)elif schedule_name == "cosine":return betas_for_alpha_bar(num_diffusion_timesteps,lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2,)else:raise NotImplementedError(f"unknown beta schedule: {schedule_name}")def betas_for_alpha_bar(num_diffusion_timesteps, alpha_bar, max_beta=0.999):"""Create a beta schedule that discretizes the given alpha_t_bar function,which defines the cumulative product of (1-beta) over time from t = [0,1].:param num_diffusion_timesteps: the number of betas to produce.:param alpha_bar: a lambda that takes an argument t from 0 to 1 andproduces the cumulative product of (1-beta) up to thatpart of the diffusion process.:param max_beta: the maximum beta to use; use values lower than 1 toprevent singularities."""betas = []for i in range(num_diffusion_timesteps):t1 = i / num_diffusion_timestepst2 = (i + 1) / num_diffusion_timestepsbetas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta))return np.array(betas)class ModelMeanType(enum.Enum):"""Which type of output the model predicts."""PREVIOUS_X = enum.auto()  # the model predicts x_{t-1}START_X = enum.auto()  # the model predicts x_0EPSILON = enum.auto()  # the model predicts epsilonclass ModelVarType(enum.Enum):"""What is used as the model's output variance.The LEARNED_RANGE option has been added to allow the model to predictvalues between FIXED_SMALL and FIXED_LARGE, making its job easier."""LEARNED = enum.auto()FIXED_SMALL = enum.auto()FIXED_LARGE = enum.auto()LEARNED_RANGE = enum.auto()class LossType(enum.Enum):MSE = enum.auto()  # use raw MSE loss (and KL when learning variances)RESCALED_MSE = (enum.auto())  # use raw MSE loss (with RESCALED_KL when learning variances)KL = enum.auto()  # use the variational lower-boundRESCALED_KL = enum.auto()  # like KL, but rescale to estimate the full VLBdef is_vb(self):return self == LossType.KL or self == LossType.RESCALED_KLclass GaussianDiffusion:"""Utilities for training and sampling diffusion models.Ported directly from here, and then adapted over time to further experimentation.https://github.com/hojonathanho/diffusion/blob/1e0dceb3b3495bbe19116a5e1b3596cd0706c543/diffusion_tf/diffusion_utils_2.py#L42:param betas: a 1-D numpy array of betas for each diffusion timestep,starting at T and going to 1.:param model_mean_type: a ModelMeanType determining what the model outputs.:param model_var_type: a ModelVarType determining how variance is output.:param loss_type: a LossType determining the loss function to use.:param rescale_timesteps: if True, pass floating point timesteps into themodel so that they are always scaled like in theoriginal paper (0 to 1000)."""def __init__(self,*,args,betas,model_mean_type,model_var_type,loss_type,rescale_timesteps,):self.model_mean_type = model_mean_typeself.model_var_type = model_var_typeself.loss_type = loss_typeself.rescale_timesteps = rescale_timestepsself.ssc_refine = args.ssc_refineself.triplane_loss_type = args.triplane_loss_typeself.args = args# Use float64 for accuracy.betas = np.array(betas, dtype=np.float64)self.betas = betasassert len(betas.shape) == 1, "betas must be 1-D"assert (betas > 0).all() and (betas <= 1).all()self.num_timesteps = int(betas.shape[0])alphas = 1.0 - betasself.alphas_cumprod = np.cumprod(alphas, axis=0)self.alphas_cumprod_prev = np.append(1.0, self.alphas_cumprod[:-1])self.alphas_cumprod_next = np.append(self.alphas_cumprod[1:], 0.0)assert self.alphas_cumprod_prev.shape == (self.num_timesteps,)# calculations for diffusion q(x_t | x_{t-1}) and othersself.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)self.sqrt_one_minus_alphas_cumprod = np.sqrt(1.0 - self.alphas_cumprod)self.log_one_minus_alphas_cumprod = np.log(1.0 - self.alphas_cumprod)self.sqrt_recip_alphas_cumprod = np.sqrt(1.0 / self.alphas_cumprod)self.sqrt_recipm1_alphas_cumprod = np.sqrt(1.0 / self.alphas_cumprod - 1)# calculations for posterior q(x_{t-1} | x_t, x_0)self.posterior_variance = (betas * (1.0 - self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod))# log calculation clipped because the posterior variance is 0 at the# beginning of the diffusion chain.self.posterior_log_variance_clipped = np.log(np.append(self.posterior_variance[1], self.posterior_variance[1:]))self.posterior_mean_coef1 = (betas * np.sqrt(self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod))self.posterior_mean_coef2 = ((1.0 - self.alphas_cumprod_prev)* np.sqrt(alphas)/ (1.0 - self.alphas_cumprod))def undo(self, img_out, t, debug=False):'''p(x_t|x_{t-1})'''beta = _extract_into_tensor(self.betas, t, img_out.shape)img_in_est = th.sqrt(1 - beta) * img_out + th.sqrt(beta) * th.randn_like(img_out)return img_in_estdef q_mean_variance(self, x_start, t):"""Get the distribution q(x_t | x_0).:param x_start: the [N x C x ...] tensor of noiseless inputs.:param t: the number of diffusion steps (minus 1). Here, 0 means one step.:return: A tuple (mean, variance, log_variance), all of x_start's shape."""mean = (_extract_into_tensor(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start)variance = _extract_into_tensor(1.0 - self.alphas_cumprod, t, x_start.shape)log_variance = _extract_into_tensor(self.log_one_minus_alphas_cumprod, t, x_start.shape)return mean, variance, log_variancedef q_sample(self, x_start, t, noise=None):"""Diffuse the data for a given number of diffusion steps.In other words, sample from q(x_t | x_0).:param x_start: the initial data batch.:param t: the number of diffusion steps (minus 1). Here, 0 means one step.:param noise: if specified, the split-out normal noise.:return: A noisy version of x_start."""if noise is None:noise = th.randn_like(x_start)assert noise.shape == x_start.shapereturn (_extract_into_tensor(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start+ _extract_into_tensor(self.sqrt_one_minus_alphas_cumprod, t, x_start.shape)* noise)def q_posterior_mean_variance(self, x_start, x_t, t):"""Compute the mean and variance of the diffusion posterior:q(x_{t-1} | x_t, x_0)"""assert x_start.shape == x_t.shapeposterior_mean = (_extract_into_tensor(self.posterior_mean_coef1, t, x_t.shape) * x_start+ _extract_into_tensor(self.posterior_mean_coef2, t, x_t.shape) * x_t)posterior_variance = _extract_into_tensor(self.posterior_variance, t, x_t.shape)posterior_log_variance_clipped = _extract_into_tensor(self.posterior_log_variance_clipped, t, x_t.shape)assert (posterior_mean.shape[0]== posterior_variance.shape[0]== posterior_log_variance_clipped.shape[0]== x_start.shape[0])return posterior_mean, posterior_variance, posterior_log_variance_clippeddef p_mean_variance(self, model, x, t, clip_denoised=True, denoised_fn=None, model_kwargs=None):"""Apply the model to get p(x_{t-1} | x_t), as well as a prediction ofthe initial x, x_0.:param model: the model, which takes a signal and a batch of timestepsas input.:param x: the [N x C x ...] tensor at time t.:param t: a 1-D Tensor of timesteps.:param clip_denoised: if True, clip the denoised signal into [-1, 1].:param denoised_fn: if not None, a function which applies to thex_start prediction before it is used to sample. Applies beforeclip_denoised.:param model_kwargs: if not None, a dict of extra keyword arguments topass to the model. This can be used for conditioning.:return: a dict with the following keys:- 'mean': the model mean output.- 'variance': the model variance output.- 'log_variance': the log of 'variance'.- 'pred_xstart': the prediction for x_0."""if model_kwargs is None:model_kwargs = {}B, C = x.shape[:2]assert t.shape == (B,)model_output = model(x, self._scale_timesteps(t), model_kwargs['H'], model_kwargs['W'], model_kwargs['D'], model_kwargs['y'])if self.model_var_type in [ModelVarType.LEARNED, ModelVarType.LEARNED_RANGE]:assert model_output.shape == (B, C * 2, *x.shape[2:])model_output, model_var_values = th.split(model_output, C, dim=1)if self.model_var_type == ModelVarType.LEARNED:model_log_variance = model_var_valuesmodel_variance = th.exp(model_log_variance)else:min_log = _extract_into_tensor(self.posterior_log_variance_clipped, t, x.shape)max_log = _extract_into_tensor(np.log(self.betas), t, x.shape)# The model_var_values is [-1, 1] for [min_var, max_var].frac = (model_var_values + 1) / 2model_log_variance = frac * max_log + (1 - frac) * min_logmodel_variance = th.exp(model_log_variance)else:model_variance, model_log_variance = {# for fixedlarge, we set the initial (log-)variance like so# to get a better decoder log likelihood.ModelVarType.FIXED_LARGE: (np.append(self.posterior_variance[1], self.betas[1:]),np.log(np.append(self.posterior_variance[1], self.betas[1:])),),ModelVarType.FIXED_SMALL: (self.posterior_variance,self.posterior_log_variance_clipped,),}[self.model_var_type]model_variance = _extract_into_tensor(model_variance, t, x.shape)model_log_variance = _extract_into_tensor(model_log_variance, t, x.shape)def process_xstart(x):if denoised_fn is not None:x = denoised_fn(x)if clip_denoised:return x.clamp(-1, 1)return xif self.model_mean_type == ModelMeanType.PREVIOUS_X:pred_xstart = process_xstart(self._predict_xstart_from_xprev(x_t=x, t=t, xprev=model_output))model_mean = model_outputelif self.model_mean_type in [ModelMeanType.START_X, ModelMeanType.EPSILON]:if self.model_mean_type == ModelMeanType.START_X:pred_xstart = process_xstart(model_output)else:pred_xstart = process_xstart(self._predict_xstart_from_eps(x_t=x, t=t, eps=model_output))model_mean, _, _ = self.q_posterior_mean_variance(x_start=pred_xstart, x_t=x, t=t)else:raise NotImplementedError(self.model_mean_type)assert (model_mean.shape == model_log_variance.shape == pred_xstart.shape == x.shape)return {"mean": model_mean,"variance": model_variance,"log_variance": model_log_variance,"pred_xstart": pred_xstart,}def _predict_xstart_from_eps(self, x_t, t, eps):assert x_t.shape == eps.shapereturn (_extract_into_tensor(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t- _extract_into_tensor(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape) * eps)def _predict_xstart_from_xprev(self, x_t, t, xprev):assert x_t.shape == xprev.shapereturn (  # (xprev - coef2*x_t) / coef1_extract_into_tensor(1.0 / self.posterior_mean_coef1, t, x_t.shape) * xprev- _extract_into_tensor(self.posterior_mean_coef2 / self.posterior_mean_coef1, t, x_t.shape)* x_t)def _predict_eps_from_xstart(self, x_t, t, pred_xstart):return (_extract_into_tensor(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t- pred_xstart) / _extract_into_tensor(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape)def _scale_timesteps(self, t):if self.rescale_timesteps:return t.float() * (1000.0 / self.num_timesteps)return tdef condition_mean(self, cond_fn, p_mean_var, x, t, model_kwargs=None):"""Compute the mean for the previous step, given a function cond_fn thatcomputes the gradient of a conditional log probability with respect tox. In particular, cond_fn computes grad(log(p(y|x))), and we want tocondition on y.This uses the conditioning strategy from Sohl-Dickstein et al. (2015)."""gradient = cond_fn(x, self._scale_timesteps(t), model_kwargs['H'], model_kwargs['W'], model_kwargs['D'], model_kwargs['y'])new_mean = (p_mean_var["mean"].float() + p_mean_var["variance"] * gradient.float())return new_meandef condition_score(self, cond_fn, p_mean_var, x, t, model_kwargs=None):"""Compute what the p_mean_variance output would have been, should themodel's score function be conditioned by cond_fn.See condition_mean() for details on cond_fn.Unlike condition_mean(), this instead uses the conditioning strategyfrom Song et al (2020)."""alpha_bar = _extract_into_tensor(self.alphas_cumprod, t, x.shape)eps = self._predict_eps_from_xstart(x, t, p_mean_var["pred_xstart"])eps = eps - (1 - alpha_bar).sqrt() * cond_fn(x, self._scale_timesteps(t), model_kwargs['H'], model_kwargs['W'], model_kwargs['D'], model_kwargs['y'])out = p_mean_var.copy()out["pred_xstart"] = self._predict_xstart_from_eps(x, t, eps)out["mean"], _, _ = self.q_posterior_mean_variance(x_start=out["pred_xstart"], x_t=x, t=t)return out# def p_sample(#     self,#     model,#     x,#     t,#     clip_denoised=True,#     denoised_fn=None,#     cond_fn=None,#     model_kwargs=None,# ):#     """#     Sample x_{t-1} from the model at the given timestep.# #     :param model: the model to sample from.#     :param x: the current tensor at x_{t-1}.#     :param t: the value of t, starting at 0 for the first diffusion step.#     :param clip_denoised: if True, clip the x_start prediction to [-1, 1].#     :param denoised_fn: if not None, a function which applies to the#         x_start prediction before it is used to sample.#     :param cond_fn: if not None, this is a gradient function that acts#                     similarly to the model.#     :param model_kwargs: if not None, a dict of extra keyword arguments to#         pass to the model. This can be used for conditioning.#     :return: a dict containing the following keys:#              - 'sample': a random sample from the model.#              - 'pred_xstart': a prediction of x_0.#     """#     out = self.p_mean_variance(#         model,#         x,#         t,#         clip_denoised=clip_denoised,#         denoised_fn=denoised_fn,#         model_kwargs=model_kwargs,#     )#     noise = th.randn_like(x)#     nonzero_mask = (#         (t != 0).float().view(-1, *([1] * (len(x.shape) - 1)))#     )  # no noise when t == 0#     if cond_fn is not None:#         out["mean"] = self.condition_mean(#             cond_fn, out, x, t, model_kwargs=model_kwargs#         )#     sample = out["mean"] + nonzero_mask * th.exp(0.5 * out["log_variance"]) * noise#     if (self.triplane_loss_type == 'residual_plus_decoder') or (self.triplane_loss_type == 'residual'):#         sample = sample + model_kwargs['y'].to(sample.device)#     return {"sample": sample, "pred_xstart": out["pred_xstart"]}def attention_masking(self, x, t, attn_map, prev_noise, blur_sigma, model_kwargs=None,):B, C, H, W = x.shapeassert t.shape == (B,)if self.sel_attn_depth in [0, 1, 2] or self.sel_attn_block == "middle":attn_res = 8elif self.sel_attn_depth in [3, 4, 5]:attn_res = 16elif self.sel_attn_depth in [6, 7, 8]:attn_res = 32else:raise ValueError("sel_attn_depth must be in [0, 1, 2, 3, 4, 5, 6, 7, 8]")attn_mask = attn_map.reshape(B, self.num_heads, attn_res ** 2, attn_res ** 2).mean(1, keepdim=False).sum(1, keepdim=False) > 1.0attn_mask = attn_mask.reshape(B, attn_res, attn_res).unsqueeze(1).repeat(1, 3, 1, 1).int().float()attn_mask = F.interpolate(attn_mask, (H, W))transform = T.GaussianBlur(kernel_size=31, sigma=blur_sigma)x_curr = transform(x)x_curr = x_curr * attn_mask + x * (1 - attn_mask)x_curr = self.q_sample(x_curr, t, noise=prev_noise)return x_currdef p_sample(self,model,x,t,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,):guidance_kwargs = {'guide_scale': 1.5,  # 指导尺度'guide_start': 50,  # 指导开始的时间步'blur_sigma': 2.0  # 高斯模糊的标准差}guide_scale = guidance_kwargs.get('guide_scale', 1.0)guide_start = guidance_kwargs.get('guide_start', 0)blur_sigma = guidance_kwargs.get('blur_sigma', 1.0)out = self.p_mean_variance(model,x,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,model_kwargs=model_kwargs,)cond_eps = out['eps']if 'attn_map' not in out:raise KeyError("error 'attn_map'。")if t.item() < guide_start:mask_blurred = self.attention_masking(out['pred_xstart'],t,out['attn_map'],prev_noise=cond_eps,blur_sigma=blur_sigma,model_kwargs=model_kwargs,)mask_out = self.p_mean_variance(model,mask_blurred,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,model_kwargs=model_kwargs,)uncond_eps = mask_out['eps']guided_eps = uncond_eps + guide_scale * (cond_eps - uncond_eps)def process_xstart(x_start):if denoised_fn is not None:x_start = denoised_fn(x_start)if clip_denoised:x_start = x_start.clamp(-1, 1)return x_startpred_xstart = process_xstart(self._predict_xstart_from_eps(x_t=x, t=t, eps=guided_eps))final_out = {}final_out["mean"], _, _ = self.q_posterior_mean_variance(x_start=pred_xstart, x_t=x, t=t)final_out["variance"] = out["variance"]else:final_out = {"mean": out["mean"],"variance": out["variance"]}noise = th.randn_like(x)nonzero_mask = ((t != 0).float().view(-1, *([1] * (len(x.shape) - 1))))if cond_fn is not None:final_out["mean"] = self.condition_mean(cond_fn, final_out, x, t, model_kwargs=model_kwargs)sample = final_out["mean"] + nonzero_mask * th.exp(0.5 * final_out["variance"].log()) * noiseif (self.triplane_loss_type == 'residual_plus_decoder') or (self.triplane_loss_type == 'residual'):sample = sample + model_kwargs['y'].to(sample.device)pred_xstart = out.get("pred_xstart", final_out.get("mean"))return {"sample": sample, "pred_xstart": pred_xstart}def p_sample_loop(self,model,shape,noise=None,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,device=None,progress=False,save_timestep_interval=None,):"""Generate samples from the model.:param model: the model module.:param shape: the shape of the samples, (N, C, H, W).:param noise: if specified, the noise from the encoder to sample.Should be of the same shape as `shape`.:param clip_denoised: if True, clip x_start predictions to [-1, 1].:param denoised_fn: if not None, a function which applies to thex_start prediction before it is used to sample.:param cond_fn: if not None, this is a gradient function that actssimilarly to the model.:param model_kwargs: if not None, a dict of extra keyword arguments topass to the model. This can be used for conditioning.:param device: if specified, the device to create the samples on.If not specified, use a model parameter's device.:param progress: if True, show a tqdm progress bar.:return: a non-differentiable batch of samples."""final = Noneif save_timestep_interval is not None:prev_steps = dict()for idx, sample in enumerate(self.p_sample_loop_progressive(model,shape,noise=noise,clip_denoised=clip_denoised,denoised_fn=denoised_fn,cond_fn=cond_fn,model_kwargs=model_kwargs,device=device,progress=progress,)):final = sampleif (save_timestep_interval is not None) and (idx % save_timestep_interval == 0): # save every save_timestep_interval stepsprev_steps[str(idx)] = final["sample"]if (save_timestep_interval is not None) and (idx > 960): # # save every steps after 900 stepsprev_steps[str(idx)] = final["sample"]if save_timestep_interval is not None: prev_steps[str(1000)] = final["sample"]return prev_stepselse : return final["sample"]def p_sample_loop_progressive(self,model,shape,noise=None,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,device=None,progress=False,):"""Generate samples from the model and yield intermediate samples fromeach timestep of diffusion.Arguments are the same as p_sample_loop().Returns a generator over dicts, where each dict is the return value ofp_sample()."""if device is None:device = next(model.parameters()).deviceassert isinstance(shape, (tuple, list))if noise is not None:img = noiseelse:img = th.randn(*shape, device=device)indices = list(range(self.num_timesteps))[::-1]if progress:# Lazy import so that we don't depend on tqdm.from tqdm.auto import tqdmindices = tqdm(indices)for i in indices:t = th.tensor([i] * shape[0], device=device)with th.no_grad():out = self.p_sample(model,img,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,cond_fn=cond_fn,model_kwargs=model_kwargs,)yield outimg = out["sample"]def p_sample_loop_scene_repaint(self,model,shape,cond,mode = 'down',overlap = 64,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,device=None,):if device is None:device = next(model.parameters()).deviceassert isinstance(shape, (tuple, list))image_after_step = th.randn(*shape, device=device)mask_cond = cond.detach().clone()times = get_schedule_jump(t_T=self.num_timesteps, jump_length=20, jump_n_sample=5)time_pairs = list(zip(times[:-1], times[1:]))with th.no_grad():for t_last, t_cur in time_pairs:t_last_t = th.tensor([t_last] * shape[0], device=device)if t_cur < t_last:  # reverset_cond = self.q_sample(mask_cond, t_last_t)image_after_step = mask_img(image_after_step, t_cond, mode, overlap, H=model_kwargs['H'])out = self.p_sample(model,image_after_step,t_last_t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,cond_fn=cond_fn,model_kwargs=model_kwargs,)image_after_step = out["sample"]else:t_shift = 1image_after_step = self.undo(image_after_step, t=t_last_t+t_shift, debug=False)return image_after_stepdef p_sample_loop_scene(self,model,shape,cond,mode = 'down',overlap = 64,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,device=None,):if device is None:device = next(model.parameters()).deviceassert isinstance(shape, (tuple, list))img = th.randn(*shape, device=device)indices = list(range(self.num_timesteps))[::-1]mask_cond = cond.detach().clone()for i in indices:t = th.tensor([i] * shape[0], device=device)with th.no_grad():m_cond = self.q_sample(mask_cond, t)img = mask_img(img, m_cond, mode, overlap, H=model_kwargs['H'])out = self.p_sample(model,img,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,cond_fn=cond_fn,model_kwargs=model_kwargs,)     img = out["sample"]return imgdef ddim_sample(self,model,x,t,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,eta=0.0,y0=None,mask=None,is_mask_t0=False,):"""Sample x_{t-1} from the model using DDIM.Same usage as p_sample()."""out = self.p_mean_variance(model,x,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,model_kwargs=model_kwargs,)if cond_fn is not None:out = self.condition_score(cond_fn, out, x, t, model_kwargs=model_kwargs)# masked generationif y0 is not None and mask is not None:assert y0.shape == x.shapeassert mask.shape == x.shapeif is_mask_t0:out["pred_xstart"] = mask * y0 + (1 - mask) * out["pred_xstart"]else:nonzero_mask = ((t != 0).float().view(-1, *([1] * (len(x.shape) - 1))))  # no noise when t == 0out["pred_xstart"] = (mask * y0 + (1 - mask) * out["pred_xstart"]) * nonzero_mask + out["pred_xstart"] * (1 - nonzero_mask)# Usually our model outputs epsilon, but we re-derive it# in case we used x_start or x_prev prediction.eps = self._predict_eps_from_xstart(x, t, out["pred_xstart"])alpha_bar = _extract_into_tensor(self.alphas_cumprod, t, x.shape)alpha_bar_prev = _extract_into_tensor(self.alphas_cumprod_prev, t, x.shape)sigma = (eta* th.sqrt((1 - alpha_bar_prev) / (1 - alpha_bar))* th.sqrt(1 - alpha_bar / alpha_bar_prev))# Equation 12.noise = th.randn_like(x)mean_pred = (out["pred_xstart"] * th.sqrt(alpha_bar_prev)+ th.sqrt(1 - alpha_bar_prev - sigma ** 2) * eps)nonzero_mask = ((t != 0).float().view(-1, *([1] * (len(x.shape) - 1))))  # no noise when t == 0sample = mean_pred + nonzero_mask * sigma * noisereturn {"sample": sample, "pred_xstart": out["pred_xstart"]}def ddim_reverse_sample(self,model,x,t,clip_denoised=True,denoised_fn=None,model_kwargs=None,eta=0.0,):"""Sample x_{t+1} from the model using DDIM reverse ODE."""assert eta == 0.0, "Reverse ODE only for deterministic path"out = self.p_mean_variance(model,x,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,model_kwargs=model_kwargs,)# Usually our model outputs epsilon, but we re-derive it# in case we used x_start or x_prev prediction.eps = (_extract_into_tensor(self.sqrt_recip_alphas_cumprod, t, x.shape) * x- out["pred_xstart"]) / _extract_into_tensor(self.sqrt_recipm1_alphas_cumprod, t, x.shape)alpha_bar_next = _extract_into_tensor(self.alphas_cumprod_next, t, x.shape)# Equation 12. reversedmean_pred = (out["pred_xstart"] * th.sqrt(alpha_bar_next)+ th.sqrt(1 - alpha_bar_next) * eps)return {"sample": mean_pred, "pred_xstart": out["pred_xstart"]}def ddim_sample_loop(self,model,shape,noise=None,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,device=None,progress=False,eta=0.0,y0=None,mask=None,is_mask_t0=False,):"""Generate samples from the model using DDIM.Same usage as p_sample_loop()."""final = Nonefor sample in self.ddim_sample_loop_progressive(model,shape,noise=noise,clip_denoised=clip_denoised,denoised_fn=denoised_fn,cond_fn=cond_fn,model_kwargs=model_kwargs,device=device,progress=progress,eta=eta,y0=y0,mask=mask,is_mask_t0=is_mask_t0,):final = samplereturn final["sample"]def ddim_sample_loop_progressive(self,model,shape,noise=None,clip_denoised=True,denoised_fn=None,cond_fn=None,model_kwargs=None,device=None,progress=False,eta=0.0,y0=None,mask=None,is_mask_t0=False,):"""Use DDIM to sample from the model and yield intermediate samples fromeach timestep of DDIM.Same usage as p_sample_loop_progressive()."""if device is None:device = next(model.parameters()).deviceassert isinstance(shape, (tuple, list))if noise is not None:img = noiseelse:img = th.randn(*shape, device=device)indices = list(range(self.num_timesteps))[::-1]if progress:# Lazy import so that we don't depend on tqdm.from tqdm.auto import tqdmindices = tqdm(indices)for i in indices:t = th.tensor([i] * shape[0], device=device)with th.no_grad():out = self.ddim_sample(model,img,t,clip_denoised=clip_denoised,denoised_fn=denoised_fn,cond_fn=cond_fn,model_kwargs=model_kwargs,eta=eta,y0=y0,mask=mask,is_mask_t0=is_mask_t0,)yield outimg = out["sample"]def _vb_terms_bpd(self, model, x_start, x_t, t, clip_denoised=True, model_kwargs=None):"""Get a term for the variational lower-bound.The resulting units are bits (rather than nats, as one might expect).This allows for comparison to other papers.:return: a dict with the following keys:- 'output': a shape [N] tensor of NLLs or KLs.- 'pred_xstart': the x_0 predictions."""true_mean, _, true_log_variance_clipped = self.q_posterior_mean_variance(x_start=x_start, x_t=x_t, t=t)out = self.p_mean_variance(model, x_t, t, clip_denoised=clip_denoised, model_kwargs=model_kwargs)kl = normal_kl(true_mean, true_log_variance_clipped, out["mean"], out["log_variance"])kl = mean_flat(kl) / np.log(2.0)decoder_nll = -discretized_gaussian_log_likelihood(x_start, means=out["mean"], log_scales=0.5 * out["log_variance"])assert decoder_nll.shape == x_start.shapedecoder_nll = mean_flat(decoder_nll) / np.log(2.0)# At the first timestep return the decoder NLL,# otherwise return KL(q(x_{t-1}|x_t,x_0) || p(x_{t-1}|x_t))output = th.where((t == 0), decoder_nll, kl)return {"output": output, "pred_xstart": out["pred_xstart"]}def merge_features(self, xy_feat, xz_feat, yz_feat):# Expand dimensionsxy_feat_exp = xy_feat.unsqueeze(4)  # Add z dimensionxz_feat_exp = xz_feat.unsqueeze(3)   # Add y dimensionyz_feat_exp = yz_feat.unsqueeze(2)   # Add x dimension# Calculate the size of the new 3D tensorB, C, H, W, D = xy_feat_exp.size(0), xy_feat_exp.size(1), xy_feat_exp.size(2), xy_feat_exp.size(3), yz_feat_exp.size(4)# Initialize a 3D tensor with zerosmerged_tensor = th.zeros((B, C, H, W, D), device=xy_feat.device)# Fill the tensor with the expanded feature mapsmerged_tensor += xy_feat_exp.expand_as(merged_tensor)merged_tensor += xz_feat_exp.expand_as(merged_tensor)merged_tensor += yz_feat_exp.expand_as(merged_tensor)return merged_tensordef training_losses(self, model, x_start, t, model_kwargs=None, noise=None):"""Compute training losses for a single timestep.:param model: the model to evaluate loss on.:param x_start: the [N x C x ...] tensor of inputs.:param t: a batch of timestep indices.:param model_kwargs: if not None, a dict of extra keyword arguments topass to the model. This can be used for conditioning.:param noise: if specified, the specific Gaussian noise to try to remove.:return: a dict with the key "loss" containing a tensor of shape [N].Some mean or variance settings may also have other keys."""if model_kwargs is None:model_kwargs = {}if noise is None:noise = th.randn_like(x_start)terms = {}if self.ssc_refine :with th.no_grad():large_T = th.tensor([self.num_timesteps-1] * x_start.shape[0], device=x_start.device)m_t = self.q_sample(x_start, large_T)m_1 = model(m_t, large_T, model_kwargs['H'], model_kwargs['W'], model_kwargs['D'], model_kwargs['y'])x_t = self.q_sample(m_1, t, noise=noise)else : x_t = self.q_sample(x_start, t, noise=noise)model_output = model(x_t, self._scale_timesteps(t), model_kwargs['H'], model_kwargs['W'], model_kwargs['D'], model_kwargs['y'])if self.model_var_type in [ModelVarType.LEARNED, ModelVarType.LEARNED_RANGE]:B, C = x_t.shape[:2]assert model_output.shape == (B, C * 2, *x_t.shape[2:])model_output, model_var_values = th.split(model_output, C, dim=1)# Learn the variance using the variational bound, but don't let# it affect our mean prediction.frozen_out = th.cat([model_output.detach(), model_var_values], dim=1)terms["vb"] = self._vb_terms_bpd(model=lambda *args, r=frozen_out: r,x_start=x_start,x_t=x_t,t=t,clip_denoised=False,)["output"]if self.loss_type == LossType.RESCALED_MSE:# Divide by 1000 for equivalence with initial implementation.# Without a factor of 1/1000, the VB term hurts the MSE term.terms["vb"] *= self.num_timesteps / 1000.0target = {ModelMeanType.PREVIOUS_X: self.q_posterior_mean_variance(x_start=x_start, x_t=x_t, t=t)[0],ModelMeanType.START_X: x_start,ModelMeanType.EPSILON: noise,}[self.model_mean_type]assert model_output.shape == target.shape == x_start.shapeif self.args.voxel_fea :if self.triplane_loss_type == 'l1':terms["loss"] = mean_flat(th.abs(target - model_output))elif self.triplane_loss_type == 'l2':terms["loss"] = mean_flat((target - model_output)**2)else : H, W, D = model_kwargs["H"], model_kwargs["W"], model_kwargs["D"]trisize = (H[0], W[0], D[0])target_xy, target_xz, target_yz = decompose_featmaps(target, trisize)model_output_xy, model_output_xz, model_output_yz = decompose_featmaps(model_output, trisize)if self.triplane_loss_type == 'l1':terms["l1_xy"] = mean_flat(th.abs(target_xy - model_output_xy))terms["l1_xz"] = mean_flat(th.abs(target_xz - model_output_xz))terms["l1_yz"] = mean_flat(th.abs(target_yz - model_output_yz))if "vb" in terms:terms["loss"] = terms["l1_xy"] + terms["l1_xz"] + terms["l1_yz"] + terms["vb"]else:terms["loss"] = terms["l1_xy"] + terms["l1_xz"] + terms["l1_yz"]elif self.triplane_loss_type == 'l2':terms["l2_xy"] = mean_flat((target_xy - model_output_xy)**2)terms["l2_xz"] = mean_flat((target_xz - model_output_xz)**2)terms["l2_yz"] = mean_flat((target_yz - model_output_yz)**2)if "vb" in terms:terms["loss"] = terms["l2_xy"] + terms["l2_xz"] + terms["l2_yz"] + terms["vb"]else:terms["loss"] = terms["l2_xy"] + terms["l2_xz"] + terms["l2_yz"]else:raise ValueError("Unknown loss type: {}".format(self.triplane_loss_type))   return termsdef _prior_bpd(self, x_start):"""Get the prior KL term for the variational lower-bound, measured inbits-per-dim.This term can't be optimized, as it only depends on the encoder.:param x_start: the [N x C x ...] tensor of inputs.:return: a batch of [N] KL values (in bits), one per batch element."""batch_size = x_start.shape[0]t = th.tensor([self.num_timesteps - 1] * batch_size, device=x_start.device)qt_mean, _, qt_log_variance = self.q_mean_variance(x_start, t)kl_prior = normal_kl(mean1=qt_mean, logvar1=qt_log_variance, mean2=0.0, logvar2=0.0)return mean_flat(kl_prior) / np.log(2.0)def calc_bpd_loop(self, model, x_start, clip_denoised=True, model_kwargs=None):"""Compute the entire variational lower-bound, measured in bits-per-dim,as well as other related quantities.:param model: the model to evaluate loss on.:param x_start: the [N x C x ...] tensor of inputs.:param clip_denoised: if True, clip denoised samples.:param model_kwargs: if not None, a dict of extra keyword arguments topass to the model. This can be used for conditioning.:return: a dict containing the following keys:- total_bpd: the total variational lower-bound, per batch element.- prior_bpd: the prior term in the lower-bound.- vb: an [N x T] tensor of terms in the lower-bound.- xstart_mse: an [N x T] tensor of x_0 MSEs for each timestep.- mse: an [N x T] tensor of epsilon MSEs for each timestep."""device = x_start.devicebatch_size = x_start.shape[0]vb = []xstart_mse = []mse = []for t in list(range(self.num_timesteps))[::-1]:t_batch = th.tensor([t] * batch_size, device=device)noise = th.randn_like(x_start)x_t = self.q_sample(x_start=x_start, t=t_batch, noise=noise)# Calculate VLB term at the current timestepwith th.no_grad():out = self._vb_terms_bpd(model,x_start=x_start,x_t=x_t,t=t_batch,clip_denoised=clip_denoised,model_kwargs=model_kwargs,)vb.append(out["output"])xstart_mse.append(mean_flat((out["pred_xstart"] - x_start) ** 2))eps = self._predict_eps_from_xstart(x_t, t_batch, out["pred_xstart"])mse.append(mean_flat((eps - noise) ** 2))vb = th.stack(vb, dim=1)xstart_mse = th.stack(xstart_mse, dim=1)mse = th.stack(mse, dim=1)prior_bpd = self._prior_bpd(x_start)total_bpd = vb.sum(dim=1) + prior_bpdreturn {"total_bpd": total_bpd,"prior_bpd": prior_bpd,"vb": vb,"xstart_mse": xstart_mse,"mse": mse,}def _extract_into_tensor(arr, timesteps, broadcast_shape):"""Extract values from a 1-D numpy array for a batch of indices.:param arr: the 1-D numpy array.:param timesteps: a tensor of indices into the array to extract.:param broadcast_shape: a larger shape of K dimensions with the batchdimension equal to the length of timesteps.:return: a tensor of shape [batch_size, 1, ...] where the shape has K dims."""res = th.from_numpy(arr).to(device=timesteps.device)[timesteps].float()while len(res.shape) < len(broadcast_shape):res = res[..., None]return res.expand(broadcast_shape)
关键字:泉州手机端建站模板_工程接单网_站长网_互联网广告营销是什么

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: