# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. # Copyright 2022 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import gradio as gr from env import BASE_MODEL_NAME, LORA_WEIGHTS_PATH, PROMPTS examples = [ [ PROMPTS, 'low quality', 7.5, 512, 512, 25, "DPMSolver" ], ] import inspect import os import random import re import time from typing import Callable, List, Optional, Union import numpy as np import paddle import PIL import PIL.Image from packaging import version from paddlenlp.transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizer from ppdiffusers.configuration_utils import FrozenDict from ppdiffusers.models import AutoencoderKL, UNet2DConditionModel from ppdiffusers.pipeline_utils import DiffusionPipeline from ppdiffusers.schedulers import ( DDIMScheduler, DPMSolverMultistepScheduler, EulerAncestralDiscreteScheduler, EulerDiscreteScheduler, LMSDiscreteScheduler, PNDMScheduler, HeunDiscreteScheduler, KDPM2AncestralDiscreteScheduler, KDPM2DiscreteScheduler, ) from ppdiffusers.utils import PIL_INTERPOLATION, deprecate, logging from ppdiffusers.utils.testing_utils import load_image from ppdiffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput from ppdiffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker logger = logging.get_logger(__name__) # pylint: disable=invalid-name def save_all(images, FORMAT="jpg", OUTDIR="./outputs/"): if not isinstance(images, (list, tuple)): images = [images] for image in images: PRECISION = "fp32" argument = image.argument os.makedirs(OUTDIR, exist_ok=True) epoch_time = argument["epoch_time"] PROMPT = argument["prompt"] NEGPROMPT = argument["negative_prompt"] HEIGHT = argument["height"] WIDTH = argument["width"] SEED = argument["seed"] STRENGTH = argument.get("strength", 1) INFERENCE_STEPS = argument["num_inference_steps"] GUIDANCE_SCALE = argument["guidance_scale"] filename = f"{str(epoch_time)}_scale_{GUIDANCE_SCALE}_steps_{INFERENCE_STEPS}_seed_{SEED}.{FORMAT}" filedir = f"{OUTDIR}/{filename}" image.save(filedir) with open(f"{OUTDIR}/{epoch_time}_prompt.txt", "w") as file: file.write( f"PROMPT: {PROMPT}\nNEG_PROMPT: {NEGPROMPT}\n\nINFERENCE_STEPS: {INFERENCE_STEPS}\nHeight: {HEIGHT}\nWidth: {WIDTH}\nSeed: {SEED}\n\nPrecision: {PRECISION}\nSTRENGTH: {STRENGTH}\nGUIDANCE_SCALE: {GUIDANCE_SCALE}" ) re_attention = re.compile( r""" \\\(| \\\)| \\\[| \\]| \\\\| \\| \(| \[| :([+-]?[.\d]+)\)| \)| ]| [^\\()\[\]:]+| : """, re.X, ) def parse_prompt_attention(text): """ Parses a string with attention tokens and returns a list of pairs: text and its associated weight. Accepted tokens are: (abc) - increases attention to abc by a multiplier of 1.1 (abc:3.12) - increases attention to abc by a multiplier of 3.12 [abc] - decreases attention to abc by a multiplier of 1.1 \( - literal character '(' \[ - literal character '[' \) - literal character ')' \] - literal character ']' \\ - literal character '\' anything else - just text >>> parse_prompt_attention('normal text') [['normal text', 1.0]] >>> parse_prompt_attention('an (important) word') [['an ', 1.0], ['important', 1.1], [' word', 1.0]] >>> parse_prompt_attention('(unbalanced') [['unbalanced', 1.1]] >>> parse_prompt_attention('\(literal\]') [['(literal]', 1.0]] >>> parse_prompt_attention('(unnecessary)(parens)') [['unnecessaryparens', 1.1]] >>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).') [['a ', 1.0], ['house', 1.5730000000000004], [' ', 1.1], ['on', 1.0], [' a ', 1.1], ['hill', 0.55], [', sun, ', 1.1], ['sky', 1.4641000000000006], ['.', 1.1]] """ res = [] round_brackets = [] square_brackets = [] round_bracket_multiplier = 1.1 square_bracket_multiplier = 1 / 1.1 def multiply_range(start_position, multiplier): for p in range(start_position, len(res)): res[p][1] *= multiplier for m in re_attention.finditer(text): text = m.group(0) weight = m.group(1) if text.startswith("\\"): res.append([text[1:], 1.0]) elif text == "(": round_brackets.append(len(res)) elif text == "[": square_brackets.append(len(res)) elif weight is not None and len(round_brackets) > 0: multiply_range(round_brackets.pop(), float(weight)) elif text == ")" and len(round_brackets) > 0: multiply_range(round_brackets.pop(), round_bracket_multiplier) elif text == "]" and len(square_brackets) > 0: multiply_range(square_brackets.pop(), square_bracket_multiplier) else: res.append([text, 1.0]) for pos in round_brackets: multiply_range(pos, round_bracket_multiplier) for pos in square_brackets: multiply_range(pos, square_bracket_multiplier) if len(res) == 0: res = [["", 1.0]] # merge runs of identical weights i = 0 while i + 1 < len(res): if res[i][1] == res[i + 1][1]: res[i][0] += res[i + 1][0] res.pop(i + 1) else: i += 1 return res def get_prompts_with_weights(pipe: DiffusionPipeline, prompt: List[str], max_length: int): r""" Tokenize a list of prompts and return its tokens with weights of each token. No padding, starting or ending token is included. """ tokens = [] weights = [] for text in prompt: texts_and_weights = parse_prompt_attention(text) text_token = [] text_weight = [] for word, weight in texts_and_weights: # tokenize and discard the starting and the ending token token = pipe.tokenizer(word).input_ids[1:-1] text_token += token # copy the weight by length of token text_weight += [weight] * len(token) # stop if the text is too long (longer than truncation limit) if len(text_token) > max_length: break # truncate if len(text_token) > max_length: text_token = text_token[:max_length] text_weight = text_weight[:max_length] tokens.append(text_token) weights.append(text_weight) return tokens, weights def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, pad, no_boseos_middle=True, chunk_length=77): r""" Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length. """ max_embeddings_multiples = (max_length - 2) // (chunk_length - 2) weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length for i in range(len(tokens)): tokens[i] = [bos] + tokens[i] + [eos] + [pad] * (max_length - 2 - len(tokens[i])) if no_boseos_middle: weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i])) else: w = [] if len(weights[i]) == 0: w = [1.0] * weights_length else: for j in range((len(weights[i]) - 1) // chunk_length + 1): w.append(1.0) # weight for starting token in this chunk w += weights[i][j * chunk_length : min(len(weights[i]), (j + 1) * chunk_length)] w.append(1.0) # weight for ending token in this chunk w += [1.0] * (weights_length - len(w)) weights[i] = w[:] return tokens, weights def get_unweighted_text_embeddings( pipe: DiffusionPipeline, text_input: paddle.Tensor, chunk_length: int, no_boseos_middle: Optional[bool] = True ): """ When the length of tokens is a multiple of the capacity of the text encoder, it should be split into chunks and sent to the text encoder individually. """ max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2) if max_embeddings_multiples > 1: text_embeddings = [] for i in range(max_embeddings_multiples): # extract the i-th chunk text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone() # cover the head and the tail by the starting and the ending tokens text_input_chunk[:, 0] = text_input[0, 0] text_input_chunk[:, -1] = text_input[0, -1] text_embedding = pipe.text_encoder(text_input_chunk)[0] if no_boseos_middle: if i == 0: # discard the ending token text_embedding = text_embedding[:, :-1] elif i == max_embeddings_multiples - 1: # discard the starting token text_embedding = text_embedding[:, 1:] else: # discard both starting and ending tokens text_embedding = text_embedding[:, 1:-1] text_embeddings.append(text_embedding) text_embeddings = paddle.concat(text_embeddings, axis=1) else: text_embeddings = pipe.text_encoder(text_input)[0] return text_embeddings def get_weighted_text_embeddings( pipe: DiffusionPipeline, prompt: Union[str, List[str]], uncond_prompt: Optional[Union[str, List[str]]] = None, max_embeddings_multiples: Optional[int] = 1, no_boseos_middle: Optional[bool] = False, skip_parsing: Optional[bool] = False, skip_weighting: Optional[bool] = False, **kwargs ): r""" Prompts can be assigned with local weights using brackets. For example, prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful', and the embedding tokens corresponding to the words get multiplied by a constant, 1.1. Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean. Args: pipe (`DiffusionPipeline`): Pipe to provide access to the tokenizer and the text encoder. prompt (`str` or `List[str]`): The prompt or prompts to guide the image generation. uncond_prompt (`str` or `List[str]`): The unconditional prompt or prompts for guide the image generation. If unconditional prompt is provided, the embeddings of prompt and uncond_prompt are concatenated. max_embeddings_multiples (`int`, *optional*, defaults to `1`): The max multiple length of prompt embeddings compared to the max output length of text encoder. no_boseos_middle (`bool`, *optional*, defaults to `False`): If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and ending token in each of the chunk in the middle. skip_parsing (`bool`, *optional*, defaults to `False`): Skip the parsing of brackets. skip_weighting (`bool`, *optional*, defaults to `False`): Skip the weighting. When the parsing is skipped, it is forced True. """ max_length = (pipe.tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 if isinstance(prompt, str): prompt = [prompt] if not skip_parsing: prompt_tokens, prompt_weights = get_prompts_with_weights(pipe, prompt, max_length - 2) if uncond_prompt is not None: if isinstance(uncond_prompt, str): uncond_prompt = [uncond_prompt] uncond_tokens, uncond_weights = get_prompts_with_weights(pipe, uncond_prompt, max_length - 2) else: prompt_tokens = [ token[1:-1] for token in pipe.tokenizer(prompt, max_length=max_length, truncation=True).input_ids ] prompt_weights = [[1.0] * len(token) for token in prompt_tokens] if uncond_prompt is not None: if isinstance(uncond_prompt, str): uncond_prompt = [uncond_prompt] uncond_tokens = [ token[1:-1] for token in pipe.tokenizer(uncond_prompt, max_length=max_length, truncation=True).input_ids ] uncond_weights = [[1.0] * len(token) for token in uncond_tokens] # round up the longest length of tokens to a multiple of (model_max_length - 2) max_length = max([len(token) for token in prompt_tokens]) if uncond_prompt is not None: max_length = max(max_length, max([len(token) for token in uncond_tokens])) max_embeddings_multiples = min( max_embeddings_multiples, (max_length - 1) // (pipe.tokenizer.model_max_length - 2) + 1 ) max_embeddings_multiples = max(1, max_embeddings_multiples) max_length = (pipe.tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 # pad the length of tokens and weights # support bert tokenizer bos = pipe.tokenizer.bos_token_id if pipe.tokenizer.bos_token_id is not None else pipe.tokenizer.cls_token_id eos = pipe.tokenizer.eos_token_id if pipe.tokenizer.eos_token_id is not None else pipe.tokenizer.sep_token_id pad = pipe.tokenizer.pad_token_id prompt_tokens, prompt_weights = pad_tokens_and_weights( prompt_tokens, prompt_weights, max_length, bos, eos, pad, no_boseos_middle=no_boseos_middle, chunk_length=pipe.tokenizer.model_max_length, ) prompt_tokens = paddle.to_tensor(prompt_tokens) if uncond_prompt is not None: uncond_tokens, uncond_weights = pad_tokens_and_weights( uncond_tokens, uncond_weights, max_length, bos, eos, pad, no_boseos_middle=no_boseos_middle, chunk_length=pipe.tokenizer.model_max_length, ) uncond_tokens = paddle.to_tensor(uncond_tokens) # get the embeddings text_embeddings = get_unweighted_text_embeddings( pipe, prompt_tokens, pipe.tokenizer.model_max_length, no_boseos_middle=no_boseos_middle ) prompt_weights = paddle.to_tensor(prompt_weights, dtype=text_embeddings.dtype) if uncond_prompt is not None: uncond_embeddings = get_unweighted_text_embeddings( pipe, uncond_tokens, pipe.tokenizer.model_max_length, no_boseos_middle=no_boseos_middle ) uncond_weights = paddle.to_tensor(uncond_weights, dtype=uncond_embeddings.dtype) # assign weights to the prompts and normalize in the sense of mean # TODO: should we normalize by chunk or in a whole (current implementation)? if (not skip_parsing) and (not skip_weighting): previous_mean = text_embeddings.mean(axis=[-2, -1]) text_embeddings *= prompt_weights.unsqueeze(-1) text_embeddings *= previous_mean / text_embeddings.mean(axis=[-2, -1]) if uncond_prompt is not None: previous_mean = uncond_embeddings.mean(axis=[-2, -1]) uncond_embeddings *= uncond_weights.unsqueeze(-1) uncond_embeddings *= previous_mean / uncond_embeddings.mean(axis=[-2, -1]) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if uncond_prompt is not None: text_embeddings = paddle.concat([uncond_embeddings, text_embeddings]) return text_embeddings def preprocess_image(image): w, h = image.size w, h = map(lambda x: x - x % 32, (w, h)) # resize to integer multiple of 32 image = image.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]) image = np.array(image).astype(np.float32) / 255.0 image = image[None].transpose(0, 3, 1, 2) image = paddle.to_tensor(image) return 2.0 * image - 1.0 def preprocess_mask(mask): mask = mask.convert("L") w, h = mask.size w, h = map(lambda x: x - x % 32, (w, h)) # resize to integer multiple of 32 mask = mask.resize((w // 8, h // 8), resample=PIL_INTERPOLATION["nearest"]) mask = np.array(mask).astype(np.float32) / 255.0 mask = np.tile(mask, (4, 1, 1)) mask = mask[None].transpose(0, 1, 2, 3) # what does this step do? mask = 1 - mask # repaint white, keep black mask = paddle.to_tensor(mask) return mask class StableDiffusionPipelineAllinOne(DiffusionPipeline): r""" Pipeline for text-to-image image-to-image inpainting generation using Stable Diffusion. This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the library implements for all the pipelines (such as downloading or saving, running on a particular xxxx, etc.) Args: vae ([`AutoencoderKL`]): Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. text_encoder ([`CLIPTextModel`]): Frozen text-encoder. Stable Diffusion uses the text portion of [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant. tokenizer (`CLIPTokenizer`): Tokenizer of class [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents. scheduler ([`SchedulerMixin`]): A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of [`DDIMScheduler`], [`LMSDiscreteScheduler`], [`PNDMScheduler`], [`EulerDiscreteScheduler`], [`EulerAncestralDiscreteScheduler`] or [`DPMSolverMultistepScheduler`]. safety_checker ([`StableDiffusionSafetyChecker`]): Classification module that estimates whether generated images could be considered offensive or harmful. Please, refer to the [model card](https://huggingface.co/junnyu/stable-diffusion-v1-4-paddle) for details. feature_extractor ([`CLIPFeatureExtractor`]): Model that extracts features from generated images to be used as inputs for the `safety_checker`. """ _optional_components = ["safety_checker", "feature_extractor"] def __init__( self, vae: AutoencoderKL, text_encoder: CLIPTextModel, tokenizer: CLIPTokenizer, unet: UNet2DConditionModel, scheduler: Union[ DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler, EulerDiscreteScheduler, EulerAncestralDiscreteScheduler, DPMSolverMultistepScheduler, ], safety_checker: StableDiffusionSafetyChecker, feature_extractor: CLIPFeatureExtractor, requires_safety_checker: bool = False, ): if hasattr(scheduler.config, "steps_offset") and scheduler.config.steps_offset != 1: deprecation_message = ( f"The configuration file of this scheduler: {scheduler} is outdated. `steps_offset`" f" should be set to 1 instead of {scheduler.config.steps_offset}. Please make sure " "to update the config accordingly as leaving `steps_offset` might led to incorrect results" " in future versions. If you have downloaded this checkpoint from the Hugging Face Hub," " it would be very nice if you could open a Pull request for the `scheduler/scheduler_config.json`" " file" ) deprecate("steps_offset!=1", "1.0.0", deprecation_message, standard_warn=False) new_config = dict(scheduler.config) new_config["steps_offset"] = 1 scheduler._internal_dict = FrozenDict(new_config) if hasattr(scheduler.config, "clip_sample") and scheduler.config.clip_sample is True: deprecation_message = ( f"The configuration file of this scheduler: {scheduler} has not set the configuration `clip_sample`." " `clip_sample` should be set to False in the configuration file. Please make sure to update the" " config accordingly as not setting `clip_sample` in the config might lead to incorrect results in" " future versions. If you have downloaded this checkpoint from the Hugging Face Hub, it would be very" " nice if you could open a Pull request for the `scheduler/scheduler_config.json` file" ) deprecate("clip_sample not set", "1.0.0", deprecation_message, standard_warn=False) new_config = dict(scheduler.config) new_config["clip_sample"] = False scheduler._internal_dict = FrozenDict(new_config) if safety_checker is None and requires_safety_checker: logger.warning( f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure" " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered" " results in services or applications open to the public. PaddleNLP team, diffusers team and Hugging Face" " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling" " it only for use-cases that involve analyzing network behavior or auditing its results. For more" " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ." ) if safety_checker is not None and feature_extractor is None: raise ValueError( "Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety" " checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead." ) is_unet_version_less_0_9_0 = hasattr(unet.config, "_ppdiffusers_version") and version.parse( version.parse(unet.config._ppdiffusers_version).base_version ) < version.parse("0.9.0.dev0") is_unet_sample_size_less_64 = hasattr(unet.config, "sample_size") and unet.config.sample_size < 64 if is_unet_version_less_0_9_0 and is_unet_sample_size_less_64: deprecation_message = ( "The configuration file of the unet has set the default `sample_size` to smaller than" " 64 which seems highly unlikely .If you're checkpoint is a fine-tuned version of any of the" " following: \n- CompVis/stable-diffusion-v1-4 \n- CompVis/stable-diffusion-v1-3 \n-" " CompVis/stable-diffusion-v1-2 \n- CompVis/stable-diffusion-v1-1 \n- runwayml/stable-diffusion-v1-5" " \n- runwayml/stable-diffusion-inpainting \n you should change 'sample_size' to 64 in the" " configuration file. Please make sure to update the config accordingly as leaving `sample_size=32`" " in the config might lead to incorrect results in future versions. If you have downloaded this" " checkpoint from the Hugging Face Hub, it would be very nice if you could open a Pull request for" " the `unet/config.json` file" ) deprecate("sample_size<64", "1.0.0", deprecation_message, standard_warn=False) new_config = dict(unet.config) new_config["sample_size"] = 64 unet._internal_dict = FrozenDict(new_config) self.register_modules( vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, scheduler=scheduler, safety_checker=safety_checker, feature_extractor=feature_extractor, ) self.register_to_config(requires_safety_checker=requires_safety_checker) def create_scheduler(self, name="DPMSolver"): config = self.scheduler.config if name == "DPMSolver": return DPMSolverMultistepScheduler.from_config( config, thresholding=False, algorithm_type="dpmsolver++", solver_type="midpoint", lower_order_final=True, ) if name == "EulerDiscrete": return EulerDiscreteScheduler.from_config(config) elif name == "EulerAncestralDiscrete": return EulerAncestralDiscreteScheduler.from_config(config) elif name == "PNDM": return PNDMScheduler.from_config(config) elif name == "DDIM": return DDIMScheduler.from_config(config) elif name == "LMSDiscrete": return LMSDiscreteScheduler.from_config(config) elif name == "HeunDiscrete": return HeunDiscreteScheduler.from_config(config) elif name == "KDPM2AncestralDiscrete": return KDPM2AncestralDiscreteScheduler.from_config(config) elif name == "KDPM2Discrete": return KDPM2DiscreteScheduler.from_config(config) else: raise NotImplementedError def enable_attention_slicing(self, slice_size: Optional[Union[str, int]] = "auto"): r""" Enable sliced attention computation. When this option is enabled, the attention module will split the input tensor in slices, to compute attention in several steps. This is useful to save some memory in exchange for a small speed decrease. Args: slice_size (`str` or `int`, *optional*, defaults to `"auto"`): When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If a number is provided, uses as many slices as `attention_head_dim // slice_size`. In this case, `attention_head_dim` must be a multiple of `slice_size`. """ if slice_size == "auto": if isinstance(self.unet.config.attention_head_dim, int): # half the attention head size is usually a good trade-off between # speed and memory slice_size = self.unet.config.attention_head_dim // 2 else: # if `attention_head_dim` is a list, take the smallest head size slice_size = min(self.unet.config.attention_head_dim) self.unet.set_attention_slice(slice_size) def disable_attention_slicing(self): r""" Disable sliced attention computation. If `enable_attention_slicing` was previously invoked, this method will go back to computing attention in one step. """ # set slice_size = `None` to disable `attention slicing` self.enable_attention_slicing(None) def __call__(self, *args, **kwargs): return self.text2image(*args, **kwargs) def text2img(self, *args, **kwargs): return self.text2image(*args, **kwargs) def _encode_prompt( self, prompt, negative_prompt, max_embeddings_multiples, no_boseos_middle, skip_parsing, skip_weighting, do_classifier_free_guidance, num_images_per_prompt, ): if do_classifier_free_guidance and negative_prompt is None: negative_prompt = "" text_embeddings = get_weighted_text_embeddings( self, prompt, negative_prompt, max_embeddings_multiples, no_boseos_middle, skip_parsing, skip_weighting ) bs_embed, seq_len, _ = text_embeddings.shape text_embeddings = text_embeddings.tile([1, num_images_per_prompt, 1]) text_embeddings = text_embeddings.reshape([bs_embed * num_images_per_prompt, seq_len, -1]) return text_embeddings def run_safety_checker(self, image, dtype): if self.safety_checker is not None: safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pd") image, has_nsfw_concept = self.safety_checker( images=image, clip_input=safety_checker_input.pixel_values.cast(dtype) ) else: has_nsfw_concept = None return image, has_nsfw_concept def decode_latents(self, latents): latents = 1 / 0.18215 * latents image = self.vae.decode(latents).sample image = (image / 2 + 0.5).clip(0, 1) # we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16 image = image.transpose([0, 2, 3, 1]).cast("float32").numpy() return image def prepare_extra_step_kwargs(self, eta, scheduler): # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502 # and should be between [0, 1] accepts_eta = "eta" in set(inspect.signature(scheduler.step).parameters.keys()) extra_step_kwargs = {} if accepts_eta: extra_step_kwargs["eta"] = eta return extra_step_kwargs def check_inputs_text2img(self, prompt, height, width, callback_steps): if not isinstance(prompt, str) and not isinstance(prompt, list): raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}") if height % 8 != 0 or width % 8 != 0: raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.") if (callback_steps is None) or ( callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0) ): raise ValueError( f"`callback_steps` has to be a positive integer but is {callback_steps} of type" f" {type(callback_steps)}." ) def check_inputs_img2img_inpaint(self, prompt, strength, callback_steps): if not isinstance(prompt, str) and not isinstance(prompt, list): raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}") if strength < 0 or strength > 1: raise ValueError(f"The value of strength should in [1.0, 1.0] but is {strength}") if (callback_steps is None) or ( callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0) ): raise ValueError( f"`callback_steps` has to be a positive integer but is {callback_steps} of type" f" {type(callback_steps)}." ) def prepare_latents_text2img(self, batch_size, num_channels_latents, height, width, dtype, latents=None, scheduler=None): shape = [batch_size, num_channels_latents, height // 8, width // 8] if latents is None: latents = paddle.randn(shape, dtype=dtype) else: if latents.shape != shape: raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}") # scale the initial noise by the standard deviation required by the scheduler latents = latents * scheduler.init_noise_sigma return latents def prepare_latents_img2img(self, image, timestep, num_images_per_prompt, dtype, scheduler): image = image.cast(dtype=dtype) init_latent_dist = self.vae.encode(image).latent_dist init_latents = init_latent_dist.sample() init_latents = 0.18215 * init_latents b, c, h, w = init_latents.shape init_latents = init_latents.tile([1, num_images_per_prompt, 1, 1]) init_latents = init_latents.reshape([b * num_images_per_prompt, c, h, w]) # add noise to latents using the timesteps noise = paddle.randn(init_latents.shape, dtype=dtype) # get latents init_latents = scheduler.add_noise(init_latents, noise, timestep) latents = init_latents return latents def get_timesteps(self, num_inference_steps, strength, scheduler): # get the original timestep using init_timestep offset = scheduler.config.get("steps_offset", 0) init_timestep = int(num_inference_steps * strength) + offset init_timestep = min(init_timestep, num_inference_steps) t_start = max(num_inference_steps - init_timestep + offset, 0) timesteps = scheduler.timesteps[t_start:] return timesteps, num_inference_steps - t_start def prepare_latents_inpaint(self, image, timestep, num_images_per_prompt, dtype, scheduler): image = image.cast(dtype) init_latent_dist = self.vae.encode(image).latent_dist init_latents = init_latent_dist.sample() init_latents = 0.18215 * init_latents b, c, h, w = init_latents.shape init_latents = init_latents.tile([1, num_images_per_prompt, 1, 1]) init_latents = init_latents.reshape([b * num_images_per_prompt, c, h, w]) init_latents_orig = init_latents # add noise to latents using the timesteps noise = paddle.randn(init_latents.shape, dtype=dtype) init_latents = scheduler.add_noise(init_latents, noise, timestep) latents = init_latents return latents, init_latents_orig, noise @paddle.no_grad() def text2image( self, prompt: Union[str, List[str]], height: int = 512, width: int = 512, num_inference_steps: int = 50, guidance_scale: float = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: float = 0.0, seed: Optional[int] = None, latents: Optional[paddle.Tensor] = None, output_type: Optional[str] = "pil", return_dict: bool = True, callback: Optional[Callable[[int, int, paddle.Tensor], None]] = None, callback_steps: Optional[int] = 1, # new add max_embeddings_multiples: Optional[int] = 1, no_boseos_middle: Optional[bool] = False, skip_parsing: Optional[bool] = False, skip_weighting: Optional[bool] = False, scheduler=None, **kwargs, ): r""" Function invoked when calling the pipeline for generation. Args: prompt (`str` or `List[str]`): The prompt or prompts to guide the image generation. height (`int`, *optional*, defaults to 512): The height in pixels of the generated image. width (`int`, *optional*, defaults to 512): The width in pixels of the generated image. num_inference_steps (`int`, *optional*, defaults to 50): The number of denoising steps. More denoising steps usually lead to a higher quality image at the expense of slower inference. guidance_scale (`float`, *optional*, defaults to 7.5): Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). `guidance_scale` is defined as `w` of equation 2. of [Imagen Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > 1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, usually at the expense of lower image quality. negative_prompt (`str` or `List[str]`, *optional*): The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). num_images_per_prompt (`int`, *optional*, defaults to 1): The number of images to generate per prompt. eta (`float`, *optional*, defaults to 0.0): Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to [`schedulers.DDIMScheduler`], will be ignored for others. seed (`int`, *optional*): Random number seed. latents (`paddle.Tensor`, *optional*): Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image generation. Can be used to tweak the same generation with different prompts. If not provided, a latents tensor will ge generated by sampling using the supplied random `seed`. output_type (`str`, *optional*, defaults to `"pil"`): The output format of the generate image. Choose between [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. return_dict (`bool`, *optional*, defaults to `True`): Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a plain tuple. callback (`Callable`, *optional*): A function that will be called every `callback_steps` steps during inference. The function will be called with the following arguments: `callback(step: int, timestep: int, latents: paddle.Tensor)`. callback_steps (`int`, *optional*, defaults to 1): The frequency at which the `callback` function will be called. If not specified, the callback will be called at every step. Returns: [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. When returning a tuple, the first element is a list with the generated images, and the second element is a list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" (nsfw) content, according to the `safety_checker`. """ if scheduler is None: scheduler = self.scheduler seed = random.randint(0, 2**32) if seed is None else seed argument = dict( prompt=prompt, negative_prompt=negative_prompt, height=height, width=width, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, num_images_per_prompt=num_images_per_prompt, eta=eta, seed=seed, latents=latents, max_embeddings_multiples=max_embeddings_multiples, no_boseos_middle=no_boseos_middle, skip_parsing=skip_parsing, skip_weighting=skip_weighting, epoch_time=time.time(), ) paddle.seed(seed) # 1. Check inputs. Raise error if not correct self.check_inputs_text2img(prompt, height, width, callback_steps) # 2. Define call parameters batch_size = 1 if isinstance(prompt, str) else len(prompt) # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # corresponds to doing no classifier free guidance. do_classifier_free_guidance = guidance_scale > 1.0 # 3. Encode input prompt text_embeddings = self._encode_prompt( prompt, negative_prompt, max_embeddings_multiples, no_boseos_middle, skip_parsing, skip_weighting, do_classifier_free_guidance, num_images_per_prompt, ) # 4. Prepare timesteps scheduler.set_timesteps(num_inference_steps) timesteps = scheduler.timesteps # 5. Prepare latent variables num_channels_latents = self.unet.in_channels latents = self.prepare_latents_text2img( batch_size * num_images_per_prompt, num_channels_latents, height, width, text_embeddings.dtype, latents, scheduler=scheduler, ) # 6. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(eta, scheduler) # 7. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * scheduler.order with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t in enumerate(timesteps): # expand the latents if we are doing classifier free guidance latent_model_input = paddle.concat([latents] * 2) if do_classifier_free_guidance else latents latent_model_input = scheduler.scale_model_input(latent_model_input, t) # predict the noise residual noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample # perform guidance if do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # compute the previous noisy sample x_t -> x_t-1 latents = scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: callback(progress_bar.n, progress_bar.total, progress_bar) # 8. Post-processing image = self.decode_latents(latents) # 9. Run safety checker image, has_nsfw_concept = self.run_safety_checker(image, text_embeddings.dtype) # 10. Convert to PIL if output_type == "pil": image = self.numpy_to_pil(image, argument=argument) if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @paddle.no_grad() def img2img( self, prompt: Union[str, List[str]], image: Union[paddle.Tensor, PIL.Image.Image], strength: float = 0.8, height=None, width=None, num_inference_steps: Optional[int] = 50, guidance_scale: Optional[float] = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: Optional[float] = 0.0, seed: Optional[int] = None, output_type: Optional[str] = "pil", return_dict: bool = True, callback: Optional[Callable[[int, int, paddle.Tensor], None]] = None, callback_steps: Optional[int] = 1, # new add max_embeddings_multiples: Optional[int] = 1, no_boseos_middle: Optional[bool] = False, skip_parsing: Optional[bool] = False, skip_weighting: Optional[bool] = False, scheduler=None, **kwargs, ): r""" Function invoked when calling the pipeline for generation. Args: prompt (`str` or `List[str]`): The prompt or prompts to guide the image generation. image (`paddle.Tensor` or `PIL.Image.Image`): `Image`, or tensor representing an image batch, that will be used as the starting point for the process. strength (`float`, *optional*, defaults to 0.8): Conceptually, indicates how much to transform the reference `image`. Must be between 0 and 1. `image` will be used as a starting point, adding more noise to it the larger the `strength`. The number of denoising steps depends on the amount of noise initially added. When `strength` is 1, added noise will be maximum and the denoising process will run for the full number of iterations specified in `num_inference_steps`. A value of 1, therefore, essentially ignores `image`. num_inference_steps (`int`, *optional*, defaults to 50): The number of denoising steps. More denoising steps usually lead to a higher quality image at the expense of slower inference. This parameter will be modulated by `strength`. guidance_scale (`float`, *optional*, defaults to 7.5): Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). `guidance_scale` is defined as `w` of equation 2. of [Imagen Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > 1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, usually at the expense of lower image quality. negative_prompt (`str` or `List[str]`, *optional*): The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). num_images_per_prompt (`int`, *optional*, defaults to 1): The number of images to generate per prompt. eta (`float`, *optional*, defaults to 0.0): Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to [`schedulers.DDIMScheduler`], will be ignored for others. seed (`int`, *optional*): A random seed. output_type (`str`, *optional*, defaults to `"pil"`): The output format of the generate image. Choose between [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. return_dict (`bool`, *optional*, defaults to `True`): Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a plain tuple. callback (`Callable`, *optional*): A function that will be called every `callback_steps` steps during inference. The function will be called with the following arguments: `callback(step: int, timestep: int, latents: paddle.Tensor)`. callback_steps (`int`, *optional*, defaults to 1): The frequency at which the `callback` function will be called. If not specified, the callback will be called at every step. Returns: [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. When returning a tuple, the first element is a list with the generated images, and the second element is a list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" (nsfw) content, according to the `safety_checker`. """ if scheduler is None: scheduler = self.scheduler seed = random.randint(0, 2**32) if seed is None else seed image_str = image if isinstance(image_str, str): image = load_image(image_str) if height is None and width is None: width = (image.size[0] // 8) * 8 height = (image.size[1] // 8) * 8 elif height is None and width is not None: height = (image.size[1] // 8) * 8 elif width is None and height is not None: width = (image.size[0] // 8) * 8 else: height = height width = width argument = dict( prompt=prompt, image=image_str, negative_prompt=negative_prompt, height=height, width=width, strength=strength, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, num_images_per_prompt=num_images_per_prompt, eta=eta, seed=seed, max_embeddings_multiples=max_embeddings_multiples, no_boseos_middle=no_boseos_middle, skip_parsing=skip_parsing, skip_weighting=skip_weighting, epoch_time=time.time(), ) paddle.seed(seed) # 1. Check inputs self.check_inputs_img2img_inpaint(prompt, strength, callback_steps) # 2. Define call parameters batch_size = 1 if isinstance(prompt, str) else len(prompt) # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # corresponds to doing no classifier free guidance. do_classifier_free_guidance = guidance_scale > 1.0 # 3. Encode input prompt text_embeddings = self._encode_prompt( prompt, negative_prompt, max_embeddings_multiples, no_boseos_middle, skip_parsing, skip_weighting, do_classifier_free_guidance, num_images_per_prompt, ) # 4. Preprocess image if isinstance(image, PIL.Image.Image): image = image.resize((width, height)) image = preprocess_image(image) # 5. set timesteps scheduler.set_timesteps(num_inference_steps) timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler) latent_timestep = timesteps[:1].tile([batch_size * num_images_per_prompt]) # 6. Prepare latent variables latents = self.prepare_latents_img2img(image, latent_timestep, num_images_per_prompt, text_embeddings.dtype, scheduler) # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(eta, scheduler) # 8. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * scheduler.order with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t in enumerate(timesteps): # expand the latents if we are doing classifier free guidance latent_model_input = paddle.concat([latents] * 2) if do_classifier_free_guidance else latents latent_model_input = scheduler.scale_model_input(latent_model_input, t) # predict the noise residual noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample # perform guidance if do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # compute the previous noisy sample x_t -> x_t-1 latents = scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: callback(progress_bar.n, progress_bar.total, progress_bar) # 9. Post-processing image = self.decode_latents(latents) # 10. Run safety checker image, has_nsfw_concept = self.run_safety_checker(image, text_embeddings.dtype) # 11. Convert to PIL if output_type == "pil": image = self.numpy_to_pil(image, argument=argument) if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @paddle.no_grad() def inpaint( self, prompt: Union[str, List[str]], image: Union[paddle.Tensor, PIL.Image.Image], mask_image: Union[paddle.Tensor, PIL.Image.Image], height=None, width=None, strength: float = 0.8, num_inference_steps: Optional[int] = 50, guidance_scale: Optional[float] = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: Optional[float] = 0.0, seed: Optional[int] = None, output_type: Optional[str] = "pil", return_dict: bool = True, callback: Optional[Callable[[int, int, paddle.Tensor], None]] = None, callback_steps: Optional[int] = 1, # new add max_embeddings_multiples: Optional[int] = 1, no_boseos_middle: Optional[bool] = False, skip_parsing: Optional[bool] = False, skip_weighting: Optional[bool] = False, scheduler=None, **kwargs, ): r""" Function invoked when calling the pipeline for generation. Args: prompt (`str` or `List[str]`): The prompt or prompts to guide the image generation. image (`paddle.Tensor` or `PIL.Image.Image`): `Image`, or tensor representing an image batch, that will be used as the starting point for the process. This is the image whose masked region will be inpainted. mask_image (`paddle.Tensor` or `PIL.Image.Image`): `Image`, or tensor representing an image batch, to mask `image`. White pixels in the mask will be replaced by noise and therefore repainted, while black pixels will be preserved. If `mask_image` is a PIL image, it will be converted to a single channel (luminance) before use. If it's a tensor, it should contain one color channel (L) instead of 3, so the expected shape would be `(B, H, W, 1)`. strength (`float`, *optional*, defaults to 0.8): Conceptually, indicates how much to inpaint the masked area. Must be between 0 and 1. When `strength` is 1, the denoising process will be run on the masked area for the full number of iterations specified in `num_inference_steps`. `image` will be used as a reference for the masked area, adding more noise to that region the larger the `strength`. If `strength` is 0, no inpainting will occur. num_inference_steps (`int`, *optional*, defaults to 50): The reference number of denoising steps. More denoising steps usually lead to a higher quality image at the expense of slower inference. This parameter will be modulated by `strength`, as explained above. guidance_scale (`float`, *optional*, defaults to 7.5): Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). `guidance_scale` is defined as `w` of equation 2. of [Imagen Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > 1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, usually at the expense of lower image quality. negative_prompt (`str` or `List[str]`, *optional*): The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). num_images_per_prompt (`int`, *optional*, defaults to 1): The number of images to generate per prompt. eta (`float`, *optional*, defaults to 0.0): Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to [`schedulers.DDIMScheduler`], will be ignored for others. seed (`int`, *optional*): A random seed. output_type (`str`, *optional*, defaults to `"pil"`): The output format of the generate image. Choose between [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. return_dict (`bool`, *optional*, defaults to `True`): Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a plain tuple. callback (`Callable`, *optional*): A function that will be called every `callback_steps` steps during inference. The function will be called with the following arguments: `callback(step: int, timestep: int, latents: paddle.Tensor)`. callback_steps (`int`, *optional*, defaults to 1): The frequency at which the `callback` function will be called. If not specified, the callback will be called at every step. Returns: [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. When returning a tuple, the first element is a list with the generated images, and the second element is a list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" (nsfw) content, according to the `safety_checker`. """ if scheduler is None: scheduler = self.scheduler seed = random.randint(0, 2**32) if seed is None else seed image_str = image mask_image_str = mask_image if isinstance(image_str, str): image = load_image(image_str) if isinstance(mask_image_str, str): mask_image = load_image(mask_image_str) if height is None and width is None: width = (image.size[0] // 8) * 8 height = (image.size[1] // 8) * 8 elif height is None and width is not None: height = (image.size[1] // 8) * 8 elif width is None and height is not None: width = (image.size[0] // 8) * 8 else: height = height width = width argument = dict( prompt=prompt, image=image_str, mask_image=mask_image_str, negative_prompt=negative_prompt, height=height, width=width, strength=strength, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, num_images_per_prompt=num_images_per_prompt, eta=eta, seed=seed, max_embeddings_multiples=max_embeddings_multiples, no_boseos_middle=no_boseos_middle, skip_parsing=skip_parsing, skip_weighting=skip_weighting, epoch_time=time.time(), ) paddle.seed(seed) # 1. Check inputs self.check_inputs_img2img_inpaint(prompt, strength, callback_steps) # 2. Define call parameters batch_size = 1 if isinstance(prompt, str) else len(prompt) # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # corresponds to doing no classifier free guidance. do_classifier_free_guidance = guidance_scale > 1.0 # 3. Encode input prompt text_embeddings = self._encode_prompt( prompt, negative_prompt, max_embeddings_multiples, no_boseos_middle, skip_parsing, skip_weighting, do_classifier_free_guidance, num_images_per_prompt, ) if not isinstance(image, paddle.Tensor): image = image.resize((width, height)) image = preprocess_image(image) if not isinstance(mask_image, paddle.Tensor): mask_image = mask_image.resize((width, height)) mask_image = preprocess_mask(mask_image) # 5. set timesteps scheduler.set_timesteps(num_inference_steps) timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler) latent_timestep = timesteps[:1].tile([batch_size * num_images_per_prompt]) # 6. Prepare latent variables # encode the init image into latents and scale the latents latents, init_latents_orig, noise = self.prepare_latents_inpaint( image, latent_timestep, num_images_per_prompt, text_embeddings.dtype, scheduler ) # 7. Prepare mask latent mask = mask_image.cast(latents.dtype) mask = paddle.concat([mask] * batch_size * num_images_per_prompt) # 8. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(eta, scheduler) # 9. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * scheduler.order with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t in enumerate(timesteps): # expand the latents if we are doing classifier free guidance latent_model_input = paddle.concat([latents] * 2) if do_classifier_free_guidance else latents latent_model_input = scheduler.scale_model_input(latent_model_input, t) # predict the noise residual noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample # perform guidance if do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # compute the previous noisy sample x_t -> x_t-1 latents = scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample # masking init_latents_proper = scheduler.add_noise(init_latents_orig, noise, t) latents = (init_latents_proper * mask) + (latents * (1 - mask)) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: callback(progress_bar.n, progress_bar.total, progress_bar) # 10. Post-processing image = self.decode_latents(latents) # 11. Run safety checker image, has_nsfw_concept = self.run_safety_checker(image, text_embeddings.dtype) # 12. Convert to PIL if output_type == "pil": image = self.numpy_to_pil(image, argument=argument) if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @staticmethod def numpy_to_pil(images, **kwargs): """ Convert a numpy image or a batch of images to a PIL image. """ if images.ndim == 3: images = images[None, ...] images = (images * 255).round().astype("uint8") pil_images = [] argument = kwargs.pop("argument", None) for image in images: image = PIL.Image.fromarray(image) if argument is not None: image.argument = argument pil_images.append(image) return pil_images pipeline = StableDiffusionPipelineAllinOne.from_pretrained(BASE_MODEL_NAME, safety_checker=None) if LORA_WEIGHTS_PATH is not None: pipeline.unet.load_attn_procs(LORA_WEIGHTS_PATH, from_hf_hub=True) support_scheduler = [ "DPMSolver", "EulerDiscrete", "EulerAncestralDiscrete", "PNDM", "DDIM", "LMSDiscrete", "HeunDiscrete", "KDPM2AncestralDiscrete", "KDPM2Discrete" ] # generate images def infer(prompt, negative, scale, height, width, num_inference_steps, scheduler_name): scheduler = pipeline.create_scheduler(scheduler_name) images = pipeline( prompt=prompt, negative_prompt=negative, guidance_scale=scale, height=height, width=width, num_inference_steps=num_inference_steps, scheduler=scheduler, ).images return images css = """ .gradio-container { font-family: 'IBM Plex Sans', sans-serif; } .gr-button { color: white; border-color: black; background: black; } input[type='range'] { accent-color: black; } .dark input[type='range'] { accent-color: #dfdfdf; } .container { max-width: 730px; margin: auto; padding-top: 1.5rem; } #gallery { min-height: 22rem; margin-bottom: 15px; margin-left: auto; margin-right: auto; border-bottom-right-radius: .5rem !important; border-bottom-left-radius: .5rem !important; } #gallery>div>.h-full { min-height: 20rem; } .details:hover { text-decoration: underline; } .gr-button { white-space: nowrap; } .gr-button:focus { border-color: rgb(147 197 253 / var(--tw-border-opacity)); outline: none; box-shadow: var(--tw-ring-offset-shadow), var(--tw-ring-shadow), var(--tw-shadow, 0 0 #0000); --tw-border-opacity: 1; --tw-ring-offset-shadow: var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color); --tw-ring-shadow: var(--tw-ring-inset) 0 0 0 calc(3px var(--tw-ring-offset-width)) var(--tw-ring-color); --tw-ring-color: rgb(191 219 254 / var(--tw-ring-opacity)); --tw-ring-opacity: .5; } #advanced-btn { font-size: .7rem !important; line-height: 19px; margin-top: 12px; margin-bottom: 12px; padding: 2px 8px; border-radius: 14px !important; } #advanced-options { display: none; margin-bottom: 20px; } .footer { margin-bottom: 45px; margin-top: 35px; text-align: center; border-bottom: 1px solid #e5e5e5; } .footer>p { font-size: .8rem; display: inline-block; padding: 0 10px; transform: translateY(10px); background: white; } .dark .footer { border-color: #303030; } .dark .footer>p { background: #0b0f19; } .acknowledgments h4{ margin: 1.25em 0 .25em 0; font-weight: bold; font-size: 115%; } .animate-spin { animation: spin 1s linear infinite; } @keyframes spin { from { transform: rotate(0deg); } to { transform: rotate(360deg); } } #share-btn-container { display: flex; padding-left: 0.5rem !important; padding-right: 0.5rem !important; background-color: #000000; justify-content: center; align-items: center; border-radius: 9999px !important; width: 13rem; margin-top: 10px; margin-left: auto; } #share-btn { all: initial; color: #ffffff;font-weight: 600; cursor:pointer; font-family: 'IBM Plex Sans', sans-serif; margin-left: 0.5rem !important; padding-top: 0.25rem !important; padding-bottom: 0.25rem !important;right:0; } #share-btn * { all: unset; } #share-btn-container div:nth-child(-n+2){ width: auto !important; min-height: 0px !important; } #share-btn-container .wrap { display: none !important; } .gr-form{ flex: 1 1 50%; border-top-right-radius: 0; border-bottom-right-radius: 0; } #prompt-container{ gap: 0; } #prompt-text-input, #negative-prompt-text-input{padding: .45rem 0.625rem} #component-16{border-top-width: 1px!important;margin-top: 1em} .image_duplication{position: absolute; width: 100px; left: 50px} """ block = gr.Blocks(css=css) with block: gr.HTML( """