Spaces:
Runtime error
Runtime error
import argparse | |
import gc | |
import os | |
import re | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Tuple | |
from diffusers import OnnxStableDiffusionPipeline, OnnxStableDiffusionImg2ImgPipeline | |
from diffusers import OnnxStableDiffusionInpaintPipeline, OnnxStableDiffusionInpaintPipelineLegacy | |
from diffusers import DDIMScheduler, LMSDiscreteScheduler, PNDMScheduler, EulerDiscreteScheduler | |
from diffusers import __version__ as _df_version | |
import gradio as gr | |
import numpy as np | |
from packaging import version | |
import PIL | |
def get_latents_from_seed(seed: int, batch_size: int, height: int, width: int) -> np.ndarray: | |
latents_shape = (batch_size, 4, height // 8, width // 8) | |
rng = np.random.default_rng(seed) | |
image_latents = rng.standard_normal(latents_shape).astype(np.float32) | |
return image_latents | |
def run_diffusers( | |
prompt: str, | |
neg_prompt: str, | |
init_image: PIL.Image.Image, | |
iteration_count: int, | |
batch_size: int, | |
steps: int, | |
guidance_scale: float, | |
height: int, | |
width: int, | |
eta: float, | |
denoise_strength: float, | |
seed: str, | |
mask_image: PIL.Image.Image | |
) -> Tuple[list, str]: | |
global model_name | |
global current_pipe | |
global pipe | |
prompt.strip("\n") | |
neg_prompt.strip("\n") | |
if seed == "": | |
rng = np.random.default_rng() | |
seed = rng.integers(np.iinfo(np.uint32).max) | |
else: | |
try: | |
seed = int(seed) & np.iinfo(np.uint32).max | |
except ValueError: | |
seed = hash(seed) & np.iinfo(np.uint32).max | |
seeds = np.array([seed], dtype=np.uint32) | |
if iteration_count > 1: | |
seed_seq = np.random.SeedSequence(seed) | |
seeds = np.concatenate((seeds, seed_seq.generate_state(iteration_count - 1))) | |
output_path = "output" | |
os.makedirs(output_path, exist_ok=True) | |
sched_name = str(pipe.scheduler._class_name) | |
prompts = [prompt]*batch_size | |
neg_prompts = [neg_prompt]*batch_size if neg_prompt != "" else None | |
images = [] | |
time_taken = 0 | |
output_base_path = Path("./output") | |
for i in range(iteration_count): | |
dt_obj = datetime.now() | |
dt_cust = dt_obj.strftime("%Y-%m-%d_%H-%M-%S") | |
image_name = dt_cust + "_" + str(seed) + ".png" | |
text_name = dt_cust + "_" + str(seed) + "_info.txt" | |
image_path = output_base_path / image_name | |
text_path = output_base_path / text_name | |
info = f"Prompt: {prompt}\nNegative prompt: {neg_prompt}\nSeed: {seeds[i]}\n" + \ | |
f"Scheduler: {sched_name}\nScale: {guidance_scale}\nHeight: {height}\nWidth: {width}\nETA: {eta}\n" + \ | |
f"Model: {model_name}\nIteration size: {iteration_count}\nBatch size: {batch_size}\nSteps: {steps}" | |
if (current_pipe == "img2img" or current_pipe == "inpaint" ): | |
info = info + f" denoise: {denoise_strength}" | |
with open(text_path, 'w', encoding='utf-8') as f: | |
f.write(info) | |
if current_pipe == "txt2img": | |
# Generate our own latents so that we can provide a seed. | |
latents = get_latents_from_seed(seeds[i], batch_size, height, width) | |
start = time.time() | |
batch_images = pipe( | |
prompts, negative_prompt=neg_prompts, height=height, width=width, num_inference_steps=steps, | |
guidance_scale=guidance_scale, eta=eta, latents=latents).images | |
finish = time.time() | |
elif current_pipe == "img2img": | |
# NOTE: at this time there's no good way of setting the seed for the random noise added by the scheduler | |
# np.random.seed(seeds[i]) | |
start = time.time() | |
batch_images = pipe( | |
prompts, negative_prompt=neg_prompts, init_image=init_image, height=height, width=width, | |
num_inference_steps=steps, guidance_scale=guidance_scale, eta=eta, strength=denoise_strength, | |
num_images_per_prompt=batch_size).images | |
finish = time.time() | |
elif current_pipe == "inpaint": | |
start = time.time() | |
# NOTE: legacy require init_image but inpainting use image | |
batch_images = pipe( | |
prompts, negative_prompt=neg_prompts, image=init_image, mask_image=mask_image, height=height, width=width, | |
num_inference_steps=steps, guidance_scale=guidance_scale, eta=eta, strength=denoise_strength, | |
num_images_per_prompt=batch_size, init_image=init_image).images | |
finish = time.time() | |
short_prompt = prompt.strip("<>:\"/\\|?*\n\t") | |
short_prompt = short_prompt[:99] if len(short_prompt) > 100 else short_prompt | |
for j in range(batch_size): | |
batch_images[j].save(image_path) | |
images.extend(batch_images) | |
time_taken = time_taken + (finish - start) | |
time_taken = time_taken / 60.0 | |
if iteration_count > 1: | |
status = f"Run took {time_taken:.1f} minutes " + \ | |
f"to generate {iteration_count} iterations with batch size of {batch_size}. seeds: " + \ | |
np.array2string(seeds, separator=",") | |
else: | |
status = f"Run took {time_taken:.1f} minutes to generate a batch size of " + \ | |
f"{batch_size}. seed: {seeds[0]}" | |
return images, status | |
def clear_click(): | |
return { | |
prompt: "", neg_prompt: "", image: None, mask: None, mask_mode: "Draw mask", sch: "Euler", iter: 1, batch: 1, | |
drawn_mask: None, steps: 25, guid: 11, height: 512, width: 512, eta: 0.0, denoise: 0.8, seed: ""} | |
def generate_click( | |
model_drop, prompt, neg_prompt, sch, iter, batch, steps, guid, height, width, eta, | |
seed, image, denoise, mask, pipeline, mask_mode, drawn_mask | |
): | |
global model_name | |
global provider | |
global current_pipe | |
global scheduler | |
global pipe | |
# reset scheduler and pipeline if model is different | |
if model_name != model_drop: | |
model_name = model_drop | |
scheduler = None | |
pipe = None | |
model_path = os.path.join("model", model_name) | |
if sch == "PNDM" and type(scheduler) is not PNDMScheduler: | |
scheduler = PNDMScheduler.from_pretrained(model_path, subfolder="scheduler") | |
elif sch == "LMS" and type(scheduler) is not LMSDiscreteScheduler: | |
scheduler = LMSDiscreteScheduler.from_pretrained(model_path, subfolder="scheduler") | |
elif sch == "DDIM" and type(scheduler) is not DDIMScheduler: | |
scheduler = DDIMScheduler.from_pretrained(model_path, subfolder="scheduler") | |
elif sch == "Euler" and type(scheduler) is not EulerDiscreteScheduler: | |
scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder="scheduler") | |
# select which pipeline depending on current tab | |
if pipeline == "TEXT2IMG": | |
if current_pipe != "txt2img" or pipe is None: | |
pipe = OnnxStableDiffusionPipeline.from_pretrained( | |
model_path, provider=provider, scheduler=scheduler) | |
gc.collect() | |
current_pipe = "txt2img" | |
if type(pipe.scheduler) is not type(scheduler): | |
pipe.scheduler = scheduler | |
return run_diffusers( | |
prompt, neg_prompt, None, iter, batch, steps, guid, height, width, eta, 0, | |
seed, None) | |
elif pipeline == "IMG2IMG": | |
if current_pipe != "img2img" or pipe is None: | |
pipe = OnnxStableDiffusionImg2ImgPipeline.from_pretrained( | |
model_path, provider=provider, scheduler=scheduler) | |
gc.collect() | |
current_pipe = "img2img" | |
if type(pipe.scheduler) is not type(scheduler): | |
pipe.scheduler = scheduler | |
# input image resizing | |
input_image = image.convert("RGB") | |
input_width, input_height = input_image.size | |
if height / width > input_height / input_width: | |
adjust_width = int(input_width * height / input_height) | |
input_image = input_image.resize((adjust_width, height)) | |
left = (adjust_width - width) // 2 | |
right = left + width | |
input_image = input_image.crop((left, 0, right, height)) | |
else: | |
adjust_height = int(input_height * width / input_width) | |
input_image = input_image.resize((width, adjust_height)) | |
top = (adjust_height - height) // 2 | |
bottom = top + height | |
input_image = input_image.crop((0, top, width, bottom)) | |
return run_diffusers( | |
prompt, neg_prompt, input_image, iter, batch, steps, guid, height, width, eta, | |
denoise, seed, None) | |
elif pipeline == "Inpainting": | |
if current_pipe != "inpaint" or pipe is None: | |
# >=0.8.0: Model name must ends with "inpainting" to use the proper pipeline | |
# This allows usage of Legacy pipeline for models not finetuned for inpainting | |
# see huggingface/diffusers!51 | |
if not model_name.endswith("inpainting"): | |
pipe = OnnxStableDiffusionInpaintPipelineLegacy.from_pretrained( | |
model_path, provider=provider, scheduler=scheduler) | |
else: | |
# on >=0.7.0 & <0.8.0 or model finetuned for inpainting | |
pipe = OnnxStableDiffusionInpaintPipeline.from_pretrained( | |
model_path, provider=provider, scheduler=scheduler) | |
gc.collect() | |
current_pipe = "inpaint" | |
if type(pipe.scheduler) is not type(scheduler): | |
pipe.scheduler = scheduler | |
if mask_mode == "Upload mask": | |
input_image = image.convert("RGB") | |
# input mask resizing | |
input_mask = mask.convert("RGB") | |
input_mask_width, input_mask_height = input_mask.size | |
if height / width > input_mask_height / input_mask_width: | |
adjust_mask_width = int(input_mask_width * height / input_mask_height) | |
input_mask = input_mask.resize((adjust_mask_width, height)) | |
mask_left = (adjust_mask_width - width) // 2 | |
mask_right = mask_left + width | |
input_mask = input_mask.crop((mask_left, 0, mask_right, height)) | |
else: | |
adjust_height = int(input_mask_height * width / input_mask_width) | |
input_mask = input_mask.resize((width, adjust_height)) | |
top = (adjust_height - height) // 2 | |
bottom = top + height | |
input_mask = input_mask.crop((0, top, width, bottom)) | |
else: | |
input_image = drawn_mask['image'].convert('RGB') | |
input_mask = drawn_mask['mask'] | |
# input image resizing | |
input_width, input_height = input_image.size | |
if height / width > input_height / input_width: | |
adjust_width = int(input_width * height / input_height) | |
input_image = input_image.resize((adjust_width, height)) | |
left = (adjust_width - width) // 2 | |
right = left + width | |
input_image = input_image.crop((left, 0, right, height)) | |
else: | |
adjust_height = int(input_height * width / input_width) | |
input_image = input_image.resize((width, adjust_height)) | |
top = (adjust_height - height) // 2 | |
bottom = top + height | |
input_image = input_image.crop((0, top, width, bottom)) | |
return run_diffusers( | |
prompt, neg_prompt, input_image, iter, batch, steps, guid, height, width, eta, | |
denoise, seed, input_mask) | |
def choose_sch(sched_name: str): | |
if sched_name == "DDIM": | |
return gr.update(visible=True) | |
else: | |
return gr.update(visible=False) | |
def choose_pipeline(pipeline: str, mask_mode: str): | |
if(pipeline == "TEXT2IMG"): | |
return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)) | |
if(pipeline == "IMG2IMG"): | |
return (gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)) | |
if(pipeline == "Inpainting"): | |
if mask_mode == "Draw mask": | |
return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)) | |
else: | |
return (gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)) | |
def choose_mask_mode(mask_mode): | |
if mask_mode == "Draw mask": | |
return [gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)] | |
else: | |
return [gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)] | |
def size_512_lock(size): | |
if size != 512: | |
return gr.update(interactive=False) | |
return gr.update(interactive=True) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Gradio interface for ONNX based Stable Diffusion") | |
parser.add_argument("--cpu-only", action="store_true", default=False, help="Run ONNX with CPU") | |
parser.add_argument("--local", action="store_true", default=False, help="Open to local network") | |
parser.add_argument("--public", action="store_true", default=False, help="Create a publicly shareable link for the interface") | |
args = parser.parse_args() | |
# variables for ONNX pipelines | |
model_name = None | |
provider = "CPUExecutionProvider" if args.cpu_only else "DmlExecutionProvider" | |
current_pipe = "txt2img" | |
# diffusers objects | |
scheduler = None | |
pipe = None | |
# search the model folder | |
model_dir = "model" | |
model_list = [] | |
with os.scandir(model_dir) as scan_it: | |
for entry in scan_it: | |
if entry.is_dir(): | |
model_list.append(entry.name) | |
default_model = model_list[0] if len(model_list) > 0 else None | |
# create gradio block | |
title = "Stable Diffusion " + str(version.parse(_df_version)) | |
possibilities = ['TEXT2IMG', 'IMG2IMG', 'Inpainting'] | |
with gr.Blocks(title=title) as app: | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=600): | |
with gr.Column(variant='panel'): | |
with gr.Row(): | |
model_drop = gr.Dropdown(model_list, value=default_model, label="Model", interactive=True) | |
pipeline = gr.Radio(possibilities, value="TEXT2IMG", label="Pipeline") | |
sch = gr.Radio(["DDIM", "LMS", "PNDM", "Euler"], value="Euler", label="Scheduler") | |
eta = gr.Slider(0, 1, value=0.0, step=0.01, label="DDIM eta", visible=False) | |
seed = gr.Textbox(value="", max_lines=1, label="Seed") | |
with gr.Column(): | |
mask_mode = gr.Radio(label="Mask mode", show_label=False, choices=["Draw mask", "Upload mask"], value="Draw mask", visible=False) | |
image = gr.Image(label="Input image", type="pil", visible=False) | |
mask = gr.Image(label="Input mask", type="pil", visible=False) | |
drawn_mask = gr.Image(label="Input image and mask", source="upload", tool="sketch", type="pil", visible=False) | |
prompt = gr.Textbox(value="", lines=2, label="Prompt") | |
neg_prompt = gr.Textbox(value="", lines=2, label="Negative prompt") | |
steps = gr.Slider(1, 150, value=25, step=1, label="Steps") | |
guid = gr.Slider(0, 20, value=11, step=0.5, label="Guidance") | |
with gr.Column(): | |
height = gr.Slider(16, 1920, value=512, step=8, label="Height") | |
width = gr.Slider(16, 1920, value=512, step=8, label="Width") | |
denoise = gr.Slider(0, 1, value=0.8, step=0.01, label="Denoise strength", visible=False) | |
with gr.Column(): | |
iter = gr.Slider(1, 24, value=1, step=1, label="Iteration count") | |
batch = gr.Slider(1, 4, value=1, step=1, label="Batch size") | |
with gr.Column(scale=1, min_width=600): | |
with gr.Row(): | |
gen_btn = gr.Button("Generate", variant="primary") | |
clear_btn = gr.Button("Clear", variant="secondary") | |
with gr.Row(variant='panel'): | |
image_out = gr.Gallery(value=None, label="Images") | |
status_out = gr.Textbox(value="", label="Status") | |
# config components | |
all_inputs = [ | |
model_drop, prompt, neg_prompt, sch, iter, batch, steps, guid, height, width, | |
eta, seed, image, denoise, mask, pipeline, mask_mode, drawn_mask] | |
clear_btn.click(fn=clear_click, inputs=None, outputs=all_inputs, queue=False) | |
gen_btn.click(fn=generate_click, inputs=all_inputs, outputs=[image_out, status_out]) | |
height.change(fn=size_512_lock, inputs=height, outputs=width) | |
width.change(fn=size_512_lock, inputs=width, outputs=height) | |
mask_mode.change(fn=choose_mask_mode, inputs=mask_mode, outputs=[image, mask, drawn_mask]) | |
pipeline.change(fn=choose_pipeline, inputs=[pipeline, mask_mode], outputs=[image, mask, denoise, mask_mode, drawn_mask]) | |
sch.change(fn=choose_sch, inputs=sch, outputs=eta) | |
image_out.style(grid=2) | |
app.queue(concurrency_count=1, api_open=True) | |
app.launch(inbrowser=True, server_name="0.0.0.0" if args.local else "127.0.0.1", show_api=True, quiet=True, share=args.public) # open to local network: server_name="0.0.0.0" | |