|
import argparse |
|
import inspect |
|
|
|
from . import gaussian_diffusion as gd |
|
from .respace import SpacedDiffusion, space_timesteps |
|
from .unet import SuperResModel, UNetModel, EncoderUNetModel |
|
|
|
NUM_CLASSES = 1000 |
|
|
|
|
|
def diffusion_defaults(): |
|
""" |
|
Defaults for image and classifier training. |
|
""" |
|
return dict( |
|
learn_sigma=False, |
|
diffusion_steps=1000, |
|
noise_schedule="linear", |
|
timestep_respacing="", |
|
use_kl=False, |
|
predict_xstart=False, |
|
rescale_timesteps=False, |
|
rescale_learned_sigmas=False, |
|
) |
|
|
|
|
|
def classifier_defaults(): |
|
""" |
|
Defaults for classifier models. |
|
""" |
|
return dict( |
|
image_size=64, |
|
classifier_use_fp16=False, |
|
classifier_width=128, |
|
classifier_depth=2, |
|
classifier_attention_resolutions="32,16,8", |
|
classifier_use_scale_shift_norm=True, |
|
classifier_resblock_updown=True, |
|
classifier_pool="attention", |
|
) |
|
|
|
|
|
def model_and_diffusion_defaults(): |
|
""" |
|
Defaults for image training. |
|
""" |
|
res = dict( |
|
image_size=64, |
|
num_channels=128, |
|
num_res_blocks=2, |
|
num_heads=4, |
|
num_heads_upsample=-1, |
|
num_head_channels=-1, |
|
attention_resolutions="16,8", |
|
channel_mult="", |
|
dropout=0.0, |
|
class_cond=False, |
|
use_checkpoint=False, |
|
use_scale_shift_norm=True, |
|
resblock_updown=False, |
|
use_fp16=False, |
|
use_new_attention_order=False, |
|
) |
|
res.update(diffusion_defaults()) |
|
return res |
|
|
|
|
|
def classifier_and_diffusion_defaults(): |
|
res = classifier_defaults() |
|
res.update(diffusion_defaults()) |
|
return res |
|
|
|
|
|
def create_model_and_diffusion( |
|
image_size, |
|
class_cond, |
|
learn_sigma, |
|
num_channels, |
|
num_res_blocks, |
|
channel_mult, |
|
num_heads, |
|
num_head_channels, |
|
num_heads_upsample, |
|
attention_resolutions, |
|
dropout, |
|
diffusion_steps, |
|
noise_schedule, |
|
timestep_respacing, |
|
use_kl, |
|
predict_xstart, |
|
rescale_timesteps, |
|
rescale_learned_sigmas, |
|
use_checkpoint, |
|
use_scale_shift_norm, |
|
resblock_updown, |
|
use_fp16, |
|
use_new_attention_order, |
|
): |
|
model = create_model( |
|
image_size, |
|
num_channels, |
|
num_res_blocks, |
|
channel_mult=channel_mult, |
|
learn_sigma=learn_sigma, |
|
class_cond=class_cond, |
|
use_checkpoint=use_checkpoint, |
|
attention_resolutions=attention_resolutions, |
|
num_heads=num_heads, |
|
num_head_channels=num_head_channels, |
|
num_heads_upsample=num_heads_upsample, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
dropout=dropout, |
|
resblock_updown=resblock_updown, |
|
use_fp16=use_fp16, |
|
use_new_attention_order=use_new_attention_order, |
|
) |
|
diffusion = create_gaussian_diffusion( |
|
steps=diffusion_steps, |
|
learn_sigma=learn_sigma, |
|
noise_schedule=noise_schedule, |
|
use_kl=use_kl, |
|
predict_xstart=predict_xstart, |
|
rescale_timesteps=rescale_timesteps, |
|
rescale_learned_sigmas=rescale_learned_sigmas, |
|
timestep_respacing=timestep_respacing, |
|
) |
|
return model, diffusion |
|
|
|
|
|
def create_model( |
|
image_size, |
|
num_channels, |
|
num_res_blocks, |
|
channel_mult="", |
|
learn_sigma=False, |
|
class_cond=False, |
|
use_checkpoint=False, |
|
attention_resolutions="16", |
|
num_heads=1, |
|
num_head_channels=-1, |
|
num_heads_upsample=-1, |
|
use_scale_shift_norm=False, |
|
dropout=0, |
|
resblock_updown=False, |
|
use_fp16=False, |
|
use_new_attention_order=False, |
|
): |
|
if channel_mult == "": |
|
if image_size == 512: |
|
channel_mult = (0.5, 1, 1, 2, 2, 4, 4) |
|
elif image_size == 256: |
|
channel_mult = (1, 1, 2, 2, 4, 4) |
|
elif image_size == 128: |
|
channel_mult = (1, 1, 2, 3, 4) |
|
elif image_size == 64: |
|
channel_mult = (1, 2, 3, 4) |
|
else: |
|
raise ValueError(f"unsupported image size: {image_size}") |
|
else: |
|
channel_mult = tuple(int(ch_mult) for ch_mult in channel_mult.split(",")) |
|
|
|
attention_ds = [] |
|
for res in attention_resolutions.split(","): |
|
attention_ds.append(image_size // int(res)) |
|
|
|
return UNetModel( |
|
image_size=image_size, |
|
in_channels=3, |
|
model_channels=num_channels, |
|
out_channels=(3 if not learn_sigma else 6), |
|
num_res_blocks=num_res_blocks, |
|
attention_resolutions=tuple(attention_ds), |
|
dropout=dropout, |
|
channel_mult=channel_mult, |
|
num_classes=(NUM_CLASSES if class_cond else None), |
|
use_checkpoint=use_checkpoint, |
|
use_fp16=use_fp16, |
|
num_heads=num_heads, |
|
num_head_channels=num_head_channels, |
|
num_heads_upsample=num_heads_upsample, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
resblock_updown=resblock_updown, |
|
use_new_attention_order=use_new_attention_order, |
|
) |
|
|
|
|
|
def create_classifier_and_diffusion( |
|
image_size, |
|
classifier_use_fp16, |
|
classifier_width, |
|
classifier_depth, |
|
classifier_attention_resolutions, |
|
classifier_use_scale_shift_norm, |
|
classifier_resblock_updown, |
|
classifier_pool, |
|
learn_sigma, |
|
diffusion_steps, |
|
noise_schedule, |
|
timestep_respacing, |
|
use_kl, |
|
predict_xstart, |
|
rescale_timesteps, |
|
rescale_learned_sigmas, |
|
): |
|
classifier = create_classifier( |
|
image_size, |
|
classifier_use_fp16, |
|
classifier_width, |
|
classifier_depth, |
|
classifier_attention_resolutions, |
|
classifier_use_scale_shift_norm, |
|
classifier_resblock_updown, |
|
classifier_pool, |
|
) |
|
diffusion = create_gaussian_diffusion( |
|
steps=diffusion_steps, |
|
learn_sigma=learn_sigma, |
|
noise_schedule=noise_schedule, |
|
use_kl=use_kl, |
|
predict_xstart=predict_xstart, |
|
rescale_timesteps=rescale_timesteps, |
|
rescale_learned_sigmas=rescale_learned_sigmas, |
|
timestep_respacing=timestep_respacing, |
|
) |
|
return classifier, diffusion |
|
|
|
|
|
def create_classifier( |
|
image_size, |
|
classifier_use_fp16, |
|
classifier_width, |
|
classifier_depth, |
|
classifier_attention_resolutions, |
|
classifier_use_scale_shift_norm, |
|
classifier_resblock_updown, |
|
classifier_pool, |
|
): |
|
if image_size == 512: |
|
channel_mult = (0.5, 1, 1, 2, 2, 4, 4) |
|
elif image_size == 256: |
|
channel_mult = (1, 1, 2, 2, 4, 4) |
|
elif image_size == 128: |
|
channel_mult = (1, 1, 2, 3, 4) |
|
elif image_size == 64: |
|
channel_mult = (1, 2, 3, 4) |
|
else: |
|
raise ValueError(f"unsupported image size: {image_size}") |
|
|
|
attention_ds = [] |
|
for res in classifier_attention_resolutions.split(","): |
|
attention_ds.append(image_size // int(res)) |
|
|
|
return EncoderUNetModel( |
|
image_size=image_size, |
|
in_channels=3, |
|
model_channels=classifier_width, |
|
out_channels=1000, |
|
num_res_blocks=classifier_depth, |
|
attention_resolutions=tuple(attention_ds), |
|
channel_mult=channel_mult, |
|
use_fp16=classifier_use_fp16, |
|
num_head_channels=64, |
|
use_scale_shift_norm=classifier_use_scale_shift_norm, |
|
resblock_updown=classifier_resblock_updown, |
|
pool=classifier_pool, |
|
) |
|
|
|
|
|
def sr_model_and_diffusion_defaults(): |
|
res = model_and_diffusion_defaults() |
|
res["large_size"] = 256 |
|
res["small_size"] = 64 |
|
arg_names = inspect.getfullargspec(sr_create_model_and_diffusion)[0] |
|
for k in res.copy().keys(): |
|
if k not in arg_names: |
|
del res[k] |
|
return res |
|
|
|
|
|
def sr_create_model_and_diffusion( |
|
large_size, |
|
small_size, |
|
class_cond, |
|
learn_sigma, |
|
num_channels, |
|
num_res_blocks, |
|
num_heads, |
|
num_head_channels, |
|
num_heads_upsample, |
|
attention_resolutions, |
|
dropout, |
|
diffusion_steps, |
|
noise_schedule, |
|
timestep_respacing, |
|
use_kl, |
|
predict_xstart, |
|
rescale_timesteps, |
|
rescale_learned_sigmas, |
|
use_checkpoint, |
|
use_scale_shift_norm, |
|
resblock_updown, |
|
use_fp16, |
|
): |
|
model = sr_create_model( |
|
large_size, |
|
small_size, |
|
num_channels, |
|
num_res_blocks, |
|
learn_sigma=learn_sigma, |
|
class_cond=class_cond, |
|
use_checkpoint=use_checkpoint, |
|
attention_resolutions=attention_resolutions, |
|
num_heads=num_heads, |
|
num_head_channels=num_head_channels, |
|
num_heads_upsample=num_heads_upsample, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
dropout=dropout, |
|
resblock_updown=resblock_updown, |
|
use_fp16=use_fp16, |
|
) |
|
diffusion = create_gaussian_diffusion( |
|
steps=diffusion_steps, |
|
learn_sigma=learn_sigma, |
|
noise_schedule=noise_schedule, |
|
use_kl=use_kl, |
|
predict_xstart=predict_xstart, |
|
rescale_timesteps=rescale_timesteps, |
|
rescale_learned_sigmas=rescale_learned_sigmas, |
|
timestep_respacing=timestep_respacing, |
|
) |
|
return model, diffusion |
|
|
|
|
|
def sr_create_model( |
|
large_size, |
|
small_size, |
|
num_channels, |
|
num_res_blocks, |
|
learn_sigma, |
|
class_cond, |
|
use_checkpoint, |
|
attention_resolutions, |
|
num_heads, |
|
num_head_channels, |
|
num_heads_upsample, |
|
use_scale_shift_norm, |
|
dropout, |
|
resblock_updown, |
|
use_fp16, |
|
): |
|
_ = small_size |
|
|
|
if large_size == 512: |
|
channel_mult = (1, 1, 2, 2, 4, 4) |
|
elif large_size == 256: |
|
channel_mult = (1, 1, 2, 2, 4, 4) |
|
elif large_size == 64: |
|
channel_mult = (1, 2, 3, 4) |
|
else: |
|
raise ValueError(f"unsupported large size: {large_size}") |
|
|
|
attention_ds = [] |
|
for res in attention_resolutions.split(","): |
|
attention_ds.append(large_size // int(res)) |
|
|
|
return SuperResModel( |
|
image_size=large_size, |
|
in_channels=3, |
|
model_channels=num_channels, |
|
out_channels=(3 if not learn_sigma else 6), |
|
num_res_blocks=num_res_blocks, |
|
attention_resolutions=tuple(attention_ds), |
|
dropout=dropout, |
|
channel_mult=channel_mult, |
|
num_classes=(NUM_CLASSES if class_cond else None), |
|
use_checkpoint=use_checkpoint, |
|
num_heads=num_heads, |
|
num_head_channels=num_head_channels, |
|
num_heads_upsample=num_heads_upsample, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
resblock_updown=resblock_updown, |
|
use_fp16=use_fp16, |
|
) |
|
|
|
|
|
def create_gaussian_diffusion( |
|
*, |
|
steps=1000, |
|
learn_sigma=False, |
|
sigma_small=False, |
|
noise_schedule="linear", |
|
use_kl=False, |
|
predict_xstart=False, |
|
rescale_timesteps=False, |
|
rescale_learned_sigmas=False, |
|
timestep_respacing="", |
|
): |
|
betas = gd.get_named_beta_schedule(noise_schedule, steps) |
|
if use_kl: |
|
loss_type = gd.LossType.RESCALED_KL |
|
elif rescale_learned_sigmas: |
|
loss_type = gd.LossType.RESCALED_MSE |
|
else: |
|
loss_type = gd.LossType.MSE |
|
if not timestep_respacing: |
|
timestep_respacing = [steps] |
|
return SpacedDiffusion( |
|
use_timesteps=space_timesteps(steps, timestep_respacing), |
|
betas=betas, |
|
model_mean_type=( |
|
gd.ModelMeanType.EPSILON if not predict_xstart else gd.ModelMeanType.START_X |
|
), |
|
model_var_type=( |
|
( |
|
gd.ModelVarType.FIXED_LARGE |
|
if not sigma_small |
|
else gd.ModelVarType.FIXED_SMALL |
|
) |
|
if not learn_sigma |
|
else gd.ModelVarType.LEARNED_RANGE |
|
), |
|
loss_type=loss_type, |
|
rescale_timesteps=rescale_timesteps, |
|
) |
|
|
|
|
|
def add_dict_to_argparser(parser, default_dict): |
|
for k, v in default_dict.items(): |
|
v_type = type(v) |
|
if v is None: |
|
v_type = str |
|
elif isinstance(v, bool): |
|
v_type = str2bool |
|
parser.add_argument(f"--{k}", default=v, type=v_type) |
|
|
|
|
|
def args_to_dict(args, keys): |
|
return {k: getattr(args, k) for k in keys} |
|
|
|
|
|
def str2bool(v): |
|
""" |
|
https://stackoverflow.com/questions/15008758/parsing-boolean-values-with-argparse |
|
""" |
|
if isinstance(v, bool): |
|
return v |
|
if v.lower() in ("yes", "true", "t", "y", "1"): |
|
return True |
|
elif v.lower() in ("no", "false", "f", "n", "0"): |
|
return False |
|
else: |
|
raise argparse.ArgumentTypeError("boolean value expected") |
|
|