Spaces:
Running
Running
# Character_Chat_Lib.py | |
# Description: Functions for character chat cards. | |
# | |
# Imports | |
import json | |
import logging | |
import io | |
import base64 | |
import time | |
from typing import Dict, Any, Optional, List, Tuple | |
# | |
# External Imports | |
from PIL import Image | |
# | |
# Local imports | |
from App_Function_Libraries.DB.DB_Manager import get_character_card_by_id, get_character_chat_by_id | |
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram | |
# | |
# Constants | |
#################################################################################################### | |
# | |
# Functions | |
# Using https://github.com/malfoyslastname/character-card-spec-v2 as the standard for v2 character cards | |
################################################################################# | |
# | |
# Placeholder functions: | |
def replace_placeholders(text: str, char_name: str, user_name: str) -> str: | |
""" | |
Replace placeholders in the given text with appropriate values. | |
Args: | |
text (str): The text containing placeholders. | |
char_name (str): The name of the character. | |
user_name (str): The name of the user. | |
Returns: | |
str: The text with placeholders replaced. | |
""" | |
replacements = { | |
'{{char}}': char_name, | |
'{{user}}': user_name, | |
'{{random_user}}': user_name # Assuming random_user is the same as user for simplicity | |
} | |
for placeholder, value in replacements.items(): | |
text = text.replace(placeholder, value) | |
return text | |
def replace_user_placeholder(history, user_name): | |
""" | |
Replaces all instances of '{{user}}' in the chat history with the actual user name. | |
Args: | |
history (list): The current chat history as a list of tuples (user_message, bot_message). | |
user_name (str): The name entered by the user. | |
Returns: | |
list: Updated chat history with placeholders replaced. | |
""" | |
if not user_name: | |
user_name = "User" # Default name if none provided | |
updated_history = [] | |
for user_msg, bot_msg in history: | |
# Replace in user message | |
if user_msg: | |
user_msg = user_msg.replace("{{user}}", user_name) | |
# Replace in bot message | |
if bot_msg: | |
bot_msg = bot_msg.replace("{{user}}", user_name) | |
updated_history.append((user_msg, bot_msg)) | |
return updated_history | |
# | |
# End of Placeholder functions | |
################################################################################# | |
################################################################################# | |
# | |
# Functions for character card processing: | |
def extract_character_id(choice: str) -> int: | |
"""Extract the character ID from the dropdown selection string.""" | |
log_counter("extract_character_id_attempt") | |
try: | |
character_id = int(choice.split('(ID: ')[1].rstrip(')')) | |
log_counter("extract_character_id_success") | |
return character_id | |
except Exception as e: | |
log_counter("extract_character_id_error", labels={"error": str(e)}) | |
raise | |
def load_character_wrapper(character_id: int, user_name: str) -> Tuple[Dict[str, Any], List[Tuple[Optional[str], str]], Optional[Image.Image]]: | |
"""Wrapper function to load character and image using the extracted ID.""" | |
log_counter("load_character_wrapper_attempt") | |
start_time = time.time() | |
try: | |
char_data, chat_history, img = load_character_and_image(character_id, user_name) | |
load_duration = time.time() - start_time | |
log_histogram("load_character_wrapper_duration", load_duration) | |
log_counter("load_character_wrapper_success") | |
return char_data, chat_history, img | |
except Exception as e: | |
log_counter("load_character_wrapper_error", labels={"error": str(e)}) | |
raise | |
def parse_character_book(book_data: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Parse the character book data from a V2 character card. | |
Args: | |
book_data (Dict[str, Any]): The raw character book data from the character card. | |
Returns: | |
Dict[str, Any]: The parsed and structured character book data. | |
""" | |
parsed_book = { | |
'name': book_data.get('name', ''), | |
'description': book_data.get('description', ''), | |
'scan_depth': book_data.get('scan_depth'), | |
'token_budget': book_data.get('token_budget'), | |
'recursive_scanning': book_data.get('recursive_scanning', False), | |
'extensions': book_data.get('extensions', {}), | |
'entries': [] | |
} | |
for entry in book_data.get('entries', []): | |
parsed_entry = { | |
'keys': entry['keys'], | |
'content': entry['content'], | |
'extensions': entry.get('extensions', {}), | |
'enabled': entry['enabled'], | |
'insertion_order': entry['insertion_order'], | |
'case_sensitive': entry.get('case_sensitive', False), | |
'name': entry.get('name', ''), | |
'priority': entry.get('priority'), | |
'id': entry.get('id'), | |
'comment': entry.get('comment', ''), | |
'selective': entry.get('selective', False), | |
'secondary_keys': entry.get('secondary_keys', []), | |
'constant': entry.get('constant', False), | |
'position': entry.get('position') | |
} | |
parsed_book['entries'].append(parsed_entry) | |
return parsed_book | |
def load_character_and_image(character_id: int, user_name: str) -> Tuple[Optional[Dict[str, Any]], List[Tuple[Optional[str], str]], Optional[Image.Image]]: | |
""" | |
Load a character and its associated image based on the character ID. | |
Args: | |
character_id (int): The ID of the character to load. | |
user_name (str): The name of the user, used for placeholder replacement. | |
Returns: | |
Tuple[Optional[Dict[str, Any]], List[Tuple[Optional[str], str]], Optional[Image.Image]]: | |
A tuple containing the character data, chat history, and character image (if available). | |
""" | |
log_counter("load_character_and_image_attempt") | |
start_time = time.time() | |
try: | |
char_data = get_character_card_by_id(character_id) | |
if not char_data: | |
log_counter("load_character_and_image_no_data") | |
logging.warning(f"No character data found for ID: {character_id}") | |
return None, [], None | |
# Replace placeholders in character data | |
for field in ['first_mes', 'mes_example', 'scenario', 'description', 'personality']: | |
if field in char_data: | |
char_data[field] = replace_placeholders(char_data[field], char_data['name'], user_name) | |
# Replace placeholders in first_mes | |
first_mes = char_data.get('first_mes', "Hello! I'm ready to chat.") | |
first_mes = replace_placeholders(first_mes, char_data['name'], user_name) | |
chat_history = [(None, first_mes)] if first_mes else [] | |
img = None | |
if char_data.get('image'): | |
try: | |
image_data = base64.b64decode(char_data['image']) | |
img = Image.open(io.BytesIO(image_data)).convert("RGBA") | |
log_counter("load_character_image_success") | |
except Exception as e: | |
log_counter("load_character_image_error", labels={"error": str(e)}) | |
logging.error(f"Error processing image for character '{char_data['name']}': {e}") | |
load_duration = time.time() - start_time | |
log_histogram("load_character_and_image_duration", load_duration) | |
log_counter("load_character_and_image_success") | |
return char_data, chat_history, img | |
except Exception as e: | |
log_counter("load_character_and_image_error", labels={"error": str(e)}) | |
logging.error(f"Error in load_character_and_image: {e}") | |
return None, [], None | |
def load_chat_and_character(chat_id: int, user_name: str) -> Tuple[Optional[Dict[str, Any]], List[Tuple[str, str]], Optional[Image.Image]]: | |
""" | |
Load a chat and its associated character, including the character image and process templates. | |
Args: | |
chat_id (int): The ID of the chat to load. | |
user_name (str): The name of the user. | |
Returns: | |
Tuple[Optional[Dict[str, Any]], List[Tuple[str, str]], Optional[Image.Image]]: | |
A tuple containing the character data, processed chat history, and character image (if available). | |
""" | |
log_counter("load_chat_and_character_attempt") | |
start_time = time.time() | |
try: | |
# Load the chat | |
chat = get_character_chat_by_id(chat_id) | |
if not chat: | |
log_counter("load_chat_and_character_no_chat") | |
logging.warning(f"No chat found with ID: {chat_id}") | |
return None, [], None | |
# Load the associated character | |
character_id = chat['character_id'] | |
char_data = get_character_card_by_id(character_id) | |
if not char_data: | |
log_counter("load_chat_and_character_no_character") | |
logging.warning(f"No character found for chat ID: {chat_id}") | |
return None, chat['chat_history'], None | |
# Process the chat history | |
processed_history = process_chat_history(chat['chat_history'], char_data['name'], user_name) | |
# Load the character image | |
img = None | |
if char_data.get('image'): | |
try: | |
image_data = base64.b64decode(char_data['image']) | |
img = Image.open(io.BytesIO(image_data)).convert("RGBA") | |
log_counter("load_chat_character_image_success") | |
except Exception as e: | |
log_counter("load_chat_character_image_error", labels={"error": str(e)}) | |
logging.error(f"Error processing image for character '{char_data['name']}': {e}") | |
# Process character data templates | |
for field in ['first_mes', 'mes_example', 'scenario', 'description', 'personality']: | |
if field in char_data: | |
char_data[field] = replace_placeholders(char_data[field], char_data['name'], user_name) | |
load_duration = time.time() - start_time | |
log_histogram("load_chat_and_character_duration", load_duration) | |
log_counter("load_chat_and_character_success") | |
return char_data, processed_history, img | |
except Exception as e: | |
log_counter("load_chat_and_character_error", labels={"error": str(e)}) | |
logging.error(f"Error in load_chat_and_character: {e}") | |
return None, [], None | |
def extract_json_from_image(image_file): | |
logging.debug(f"Attempting to extract JSON from image: {image_file.name}") | |
log_counter("extract_json_from_image_attempt") | |
start_time = time.time() | |
try: | |
with Image.open(image_file) as img: | |
logging.debug("Image opened successfully") | |
metadata = img.info | |
if 'chara' in metadata: | |
logging.debug("Found 'chara' in image metadata") | |
chara_content = metadata['chara'] | |
logging.debug(f"Content of 'chara' metadata (first 100 chars): {chara_content[:100]}...") | |
try: | |
decoded_content = base64.b64decode(chara_content).decode('utf-8') | |
logging.debug(f"Decoded content (first 100 chars): {decoded_content[:100]}...") | |
log_counter("extract_json_from_image_metadata_success") | |
return decoded_content | |
except Exception as e: | |
logging.error(f"Error decoding base64 content: {e}") | |
log_counter("extract_json_from_image_decode_error", labels={"error": str(e)}) | |
logging.warning("'chara' not found in metadata, attempting to find JSON data in image bytes") | |
# Alternative method to extract embedded JSON from image bytes if metadata is not available | |
img_byte_arr = io.BytesIO() | |
img.save(img_byte_arr, format='PNG') | |
img_bytes = img_byte_arr.getvalue() | |
img_str = img_bytes.decode('latin1') | |
# Search for JSON-like structures in the image bytes | |
json_start = img_str.find('{') | |
json_end = img_str.rfind('}') | |
if json_start != -1 and json_end != -1 and json_end > json_start: | |
possible_json = img_str[json_start:json_end+1] | |
try: | |
json.loads(possible_json) | |
logging.debug("Found JSON data in image bytes") | |
log_counter("extract_json_from_image_bytes_success") | |
return possible_json | |
except json.JSONDecodeError: | |
logging.debug("No valid JSON found in image bytes") | |
log_counter("extract_json_from_image_invalid_json") | |
logging.warning("No JSON data found in the image") | |
log_counter("extract_json_from_image_no_json_found") | |
except Exception as e: | |
log_counter("extract_json_from_image_error", labels={"error": str(e)}) | |
logging.error(f"Error extracting JSON from image: {e}") | |
extract_duration = time.time() - start_time | |
log_histogram("extract_json_from_image_duration", extract_duration) | |
return None | |
def load_chat_history(file): | |
log_counter("load_chat_history_attempt") | |
start_time = time.time() | |
try: | |
content = file.read().decode('utf-8') | |
chat_data = json.loads(content) | |
# Extract history and character name from the loaded data | |
history = chat_data.get('history') or chat_data.get('messages') | |
character_name = chat_data.get('character') or chat_data.get('character_name') | |
if not history or not character_name: | |
log_counter("load_chat_history_incomplete_data") | |
logging.error("Chat history or character name missing in the imported file.") | |
return None, None | |
load_duration = time.time() - start_time | |
log_histogram("load_chat_history_duration", load_duration) | |
log_counter("load_chat_history_success") | |
return history, character_name | |
except Exception as e: | |
log_counter("load_chat_history_error", labels={"error": str(e)}) | |
logging.error(f"Error loading chat history: {e}") | |
return None, None | |
def process_chat_history(chat_history: List[Tuple[str, str]], char_name: str, user_name: str) -> List[Tuple[str, str]]: | |
""" | |
Process the chat history to replace placeholders in both user and character messages. | |
Args: | |
chat_history (List[Tuple[str, str]]): The chat history. | |
char_name (str): The name of the character. | |
user_name (str): The name of the user. | |
Returns: | |
List[Tuple[str, str]]: The processed chat history. | |
""" | |
log_counter("process_chat_history_attempt") | |
start_time = time.time() | |
try: | |
processed_history = [] | |
for user_msg, char_msg in chat_history: | |
if user_msg: | |
user_msg = replace_placeholders(user_msg, char_name, user_name) | |
if char_msg: | |
char_msg = replace_placeholders(char_msg, char_name, user_name) | |
processed_history.append((user_msg, char_msg)) | |
process_duration = time.time() - start_time | |
log_histogram("process_chat_history_duration", process_duration) | |
log_counter("process_chat_history_success", labels={"message_count": len(chat_history)}) | |
return processed_history | |
except Exception as e: | |
log_counter("process_chat_history_error", labels={"error": str(e)}) | |
logging.error(f"Error processing chat history: {e}") | |
raise | |
def validate_character_book(book_data): | |
""" | |
Validate the 'character_book' field in the character card. | |
Args: | |
book_data (dict): The character book data. | |
Returns: | |
Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages. | |
""" | |
validation_messages = [] | |
# Optional fields with expected types | |
optional_fields = { | |
'name': str, | |
'description': str, | |
'scan_depth': (int, float), | |
'token_budget': (int, float), | |
'recursive_scanning': bool, | |
'extensions': dict, | |
'entries': list | |
} | |
for field, expected_type in optional_fields.items(): | |
if field in book_data: | |
if not isinstance(book_data[field], expected_type): | |
validation_messages.append(f"Field 'character_book.{field}' must be of type '{expected_type}'.") | |
# 'entries' is required | |
if 'entries' not in book_data or not isinstance(book_data['entries'], list): | |
validation_messages.append("Field 'character_book.entries' is required and must be a list.") | |
return False, validation_messages | |
# Validate each entry in 'entries' | |
entries = book_data.get('entries', []) | |
entry_ids = set() | |
for idx, entry in enumerate(entries): | |
is_valid_entry, entry_messages = validate_character_book_entry(entry, idx, entry_ids) | |
if not is_valid_entry: | |
validation_messages.extend(entry_messages) | |
is_valid = len(validation_messages) == 0 | |
return is_valid, validation_messages | |
def validate_character_book_entry(entry, idx, entry_ids): | |
""" | |
Validate an entry in the 'character_book.entries' list. | |
Args: | |
entry (dict): The entry data. | |
idx (int): The index of the entry in the list. | |
entry_ids (set): A set of existing entry IDs for uniqueness checking. | |
Returns: | |
Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages. | |
""" | |
validation_messages = [] | |
required_fields = { | |
'keys': list, | |
'content': str, | |
'extensions': dict, | |
'enabled': bool, | |
'insertion_order': (int, float) | |
} | |
for field, expected_type in required_fields.items(): | |
if field not in entry: | |
validation_messages.append(f"Entry {idx}: Missing required field '{field}'.") | |
elif not isinstance(entry[field], expected_type): | |
validation_messages.append(f"Entry {idx}: Field '{field}' must be of type '{expected_type}'.") | |
elif field == 'content' and not entry[field].strip(): | |
validation_messages.append(f"Entry {idx}: Field 'content' cannot be empty.") | |
elif field == 'keys' and not entry[field]: | |
validation_messages.append(f"Entry {idx}: Field 'keys' cannot be empty.") | |
# Optional fields | |
optional_fields = { | |
'case_sensitive': bool, | |
'name': str, | |
'priority': (int, float), | |
'id': (int, float), | |
'comment': str, | |
'selective': bool, | |
'secondary_keys': list, | |
'constant': bool, | |
'position': str # Should be 'before_char' or 'after_char' | |
} | |
for field, expected_type in optional_fields.items(): | |
if field in entry and not isinstance(entry[field], expected_type): | |
validation_messages.append(f"Entry {idx}: Field '{field}' must be of type '{expected_type}'.") | |
# Validate 'position' value if present | |
if 'position' in entry: | |
if entry['position'] not in ['before_char', 'after_char']: | |
validation_messages.append(f"Entry {idx}: Field 'position' must be 'before_char' or 'after_char'.") | |
# Validate 'secondary_keys' if 'selective' is True | |
if entry.get('selective', False): | |
if 'secondary_keys' not in entry or not isinstance(entry['secondary_keys'], list): | |
validation_messages.append(f"Entry {idx}: 'secondary_keys' must be a list when 'selective' is True.") | |
elif not entry['secondary_keys']: | |
validation_messages.append(f"Entry {idx}: 'secondary_keys' cannot be empty when 'selective' is True.") | |
# Validate 'keys' list elements | |
if 'keys' in entry and isinstance(entry['keys'], list): | |
for i, key in enumerate(entry['keys']): | |
if not isinstance(key, str) or not key.strip(): | |
validation_messages.append(f"Entry {idx}: Element {i} in 'keys' must be a non-empty string.") | |
# Validate 'secondary_keys' list elements | |
if 'secondary_keys' in entry and isinstance(entry['secondary_keys'], list): | |
for i, key in enumerate(entry['secondary_keys']): | |
if not isinstance(key, str) or not key.strip(): | |
validation_messages.append(f"Entry {idx}: Element {i} in 'secondary_keys' must be a non-empty string.") | |
# Validate 'id' uniqueness | |
if 'id' in entry: | |
entry_id = entry['id'] | |
if entry_id in entry_ids: | |
validation_messages.append \ | |
(f"Entry {idx}: Duplicate 'id' value '{entry_id}'. Each entry 'id' must be unique.") | |
else: | |
entry_ids.add(entry_id) | |
# Validate 'extensions' keys are namespaced | |
if 'extensions' in entry and isinstance(entry['extensions'], dict): | |
for key in entry['extensions'].keys(): | |
if '/' not in key and '_' not in key: | |
validation_messages.append \ | |
(f"Entry {idx}: Extension key '{key}' in 'extensions' should be namespaced to prevent conflicts.") | |
is_valid = len(validation_messages) == 0 | |
return is_valid, validation_messages | |
def validate_v2_card(card_data): | |
""" | |
Validate a character card according to the V2 specification. | |
Args: | |
card_data (dict): The parsed character card data. | |
Returns: | |
Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages. | |
""" | |
validation_messages = [] | |
# Check top-level fields | |
if 'spec' not in card_data: | |
validation_messages.append("Missing 'spec' field.") | |
elif card_data['spec'] != 'chara_card_v2': | |
validation_messages.append(f"Invalid 'spec' value: {card_data['spec']}. Expected 'chara_card_v2'.") | |
if 'spec_version' not in card_data: | |
validation_messages.append("Missing 'spec_version' field.") | |
else: | |
# Ensure 'spec_version' is '2.0' or higher | |
try: | |
spec_version = float(card_data['spec_version']) | |
if spec_version < 2.0: | |
validation_messages.append \ | |
(f"'spec_version' must be '2.0' or higher. Found '{card_data['spec_version']}'.") | |
except ValueError: | |
validation_messages.append \ | |
(f"Invalid 'spec_version' format: {card_data['spec_version']}. Must be a number as a string.") | |
if 'data' not in card_data: | |
validation_messages.append("Missing 'data' field.") | |
return False, validation_messages # Cannot proceed without 'data' field | |
data = card_data['data'] | |
# Required fields in 'data' | |
required_fields = ['name', 'description', 'personality', 'scenario', 'first_mes', 'mes_example'] | |
for field in required_fields: | |
if field not in data: | |
validation_messages.append(f"Missing required field in 'data': '{field}'.") | |
elif not isinstance(data[field], str): | |
validation_messages.append(f"Field '{field}' must be a string.") | |
elif not data[field].strip(): | |
validation_messages.append(f"Field '{field}' cannot be empty.") | |
# Optional fields with expected types | |
optional_fields = { | |
'creator_notes': str, | |
'system_prompt': str, | |
'post_history_instructions': str, | |
'alternate_greetings': list, | |
'tags': list, | |
'creator': str, | |
'character_version': str, | |
'extensions': dict, | |
'character_book': dict # If present, should be a dict | |
} | |
for field, expected_type in optional_fields.items(): | |
if field in data: | |
if not isinstance(data[field], expected_type): | |
validation_messages.append(f"Field '{field}' must be of type '{expected_type.__name__}'.") | |
elif field == 'extensions': | |
# Validate that extensions keys are properly namespaced | |
for key in data[field].keys(): | |
if '/' not in key and '_' not in key: | |
validation_messages.append \ | |
(f"Extension key '{key}' in 'extensions' should be namespaced to prevent conflicts.") | |
# If 'alternate_greetings' is present, check that it's a list of non-empty strings | |
if 'alternate_greetings' in data and isinstance(data['alternate_greetings'], list): | |
for idx, greeting in enumerate(data['alternate_greetings']): | |
if not isinstance(greeting, str) or not greeting.strip(): | |
validation_messages.append(f"Element {idx} in 'alternate_greetings' must be a non-empty string.") | |
# If 'tags' is present, check that it's a list of non-empty strings | |
if 'tags' in data and isinstance(data['tags'], list): | |
for idx, tag in enumerate(data['tags']): | |
if not isinstance(tag, str) or not tag.strip(): | |
validation_messages.append(f"Element {idx} in 'tags' must be a non-empty string.") | |
# Validate 'extensions' field | |
if 'extensions' in data and not isinstance(data['extensions'], dict): | |
validation_messages.append("Field 'extensions' must be a dictionary.") | |
# Validate 'character_book' if present | |
if 'character_book' in data: | |
is_valid_book, book_messages = validate_character_book(data['character_book']) | |
if not is_valid_book: | |
validation_messages.extend(book_messages) | |
is_valid = len(validation_messages) == 0 | |
return is_valid, validation_messages | |
# | |
# End of File | |
#################################################################################################### | |