oceansweep's picture
Upload 155 files
43cd37c verified
# Character_Chat_Lib.py
# Description: Functions for character chat cards.
#
# Imports
import json
import logging
import io
import base64
import time
from typing import Dict, Any, Optional, List, Tuple
#
# External Imports
from PIL import Image
#
# Local imports
from App_Function_Libraries.DB.DB_Manager import get_character_card_by_id, get_character_chat_by_id
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
#
# Constants
####################################################################################################
#
# Functions
# Using https://github.com/malfoyslastname/character-card-spec-v2 as the standard for v2 character cards
#################################################################################
#
# Placeholder functions:
def replace_placeholders(text: str, char_name: str, user_name: str) -> str:
"""
Replace placeholders in the given text with appropriate values.
Args:
text (str): The text containing placeholders.
char_name (str): The name of the character.
user_name (str): The name of the user.
Returns:
str: The text with placeholders replaced.
"""
replacements = {
'{{char}}': char_name,
'{{user}}': user_name,
'{{random_user}}': user_name # Assuming random_user is the same as user for simplicity
}
for placeholder, value in replacements.items():
text = text.replace(placeholder, value)
return text
def replace_user_placeholder(history, user_name):
"""
Replaces all instances of '{{user}}' in the chat history with the actual user name.
Args:
history (list): The current chat history as a list of tuples (user_message, bot_message).
user_name (str): The name entered by the user.
Returns:
list: Updated chat history with placeholders replaced.
"""
if not user_name:
user_name = "User" # Default name if none provided
updated_history = []
for user_msg, bot_msg in history:
# Replace in user message
if user_msg:
user_msg = user_msg.replace("{{user}}", user_name)
# Replace in bot message
if bot_msg:
bot_msg = bot_msg.replace("{{user}}", user_name)
updated_history.append((user_msg, bot_msg))
return updated_history
#
# End of Placeholder functions
#################################################################################
#################################################################################
#
# Functions for character card processing:
def extract_character_id(choice: str) -> int:
"""Extract the character ID from the dropdown selection string."""
log_counter("extract_character_id_attempt")
try:
character_id = int(choice.split('(ID: ')[1].rstrip(')'))
log_counter("extract_character_id_success")
return character_id
except Exception as e:
log_counter("extract_character_id_error", labels={"error": str(e)})
raise
def load_character_wrapper(character_id: int, user_name: str) -> Tuple[Dict[str, Any], List[Tuple[Optional[str], str]], Optional[Image.Image]]:
"""Wrapper function to load character and image using the extracted ID."""
log_counter("load_character_wrapper_attempt")
start_time = time.time()
try:
char_data, chat_history, img = load_character_and_image(character_id, user_name)
load_duration = time.time() - start_time
log_histogram("load_character_wrapper_duration", load_duration)
log_counter("load_character_wrapper_success")
return char_data, chat_history, img
except Exception as e:
log_counter("load_character_wrapper_error", labels={"error": str(e)})
raise
def parse_character_book(book_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse the character book data from a V2 character card.
Args:
book_data (Dict[str, Any]): The raw character book data from the character card.
Returns:
Dict[str, Any]: The parsed and structured character book data.
"""
parsed_book = {
'name': book_data.get('name', ''),
'description': book_data.get('description', ''),
'scan_depth': book_data.get('scan_depth'),
'token_budget': book_data.get('token_budget'),
'recursive_scanning': book_data.get('recursive_scanning', False),
'extensions': book_data.get('extensions', {}),
'entries': []
}
for entry in book_data.get('entries', []):
parsed_entry = {
'keys': entry['keys'],
'content': entry['content'],
'extensions': entry.get('extensions', {}),
'enabled': entry['enabled'],
'insertion_order': entry['insertion_order'],
'case_sensitive': entry.get('case_sensitive', False),
'name': entry.get('name', ''),
'priority': entry.get('priority'),
'id': entry.get('id'),
'comment': entry.get('comment', ''),
'selective': entry.get('selective', False),
'secondary_keys': entry.get('secondary_keys', []),
'constant': entry.get('constant', False),
'position': entry.get('position')
}
parsed_book['entries'].append(parsed_entry)
return parsed_book
def load_character_and_image(character_id: int, user_name: str) -> Tuple[Optional[Dict[str, Any]], List[Tuple[Optional[str], str]], Optional[Image.Image]]:
"""
Load a character and its associated image based on the character ID.
Args:
character_id (int): The ID of the character to load.
user_name (str): The name of the user, used for placeholder replacement.
Returns:
Tuple[Optional[Dict[str, Any]], List[Tuple[Optional[str], str]], Optional[Image.Image]]:
A tuple containing the character data, chat history, and character image (if available).
"""
log_counter("load_character_and_image_attempt")
start_time = time.time()
try:
char_data = get_character_card_by_id(character_id)
if not char_data:
log_counter("load_character_and_image_no_data")
logging.warning(f"No character data found for ID: {character_id}")
return None, [], None
# Replace placeholders in character data
for field in ['first_mes', 'mes_example', 'scenario', 'description', 'personality']:
if field in char_data:
char_data[field] = replace_placeholders(char_data[field], char_data['name'], user_name)
# Replace placeholders in first_mes
first_mes = char_data.get('first_mes', "Hello! I'm ready to chat.")
first_mes = replace_placeholders(first_mes, char_data['name'], user_name)
chat_history = [(None, first_mes)] if first_mes else []
img = None
if char_data.get('image'):
try:
image_data = base64.b64decode(char_data['image'])
img = Image.open(io.BytesIO(image_data)).convert("RGBA")
log_counter("load_character_image_success")
except Exception as e:
log_counter("load_character_image_error", labels={"error": str(e)})
logging.error(f"Error processing image for character '{char_data['name']}': {e}")
load_duration = time.time() - start_time
log_histogram("load_character_and_image_duration", load_duration)
log_counter("load_character_and_image_success")
return char_data, chat_history, img
except Exception as e:
log_counter("load_character_and_image_error", labels={"error": str(e)})
logging.error(f"Error in load_character_and_image: {e}")
return None, [], None
def load_chat_and_character(chat_id: int, user_name: str) -> Tuple[Optional[Dict[str, Any]], List[Tuple[str, str]], Optional[Image.Image]]:
"""
Load a chat and its associated character, including the character image and process templates.
Args:
chat_id (int): The ID of the chat to load.
user_name (str): The name of the user.
Returns:
Tuple[Optional[Dict[str, Any]], List[Tuple[str, str]], Optional[Image.Image]]:
A tuple containing the character data, processed chat history, and character image (if available).
"""
log_counter("load_chat_and_character_attempt")
start_time = time.time()
try:
# Load the chat
chat = get_character_chat_by_id(chat_id)
if not chat:
log_counter("load_chat_and_character_no_chat")
logging.warning(f"No chat found with ID: {chat_id}")
return None, [], None
# Load the associated character
character_id = chat['character_id']
char_data = get_character_card_by_id(character_id)
if not char_data:
log_counter("load_chat_and_character_no_character")
logging.warning(f"No character found for chat ID: {chat_id}")
return None, chat['chat_history'], None
# Process the chat history
processed_history = process_chat_history(chat['chat_history'], char_data['name'], user_name)
# Load the character image
img = None
if char_data.get('image'):
try:
image_data = base64.b64decode(char_data['image'])
img = Image.open(io.BytesIO(image_data)).convert("RGBA")
log_counter("load_chat_character_image_success")
except Exception as e:
log_counter("load_chat_character_image_error", labels={"error": str(e)})
logging.error(f"Error processing image for character '{char_data['name']}': {e}")
# Process character data templates
for field in ['first_mes', 'mes_example', 'scenario', 'description', 'personality']:
if field in char_data:
char_data[field] = replace_placeholders(char_data[field], char_data['name'], user_name)
load_duration = time.time() - start_time
log_histogram("load_chat_and_character_duration", load_duration)
log_counter("load_chat_and_character_success")
return char_data, processed_history, img
except Exception as e:
log_counter("load_chat_and_character_error", labels={"error": str(e)})
logging.error(f"Error in load_chat_and_character: {e}")
return None, [], None
def extract_json_from_image(image_file):
logging.debug(f"Attempting to extract JSON from image: {image_file.name}")
log_counter("extract_json_from_image_attempt")
start_time = time.time()
try:
with Image.open(image_file) as img:
logging.debug("Image opened successfully")
metadata = img.info
if 'chara' in metadata:
logging.debug("Found 'chara' in image metadata")
chara_content = metadata['chara']
logging.debug(f"Content of 'chara' metadata (first 100 chars): {chara_content[:100]}...")
try:
decoded_content = base64.b64decode(chara_content).decode('utf-8')
logging.debug(f"Decoded content (first 100 chars): {decoded_content[:100]}...")
log_counter("extract_json_from_image_metadata_success")
return decoded_content
except Exception as e:
logging.error(f"Error decoding base64 content: {e}")
log_counter("extract_json_from_image_decode_error", labels={"error": str(e)})
logging.warning("'chara' not found in metadata, attempting to find JSON data in image bytes")
# Alternative method to extract embedded JSON from image bytes if metadata is not available
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='PNG')
img_bytes = img_byte_arr.getvalue()
img_str = img_bytes.decode('latin1')
# Search for JSON-like structures in the image bytes
json_start = img_str.find('{')
json_end = img_str.rfind('}')
if json_start != -1 and json_end != -1 and json_end > json_start:
possible_json = img_str[json_start:json_end+1]
try:
json.loads(possible_json)
logging.debug("Found JSON data in image bytes")
log_counter("extract_json_from_image_bytes_success")
return possible_json
except json.JSONDecodeError:
logging.debug("No valid JSON found in image bytes")
log_counter("extract_json_from_image_invalid_json")
logging.warning("No JSON data found in the image")
log_counter("extract_json_from_image_no_json_found")
except Exception as e:
log_counter("extract_json_from_image_error", labels={"error": str(e)})
logging.error(f"Error extracting JSON from image: {e}")
extract_duration = time.time() - start_time
log_histogram("extract_json_from_image_duration", extract_duration)
return None
def load_chat_history(file):
log_counter("load_chat_history_attempt")
start_time = time.time()
try:
content = file.read().decode('utf-8')
chat_data = json.loads(content)
# Extract history and character name from the loaded data
history = chat_data.get('history') or chat_data.get('messages')
character_name = chat_data.get('character') or chat_data.get('character_name')
if not history or not character_name:
log_counter("load_chat_history_incomplete_data")
logging.error("Chat history or character name missing in the imported file.")
return None, None
load_duration = time.time() - start_time
log_histogram("load_chat_history_duration", load_duration)
log_counter("load_chat_history_success")
return history, character_name
except Exception as e:
log_counter("load_chat_history_error", labels={"error": str(e)})
logging.error(f"Error loading chat history: {e}")
return None, None
def process_chat_history(chat_history: List[Tuple[str, str]], char_name: str, user_name: str) -> List[Tuple[str, str]]:
"""
Process the chat history to replace placeholders in both user and character messages.
Args:
chat_history (List[Tuple[str, str]]): The chat history.
char_name (str): The name of the character.
user_name (str): The name of the user.
Returns:
List[Tuple[str, str]]: The processed chat history.
"""
log_counter("process_chat_history_attempt")
start_time = time.time()
try:
processed_history = []
for user_msg, char_msg in chat_history:
if user_msg:
user_msg = replace_placeholders(user_msg, char_name, user_name)
if char_msg:
char_msg = replace_placeholders(char_msg, char_name, user_name)
processed_history.append((user_msg, char_msg))
process_duration = time.time() - start_time
log_histogram("process_chat_history_duration", process_duration)
log_counter("process_chat_history_success", labels={"message_count": len(chat_history)})
return processed_history
except Exception as e:
log_counter("process_chat_history_error", labels={"error": str(e)})
logging.error(f"Error processing chat history: {e}")
raise
def validate_character_book(book_data):
"""
Validate the 'character_book' field in the character card.
Args:
book_data (dict): The character book data.
Returns:
Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages.
"""
validation_messages = []
# Optional fields with expected types
optional_fields = {
'name': str,
'description': str,
'scan_depth': (int, float),
'token_budget': (int, float),
'recursive_scanning': bool,
'extensions': dict,
'entries': list
}
for field, expected_type in optional_fields.items():
if field in book_data:
if not isinstance(book_data[field], expected_type):
validation_messages.append(f"Field 'character_book.{field}' must be of type '{expected_type}'.")
# 'entries' is required
if 'entries' not in book_data or not isinstance(book_data['entries'], list):
validation_messages.append("Field 'character_book.entries' is required and must be a list.")
return False, validation_messages
# Validate each entry in 'entries'
entries = book_data.get('entries', [])
entry_ids = set()
for idx, entry in enumerate(entries):
is_valid_entry, entry_messages = validate_character_book_entry(entry, idx, entry_ids)
if not is_valid_entry:
validation_messages.extend(entry_messages)
is_valid = len(validation_messages) == 0
return is_valid, validation_messages
def validate_character_book_entry(entry, idx, entry_ids):
"""
Validate an entry in the 'character_book.entries' list.
Args:
entry (dict): The entry data.
idx (int): The index of the entry in the list.
entry_ids (set): A set of existing entry IDs for uniqueness checking.
Returns:
Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages.
"""
validation_messages = []
required_fields = {
'keys': list,
'content': str,
'extensions': dict,
'enabled': bool,
'insertion_order': (int, float)
}
for field, expected_type in required_fields.items():
if field not in entry:
validation_messages.append(f"Entry {idx}: Missing required field '{field}'.")
elif not isinstance(entry[field], expected_type):
validation_messages.append(f"Entry {idx}: Field '{field}' must be of type '{expected_type}'.")
elif field == 'content' and not entry[field].strip():
validation_messages.append(f"Entry {idx}: Field 'content' cannot be empty.")
elif field == 'keys' and not entry[field]:
validation_messages.append(f"Entry {idx}: Field 'keys' cannot be empty.")
# Optional fields
optional_fields = {
'case_sensitive': bool,
'name': str,
'priority': (int, float),
'id': (int, float),
'comment': str,
'selective': bool,
'secondary_keys': list,
'constant': bool,
'position': str # Should be 'before_char' or 'after_char'
}
for field, expected_type in optional_fields.items():
if field in entry and not isinstance(entry[field], expected_type):
validation_messages.append(f"Entry {idx}: Field '{field}' must be of type '{expected_type}'.")
# Validate 'position' value if present
if 'position' in entry:
if entry['position'] not in ['before_char', 'after_char']:
validation_messages.append(f"Entry {idx}: Field 'position' must be 'before_char' or 'after_char'.")
# Validate 'secondary_keys' if 'selective' is True
if entry.get('selective', False):
if 'secondary_keys' not in entry or not isinstance(entry['secondary_keys'], list):
validation_messages.append(f"Entry {idx}: 'secondary_keys' must be a list when 'selective' is True.")
elif not entry['secondary_keys']:
validation_messages.append(f"Entry {idx}: 'secondary_keys' cannot be empty when 'selective' is True.")
# Validate 'keys' list elements
if 'keys' in entry and isinstance(entry['keys'], list):
for i, key in enumerate(entry['keys']):
if not isinstance(key, str) or not key.strip():
validation_messages.append(f"Entry {idx}: Element {i} in 'keys' must be a non-empty string.")
# Validate 'secondary_keys' list elements
if 'secondary_keys' in entry and isinstance(entry['secondary_keys'], list):
for i, key in enumerate(entry['secondary_keys']):
if not isinstance(key, str) or not key.strip():
validation_messages.append(f"Entry {idx}: Element {i} in 'secondary_keys' must be a non-empty string.")
# Validate 'id' uniqueness
if 'id' in entry:
entry_id = entry['id']
if entry_id in entry_ids:
validation_messages.append \
(f"Entry {idx}: Duplicate 'id' value '{entry_id}'. Each entry 'id' must be unique.")
else:
entry_ids.add(entry_id)
# Validate 'extensions' keys are namespaced
if 'extensions' in entry and isinstance(entry['extensions'], dict):
for key in entry['extensions'].keys():
if '/' not in key and '_' not in key:
validation_messages.append \
(f"Entry {idx}: Extension key '{key}' in 'extensions' should be namespaced to prevent conflicts.")
is_valid = len(validation_messages) == 0
return is_valid, validation_messages
def validate_v2_card(card_data):
"""
Validate a character card according to the V2 specification.
Args:
card_data (dict): The parsed character card data.
Returns:
Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages.
"""
validation_messages = []
# Check top-level fields
if 'spec' not in card_data:
validation_messages.append("Missing 'spec' field.")
elif card_data['spec'] != 'chara_card_v2':
validation_messages.append(f"Invalid 'spec' value: {card_data['spec']}. Expected 'chara_card_v2'.")
if 'spec_version' not in card_data:
validation_messages.append("Missing 'spec_version' field.")
else:
# Ensure 'spec_version' is '2.0' or higher
try:
spec_version = float(card_data['spec_version'])
if spec_version < 2.0:
validation_messages.append \
(f"'spec_version' must be '2.0' or higher. Found '{card_data['spec_version']}'.")
except ValueError:
validation_messages.append \
(f"Invalid 'spec_version' format: {card_data['spec_version']}. Must be a number as a string.")
if 'data' not in card_data:
validation_messages.append("Missing 'data' field.")
return False, validation_messages # Cannot proceed without 'data' field
data = card_data['data']
# Required fields in 'data'
required_fields = ['name', 'description', 'personality', 'scenario', 'first_mes', 'mes_example']
for field in required_fields:
if field not in data:
validation_messages.append(f"Missing required field in 'data': '{field}'.")
elif not isinstance(data[field], str):
validation_messages.append(f"Field '{field}' must be a string.")
elif not data[field].strip():
validation_messages.append(f"Field '{field}' cannot be empty.")
# Optional fields with expected types
optional_fields = {
'creator_notes': str,
'system_prompt': str,
'post_history_instructions': str,
'alternate_greetings': list,
'tags': list,
'creator': str,
'character_version': str,
'extensions': dict,
'character_book': dict # If present, should be a dict
}
for field, expected_type in optional_fields.items():
if field in data:
if not isinstance(data[field], expected_type):
validation_messages.append(f"Field '{field}' must be of type '{expected_type.__name__}'.")
elif field == 'extensions':
# Validate that extensions keys are properly namespaced
for key in data[field].keys():
if '/' not in key and '_' not in key:
validation_messages.append \
(f"Extension key '{key}' in 'extensions' should be namespaced to prevent conflicts.")
# If 'alternate_greetings' is present, check that it's a list of non-empty strings
if 'alternate_greetings' in data and isinstance(data['alternate_greetings'], list):
for idx, greeting in enumerate(data['alternate_greetings']):
if not isinstance(greeting, str) or not greeting.strip():
validation_messages.append(f"Element {idx} in 'alternate_greetings' must be a non-empty string.")
# If 'tags' is present, check that it's a list of non-empty strings
if 'tags' in data and isinstance(data['tags'], list):
for idx, tag in enumerate(data['tags']):
if not isinstance(tag, str) or not tag.strip():
validation_messages.append(f"Element {idx} in 'tags' must be a non-empty string.")
# Validate 'extensions' field
if 'extensions' in data and not isinstance(data['extensions'], dict):
validation_messages.append("Field 'extensions' must be a dictionary.")
# Validate 'character_book' if present
if 'character_book' in data:
is_valid_book, book_messages = validate_character_book(data['character_book'])
if not is_valid_book:
validation_messages.extend(book_messages)
is_valid = len(validation_messages) == 0
return is_valid, validation_messages
#
# End of File
####################################################################################################