project_index = 0 # Initialize project_index with a default value import gradio as gr,os,shutil,numpy as np,hashlib,subprocess,pandas as pd from PIL import Image from datetime import datetime import base64 import requests import io import json import logging import re """ Gradio App Interface Specification: Inputs: - prompt: Text Input. Description: 'Enter a prompt for image generation.' - steps: Slider. Range: [1, 32]. value: 16. Description: 'Number of steps for image generation.' - model: Dropdown. Options: ['TurboAnime.saftensors', 'OtherModel']. value: 'TurboAnime.saftensors'. Description: 'Select the model for image generation.' - styles: CheckboxGroup. Options: ['RayORender', 'OtherStyle']. value: ['RayORender']. Description: 'Select styles for image generation.' Outputs: - image: Image. Description: 'Generated image based on the prompt.' - log: Text. Description: 'Log of the image generation process.' """ import gradio as gr import subprocess import json import base64 from PIL import Image import io import time # πŸ•’ Start time for the entire script start_time = time.time() def analyze_all_images(custom_prompt): global file_array descriptions = [] for file_info in file_array: file_path = find_file_by_hash(file_info["hash"]) if file_path: with Image.open(file_path) as img: description = get_image_description(img, custom_prompt) save_text(file_info["hash"], description, "", "", True) # Save the description to the text file descriptions.append(description) return descriptions def generate_status_message(index, total, file_name): return f"Processing {index + 1}/{total}: {file_name}" def analyze_thumbs_up_images(custom_prompt): thumbs_up_images = load_thumbs_up_gallery() # Assuming this function returns full paths total_images = len(thumbs_up_images) descriptions = [] status_messages = [] for index, image_path in enumerate(thumbs_up_images): with Image.open(image_path) as img: description = get_image_description(img, custom_prompt) descriptions.append((os.path.basename(image_path), description)) status_messages.append(generate_status_message(index, total_images, os.path.basename(image_path))) save_text(file_info["hash"], description, "", "", True) # Save the description to the text file return descriptions, status_messages # πŸ“’ Log Status def log_status(progress, message): elapsed_time = time.time() - start_time log_message = {"status": message, "progress": f"{progress}%", "time_elapsed": f"{elapsed_time:.2f} seconds"} return json.dumps(log_message) def process_files(file_info): try: if file_info: return [_process_file(file_path) for file_path in file_info] if isinstance(file_info, list) else [_process_file(file_info)] return [] except Exception as e: print(f"Error processing files: {e}") raise e def clear_uploads_folder(): os.makedirs(recycle_bin_folder, exist_ok=True) for file_name in os.listdir(_get_project_folder()): dest_file_path = os.path.join(recycle_bin_folder, file_name) if os.path.exists(dest_file_path): base, extension = os.path.splitext(file_name) i = 1 new_file_name = f"{base}_{i}{extension}" new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name) while os.path.exists(new_dest_file_path): i += 1 new_file_name = f"{base}_{i}{extension}" new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name) dest_file_path = new_dest_file_path shutil.move(os.path.join(_get_project_folder(), file_name), dest_file_path) return "Uploads folder cleared!" def undo_last_deletion(): file_list = os.listdir(recycle_bin_folder) if file_list: last_deleted_file = file_list[-1] shutil.move(os.path.join(recycle_bin_folder, last_deleted_file), _get_project_folder()) return f"Restored: {last_deleted_file}" return "No files to restore" def next_session(): global project_index project_index += 1 _init_project_directory() return f"Switched to the next session! Current session index: {project_index}" def previous_session(): global project_index if project_index > 0: project_index -= 1 return f"Switched to the previous session! Current session index: {project_index}" def get_next_image(): global current_image images = get_images() if images: current_image = images[0] if current_image is None else images[(images.index(current_image) + 1) % len(images)] else: current_image = default_image return current_image def get_previous_image(): global current_image images = get_images() if images: current_image = images[-1] if current_image is None else images[(images.index(current_image) - 1) % len(images)] else: current_image = default_image return current_image def process_all_zips(): for file_name in os.listdir(_get_project_folder()): file_path = os.path.join(_get_project_folder(), file_name) if zipfile.is_zipfile(file_path): _process_file(file_path) return "All zip files processed!" # 🌐 API Call def make_api_call(url, payload): try: response = subprocess.run(["curl", "-X", "POST", url, "-H", "Content-Type: application/json", "--data", json.dumps(payload)], capture_output=True, text=True) if response.returncode == 0: return json.loads(response.stdout), None else: return None, log_status(progress, f"API call failed with return code: {response.returncode}") except Exception as e: return None, log_status(progress, f"API call failed with exception: {str(e)}") # πŸ–ΌοΈ Save Image def save_image(image_data): image = Image.open(io.BytesIO(base64.b64decode(image_data))) return image # πŸš€ Main Execution Function def generate_image(prompt, steps, model, styles): base_url = "http://73.255.78.150:7909" base_payload = { "prompt": prompt, "steps": steps, "model": model, "styles": styles, "negative_prompt": "album, duplicate, crowded, multiple, stuff, messy, photo, collage, doll, caricature, render. mannequin.", "seed": -1, "height": 768, "width": 1280, "sampler_index": "DPM++ 2M SDE Karras", "restore_faces": False, "tiling": False, "n_iter": 1, "batch_size": 1, "cfg_scale": 2.0, "subseed": -1, "subseed_strength": 0.0, "seed_resize_from_h": -1, "seed_resize_from_w": -1, "seed_enable_extras": False, "enable_hr": False, "denoising_strength": 0.0, "hr_scale": 2.0, "hr_upscaler": "Latent", "hr_second_pass_steps": 0, "hr_resize_x": 0, "hr_resize_y": 0, "hr_sampler_index": "", "hr_prompt": "", "hr_negative_prompt": "", "override_settings_texts": "" } progress = 0 log = log_status(progress, "Starting image generation...") response, error_log = make_api_call(f"{base_url}/sdapi/v1/txt2img", base_payload) if error_log: return None, error_log if response and 'images' in response: image_data = response['images'][0] image = save_image(image_data) log = log_status(100, "Image generated successfully.") return image, log else: log = log_status(progress, "Failed to generate image.") return None, log max_image_size = 768 image_folder = "/Users/gev7418/Library/CloudStorage/OneDrive-Personal/Gradio Script Building Blocks/HatchDuo-Gradio/Images" thumbs_up_folder = os.path.join(image_folder, "Thumbs_Up") thumbs_down_folder = os.path.join(image_folder, "Thumbs_Down") allowed_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] folders_cycle = [image_folder, thumbs_up_folder, thumbs_down_folder] # Set up logging to print to the terminal logging.basicConfig(level=logging.INFO) # OpenAI API Key api_key = "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" # Replace with your OpenAI API Key import subprocess def convert_and_rename_files_in_directory(directory, original_extension, new_extension, new_base_name): """ Converts all files in the specified directory from the original extension to PNG, and renames them to a new base name followed by a number. """ files = [f for f in os.listdir(directory) if f.endswith(original_extension)] for index, file in enumerate(sorted(files)): new_name = f"{new_base_name}_{index}{new_extension}" original_file_path = os.path.join(directory, file) new_file_path = os.path.join(directory, new_name) # Convert to PNG using a subprocess call to a command line tool like ImageMagick subprocess.run(['convert', original_file_path, new_file_path]) os.rename(new_file_path, os.path.join(directory, new_name)) logging.info(f"All {original_extension} files in {directory} have been converted to PNG and renamed to {new_base_name} format.") def encode_image(image): """ This function converts the image to bytes and returns the base64 encoding of the image file """ logging.info("Encoding image to base64") image_bytes = io.BytesIO() if image.mode == 'RGBA': # Convert RGBA to RGB image = image.convert('RGB') image.save(image_bytes, format='JPEG') image_bytes = image_bytes.getvalue() base64_image = base64.b64encode(image_bytes).decode('utf-8') return base64_image def get_image_description(image, custom_prompt): """ This function sends the image to the OpenAI API and returns the response. It now checks if there's custom prompt content before sending it to the API. """ logging.info("Getting image description from OpenAI API") base64_image = encode_image(image) headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} # Check if custom_prompt is not empty, otherwise use a default prompt if not custom_prompt.strip(): custom_prompt = "Describe this image" payload = { "model": "gpt-4-vision-preview", "messages": [ { "role": "user", "content": [ {"type": "text", "text": custom_prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} ] } ], "max_tokens": 300 } response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) response_payload = json.loads(response.text) content = response_payload['choices'][0]['message']['content'] logging.info("Received response from OpenAI API") return content def create_directories(): print("Creating directories...") os.makedirs(thumbs_up_folder, exist_ok=True) os.makedirs(thumbs_down_folder, exist_ok=True) create_directories() def get_files(folder): print(f"Getting files from {folder}...") return [os.path.join(dp, f) for dp, dn, filenames in os.walk(folder) for f in filenames if os.path.splitext(f)[1].lower() in allowed_extensions] def hash_image_pixels(file_path): """ Generates a hash for an image based on its pixel content. """ if os.path.splitext(file_path)[1].lower() in allowed_extensions: with Image.open(file_path) as img: # Convert the image to RGBA (to standardize if images are in different modes) img_rgba = img.convert('RGBA') # Calculate the new height and width to maintain aspect ratio aspect_ratio = img_rgba.width / img_rgba.height new_height = min(max_image_size, img_rgba.height) new_width = int(aspect_ratio * new_height) if img_rgba.height > max_image_size: new_height = max_image_size new_width = int(new_height * aspect_ratio) # Use Image.Resampling.LANCZOS for better quality resizing img_resized = img_rgba.resize((new_width, new_height), Image.Resampling.LANCZOS) # Get the bytes of the resized image data img_bytes = io.BytesIO() img_resized.save(img_bytes, format='PNG') # PNG format to ensure consistency across platforms img_bytes = img_bytes.getvalue() # Generate a hash of the resized image bytes hash_obj = hashlib.sha256(img_bytes) return hash_obj.hexdigest() else: return None # Global cache for hashes to file paths hash_to_path_cache = {} def update_hash_to_path_cache(): global hash_to_path_cache hash_to_path_cache.clear() for folder in folders_cycle: for file_path in get_files(folder): file_hash = hash_image_pixels(file_path) hash_to_path_cache[file_hash] = file_path def find_file_by_hash(file_hash): # Use the cache to find the file path file_path = hash_to_path_cache.get(file_hash, None) if file_path and os.path.exists(file_path): return file_path # If the file was not found or doesn't exist at the cached location, # search in the thumbs up and thumbs down folders for folder in [thumbs_up_folder, thumbs_down_folder]: for dp, dn, filenames in os.walk(folder): for f in filenames: potential_path = os.path.join(dp, f) if os.path.splitext(f)[1].lower() in allowed_extensions: if hash_image_pixels(potential_path) == file_hash: # Update the cache with the new location hash_to_path_cache[file_hash] = potential_path return potential_path # If the file is not found in any of the locations, return None return None def build_file_array(): print("Building file array...") files = [] for folder in folders_cycle: for file_path in get_files(folder): file_hash = hash_image_pixels(file_path) status = "πŸ‘ Rated Up" if folder == thumbs_up_folder else "πŸ‘Ž Rated Down" if folder == thumbs_down_folder else "⏳ Pending" files.append({"path": file_path, "hash": file_hash, "status": status, "date_modified": os.path.getmtime(file_path)}) # Create text files for any images missing their text file counterpart text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') if not os.path.exists(text_file_path): with open(text_file_path, 'w') as text_file: text_file.write("") # Create an empty text file files = sorted(files, key=lambda x: x["date_modified"], reverse=True) return files def refresh_file_array_and_hashes(): global file_array, hash_list file_array = build_file_array() hash_list = [file['hash'] for file in file_array] print("File array and hash list updated.") update_hash_to_path_cache() refresh_file_array_and_hashes() current_index = 0 def save_text(file_hash, text, prepend_text="", append_text="", save_and_overwrite_changes=True): if not save_and_overwrite_changes: print("Skipping saving due to user preference.") return file_path = find_file_by_hash(file_hash) if file_path: text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') final_text = "" if not save_and_overwrite_changes and os.path.exists(text_file_path): with open(text_file_path, 'r') as existing_file: existing_content = existing_file.read() final_text = (prepend_text + ", " if prepend_text else "") + existing_content + (", " + append_text if append_text else "") else: final_text = (prepend_text + ", " if prepend_text else "") + text + (", " + append_text if append_text else "") print(f"Saving text for file hash {file_hash} to {text_file_path}...") with open(text_file_path, 'w') as text_file: text_file.write(final_text) def get_text(file_hash): file_path = find_file_by_hash(file_hash) if file_path: text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') if os.path.exists(text_file_path): print(f"Getting text for file hash {file_hash} from {text_file_path}...") with open(text_file_path) as text_file: return text_file.read() return "" def update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward=True, save_and_overwrite_changes=False): global current_index, file_array, hash_list if current_text or not save_and_overwrite_changes: # Save only if there's something to save or if saving changes is not required file_hash = hash_list[current_index] save_text(file_hash, current_text, prepend_text, append_text, save_and_overwrite_changes) if navigate_forward: current_index = (current_index + 1) % len(hash_list) # Cycle to the first item if at the end else: current_index = (current_index - 1) % len(hash_list) # Cycle to the last item if at the beginning def get_file(navigate_forward, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): global current_index, file_array, hash_list print(f"Getting file, navigate_forward: {navigate_forward}...") update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward, save_and_overwrite_changes) if current_index < len(hash_list): current_hash = hash_list[current_index] file_info = next((item for item in file_array if item["hash"] == current_hash), None) if file_info: file_path = find_file_by_hash(current_hash) if file_path: with Image.open(file_path) as img: # Calculate the new height and width to maintain aspect ratio, with a max height of 768 for display in gradio aspect_ratio = img.width / img.height new_height = min(768, img.height) # Set max height to 768 for gradio display new_width = int(aspect_ratio * new_height) # Use Image.Resampling.LANCZOS for better quality resizing img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS) img_resized_bytes = io.BytesIO() img_resized.save(img_resized_bytes, format='PNG') img_resized_bytes = img_resized_bytes.getvalue() text = get_text(current_hash) # Load text file contents into textbox if not pause_api_call: text = get_image_description(img, custom_prompt) # Get new description from OpenAI save_text(current_hash, text, prepend_text, append_text, save_and_overwrite_changes) # Optionally save the new description # Convert the resized image to a numpy array for display img_resized_np = np.array(Image.open(io.BytesIO(img_resized_bytes))) return img_resized_np, current_hash, file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), os.path.relpath(find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), start=image_folder) return None, "File not found", "⏳ Pending", "", "", "", "", "", "", "" def move_file(direction, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): global current_index, file_array, hash_list print(f"Moving file in direction {direction}...") if current_index < len(hash_list): current_hash = hash_list[current_index] file_info = next((item for item in file_array if item["hash"] == current_hash), None) if file_info: source_file = find_file_by_hash(current_hash) if source_file: destination = thumbs_up_folder if direction == "up" else thumbs_down_folder if direction == "down" else None new_status = "πŸ‘ Rated Up" if direction == "up" else "πŸ‘Ž Rated Down" if direction == "down" else None if destination: shutil.move(source_file, os.path.join(destination, os.path.basename(source_file))) file_info["status"] = new_status # Get new description and save it with Image.open(source_file) as img: description = get_image_description(img, custom_prompt) save_text(current_hash, description, prepend_text, append_text, save_and_overwrite_changes) return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) return None, "Invalid direction or file not found", "⏳ Pending", "", "", "", "", "", "", "" def reset_files(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): global file_array, current_index, hash_list print("Resetting files and clearing text...") for file_info in file_array: source_file = find_file_by_hash(file_info["hash"]) if source_file and os.path.exists(source_file): source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt') if os.path.exists(source_text): shutil.move(source_text, os.path.join(image_folder, os.path.basename(source_text))) shutil.move(source_file, os.path.join(image_folder, os.path.basename(source_file))) with open(os.path.join(image_folder, os.path.basename(source_text)), 'w') as text_file: text_file.write("") file_info["status"] = "⏳ Pending" file_array = build_file_array() hash_list = [file['hash'] for file in file_array] update_hash_to_path_cache() return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) def delete_file(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): global current_index, file_array, hash_list print("Deleting file...") if current_index < len(hash_list): current_hash = hash_list[current_index] source_file = find_file_by_hash(current_hash) if source_file: source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt') os.remove(source_file) os.remove(source_text) hash_list.remove(current_hash) # Remove the hash from the hash_list file_array = [file for file in file_array if file["hash"] != current_hash] # Rebuild file_array without the deleted file if current_index >= len(hash_list): current_index = len(hash_list) - 1 update_hash_to_path_cache() return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) def load_image_details(): print("Loading image details...") image_details = [] for folder in folders_cycle: for file_path in get_files(folder): file_hash = hash_image_pixels(file_path) status = "πŸ‘ Rated Up" if folder == thumbs_up_folder else "πŸ‘Ž Rated Down" if folder == thumbs_down_folder else "⏳ Pending" text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') text_content = "" if os.path.exists(text_file_path): with open(text_file_path, 'r') as text_file: text_content = text_file.read() image_details.append({ "Text Content": text_content, "Rating Status": status, "File Hash": file_hash, "Image Path": file_path, "Text Path": text_file_path if os.path.exists(text_file_path) else "N/A" }) return pd.DataFrame(image_details) def load_text_files_as_df(): print("Loading text files into dataframe...") text_files = [] for file in os.listdir(image_folder): if file.endswith('.txt'): file_path = os.path.join(image_folder, file) with open(file_path, 'r') as text_file: text_content = text_file.read() text_files.append({ "File Name": file.replace('.txt', ''), "Content": text_content, "Date Modified": datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S') }) return pd.DataFrame(text_files) # Function to merge text_files_df and image_details_df into a single dataframe def load_combined_details(): print("Loading text file details...") text_df = load_text_files_as_df() image_details_df = load_image_details() # Ensure the 'File Name' column exists in image_details_df by extracting it from 'Image Path' image_details_df['File Name'] = image_details_df['Image Path'].apply(lambda x: os.path.basename(x).replace(os.path.splitext(os.path.basename(x))[1], '')) combined_df = pd.merge(text_df, image_details_df, on="File Name", how="outer") # Reorder dataframe fields according to specified order: Date Modified, Rating Status, File Name, Text Content, File Hash, Text Path, Image Path combined_df = combined_df[['Date Modified', 'Rating Status', 'File Name', 'Text Content', 'File Hash', 'Text Path', 'Image Path']] return combined_df def load_gallery(): print("Loading gallery...") return sorted([os.path.join(image_folder, f) for f in os.listdir(image_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) def load_thumbs_up_gallery(): print("Loading Thumbs Up gallery...") return sorted([os.path.join(thumbs_up_folder, f) for f in os.listdir(thumbs_up_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) def load_thumbs_down_gallery(): print("Loading Thumbs Down gallery...") return sorted([os.path.join(thumbs_down_folder, f) for f in os.listdir(thumbs_down_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) def load_all_folders_gallery(): print("Loading all folders gallery...") all_images = [] for folder in folders_cycle: all_images.extend(sorted([os.path.join(folder, f) for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower())) return all_images def load_text_files(): print("Loading text files...") return sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower()) def update_text_content(df): global text_files_df print("Updating text content...") for row in df: if len(row) == 3: file_name, new_content, date_added = row file_path = os.path.join(image_folder, file_name) if os.path.exists(file_path): file_hash = hash_file(file_path) save_text(file_hash, new_content) else: print("Error: Unexpected number of values in row. Expected 3 values per row.") # Step 1: Define a function to update the dataframe def refresh_text_files(): print("Refreshing text files...") updated_text_files = sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower()) return updated_text_files def save_uploaded_image(image_file): """ Attempts to save the uploaded image to the designated image folder. Returns the path of the saved image or None if an error occurs. """ try: os.makedirs(image_folder, exist_ok=True) # Ensure the image folder exists image_path = os.path.join(image_folder, image_file.name) with open(image_path, "wb") as file: file.write(image_file.content) # Write the byte content of the file print(f"Image saved successfully: {image_path}") return image_path except Exception as e: print(f"Error saving image: {e}") return None def create_text_file_for_image(image_path): """ Generates an empty text file corresponding to an uploaded image, using the same base name. """ text_file_path = f"{os.path.splitext(image_path)[0]}.txt" open(text_file_path, 'w').close() # Efficiently create an empty text file return text_file_path def handle_image_upload(uploaded_files): status_messages = [] for uploaded_file in uploaded_files: file_extension = os.path.splitext(uploaded_file.name)[1].lower() if file_extension in allowed_extensions: # Ensure the image folder exists os.makedirs(image_folder, exist_ok=True) # Save the file with a unique name based on its hash unique_name = hashlib.sha256(uploaded_file.name.encode()).hexdigest() + file_extension save_path = os.path.join(image_folder, unique_name) with open(save_path, "wb") as file: file.write(uploaded_file.content) # Optionally, create a corresponding text file text_file_path = save_path + ".txt" with open(text_file_path, "w") as text_file: text_file.write(f"Image file: {unique_name} uploaded successfully.") status_messages.append(f"Uploaded and saved {uploaded_file.name} successfully.") else: status_messages.append(f"File {uploaded_file.name} has an unsupported extension ({file_extension}) and was not uploaded.") return "\n".join(status_messages) def refresh_image_description(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): global current_index, file_array if not pause_api_call: file_info = file_array[current_index] file_path = find_file_by_hash(file_info["hash"]) if file_path: with Image.open(file_path) as img: # Use the custom_prompt parameter when calling get_image_description text = get_image_description(img, custom_prompt) save_text(file_info["hash"], text, prepend_text, append_text, save_and_overwrite_changes) return np.array(img), file_info["hash"], file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder) # If API call is paused or file not found, return current state without changes return None, file_info["hash"], file_info["status"], current_text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder) # Backup functionality for successful app launches backup_folder = "Backup_Scripts" os.makedirs(backup_folder, exist_ok=True) def backup_script(): """ This function creates a backup of the current script in the designated backup folder. """ current_script_path = os.path.realpath(__file__) backup_script_path = os.path.join(backup_folder, f"backup_{os.path.basename(current_script_path)}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.py") shutil.copy2(current_script_path, backup_script_path) print(f"Backup of the script created at {backup_script_path}") def app_launch_success(): """ This function is called when the app successfully launches. It triggers the backup of the current script. """ print("App successfully launched. Creating a backup of the script...") backup_script() def refresh_all(): combined_details = load_combined_details() image_gallery_content = load_gallery() thumbs_up_gallery_content = load_thumbs_up_gallery() # Reinitialize the hash array and the index for new images update_hash_to_path_cache() refresh_file_array_and_hashes() current_index = 0 # Reset the index to start from the first image return combined_details, image_gallery_content, thumbs_up_gallery_content def combined_action(prompt, steps, model, styles, custom_prompt): # Trigger the OpenAI API for description and wait for the response description, status = trigger_openai_api(custom_prompt) if status == "success": # Once the response is received, generate the image based on the prompt and other parameters generated_image, log = generate_image(prompt, steps, model, styles) else: generated_image, log = None, "Failed to generate image due to API call failure." return generated_image, log, description # πŸ“‚ Setup Environment πŸ“‚ import gradio as gr import os from pathlib import Path import subprocess # Define the directory to store uploaded images image_folder = "./Images" os.makedirs(image_folder, exist_ok=True) # Create the directory if it doesn't exist # πŸ”„ Function Definitions πŸ”„ def save_and_show_images(uploaded_files): images_to_display = [] # Process each uploaded file for index, uploaded_file in enumerate(uploaded_files): # Define a base file name with index base_file_name = f"Img-{index}" # Check for existing files and increment index to avoid overwriting existing_files = [f for f in Path(image_folder).rglob('*') if f.is_file() and f.suffix in ['.jpg', '.png', '.txt']] while any(base_file_name in file.name for file in existing_files): index += 1 base_file_name = f"Img-{index}" # Determine the file extension (jpg or png) file_extension = "jpg" if uploaded_file[:3] == b'\xff\xd8\xff' else "png" final_file_name = f"{base_file_name}.{file_extension}" image_path = os.path.join(image_folder, final_file_name) # Write the bytes to a new file in the specified directory with open(image_path, "wb") as file: file.write(uploaded_file) # Create a corresponding text file for the image text_file_path = os.path.join(image_folder, f"{base_file_name}.txt") with open(text_file_path, "w") as text_file: text_file.write(f"Image file: {final_file_name}") # Add the path (or ideally, a URL to the file) to the list to display images_to_display.append(image_path) # Ensure all images have a corresponding text file for image_file_path in Path(image_folder).rglob('*'): if image_file_path.suffix in ['.jpg', '.png']: base_name = os.path.splitext(image_file_path.name)[0] text_file_path = os.path.join(image_folder, f"{base_name}.txt") if not os.path.exists(text_file_path): with open(text_file_path, "w") as text_file: text_file.write(f"Image file: {image_file_path.name}") return images_to_display with gr.Blocks() as app: with gr.Row(): upload_btn = gr.File(label="Upload Images", type="binary", file_count='multiple') gallery = gr.Gallery(label="Uploaded Images Gallery") upload_btn.change(fn=save_and_show_images, inputs=upload_btn, outputs=gallery) with gr.Row(): with gr.Column(): gr.Markdown("## Image Viewer") with gr.Column(): # Add an upload button for uploading files to the images folder image = gr.Image(height=768, label="Current Image", image_mode="RGBA", width="max", elem_id="current_image", type="numpy") with gr.Row(): thumbs_down_btn = gr.Button("πŸ‘Ž Rate Down") thumbs_up_btn = gr.Button("πŸ‘ Rate Up") with gr.Row(): prev_btn = gr.Button("β¬… Previous") next_btn = gr.Button("Next ➑") with gr.Row(): status_display = gr.Textbox(label="Rating Status", interactive=False) image_path_display = gr.Textbox(label="Image Path", interactive=False) with gr.Column(): # Existing setup for checkboxes with gr.Row(): pause_api_call_checkbox = gr.Checkbox(label="Pause OpenAI API Calls", value=True) save_and_overwrite_changes_checkbox = gr.Checkbox(label="Save and Overwrite Changes", value=True) # New textbox for custom OpenAI API instructions custom_instructions = gr.Textbox(label="Custom Instructions", value="Use the art of deduction and creativity to generate a persona profile and 3 inspiration words to describe the image without describing the subject. Wrap it up with a concise 1 sentence caption of the image with the subject in high detail. JSON Format needed: Futuristic Persona Profile: The subject exudes a sense of readiness and authority, dressed in attire that hints at a future dominated by advanced technology and interstellar travel. This character could be envisioned as a commander or a pioneer in a futuristic saga, marked by their composed nature and the streamlined design of their gear. Futuristic Caption: A resolute figure of tomorrow, standing firm with a serene resolve, garbed in a sleek, technologically superior suit that narrates tales of distant realms and adventures. Poised Persona Profile: The individual appears as a beacon of confidence and tactical acumen. Their outfit suggests a world of progressive technology and potential space conquests. This persona might be a strategist or a guardian in a speculative fiction setting, distinguished by their steady presence and the modernistic cut of their uniform. Poised Caption: A visionary sentinel of the cosmos, poised with a calm yet assertive demeanor, dressed in an advanced, form-enhancing suit that hints at the mysteries of the universe yet to unfold. Advanced Persona Profile: The figure stands as a symbol of assurance and innovation, clad in a costume that forecasts an era of sophisticated technology and cosmic exploration. This character could represent an expert or a defender in a futuristic tale, highlighted by their tranquil posture and the elegant configuration of their attire. Advanced Caption: An unwavering pioneer of the future, the individual poses with a composed assurance, enveloped in a cutting-edge suit that speaks of advanced civilizations and uncharted frontiers.", placeholder="Enter custom instructions...", lines=2, max_lines=5) with gr.Accordion("πŸ‘ Thumbs Up Gallery", open=False): thumbs_up_gallery = gr.Gallery(label="Thumbs Up Image Gallery", value=load_thumbs_up_gallery(), every=5, columns=3, rows=1, object_fit="contain", height="auto") with gr.Row(): text_display = prompt = gr.Textbox(label="Image Analysis", lines=3, max_lines=10, interactive=True, placeholder="API call paused. Manually enter description.") output_image = gr.Image(type="pil", label="Visual Analysis Analog") with gr.Row(): trigger_api_btn = gr.Button("Analyze Aesthetic!") combined_btn = gr.Button("Analyze & Generate") # New combined action button generate_btn = gr.Button("Generate") analyze_all_btn = gr.Button("Analyze All Thumbs Up") descriptions_display = gr.Dataframe() status_display = gr.Textbox(label="Status", lines=10, interactive=False) analyze_all_btn.click(analyze_thumbs_up_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) with gr.Row(): analyze_everything_btn = gr.Button("Analyze All Images") analyze_all_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) analyze_everything_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) with gr.Row(): gr.Image(type="pil") gr.Image(type="pil") gr.Image(type="pil") with gr.Accordion("Append / Prepend", open=False): with gr.Row(): prepend_text = gr.Textbox(label="Prepend", lines=2, max_lines=5, placeholder="Enter text to prepend...") append_text = gr.Textbox(label="Append", lines=2, max_lines=5, placeholder="Enter text to append...") def trigger_openai_api(custom_prompt): global current_index, file_array if current_index < len(file_array): file_info = file_array[current_index] file_path = find_file_by_hash(file_info["hash"]) if file_path: with Image.open(file_path) as img: # Use the custom_prompt parameter when calling get_image_description text = get_image_description(img, custom_prompt) save_text(file_info["hash"], text, "", "", True) # Assuming you want to save and overwrite changes by default return text, "Triggered OpenAI API successfully." return "", "Failed to trigger OpenAI API." trigger_api_btn.click(trigger_openai_api, inputs=[custom_instructions], outputs=[text_display, status_display]) with gr.Row(): delete_btn = gr.Button("πŸ—‘οΈ Delete") reset_btn = gr.Button("πŸ”„ Reset All Ratings πŸ‘πŸ‘Ž") with gr.Accordion(label="Edit Project", open=False): with gr.Column(scale=0): file_input = gr.File(file_count="multiple", type="binary", label="Upload Images or Zip Files") with gr.Row(): previous_session_button = gr.Button("πŸ‘ˆ Previous Project") next_session_button = gr.Button("Next ProjectπŸ‘‰") with gr.Row(): next_img = gr.Button("πŸ–ΌοΈ Show Next Image") prev_img = gr.Button("πŸ–ΌοΈ Show Previous Image") with gr.Row(): with gr.Row(): clear_button = gr.Button("πŸ—‘οΈ Clear Uploads Folder") undo_button = gr.Button("↩️ Undo Last Deletion") process_zip_button = gr.Button("πŸ“‚ Process Zip Files") with gr.Column(): randomize_checkbox = gr.Checkbox(label="πŸ”€ Randomize Every 2 Seconds", value=True) refresh_time_input = gr.Number(label="⏱️ Refresh Time", value=2) index_display = gr.Textbox(value=str(project_index), label="πŸ”’ Session Index", every=5) file_input.change(process_files, inputs=[file_input], outputs=[]) clear_button.click(clear_uploads_folder, inputs=[], outputs=[]) undo_button.click(undo_last_deletion, inputs=[], outputs=[]) next_session_button.click(next_session, inputs=[], outputs=[index_display]) previous_session_button.click(previous_session, inputs=[], outputs=[index_display]) next_img.click(get_next_image, inputs=[], outputs=[gallery]) prev_img.click(get_previous_image, inputs=[], outputs=[gallery]) process_zip_button.click(process_all_zips, inputs=[], outputs=[]) with gr.Accordion("Advanced Generation Settings", open=False): with gr.Column(): steps = gr.Slider(minimum=1, maximum=32, value=16, label="Steps") model = gr.Dropdown(choices=['TurboAnime.saftensors', 'OtherModel'], value='TurboAnime.saftensors', label="Model") styles = gr.CheckboxGroup(choices=['RayORender', 'OtherStyle'], value=['RayORender'], label="Styles") output_log = gr.Textbox(label="Log") # Step 2: Add a button to trigger the update generate_btn.click(fn=generate_image, inputs=[prompt, steps, model, styles], outputs=[output_image, output_log]) # Set the click action for the combined button combined_btn.click(combined_action, inputs=[prompt, steps, model, styles, custom_instructions], outputs=[output_image, output_log, text_display]) with gr.Accordion("Training Data", open=True): refresh_btn = gr.Button("Refresh") with gr.Row(): combined_details_df = gr.Dataframe(label="Combined Text and Image Details", value=load_combined_details(), headers=["Date Modified", "Rating Status", "File Name", "Text Content", "File Hash", "Text Path", "Image Path"], datatype=["str", "str", "str", "str", "str", "str", "str"], every=5) gallery = gr.Gallery(label="Image Gallery", value=load_gallery(), every=5) with gr.Accordion("File Info", open=False): file_hash_display = gr.Textbox(label="File Hash", interactive=False) text_path_display = gr.Textbox(label="Text Path", interactive=False) def delete_all_text_files_content(): text_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith('.txt')] for file_path in text_files: with open(file_path, 'w') as f: f.write('') # Clear the content of the text file return "All text files' content deleted." with gr.Row(): # Add a button to manually trigger a backup of the script backup_btn = gr.Button("Backup Script") backup_btn.click(backup_script, inputs=[], outputs=[]) delete_all_text_btn = gr.Button("Delete All Text Files Content") delete_all_text_btn.click(delete_all_text_files_content, inputs=[], outputs=status_display) # Modify the button click actions to include the pause_api_call_checkbox state and the save_and_overwrite_changes_checkbox state as arguments reset_btn.click(lambda t, pt, at, p, s, ci: reset_files(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) prev_btn.click(lambda t, pt, at, p, s, ci: get_file(False, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) next_btn.click(lambda t, pt, at, p, s, ci: get_file(True, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) thumbs_up_btn.click(lambda t, pt, at, p, s, ci: move_file("up", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) thumbs_down_btn.click(lambda t, pt, at, p, s, ci: move_file("down", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) delete_btn.click(lambda t, pt, at, p, s, ci: delete_file(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) refresh_btn.click(refresh_all, inputs=[], outputs=[combined_details_df, gallery, thumbs_up_gallery]) # Check for successful app launch and backup the script try: app.launch(share=True, server_port=7866, server_name="0.0.0.0") app_launch_success() except Exception as e: print(f"Error launching the app: {e}")