|
import torch
|
|
import torch.nn as nn
|
|
from packaging import version
|
|
|
|
|
|
|
|
|
|
OPENAIUNETWRAPPER = "sgm.modules.diffusionmodules.wrappers.OpenAIWrapper"
|
|
|
|
|
|
class IdentityWrapper(nn.Module):
|
|
def __init__(self, diffusion_model, compile_model: bool = False):
|
|
super().__init__()
|
|
compile = (
|
|
torch.compile
|
|
if (version.parse(torch.__version__) >= version.parse("2.0.0"))
|
|
and compile_model
|
|
else lambda x: x
|
|
)
|
|
self.diffusion_model = compile(diffusion_model)
|
|
|
|
def forward(self, *args, **kwargs):
|
|
return self.diffusion_model(*args, **kwargs)
|
|
|
|
|
|
class OpenAIWrapper(IdentityWrapper):
|
|
def forward(
|
|
self, x: torch.Tensor, t: torch.Tensor, c: dict, **kwargs
|
|
) -> torch.Tensor:
|
|
x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=1)
|
|
return self.diffusion_model(
|
|
x,
|
|
timesteps=t,
|
|
context=c.get("crossattn", None),
|
|
y=c.get("vector", None),
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
class OpenAIHalfWrapper(IdentityWrapper):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.diffusion_model = self.diffusion_model.half()
|
|
|
|
def forward(
|
|
self, x: torch.Tensor, t: torch.Tensor, c: dict, **kwargs
|
|
) -> torch.Tensor:
|
|
x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=1)
|
|
_context = c.get("crossattn", None)
|
|
_y = c.get("vector", None)
|
|
if _context is not None:
|
|
_context = _context.half()
|
|
if _y is not None:
|
|
_y = _y.half()
|
|
x = x.half()
|
|
t = t.half()
|
|
|
|
out = self.diffusion_model(
|
|
x,
|
|
timesteps=t,
|
|
context=_context,
|
|
y=_y,
|
|
**kwargs,
|
|
)
|
|
return out.float()
|
|
|
|
|
|
class ControlWrapper(nn.Module):
|
|
def __init__(self, diffusion_model, compile_model: bool = False, dtype=torch.float32):
|
|
super().__init__()
|
|
self.compile = (
|
|
torch.compile
|
|
if (version.parse(torch.__version__) >= version.parse("2.0.0"))
|
|
and compile_model
|
|
else lambda x: x
|
|
)
|
|
self.diffusion_model = self.compile(diffusion_model)
|
|
self.control_model = None
|
|
self.dtype = dtype
|
|
|
|
def load_control_model(self, control_model):
|
|
self.control_model = self.compile(control_model)
|
|
|
|
def forward(
|
|
self, x: torch.Tensor, t: torch.Tensor, c: dict, control_scale=1, **kwargs
|
|
) -> torch.Tensor:
|
|
with torch.autocast("cuda", dtype=self.dtype):
|
|
control = self.control_model(x=c.get("control", None), timesteps=t, xt=x,
|
|
control_vector=c.get("control_vector", None),
|
|
mask_x=c.get("mask_x", None),
|
|
context=c.get("crossattn", None),
|
|
y=c.get("vector", None))
|
|
out = self.diffusion_model(
|
|
x,
|
|
timesteps=t,
|
|
context=c.get("crossattn", None),
|
|
y=c.get("vector", None),
|
|
control=control,
|
|
control_scale=control_scale,
|
|
**kwargs,
|
|
)
|
|
return out.float()
|
|
|
|
|