# Copyright (c) 2024 Jaerin Lee # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import argparse import random import time import json import os import glob import pathlib import requests from functools import partial from pprint import pprint from multiprocessing.connection import Client, Listener import numpy as np from PIL import Image import torch import spaces import gradio as gr from huggingface_hub import snapshot_download # from model import StreamMultiDiffusionSDXL from model import StreamMultiDiffusion from util import seed_everything from prompt_util import preprocess_prompts, _quality_dict, _style_dict ### Utils def log_state(state): pprint(vars(opt)) if isinstance(state, gr.State): state = state.value pprint(vars(state)) def is_empty_image(im: Image.Image) -> bool: if im is None: return True im = np.array(im) has_alpha = (im.shape[2] == 4) if not has_alpha: return False elif im.sum() == 0: return True else: return False ### Argument passing # parser = argparse.ArgumentParser(description='Semantic Palette demo powered by StreamMultiDiffusion with SDXL support.') # parser.add_argument('-H', '--height', type=int, default=1024) # parser.add_argument('-W', '--width', type=int, default=1024) parser = argparse.ArgumentParser(description='Semantic Palette demo powered by StreamMultiDiffusion.') parser.add_argument('-H', '--height', type=int, default=768) parser.add_argument('-W', '--width', type=int, default=768) parser.add_argument('--model', type=str, default=None, help='Hugging face model repository or local path for a SD1.5 model checkpoint to run.') parser.add_argument('--bootstrap_steps', type=int, default=1) parser.add_argument('--bootstrap_mix_steps', type=int, default=1) parser.add_argument('--guidance_scale', type=float, default=1.0) # 1.2 parser.add_argument('--run_time', type=float, default=60) parser.add_argument('--seed', type=int, default=-1) parser.add_argument('--device', type=int, default=0) parser.add_argument('--port', type=int, default=8000) opt = parser.parse_args() ### Global variables and data structures device = f'cuda:{opt.device}' if opt.device >= 0 else 'cpu' def download_file(url, root): local_filename = url.split('/')[-1].split('?')[0] # NOTE the stream=True parameter below with requests.get(url, stream=True) as r: r.raise_for_status() with open(os.path.join(root, local_filename), 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. #if chunk: f.write(chunk) return os.path.join(root, local_filename) if opt.model is None: # opt.model = 'cagliostrolab/animagine-xl-3.1' # opt.model = 'ironjr/BlazingDriveV11m' # opt.model = 'KBlueLeaf/kohaku-v2.1' opt.model = 'https://huggingface.co/ironjr/BlazingDriveV13md/resolve/main/blazingDrive_V13md.safetensors?download=true' root = pathlib.Path(__file__).parent checkpoint_root = os.path.join(root, 'checkpoints') opt.model = download_file(opt.model, checkpoint_root) print(opt.model, os.path.getsize(opt.model)) else: if opt.model.endswith('.safetensors'): opt.model = os.path.abspath(os.path.join('checkpoints', opt.model)) # model = StreamMultiDiffusionSDXL( model = StreamMultiDiffusion( device, hf_key=opt.model, height=opt.height, width=opt.width, cfg_type="full", autoflush=False, use_tiny_vae=True, mask_type='continuous', bootstrap_steps=opt.bootstrap_steps, bootstrap_mix_steps=opt.bootstrap_mix_steps, guidance_scale=opt.guidance_scale, seed=opt.seed, ).cuda() print(f'[INFO] Parameters prepared!') prompt_suggestions = [ '1girl, souryuu asuka langley, neon genesis evangelion, solo, upper body, v, smile, looking at viewer', '1boy, solo, portrait, looking at viewer, white t-shirt, brown hair', '1girl, arima kana, oshi no ko, solo, upper body, from behind', ] opt.max_palettes = 3 opt.default_prompt_strength = 1.0 opt.default_mask_strength = 1.0 opt.default_mask_std = 8.0 opt.default_negative_prompt = ( 'nsfw, worst quality, bad quality, normal quality, cropped, framed' ) opt.verbose = True opt.colors = [ '#000000', '#2692F3', '#F89E12', '#16C232', # '#F92F6C', # '#AC6AEB', # '#92C62C', # '#92C6EC', # '#FECAC0', ] opt.excluded_keys = ['inpainting_mode', 'is_running', 'active_palettes', 'current_palette', 'model'] opt.prep_time = -3 ### Shared memory hack for ZeroGPU # opt.address = ('localhost', 6000) # opt.authkey = b'secret password' ### Load examples example_root = os.path.join(root, 'examples') example_images = glob.glob(os.path.join(example_root, '*.png')) example_images = [Image.open(i) for i in example_images] with open(os.path.join(example_root, 'prompt_background.txt')) as f: prompts_background = [l.strip() for l in f.readlines() if l.strip() != ''] with open(os.path.join(example_root, 'prompt_girl.txt')) as f: prompts_girl = [l.strip() for l in f.readlines() if l.strip() != ''] with open(os.path.join(example_root, 'prompt_boy.txt')) as f: prompts_boy = [l.strip() for l in f.readlines() if l.strip() != ''] with open(os.path.join(example_root, 'prompt_props.txt')) as f: prompts_props = [l.strip() for l in f.readlines() if l.strip() != ''] prompts_props = {l.split(',')[0].strip(): ','.join(l.split(',')[1:]).strip() for l in prompts_props} prompt_background = lambda: random.choice(prompts_background) prompt_girl = lambda: random.choice(prompts_girl) prompt_boy = lambda: random.choice(prompts_boy) prompt_props = lambda: np.random.choice(list(prompts_props.keys()), size=(opt.max_palettes - 1), replace=False).tolist() ### Event handlers def add_palette(state): old_actives = state.active_palettes state.active_palettes = min(state.active_palettes + 1, opt.max_palettes) if opt.verbose: log_state(state) if state.active_palettes != old_actives: return [state] + [ gr.update() if state.active_palettes != opt.max_palettes else gr.update(visible=False) ] + [ gr.update() if i != state.active_palettes - 1 else gr.update(value=state.prompt_names[i + 1], visible=True) for i in range(opt.max_palettes) ] else: return [state] + [gr.update() for i in range(opt.max_palettes + 1)] def select_palette(state, button, idx): if idx < 0 or idx > opt.max_palettes: idx = 0 old_idx = state.current_palette if old_idx == idx: return [state] + [gr.update() for _ in range(opt.max_palettes + 7)] state.current_palette = idx if opt.verbose: log_state(state) updates = [state] + [ gr.update() if i not in (idx, old_idx) else gr.update(variant='secondary') if i == old_idx else gr.update(variant='primary') for i in range(opt.max_palettes + 1) ] label = 'Background' if idx == 0 else f'Palette {idx}' updates.extend([ gr.update(value=button, interactive=(idx > 0)), gr.update(value=state.prompts[idx], label=f'Edit Prompt for {label}'), gr.update(value=state.neg_prompts[idx], label=f'Edit Negative Prompt for {label}'), ( gr.update(value=state.mask_strengths[idx - 1], interactive=True) if idx > 0 else gr.update(value=opt.default_mask_strength, interactive=False) ), ( gr.update(value=state.prompt_strengths[idx - 1], interactive=True) if idx > 0 else gr.update(value=opt.default_prompt_strength, interactive=False) ), ( gr.update(value=state.mask_stds[idx - 1], interactive=True) if idx > 0 else gr.update(value=opt.default_mask_std, interactive=False) ), ]) return updates def random_palette(state): # girlboy = random.choice(['A girl smiling at viewer', 'A boy smiling at viewer']) girlboy = random.choice(['girl', 'boy']) girlboy_name = '👧 Girl' if girlboy == 'girl' else '👦 Boy' girlboy_prompt = prompt_girl() if girlboy == 'girl' else prompt_boy() prompt_props_ = prompt_props() state.prompt_names = ['🌄 Background'] + [girlboy_name] + prompt_props_ state.prompts = [''] + [girlboy_prompt] + [prompts_props[k] for k in prompt_props_] state.neg_prompts = [ opt.default_negative_prompt + (', humans, humans, humans' if i == 0 else '') for i in range(opt.max_palettes + 1) ] state.prompt_strengths = [opt.default_prompt_strength for _ in range(opt.max_palettes)] state.mask_strengths = [opt.default_mask_strength for _ in range(opt.max_palettes)] state.mask_stds = [opt.default_mask_std for _ in range(opt.max_palettes)] updates = [state] + [ gr.update(value=state.prompt_names[state.current_palette]), gr.update(value=state.prompts[state.current_palette]), gr.update(value=state.neg_prompts[state.current_palette]), ( gr.update(value=state.mask_strengths[state.current_palette - 1], interactive=True) if state.current_palette > 0 else gr.update(value=opt.default_mask_strength, interactive=False) ), ( gr.update(value=state.prompt_strengths[state.current_palette - 1], interactive=True) if state.current_palette > 0 else gr.update(value=opt.default_prompt_strength, interactive=False) ), ( gr.update(value=state.mask_stds[state.current_palette - 1], interactive=True) if state.current_palette > 0 else gr.update(value=opt.default_mask_std, interactive=False) ), ] + [ gr.update(value=v) for v in state.prompt_names[1:] ] return updates def change_prompt_strength(state, strength): if state.current_palette == 0: return state state.prompt_strengths[state.current_palette - 1] = strength if opt.verbose: log_state(state) return state def change_std(state, std): if state.current_palette == 0: return state state.mask_stds[state.current_palette - 1] = std if opt.verbose: log_state(state) return state def change_mask_strength(state, strength): if state.current_palette == 0: return state state.mask_strengths[state.current_palette - 1] = strength if opt.verbose: log_state(state) return state def reset_seed(state, seed): state.seed = seed if opt.verbose: log_state(state) return state def rename_prompt(state, name): state.prompt_names[state.current_palette] = name if opt.verbose: log_state(state) return [state] + [ gr.update() if i != state.current_palette else gr.update(value=name) for i in range(opt.max_palettes + 1) ] def change_prompt(state, prompt): state.prompts[state.current_palette] = prompt if opt.verbose: log_state(state) if state.is_running: model.update_single_layer( idx=state.current_palette, prompt=prompt, ) return state def change_neg_prompt(state, neg_prompt): state.neg_prompts[state.current_palette] = neg_prompt if opt.verbose: log_state(state) if state.is_running: model.update_single_layer( idx=state.current_palette, neg_prompt=neg_prompt, ) return state # def select_style(state, style_name): # state.style_name = style_name # if opt.verbose: # log_state(state) # return state # def select_quality(state, quality_name): # state.quality_name = quality_name # if opt.verbose: # log_state(state) # return state def import_state(state, json_text): prev_state_dict = {k: v for k, v in vars(state).items() if k in opt.excluded_keys} state_dict = json.loads(json_text) for k in opt.excluded_keys: if k in state_dict: del state_dict[k] state_dict.update(prev_state_dict) state = argparse.Namespace(**state_dict) current_palette = state.current_palette state.active_palettes = opt.max_palettes return [state] + [ gr.update(value=v, visible=True) for v in state.prompt_names ] + [ # state.style_name, # state.quality_name, state.prompts[current_palette], state.prompt_names[current_palette], state.neg_prompts[current_palette], state.prompt_strengths[current_palette - 1], state.mask_strengths[current_palette - 1], state.mask_stds[current_palette - 1], state.seed, ] ### Main worker def register(state, drawpad, model): seed_everything(state.seed if state.seed >=0 else np.random.randint(2147483647)) print('Generate!') background = drawpad['background'].convert('RGBA') inpainting_mode = np.asarray(background).sum() != 0 if not inpainting_mode: background = Image.new(size=(opt.width, opt.height), mode='RGB', color=(255, 255, 255)) background_prompt = "Simple white background" else: background_prompt = None print('Inpainting mode: ', inpainting_mode) user_input = np.asarray(drawpad['layers'][0]) # (H, W, 4) foreground_mask = torch.tensor(user_input[..., -1], device=model.device)[None, None] # (1, 1, H, W) user_input = torch.tensor(user_input[..., :-1], device=model.device) # (H, W, 3) palette = torch.tensor([ tuple(int(s[i+1:i+3], 16) for i in (0, 2, 4)) for s in opt.colors[1:] ], device=model.device) # (N, 3) masks = (palette[:, None, None, :] == user_input[None]).all(dim=-1)[:, None, ...] # (N, 1, H, W) # has_masks = [i for i, m in enumerate(masks.sum(dim=(1, 2, 3)) == 0) if not m] has_masks = list(range(opt.max_palettes)) print('Has mask: ', has_masks) masks = masks * foreground_mask masks = masks[has_masks] # if inpainting_mode: prompts = [state.prompts[v + 1] for v in has_masks] negative_prompts = [state.neg_prompts[v + 1] for v in has_masks] mask_strengths = [state.mask_strengths[v] for v in has_masks] mask_stds = [state.mask_stds[v] for v in has_masks] prompt_strengths = [state.prompt_strengths[v] for v in has_masks] # else: # masks = torch.cat([torch.ones_like(foreground_mask), masks], dim=0) # prompts = [state.prompts[0]] + [state.prompts[v + 1] for v in has_masks] # negative_prompts = [state.neg_prompts[0]] + [state.neg_prompts[v + 1] for v in has_masks] # mask_strengths = [1] + [state.mask_strengths[v] for v in has_masks] # mask_stds = [0] + [state.mask_stds[v] for v in has_masks] # prompt_strengths = [1] + [state.prompt_strengths[v] for v in has_masks] # prompts, negative_prompts = preprocess_prompts( # prompts, negative_prompts, style_name=state.style_name, quality_name=state.quality_name) model.update_background( background.convert('RGB'), prompt=background_prompt, negative_prompt=None, ) state.prompts[0] = model.background.prompt state.neg_prompts[0] = model.background.negative_prompt model.update_layers( prompts=prompts, negative_prompts=negative_prompts, masks=masks.to(device), mask_strengths=mask_strengths, mask_stds=mask_stds, prompt_strengths=prompt_strengths, ) state.inpainting_mode = inpainting_mode return state # @spaces.GPU(duration=(opt.prep_time + opt.run_time + 5)) def run(state, drawpad): # ZeroGPU hack. # listener = Listener(opt.address, authkey=opt.authkey) # conn = listener.accept() # Reset model. model.device = torch.device('cuda') model.reset_seed(model.generator, opt.seed) model.reset_latent() model.prepare() state = register(state, drawpad, model) state.is_running = True tic = time.time() while True: # Receive real-time mask inputs from the main process. # data = conn.recv() # if data is not None: # print('Received data!!!') # for i in range(opt.max_palettes): # model.update_single_layer( # idx=i, # mask=data['masks'][i], # mask_strength=data['mask_strengths'][i], # mask_std=data['mask_stds'][i], # ) yield [state, model()] toc = time.time() tdelta = toc - tic if tdelta > opt.run_time: state.is_running = False state.model = None return [state, model()] def hide_element(): return gr.update(visible=False) def show_element(): return gr.update(visible=True) def draw(state, drawpad): if not state.is_running: print('[WARNING] Streaming is currently off, update ignored.') return # ZeroGPU hack. # conn = Client(opt.address, authkey=opt.authkey) user_input = np.asarray(drawpad['layers'][0]) # (H, W, 4) foreground_mask = torch.tensor(user_input[..., -1], device=model.device)[None, None] # (1, 1, H, W) user_input = torch.tensor(user_input[..., :-1], device=model.device) # (H, W, 3) palette = torch.tensor([ tuple(int(s[i+1:i+3], 16) for i in (0, 2, 4)) for s in opt.colors[1:] ], device=model.device) # (N, 3) masks = (palette[:, None, None, :] == user_input[None]).all(dim=-1)[:, None, ...] # (N, 1, H, W) # has_masks = [i for i, m in enumerate(masks.sum(dim=(1, 2, 3)) == 0) if not m] has_masks = list(range(opt.max_palettes)) print('Has mask: ', has_masks) masks = masks * foreground_mask masks = masks[has_masks] # if state.inpainting_mode: mask_strengths = [state.mask_strengths[v] for v in has_masks] mask_stds = [state.mask_stds[v] for v in has_masks] # else: # masks = torch.cat([torch.ones_like(foreground_mask), masks], dim=0) # mask_strengths = [1] + [state.mask_strengths[v] for v in has_masks] # mask_stds = [0] + [state.mask_stds[v] for v in has_masks] model.update_masks( masks=masks, mask_strengths=mask_strengths, mask_stds=mask_stds, ) # for i in range(len(has_masks)): # model.update( # idx=i, # mask=masks[i:i + 1], # mask_strength=mask_strengths[i], # mask_std=mask_stds[i], # ) # data = dict( # masks=masks, # mask_strengths=mask_strengths, # mask_stds=mask_stds, # ) # conn.send(data) # conn.close() ### Main application css = f""" #run-button {{ font-size: 18pt; background-image: linear-gradient(to right, #4338ca 0%, #26a0da 51%, #4338ca 100%); margin: 0; padding: 15px 45px; text-align: center; // text-transform: uppercase; transition: 0.5s; background-size: 200% auto; color: white; box-shadow: 0 0 20px #eee; border-radius: 10px; // display: block; background-position: right center; }} #run-button:hover {{ background-position: left center; color: #fff; text-decoration: none; }} #run-anim {{ padding: 40px 45px; }} #semantic-palette {{ border-style: solid; border-width: 0.2em; border-color: #eee; }} #semantic-palette:hover {{ box-shadow: 0 0 20px #eee; }} #output-screen {{ width: 100%; aspect-ratio: {opt.width} / {opt.height}; }} #share-btn {{ color: #ffffff;font-weight: 600; background-color: #000000; font-family: 'IBM Plex Sans', sans-serif; border-radius: 9999px !important; }} #share-btn:hover {{ color: #ffffff;font-weight: 600; background-color: #000000; font-family: 'IBM Plex Sans', sans-serif; border-radius: 9999px !important; box-shadow: 0 0 20px #eee; }} """ for i in range(opt.max_palettes + 1): css = css + f""" .secondary#semantic-palette-{i} {{ background-image: linear-gradient(to right, #374151 0%, #374151 71%, {opt.colors[i]} 100%); color: white; }} .primary#semantic-palette-{i} {{ background-image: linear-gradient(to right, #4338ca 0%, #4338ca 71%, {opt.colors[i]} 100%); color: white; }} """ css = css + f""" .mask-red {{ left: 0; width: 0; color: #BE002A; // -webkit-animation: text-red {opt.run_time + opt.prep_time:.1f}s ease infinite; // animation: text-red {opt.run_time + opt.prep_time:.1f}s ease infinite; z-index: 2; background: transparent; }} .mask-white {{ right: 0; }} /* Flames */ #red-flame {{ opacity: 0; -webkit-animation: idle-flames {opt.run_time + opt.prep_time:.1f}s ease infinite, red-flame 120ms ease infinite; animation: idle-flames {opt.run_time + opt.prep_time:.1f}s ease infinite, red-flame 120ms ease infinite; transform-origin: center bottom; }} #yellow-flame {{ opacity: 0; -webkit-animation: idle-flames {opt.run_time + opt.prep_time:.1f}s ease infinite, yellow-flame 120ms ease infinite; animation: idle-flames {opt.run_time + opt.prep_time:.1f}s ease infinite, yellow-flame 120ms ease infinite; transform-origin: center bottom; }} #white-flame {{ opacity: 0; -webkit-animation: idle-flames {opt.run_time + opt.prep_time:.1f}s ease infinite, red-flame 100ms ease infinite; animation: idle-flames {opt.run_time + opt.prep_time:.1f}s ease infinite, red-flame 100ms ease infinite; transform-origin: center bottom; }} """ with open(os.path.join(root, 'timer', 'style.css')) as f: added_css = ''.join(f.readlines()) css = css + added_css # js = '' # with open(os.path.join(root, 'timer', 'script.js')) as f: # added_js = ''.join(f.readlines()) # js = js + added_js head = f""" """ with gr.Blocks(theme=gr.themes.Soft(), css=css, head=head) as demo: iface = argparse.Namespace() def _define_state(): state = argparse.Namespace() # Cursor. state.is_running = False state.inpainting_mode = False state.current_palette = 0 # 0: Background; 1,2,3,...: Layers state.model_id = opt.model state.style_name = '(None)' state.quality_name = 'Standard v3.1' state.model = None # State variables (one-hot). state.active_palettes = 5 # Front-end initialized to the default values. state.prompt_names = [ '🌄 Background', '👧 Girl', '🐶 Dog', '💐 Garden', ] + ['🎨 New Palette' for _ in range(opt.max_palettes - 3)] state.prompts = [ '', 'A girl smiling at viewer', 'Doggy body part', 'Flower garden', ] + ['' for _ in range(opt.max_palettes - 3)] state.neg_prompts = [ opt.default_negative_prompt + (', humans, humans, humans' if i == 0 else '') for i in range(opt.max_palettes + 1) ] state.prompt_strengths = [opt.default_prompt_strength for _ in range(opt.max_palettes)] state.mask_strengths = [opt.default_mask_strength for _ in range(opt.max_palettes)] state.mask_stds = [opt.default_mask_std for _ in range(opt.max_palettes)] state.seed = opt.seed return state state = gr.State(value=_define_state) ### Demo user interface gr.HTML( """
1. (Optional) Uploading a background image. No background image means white background. It is not mandatory!
2. Modify semantic palette (prompt & settings) as you want by clicking Semantic Palette. Export, import, and share semantic palette for fast configuration.
3. Start drawing in the Semantic Drawpad tab. The brush color, not layer, is directly linked to the semantic brushes.
4. Click [Lemmy try!] button to grant 1 minute of streaming demo.
5. Continue drawing until your quota is over!