test-lumimaid-pixtral / joycaption.py
mrcuddle's picture
Update joycaption.py
640ca87 verified
import os
if os.environ.get("SPACES_ZERO_GPU") is not None:
import spaces
else:
class spaces:
@staticmethod
def GPU(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
import gradio as gr
from huggingface_hub import InferenceClient, HfApi
from torch import nn
from transformers import AutoModel, AutoProcessor, AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast, AutoModelForCausalLM, LlavaForConditionalGeneration
from pathlib import Path
import torch
import torch.amp.autocast_mode
from PIL import Image
import torchvision.transforms.functional as TVF
import gc
from peft import PeftModel
from typing import Union
LOAD_IN_NF4 = True
if os.environ.get("SPACES_ZERO_GPU") is not None:
import subprocess
LOAD_IN_NF4 = False # If true, Custom VLM LoRA doesn't work initially. The rest are fine.
subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True)
BASE_DIR = Path(__file__).resolve().parent # Define the base directory
device = "cuda" if torch.cuda.is_available() else "cpu"
HF_TOKEN = os.environ.get("HF_TOKEN", None)
use_inference_client = False
PIXTRAL_PATHS = ["mrcuddle/lumimaid-v0.2-8b-pixtral", "mrcuddle/lumimaid-v0.2-12b-pixtral"]
llm_models = {
"mrcuddle/Lumimaid-v0.2-Llama-3.2-11B-Vision": None,
"mrcuddle/Lumimaid-v0.2-8B": None,
"mrcuddle/lumimaid-8b-StoryGen": None,
"mrcuddle/Lumimaid-NSFW-Reddit_Merge": None,
}
CLIP_PATH = "google/siglip-so400m-patch14-384"
MODEL_PATH = list(llm_models.keys())[0]
CHECKPOINT_PATH = BASE_DIR / Path("cgrkzexw-599808")
LORA_PATH = CHECKPOINT_PATH / "text_model"
TITLE = "<h1><center>JoyCaption Alpha Two (2024-09-26a)</center></h1>"
CAPTION_TYPE_MAP = {
"Descriptive": [
"Write a descriptive caption for this image in a formal tone.",
"Write a descriptive caption for this image in a formal tone within {word_count} words.",
"Write a {length} descriptive caption for this image in a formal tone.",
],
"Descriptive (Informal)": [
"Write a descriptive caption for this image in a casual tone.",
"Write a descriptive caption for this image in a casual tone within {word_count} words.",
"Write a {length} descriptive caption for this image in a casual tone.",
],
"Training Prompt": [
"Write a stable diffusion prompt for this image.",
"Write a stable diffusion prompt for this image within {word_count} words.",
"Write a {length} stable diffusion prompt for this image.",
],
"MidJourney": [
"Write a MidJourney prompt for this image.",
"Write a MidJourney prompt for this image within {word_count} words.",
"Write a {length} MidJourney prompt for this image.",
],
"Booru tag list": [
"Write a list of Booru tags for this image.",
"Write a list of Booru tags for this image within {word_count} words.",
"Write a {length} list of Booru tags for this image.",
],
"Booru-like tag list": [
"Write a list of Booru-like tags for this image.",
"Write a list of Booru-like tags for this image within {word_count} words.",
"Write a {length} list of Booru-like tags for this image.",
],
"Art Critic": [
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc.",
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it within {word_count} words.",
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it {length}.",
],
"Product Listing": [
"Write a caption for this image as though it were a product listing.",
"Write a caption for this image as though it were a product listing. Keep it under {word_count} words.",
"Write a {length} caption for this image as though it were a product listing.",
],
"Social Media Post": [
"Write a caption for this image as if it were being used for a social media post.",
"Write a caption for this image as if it were being used for a social media post. Limit the caption to {word_count} words.",
"Write a {length} caption for this image as if it were being used for a social media post.",
],
}
class ImageAdapter(nn.Module):
def __init__(self, input_features: int, output_features: int, ln1: bool, pos_emb: bool, num_image_tokens: int, deep_extract: bool):
super().__init__()
self.deep_extract = deep_extract
if self.deep_extract:
input_features = input_features * 5
self.linear1 = nn.Linear(input_features, output_features)
self.activation = nn.GELU()
self.linear2 = nn.Linear(output_features, output_features)
self.ln1 = nn.Identity() if not ln1 else nn.LayerNorm(input_features)
self.pos_emb = None if not pos_emb else nn.Parameter(torch.zeros(num_image_tokens, input_features))
# Other tokens (<|image_start|>, <|image_end|>, <|eot_id|>)
self.other_tokens = nn.Embedding(3, output_features)
self.other_tokens.weight.data.normal_(mean=0.0, std=0.02) # Matches HF's implementation of llama3
def forward(self, vision_outputs: torch.Tensor):
if self.deep_extract:
x = torch.concat((
vision_outputs[-2],
vision_outputs[3],
vision_outputs[7],
vision_outputs[13],
vision_outputs[20],
), dim=-1)
assert len(x.shape) == 3, f"Expected 3, got {len(x.shape)}" # batch, tokens, features
assert x.shape[-1] == vision_outputs[-2].shape[-1] * 5, f"Expected {vision_outputs[-2].shape[-1] * 5}, got {x.shape[-1]}"
else:
x = vision_outputs[-2]
x = self.ln1(x)
if self.pos_emb is not None:
assert x.shape[-2:] == self.pos_emb.shape, f"Expected {self.pos_emb.shape}, got {x.shape[-2:]}"
x = x + self.pos_emb
x = self.linear1(x)
x = self.activation(x)
x = self.linear2(x)
# <|image_start|>, IMAGE, <|image_end|>
other_tokens = self.other_tokens(torch.tensor([0, 1], device=self.other_tokens.weight.device).expand(x.shape[0], -1))
assert other_tokens.shape == (x.shape[0], 2, x.shape[2]), f"Expected {(x.shape[0], 2, x.shape[2])}, got {other_tokens.shape}"
x = torch.cat((other_tokens[:, 0:1], x, other_tokens[:, 1:2]), dim=1)
return x
def get_eot_embedding(self):
return self.other_tokens(torch.tensor([2], device=self.other_tokens.weight.device)).squeeze(0)
# https://huggingface.co/docs/transformers/v4.44.2/gguf
# https://github.com/city96/ComfyUI-GGUF/issues/7
# https://github.com/THUDM/ChatGLM-6B/issues/18
# https://github.com/meta-llama/llama/issues/394
# https://huggingface.co/meta-llama/Meta-Llama-3-8B-Instruct/discussions/109
# https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu
# https://huggingface.co/google/flan-ul2/discussions/8
# https://huggingface.co/blog/4bit-transformers-bitsandbytes
# https://huggingface.co/docs/transformers/main/en/peft
# https://huggingface.co/docs/transformers/main/en/peft#enable-and-disable-adapters
# https://huggingface.co/docs/transformers/main/quantization/bitsandbytes?bnb=4-bit
# https://huggingface.co/lllyasviel/flux1-dev-bnb-nf4
# https://github.com/huggingface/transformers/issues/28515
# https://gist.github.com/ChrisHayduk/1a53463331f52dca205e55982baf9930
tokenizer = None
text_model_client = None
text_model = None
image_adapter = None
pixtral_model = None
pixtral_processor = None
def load_text_model(model_name: str=MODEL_PATH, gguf_file: Union[str, None]=None, is_nf4: bool=True, is_lora: bool=True):
global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client
try:
tokenizer = None
text_model_client = None
text_model = None
image_adapter = None
pixtral_model = None
pixtral_processor = None
torch.cuda.empty_cache()
gc.collect()
lora_device = "auto"
from transformers import BitsAndBytesConfig
nf4_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.bfloat16)
if model_name in PIXTRAL_PATHS: # Pixtral
print(f"Loading LLM: {model_name}")
if is_nf4:
pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval()
else:
pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, device_map=device, torch_dtype=torch.bfloat16).eval()
pixtral_processor = AutoProcessor.from_pretrained(model_name)
print(f"pixtral_model: {type(pixtral_model)}") #
print(f"pixtral_processor: {type(pixtral_processor)}") #
return
print("Loading tokenizer")
tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_PATH / "text_model", use_fast=True)
assert isinstance(tokenizer, PreTrainedTokenizer) or isinstance(tokenizer, PreTrainedTokenizerFast), f"Tokenizer is of type {type(tokenizer)}"
print(f"Loading LLM: {model_name}")
if gguf_file:
if device == "cpu":
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval()
elif is_nf4:
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval()
else:
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=lora_device, torch_dtype=torch.bfloat16).eval()
else:
if device == "cpu":
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval()
elif is_nf4:
text_model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval()
else:
text_model = AutoModelForCausalLM.from_pretrained(model_name, device_map=lora_device, torch_dtype=torch.bfloat16).eval()
if is_lora and LORA_PATH.exists() and not is_nf4:
print("Loading VLM's custom text model")
if is_nf4: # omitted
text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device, quantization_config=nf4_config)
else:
text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device)
text_model = text_model.merge_and_unload(safe_merge=True) # to avoid PEFT bug https://github.com/huggingface/transformers/issues/28515
else: print("VLM's custom text model is not loaded")
print("Loading image adapter")
image_adapter = ImageAdapter(clip_model.config.hidden_size, text_model.config.hidden_size, False, False, 38, False).eval().to("cpu")
image_adapter.load_state_dict(torch.load(CHECKPOINT_PATH / "image_adapter.pt", map_location="cpu", weights_only=False))
image_adapter.eval().to(device)
except Exception as e:
print(f"LLM load error: {e}")
raise Exception(f"LLM load error: {e}") from e
finally:
torch.cuda.empty_cache()
gc.collect()
load_text_model.zerogpu = True
# Load CLIP
print("Loading CLIP")
clip_processor = AutoProcessor.from_pretrained(CLIP_PATH)
clip_model = AutoModel.from_pretrained(CLIP_PATH).vision_model
assert (CHECKPOINT_PATH / "clip_model.pt").exists()
if (CHECKPOINT_PATH / "clip_model.pt").exists():
print("Loading VLM's custom vision model")
checkpoint = torch.load(CHECKPOINT_PATH / "clip_model.pt", map_location='cpu', weights_only=False)
checkpoint = {k.replace("_orig_mod.module.", ""): v for k, v in checkpoint.items()}
clip_model.load_state_dict(checkpoint)
del checkpoint
clip_model.eval().requires_grad_(False).to(device)
# Tokenizer
# LLM
# Image Adapter
#load_text_model(PIXTRAL_PATHS[0])
#print(f"pixtral_model: {type(pixtral_model)}") #
#print(f"pixtral_processor: {type(pixtral_processor)}") #
load_text_model(MODEL_PATH, None, LOAD_IN_NF4, True)
#print(f"pixtral_model: {type(pixtral_model)}") #
#print(f"pixtral_processor: {type(pixtral_processor)}") #
@spaces.GPU
@torch.inference_mode()
def stream_chat_mod(input_image: Image.Image, caption_type: str, caption_length: Union[str, int], extra_options: list[str], name_input: str, custom_prompt: str,
max_new_tokens: int=300, top_p: float=0.9, temperature: float=0.6, model_name: str=MODEL_PATH, progress=gr.Progress(track_tqdm=True)) -> tuple[str, str]:
global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client
torch.cuda.empty_cache()
gc.collect()
# 'any' means no length specified
length = None if caption_length == "any" else caption_length
if isinstance(length, str):
try:
length = int(length)
except ValueError:
pass
# Build prompt
if length is None:
map_idx = 0
elif isinstance(length, int):
map_idx = 1
elif isinstance(length, str):
map_idx = 2
else:
raise ValueError(f"Invalid caption length: {length}")
prompt_str = CAPTION_TYPE_MAP[caption_type][map_idx]
# Add extra options
if len(extra_options) > 0:
prompt_str += " " + " ".join(extra_options)
# Add name, length, word_count
prompt_str = prompt_str.format(name=name_input, length=caption_length, word_count=caption_length)
if custom_prompt.strip() != "":
prompt_str = custom_prompt.strip()
# For debugging
print(f"Prompt: {prompt_str}")
# Pixtral
if model_name in PIXTRAL_PATHS:
print(f"pixtral_model: {type(pixtral_model)}") #
print(f"pixtral_processor: {type(pixtral_processor)}") #
input_images = [input_image.convert("RGB")]
input_prompt = "[INST]Caption this image:\n[IMG][/INST]"
inputs = pixtral_processor(images=input_images, text=input_prompt, return_tensors="pt").to(device)
generate_ids = pixtral_model.generate(**inputs, max_new_tokens=max_new_tokens)
output = pixtral_processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
return input_prompt, output.strip()
# Preprocess image
# NOTE: I found the default processor for so400M to have worse results than just using PIL directly
#image = clip_processor(images=input_image, return_tensors='pt').pixel_values
image = input_image.resize((384, 384), Image.LANCZOS)
pixel_values = TVF.pil_to_tensor(image).unsqueeze(0) / 255.0
pixel_values = TVF.normalize(pixel_values, [0.5], [0.5])
pixel_values = pixel_values.to(device)
# Embed image
# This results in Batch x Image Tokens x Features
with torch.amp.autocast_mode.autocast(device, enabled=True):
vision_outputs = clip_model(pixel_values=pixel_values, output_hidden_states=True)
image_features = vision_outputs.hidden_states
embedded_images = image_adapter(image_features)
embedded_images = embedded_images.to(device)
# Build the conversation
convo = [
{
"role": "system",
"content": "You are a helpful image captioner.",
},
{
"role": "user",
"content": prompt_str,
},
]
# Format the conversation
convo_string = tokenizer.apply_chat_template(convo, tokenize = False, add_generation_prompt = True)
assert isinstance(convo_string, str)
# Tokenize the conversation
# prompt_str is tokenized separately so we can do the calculations below
convo_tokens = tokenizer.encode(convo_string, return_tensors="pt", add_special_tokens=False, truncation=False)
prompt_tokens = tokenizer.encode(prompt_str, return_tensors="pt", add_special_tokens=False, truncation=False)
assert isinstance(convo_tokens, torch.Tensor) and isinstance(prompt_tokens, torch.Tensor)
convo_tokens = convo_tokens.squeeze(0) # Squeeze just to make the following easier
prompt_tokens = prompt_tokens.squeeze(0)
# Calculate where to inject the image
eot_id_indices = (convo_tokens == tokenizer.convert_tokens_to_ids("<|eot_id|>")).nonzero(as_tuple=True)[0].tolist()
assert len(eot_id_indices) == 2, f"Expected 2 <|eot_id|> tokens, got {len(eot_id_indices)}"
preamble_len = eot_id_indices[1] - prompt_tokens.shape[0] # Number of tokens before the prompt
# Embed the tokens
convo_embeds = text_model.model.embed_tokens(convo_tokens.unsqueeze(0).to(device))
# Construct the input
input_embeds = torch.cat([
convo_embeds[:, :preamble_len], # Part before the prompt
embedded_images.to(dtype=convo_embeds.dtype), # Image
convo_embeds[:, preamble_len:], # The prompt and anything after it
], dim=1).to(device)
input_ids = torch.cat([
convo_tokens[:preamble_len].unsqueeze(0),
torch.zeros((1, embedded_images.shape[1]), dtype=torch.long), # Dummy tokens for the image (TODO: Should probably use a special token here so as not to confuse any generation algorithms that might be inspecting the input)
convo_tokens[preamble_len:].unsqueeze(0),
], dim=1).to(device)
attention_mask = torch.ones_like(input_ids)
# Debugging
#print(f"Input to model: {repr(tokenizer.decode(input_ids[0]))}")
text_model.to(device)
generate_ids = text_model.generate(input_ids, inputs_embeds=input_embeds, attention_mask=attention_mask, max_new_tokens=max_new_tokens,
do_sample=True, suppress_tokens=None, top_p=top_p, temperature=temperature)
# Trim off the prompt
generate_ids = generate_ids[:, input_ids.shape[1]:]
if generate_ids[0][-1] == tokenizer.eos_token_id or generate_ids[0][-1] == tokenizer.convert_tokens_to_ids("<|eot_id|>"):
generate_ids = generate_ids[:, :-1]
caption = tokenizer.batch_decode(generate_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False)[0]
return prompt_str, caption.strip()
# https://huggingface.co/docs/transformers/v4.44.2/main_classes/text_generation#transformers.FlaxGenerationMixin.generate
# https://github.com/huggingface/transformers/issues/6535
# https://zenn.dev/hijikix/articles/8c445f4373fdcc ja
# https://github.com/ggerganov/llama.cpp/discussions/7712
# https://huggingface.co/docs/huggingface_hub/guides/inference#openai-compatibility
# https://huggingface.co/docs/huggingface_hub/v0.24.6/en/package_reference/inference_client#huggingface_hub.InferenceClient.text_generation
def is_repo_name(s):
import re
return re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', s)
def is_repo_exists(repo_id):
try:
api = HfApi(token=HF_TOKEN)
if api.repo_exists(repo_id=repo_id): return True
else: return False
except Exception as e:
print(f"Error: Failed to connect {repo_id}. {e}")
return True # for safe
def is_valid_repo(repo_id):
import re
try:
if not re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', repo_id): return False
api = HfApi()
if api.repo_exists(repo_id=repo_id): return True
else: return False
except Exception as e:
print(f"Failed to connect {repo_id}. {e}")
return False
def get_text_model():
return list(llm_models.keys())
def is_gguf_repo(repo_id: str):
try:
api = HfApi(token=HF_TOKEN)
if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return False
files = api.list_repo_files(repo_id=repo_id)
except Exception as e:
print(f"Error: Failed to get {repo_id}'s info. {e}")
gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}")
return False
files = [f for f in files if f.endswith(".gguf")]
if len(files) == 0: return False
else: return True
def get_repo_gguf(repo_id: str):
try:
api = HfApi(token=HF_TOKEN)
if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return gr.update(value="", choices=[])
files = api.list_repo_files(repo_id=repo_id)
except Exception as e:
print(f"Error: Failed to get {repo_id}'s info. {e}")
gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}")
return gr.update(value="", choices=[])
files = [f for f in files if f.endswith(".gguf")]
if len(files) == 0: return gr.update(value="", choices=[])
else: return gr.update(value=files[0], choices=files)
@spaces.GPU
def change_text_model(model_name: str=MODEL_PATH, use_client: bool=False, gguf_file: Union[str, None]=None,
is_nf4: bool=True, is_lora: bool=True, progress=gr.Progress(track_tqdm=True)):
global use_inference_client, llm_models
use_inference_client = use_client
try:
if not is_repo_name(model_name) or not is_repo_exists(model_name):
raise gr.Error(f"Repo doesn't exist: {model_name}")
if not gguf_file and is_gguf_repo(model_name):
gr.Info(f"Please select a gguf file.")
return gr.update(visible=True)
if use_inference_client:
pass #
else:
load_text_model(model_name, gguf_file, is_nf4, is_lora)
if model_name not in llm_models: llm_models[model_name] = gguf_file if gguf_file else None
return gr.update(choices=get_text_model())
except Exception as e:
raise gr.Error(f"Model load error: {model_name}, {e}")