tldw / App_Function_Libraries /DB /Character_Chat_DB.py
oceansweep's picture
Update App_Function_Libraries/DB/Character_Chat_DB.py
babbdf2 verified
raw
history blame
26.5 kB
# character_chat_db.py
# Database functions for managing character cards and chat histories.
# #
# Imports
import configparser
import sqlite3
import json
import logging
import os
import sys
from typing import List, Dict, Optional, Tuple, Any, Union
from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
#from Tests.Chat_APIs.Chat_APIs_Integration_test import logging
#
#######################################################################################################################
#
#
def ensure_database_directory():
os.makedirs(get_database_dir(), exist_ok=True)
ensure_database_directory()
# Construct the path to the config file
config_path = get_project_relative_path('Config_Files/config.txt')
# Read the config file
config = configparser.ConfigParser()
config.read(config_path)
# Get the chat db path from the config, or use the default if not specified
chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
print(f"Chat Database path: {chat_DB_PATH}")
########################################################################################################
#
# Functions
# FIXME - Setup properly and test/add documentation for its existence...
def initialize_database():
"""Initialize the SQLite database with required tables and FTS5 virtual tables."""
conn = None
try:
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
# Enable foreign key constraints
cursor.execute("PRAGMA foreign_keys = ON;")
# Create CharacterCards table with V2 fields
cursor.execute("""
CREATE TABLE IF NOT EXISTS CharacterCards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
personality TEXT,
scenario TEXT,
image BLOB,
post_history_instructions TEXT,
first_mes TEXT,
mes_example TEXT,
creator_notes TEXT,
system_prompt TEXT,
alternate_greetings TEXT,
tags TEXT,
creator TEXT,
character_version TEXT,
extensions TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
# Create CharacterChats table
cursor.execute("""
CREATE TABLE IF NOT EXISTS CharacterChats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
character_id INTEGER NOT NULL,
conversation_name TEXT,
chat_history TEXT,
is_snapshot BOOLEAN DEFAULT FALSE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
);
""")
# Create FTS5 virtual table for CharacterChats
cursor.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
conversation_name,
chat_history,
content='CharacterChats',
content_rowid='id'
);
""")
# Create triggers to keep FTS5 table in sync with CharacterChats
cursor.executescript("""
CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
VALUES (new.id, new.conversation_name, new.chat_history);
END;
CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
DELETE FROM CharacterChats_fts WHERE rowid = old.id;
END;
CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
WHERE rowid = new.id;
END;
""")
# Create ChatKeywords table
cursor.execute("""
CREATE TABLE IF NOT EXISTS ChatKeywords (
chat_id INTEGER NOT NULL,
keyword TEXT NOT NULL,
FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
);
""")
# Create indexes for faster searches
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
""")
conn.commit()
logging.info("Database initialized successfully.")
except sqlite3.Error as e:
logging.error(f"SQLite error occurred during database initialization: {e}")
if conn:
conn.rollback()
raise
except Exception as e:
logging.error(f"Unexpected error occurred during database initialization: {e}")
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
# Call initialize_database() at the start of your application
def setup_chat_database():
try:
initialize_database()
except Exception as e:
logging.critical(f"Failed to initialize database: {e}")
sys.exit(1)
setup_chat_database()
########################################################################################################
#
# Character Card handling
def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse and validate a character card according to V2 specification."""
v2_data = {
'name': card_data.get('name', ''),
'description': card_data.get('description', ''),
'personality': card_data.get('personality', ''),
'scenario': card_data.get('scenario', ''),
'first_mes': card_data.get('first_mes', ''),
'mes_example': card_data.get('mes_example', ''),
'creator_notes': card_data.get('creator_notes', ''),
'system_prompt': card_data.get('system_prompt', ''),
'post_history_instructions': card_data.get('post_history_instructions', ''),
'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
'tags': json.dumps(card_data.get('tags', [])),
'creator': card_data.get('creator', ''),
'character_version': card_data.get('character_version', ''),
'extensions': json.dumps(card_data.get('extensions', {}))
}
# Handle 'image' separately as it might be binary data
if 'image' in card_data:
v2_data['image'] = card_data['image']
return v2_data
def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
"""Add or update a character card in the database."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
parsed_card = parse_character_card(card_data)
# Check if character already exists
cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
row = cursor.fetchone()
if row:
# Update existing character
character_id = row[0]
update_query = """
UPDATE CharacterCards
SET description = ?, personality = ?, scenario = ?, image = ?,
post_history_instructions = ?, first_mes = ?, mes_example = ?,
creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
tags = ?, creator = ?, character_version = ?, extensions = ?
WHERE id = ?
"""
cursor.execute(update_query, (
parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
parsed_card['character_version'], parsed_card['extensions'], character_id
))
else:
# Insert new character
insert_query = """
INSERT INTO CharacterCards (name, description, personality, scenario, image,
post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
alternate_greetings, tags, creator, character_version, extensions)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
cursor.execute(insert_query, (
parsed_card['name'], parsed_card['description'], parsed_card['personality'],
parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
))
character_id = cursor.lastrowid
conn.commit()
return character_id
except sqlite3.IntegrityError as e:
logging.error(f"Error adding character card: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error adding character card: {e}")
return None
finally:
conn.close()
# def add_character_card(card_data: Dict) -> Optional[int]:
# """Add or update a character card in the database.
#
# Returns the ID of the inserted character or None if failed.
# """
# conn = sqlite3.connect(chat_DB_PATH)
# cursor = conn.cursor()
# try:
# # Ensure all required fields are present
# required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
# for field in required_fields:
# if field not in card_data:
# card_data[field] = '' # Assign empty string if field is missing
#
# # Check if character already exists
# cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
# row = cursor.fetchone()
#
# if row:
# # Update existing character
# character_id = row[0]
# cursor.execute("""
# UPDATE CharacterCards
# SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
# WHERE id = ?
# """, (
# card_data['description'],
# card_data['personality'],
# card_data['scenario'],
# card_data['image'],
# card_data['post_history_instructions'],
# card_data['first_message'],
# character_id
# ))
# else:
# # Insert new character
# cursor.execute("""
# INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
# VALUES (?, ?, ?, ?, ?, ?, ?)
# """, (
# card_data['name'],
# card_data['description'],
# card_data['personality'],
# card_data['scenario'],
# card_data['image'],
# card_data['post_history_instructions'],
# card_data['first_message']
# ))
# character_id = cursor.lastrowid
#
# conn.commit()
# return cursor.lastrowid
# except sqlite3.IntegrityError as e:
# logging.error(f"Error adding character card: {e}")
# return None
# except Exception as e:
# logging.error(f"Unexpected error adding character card: {e}")
# return None
# finally:
# conn.close()
def get_character_cards() -> List[Dict]:
"""Retrieve all character cards from the database."""
logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM CharacterCards")
rows = cursor.fetchall()
columns = [description[0] for description in cursor.description]
conn.close()
characters = [dict(zip(columns, row)) for row in rows]
#logging.debug(f"Characters fetched from DB: {characters}")
return characters
def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
Retrieve a single character card by its ID.
Args:
character_id: Can be either an integer ID or a dictionary containing character data.
Returns:
A dictionary containing the character card data, or None if not found.
"""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
if isinstance(character_id, dict):
# If a dictionary is passed, assume it's already a character card
return character_id
elif isinstance(character_id, int):
# If an integer is passed, fetch the character from the database
cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
row = cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
else:
logging.warning(f"Invalid type for character_id: {type(character_id)}")
return None
except Exception as e:
logging.error(f"Error in get_character_card_by_id: {e}")
return None
finally:
conn.close()
def update_character_card(character_id: int, card_data: Dict) -> bool:
"""Update an existing character card."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("""
UPDATE CharacterCards
SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
WHERE id = ?
""", (
card_data.get('name'),
card_data.get('description'),
card_data.get('personality'),
card_data.get('scenario'),
card_data.get('image'),
card_data.get('post_history_instructions', ''),
card_data.get('first_message', "Hello! I'm ready to chat."),
character_id
))
conn.commit()
return cursor.rowcount > 0
except sqlite3.IntegrityError as e:
logging.error(f"Error updating character card: {e}")
return False
finally:
conn.close()
def delete_character_card(character_id: int) -> bool:
"""Delete a character card and its associated chats."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
# Delete associated chats first due to foreign key constraint
cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
conn.commit()
return cursor.rowcount > 0
except sqlite3.Error as e:
logging.error(f"Error deleting character card: {e}")
return False
finally:
conn.close()
def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
"""
Add a new chat history for a character, optionally associating keywords.
Args:
character_id (int): The ID of the character.
conversation_name (str): Name of the conversation.
chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
keywords (Optional[List[str]]): List of keywords to associate with this chat.
is_snapshot (bool, optional): Whether this chat is a snapshot.
Returns:
Optional[int]: The ID of the inserted chat or None if failed.
"""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
chat_history_json = json.dumps(chat_history)
cursor.execute("""
INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
VALUES (?, ?, ?, ?)
""", (
character_id,
conversation_name,
chat_history_json,
is_snapshot
))
chat_id = cursor.lastrowid
if keywords:
# Insert keywords into ChatKeywords table
keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
cursor.executemany("""
INSERT INTO ChatKeywords (chat_id, keyword)
VALUES (?, ?)
""", keyword_records)
conn.commit()
return chat_id
except sqlite3.Error as e:
logging.error(f"Error adding character chat: {e}")
return None
finally:
conn.close()
def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
"""Retrieve all chats, or chats for a specific character if character_id is provided."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
if character_id is not None:
cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
else:
cursor.execute("SELECT * FROM CharacterChats")
rows = cursor.fetchall()
columns = [description[0] for description in cursor.description]
conn.close()
return [dict(zip(columns, row)) for row in rows]
def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
"""Retrieve a single chat by its ID."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
row = cursor.fetchone()
conn.close()
if row:
columns = [description[0] for description in cursor.description]
chat = dict(zip(columns, row))
chat['chat_history'] = json.loads(chat['chat_history'])
return chat
return None
def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
"""
Search for character chats using FTS5, optionally filtered by character_id.
Args:
query (str): The search query.
character_id (Optional[int]): The ID of the character to filter chats by.
Returns:
Tuple[List[Dict], str]: A list of matching chats and a status message.
"""
if not query.strip():
return [], "Please enter a search query."
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
if character_id is not None:
# Search with character_id filter
cursor.execute("""
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
FROM CharacterChats_fts
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
ORDER BY rank
""", (query, character_id))
else:
# Search without character_id filter
cursor.execute("""
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
FROM CharacterChats_fts
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
WHERE CharacterChats_fts MATCH ?
ORDER BY rank
""", (query,))
rows = cursor.fetchall()
columns = [description[0] for description in cursor.description]
results = [dict(zip(columns, row)) for row in rows]
if character_id is not None:
status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
else:
status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
return results, status_message
except Exception as e:
logging.error(f"Error searching chats with FTS5: {e}")
return [], f"Error occurred during search: {e}"
finally:
conn.close()
def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
"""Update an existing chat history."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
chat_history_json = json.dumps(chat_history)
cursor.execute("""
UPDATE CharacterChats
SET chat_history = ?
WHERE id = ?
""", (
chat_history_json,
chat_id
))
conn.commit()
return cursor.rowcount > 0
except sqlite3.Error as e:
logging.error(f"Error updating character chat: {e}")
return False
finally:
conn.close()
def delete_character_chat(chat_id: int) -> bool:
"""Delete a specific chat."""
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
conn.commit()
return cursor.rowcount > 0
except sqlite3.Error as e:
logging.error(f"Error deleting character chat: {e}")
return False
finally:
conn.close()
def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
"""
Fetch chat IDs associated with any of the specified keywords.
Args:
keywords (List[str]): List of keywords to search for.
Returns:
List[int]: List of chat IDs associated with the keywords.
"""
if not keywords:
return []
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
# Construct the WHERE clause to search for each keyword
keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
cursor.execute(sql_query, keywords)
rows = cursor.fetchall()
chat_ids = [row[0] for row in rows]
return chat_ids
except Exception as e:
logging.error(f"Error in fetch_keywords_for_chats: {e}")
return []
finally:
conn.close()
def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
"""Save chat history to the CharacterChats table.
Returns the ID of the inserted chat or None if failed.
"""
return add_character_chat(character_id, conversation_name, chat_history)
def migrate_chat_to_media_db():
pass
def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
"""
Perform a full-text search on specified fields with optional filtering and pagination.
Args:
query (str): The search query.
fields (List[str]): List of fields to search in.
where_clause (str, optional): Additional SQL WHERE clause to filter results.
page (int, optional): Page number for pagination.
results_per_page (int, optional): Number of results per page.
Returns:
List[Dict[str, Any]]: List of matching chat records with content and metadata.
"""
if not query.strip():
return []
conn = sqlite3.connect(chat_DB_PATH)
cursor = conn.cursor()
try:
# Construct the MATCH query for FTS5
match_query = " AND ".join(fields) + f" MATCH ?"
# Adjust the query with the fields
fts_query = f"""
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
FROM CharacterChats_fts
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
WHERE {match_query}
"""
if where_clause:
fts_query += f" AND ({where_clause})"
fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
offset = (page - 1) * results_per_page
cursor.execute(fts_query, (query, results_per_page, offset))
rows = cursor.fetchall()
columns = [description[0] for description in cursor.description]
results = [dict(zip(columns, row)) for row in rows]
return results
except Exception as e:
logging.error(f"Error in search_db: {e}")
return []
finally:
conn.close()
def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
List[Dict[str, Any]]:
"""
Perform a full-text search within the specified chat IDs using FTS5.
Args:
query (str): The user's query.
relevant_chat_ids (List[int]): List of chat IDs to search within.
page (int): Pagination page number.
results_per_page (int): Number of results per page.
Returns:
List[Dict[str, Any]]: List of search results with content and metadata.
"""
try:
# Construct a WHERE clause to limit the search to relevant chat IDs
where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
if not where_clause:
where_clause = "1" # No restriction if no chat IDs
# Perform full-text search using FTS5
fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
filtered_fts_results = [
{
"content": result['content'],
"metadata": {"media_id": result['id']}
}
for result in fts_results
if result['id'] in relevant_chat_ids
]
return filtered_fts_results
except Exception as e:
logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
return []
def fetch_all_chats() -> List[Dict[str, Any]]:
"""
Fetch all chat messages from the database.
Returns:
List[Dict[str, Any]]: List of chat messages with relevant metadata.
"""
try:
chats = get_character_chats() # Modify this function to retrieve all chats
return chats
except Exception as e:
logging.error(f"Error fetching all chats: {str(e)}")
return []
#
# End of Character_Chat_DB.py
#######################################################################################################################