Spaces:
Running
Running
# DB_Manager.py | |
# Description: This file contains the DatabaseManager class, which is responsible for managing the database connection, i.e. either SQLite or Elasticsearch. | |
# | |
# Imports | |
import configparser | |
import os | |
import logging | |
import threading | |
from contextlib import contextmanager | |
from typing import Tuple, List, Union, Dict | |
import sqlite3 | |
import time | |
# | |
# 3rd-Party Libraries | |
from elasticsearch import Elasticsearch | |
# | |
# Import your existing SQLite functions | |
from App_Function_Libraries.DB.SQLite_DB import ( | |
update_media_content as sqlite_update_media_content, | |
list_prompts as sqlite_list_prompts, | |
search_and_display as sqlite_search_and_display, | |
fetch_prompt_details as sqlite_fetch_prompt_details, | |
keywords_browser_interface as sqlite_keywords_browser_interface, | |
add_keyword as sqlite_add_keyword, | |
delete_keyword as sqlite_delete_keyword, | |
export_keywords_to_csv as sqlite_export_keywords_to_csv, | |
ingest_article_to_db as sqlite_ingest_article_to_db, | |
add_media_to_database as sqlite_add_media_to_database, | |
import_obsidian_note_to_db as sqlite_import_obsidian_note_to_db, | |
add_prompt as sqlite_add_prompt, | |
delete_chat_message as sqlite_delete_chat_message, | |
update_chat_message as sqlite_update_chat_message, | |
add_chat_message as sqlite_add_chat_message, | |
get_chat_messages as sqlite_get_chat_messages, | |
search_chat_conversations as sqlite_search_chat_conversations, | |
create_chat_conversation as sqlite_create_chat_conversation, | |
save_chat_history_to_database as sqlite_save_chat_history_to_database, | |
view_database as sqlite_view_database, | |
get_transcripts as sqlite_get_transcripts, | |
get_trashed_items as sqlite_get_trashed_items, | |
user_delete_item as sqlite_user_delete_item, | |
empty_trash as sqlite_empty_trash, | |
create_automated_backup as sqlite_create_automated_backup, | |
add_or_update_prompt as sqlite_add_or_update_prompt, | |
load_prompt_details as sqlite_load_prompt_details, | |
load_preset_prompts as sqlite_load_preset_prompts, | |
insert_prompt_to_db as sqlite_insert_prompt_to_db, | |
delete_prompt as sqlite_delete_prompt, | |
search_and_display_items as sqlite_search_and_display_items, | |
get_conversation_name as sqlite_get_conversation_name, | |
add_media_with_keywords as sqlite_add_media_with_keywords, | |
check_media_and_whisper_model as sqlite_check_media_and_whisper_model, | |
DatabaseError, create_document_version as sqlite_create_document_version, | |
get_document_version as sqlite_get_document_version, sqlite_search_db, sqlite_add_media_chunk, | |
sqlite_update_fts_for_media, sqlite_get_unprocessed_media, fetch_item_details as sqlite_fetch_item_details, \ | |
search_media_database as sqlite_search_media_database, mark_as_trash as sqlite_mark_as_trash, \ | |
get_media_transcripts as sqlite_get_media_transcripts, get_specific_transcript as sqlite_get_specific_transcript, \ | |
get_media_summaries as sqlite_get_media_summaries, get_specific_summary as sqlite_get_specific_summary, \ | |
get_media_prompts as sqlite_get_media_prompts, get_specific_prompt as sqlite_get_specific_prompt, \ | |
delete_specific_transcript as sqlite_delete_specific_transcript, delete_specific_summary as sqlite_delete_specific_summary, \ | |
delete_specific_prompt as sqlite_delete_specific_prompt, fetch_keywords_for_media as sqlite_fetch_keywords_for_media, \ | |
update_keywords_for_media as sqlite_update_keywords_for_media, check_media_exists as sqlite_check_media_exists, \ | |
search_prompts as sqlite_search_prompts, get_media_content as sqlite_get_media_content, \ | |
get_paginated_files as sqlite_get_paginated_files, get_media_title as sqlite_get_media_title, \ | |
get_all_content_from_database as sqlite_get_all_content_from_database, | |
) | |
# | |
# Local Imports | |
from App_Function_Libraries.Utils.Utils import load_comprehensive_config, get_database_path, get_project_relative_path | |
# | |
# End of imports | |
############################################################################################################ | |
# | |
# Globals | |
# Load configuration from config file | |
config_path = get_project_relative_path('Config_Files/config.txt') | |
config = configparser.ConfigParser() | |
config.read(config_path) | |
db_path: str = config.get('Database', 'sqlite_path', fallback='./Databases/media_summary.db') | |
backup_path: str = config.get('Database', 'backup_path', fallback='database_backups') | |
backup_dir: Union[str, bytes] = os.environ.get('DB_BACKUP_DIR', backup_path) | |
# | |
# End of Globals | |
############################################################################################################ | |
# | |
# Database Manager Class | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class Database: | |
def __init__(self, db_name='media_summary.db'): | |
self.db_path = get_database_path(db_name) | |
self.pool = [] | |
self.pool_size = 10 | |
self.lock = threading.Lock() | |
self.timeout = 60.0 # 60 seconds timeout | |
def get_connection(self): | |
retry_count = 5 | |
retry_delay = 1 | |
while retry_count > 0: | |
try: | |
if self.pool: | |
conn = self.pool.pop() | |
else: | |
conn = sqlite3.connect(self.db_path, timeout=self.timeout, check_same_thread=False) | |
conn.execute("PRAGMA journal_mode=WAL;") # Enable WAL mode | |
yield conn | |
self.pool.append(conn) | |
return | |
except sqlite3.OperationalError as e: | |
if 'database is locked' in str(e): | |
logger.warning(f"Database is locked, retrying in {retry_delay} seconds...") | |
retry_count -= 1 | |
time.sleep(retry_delay) | |
retry_delay *= 2 # Exponential backoff | |
else: | |
raise DatabaseError(f"Database error: {e}") | |
except Exception as e: | |
raise DatabaseError(f"Unexpected error: {e}") | |
raise DatabaseError("Database is locked and retries have been exhausted") | |
def execute_query(self, query: str, params: Tuple = ()) -> None: | |
with self.lock: # Use a global lock for write operations | |
with self.get_connection() as conn: | |
try: | |
cursor = conn.cursor() | |
cursor.execute(query, params) | |
conn.commit() | |
except sqlite3.Error as e: | |
logger.error(f"Database error: {e}, Query: {query}") | |
raise DatabaseError(f"Database error: {e}, Query: {query}") | |
def execute_many(self, query: str, params_list: List[Tuple]) -> None: | |
with self.lock: # Use a global lock for write operations | |
with self.get_connection() as conn: | |
try: | |
cursor = conn.cursor() | |
cursor.executemany(query, params_list) | |
conn.commit() | |
except sqlite3.Error as e: | |
logger.error(f"Database error: {e}, Query: {query}") | |
raise DatabaseError(f"Database error: {e}, Query: {query}") | |
def close_all_connections(self): | |
for conn in self.pool: | |
conn.close() | |
self.pool.clear() | |
# | |
# class Database: | |
# def __init__(self, db_name='media_summary.db'): | |
# self.db_path = get_database_path(db_name) | |
# self.pool = [] | |
# self.pool_size = 10 | |
# | |
# @contextmanager | |
# def get_connection(self): | |
# retry_count = 5 | |
# retry_delay = 1 | |
# while retry_count > 0: | |
# try: | |
# if self.pool: | |
# conn = self.pool.pop() | |
# else: | |
# conn = sqlite3.connect(self.db_path, check_same_thread=False) | |
# yield conn | |
# self.pool.append(conn) | |
# return | |
# except sqlite3.OperationalError as e: | |
# if 'database is locked' in str(e): | |
# logger.warning(f"Database is locked, retrying in {retry_delay} seconds...") | |
# retry_count -= 1 | |
# time.sleep(retry_delay) | |
# retry_delay *= 2 # Exponential backoff | |
# else: | |
# raise DatabaseError(f"Database error: {e}") | |
# except Exception as e: | |
# raise DatabaseError(f"Unexpected error: {e}") | |
# raise DatabaseError("Database is locked and retries have been exhausted") | |
# | |
# def execute_query(self, query: str, params: Tuple = ()) -> None: | |
# with self.get_connection() as conn: | |
# try: | |
# cursor = conn.cursor() | |
# cursor.execute(query, params) | |
# conn.commit() | |
# except sqlite3.Error as e: | |
# logger.error(f"Database error: {e}, Query: {query}") | |
# raise DatabaseError(f"Database error: {e}, Query: {query}") | |
# | |
# def close_all_connections(self): | |
# for conn in self.pool: | |
# conn.close() | |
# self.pool.clear() | |
# | |
# | |
# End of Database Manager Class | |
############################################################################################################ | |
# | |
# Database Config loading | |
def get_db_config(): | |
try: | |
config = load_comprehensive_config() | |
if 'Database' not in config: | |
print("Warning: 'Database' section not found in config. Using default values.") | |
return default_db_config() | |
return { | |
'type': config.get('Database', 'type', fallback='sqlite'), | |
'sqlite_path': config.get('Database', 'sqlite_path', fallback='Databases/media_summary.db'), | |
'elasticsearch_host': config.get('Database', 'elasticsearch_host', fallback='localhost'), | |
'elasticsearch_port': config.getint('Database', 'elasticsearch_port', fallback=9200) | |
} | |
except FileNotFoundError: | |
print("Warning: Config file not found. Using default database configuration.") | |
return default_db_config() | |
except Exception as e: | |
print(f"Error reading config: {str(e)}. Using default database configuration.") | |
return default_db_config() | |
def default_db_config(): | |
"""Return the default database configuration with project-relative paths.""" | |
return { | |
'type': 'sqlite', | |
'sqlite_path': get_database_path('media_summary.db'), | |
'elasticsearch_host': 'localhost', | |
'elasticsearch_port': 9200 | |
} | |
def ensure_directory_exists(file_path): | |
directory = os.path.dirname(file_path) | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
print(f"Created directory: {directory}") | |
# Use the config to set up the database | |
db_config = get_db_config() | |
db_type = db_config['type'] | |
if db_type == 'sqlite': | |
db = Database(os.path.basename(db_config['sqlite_path'])) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch setup here if needed | |
raise NotImplementedError("Elasticsearch support not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
# Print database path for debugging | |
print(f"Database path: {db.db_path}") | |
# Sanity Check for SQLite DB | |
# FIXME - Remove this after testing / Writing Unit tests | |
# try: | |
# db.execute_query("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY)") | |
# logger.info("Successfully created test table") | |
# except DatabaseError as e: | |
# logger.error(f"Failed to create test table: {e}") | |
# | |
# End of Database Config loading | |
############################################################################################################ | |
# | |
# DB Search functions | |
def search_db(search_query: str, search_fields: List[str], keywords: str, page: int = 1, results_per_page: int = 10): | |
if db_type == 'sqlite': | |
return sqlite_search_db(search_query, search_fields, keywords, page, results_per_page) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version when available | |
raise NotImplementedError("Elasticsearch version of search_db not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def view_database(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_view_database(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def search_and_display_items(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_search_and_display_items(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def get_all_content_from_database(): | |
if db_type == 'sqlite': | |
return sqlite_get_all_content_from_database() | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def search_and_display(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_search_and_display(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def check_media_exists(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_check_media_exists(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def get_paginated_files(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_paginated_files(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def get_media_title(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_media_title(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
# | |
# End of DB-Searching functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Transcript-related Functions | |
def get_transcripts(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_transcripts(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
# | |
# End of Transcript-related Functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# DB-Ingestion functions | |
def add_media_to_database(*args, **kwargs): | |
if db_type == 'sqlite': | |
result = sqlite_add_media_to_database(*args, **kwargs) | |
# Extract content | |
segments = args[2] | |
if isinstance(segments, list): | |
content = ' '.join([segment.get('Text', '') for segment in segments if 'Text' in segment]) | |
elif isinstance(segments, dict): | |
content = segments.get('text', '') or segments.get('content', '') | |
else: | |
content = str(segments) | |
# Extract media_id from the result | |
# Assuming the result is in the format "Media 'Title' added/updated successfully with ID: {media_id}" | |
import re | |
match = re.search(r"with ID: (\d+)", result) | |
if match: | |
media_id = int(match.group(1)) | |
# Create initial document version | |
sqlite_create_document_version(media_id, content) | |
return result | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_to_database not yet implemented") | |
def import_obsidian_note_to_db(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_import_obsidian_note_to_db(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def update_media_content(*args, **kwargs): | |
if db_type == 'sqlite': | |
result = sqlite_update_media_content(*args, **kwargs) | |
# Extract media_id and content | |
selected_item = args[0] | |
item_mapping = args[1] | |
content_input = args[2] | |
if selected_item and item_mapping and selected_item in item_mapping: | |
media_id = item_mapping[selected_item] | |
# Create new document version | |
sqlite_create_document_version(media_id, content_input) | |
return result | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of update_media_content not yet implemented") | |
def add_media_with_keywords(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_add_media_with_keywords(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def check_media_and_whisper_model(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_check_media_and_whisper_model(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of check_media_and_whisper_model not yet implemented") | |
def ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt): | |
if db_type == 'sqlite': | |
return sqlite_ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of ingest_article_to_db not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def add_media_chunk(media_id: int, chunk_text: str, start_index: int, end_index: int, chunk_id: str): | |
if db_type == 'sqlite': | |
sqlite_add_media_chunk(db, media_id, chunk_text, start_index, end_index, chunk_id) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def update_fts_for_media(media_id: int): | |
if db_type == 'sqlite': | |
sqlite_update_fts_for_media(db, media_id) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_unprocessed_media(): | |
if db_type == 'sqlite': | |
return sqlite_get_unprocessed_media(db) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of get_unprocessed_media not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
# | |
# End of DB-Ingestion functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Prompt-related functions #FIXME rename /resort | |
def list_prompts(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_list_prompts(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def search_prompts(query): | |
if db_type == 'sqlite': | |
return sqlite_search_prompts(query) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def fetch_prompt_details(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_fetch_prompt_details(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def add_prompt(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_add_prompt(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def add_or_update_prompt(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_add_or_update_prompt(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def load_prompt_details(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_load_prompt_details(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def load_preset_prompts(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_load_preset_prompts() | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def insert_prompt_to_db(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_insert_prompt_to_db(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def delete_prompt(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_delete_prompt(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def search_media_database(query: str) -> List[Tuple[int, str, str]]: | |
if db_type == 'sqlite': | |
return sqlite_search_media_database(query) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version when available | |
raise NotImplementedError("Elasticsearch version of search_media_database not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def mark_as_trash(media_id: int) -> None: | |
if db_type == 'sqlite': | |
return sqlite_mark_as_trash(media_id) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version when available | |
raise NotImplementedError("Elasticsearch version of mark_as_trash not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_media_content(media_id: int) -> str: | |
if db_type == 'sqlite': | |
return sqlite_get_media_content(media_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_media_content not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_media_transcripts(media_id: int) -> List[Dict]: | |
if db_type == 'sqlite': | |
return sqlite_get_media_transcripts(media_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_media_transcripts not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_specific_transcript(transcript_id: int) -> Dict: | |
if db_type == 'sqlite': | |
return sqlite_get_specific_transcript(transcript_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_specific_transcript not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_media_summaries(media_id: int) -> List[Dict]: | |
if db_type == 'sqlite': | |
return sqlite_get_media_summaries(media_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_media_summaries not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_specific_summary(summary_id: int) -> Dict: | |
if db_type == 'sqlite': | |
return sqlite_get_specific_summary(summary_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_specific_summary not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_media_prompts(media_id: int) -> List[Dict]: | |
if db_type == 'sqlite': | |
return sqlite_get_media_prompts(media_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_media_prompts not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def get_specific_prompt(prompt_id: int) -> Dict: | |
if db_type == 'sqlite': | |
return sqlite_get_specific_prompt(prompt_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of get_specific_prompt not yet implemented") | |
else: | |
return {'error': f"Unsupported database type: {db_type}"} | |
def delete_specific_transcript(transcript_id: int) -> str: | |
if db_type == 'sqlite': | |
return sqlite_delete_specific_transcript(transcript_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of delete_specific_transcript not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def delete_specific_summary(summary_id: int) -> str: | |
if db_type == 'sqlite': | |
return sqlite_delete_specific_summary(summary_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of delete_specific_summary not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
def delete_specific_prompt(prompt_id: int) -> str: | |
if db_type == 'sqlite': | |
return sqlite_delete_specific_prompt(prompt_id) | |
elif db_type == 'elasticsearch': | |
raise NotImplementedError("Elasticsearch version of delete_specific_prompt not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
# | |
# End of Prompt-related functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Keywords-related Functions | |
def keywords_browser_interface(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_keywords_browser_interface() | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def add_keyword(*args, **kwargs): | |
if db_type == 'sqlite': | |
with db.get_connection() as conn: | |
cursor = conn.cursor() | |
return sqlite_add_keyword(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def delete_keyword(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_delete_keyword(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def export_keywords_to_csv(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_export_keywords_to_csv() | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def update_keywords_for_media(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_update_keywords_for_media(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def fetch_keywords_for_media(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_fetch_keywords_for_media(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
# | |
# End of Keywords-related Functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Chat-related Functions | |
def delete_chat_message(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_delete_chat_message(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def update_chat_message(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_update_chat_message(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def add_chat_message(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_add_chat_message(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def get_chat_messages(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_chat_messages(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def search_chat_conversations(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_search_chat_conversations(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def create_chat_conversation(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_create_chat_conversation(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def save_chat_history_to_database(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_save_chat_history_to_database(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def get_conversation_name(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_conversation_name(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
# | |
# End of Chat-related Functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Trash-related Functions | |
def get_trashed_items(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_trashed_items() | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def user_delete_item(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_user_delete_item(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def empty_trash(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_empty_trash(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
def fetch_item_details(media_id: int) -> Tuple[str, str, str]: | |
""" | |
Fetch the details of a media item including content, prompt, and summary. | |
Args: | |
media_id (int): The ID of the media item. | |
Returns: | |
Tuple[str, str, str]: A tuple containing (content, prompt, summary). | |
If an error occurs, it returns empty strings for each field. | |
""" | |
if db_type == 'sqlite': | |
return sqlite_fetch_item_details(media_id) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version when available | |
raise NotImplementedError("Elasticsearch version of fetch_item_details not yet implemented") | |
else: | |
raise ValueError(f"Unsupported database type: {db_type}") | |
# | |
# End of Trash-related Functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# DB-Backup Functions | |
def create_automated_backup(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_create_automated_backup(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented") | |
# | |
# End of DB-Backup Functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Document Versioning Functions | |
def create_document_version(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_create_document_version(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of create_document_version not yet implemented") | |
def get_document_version(*args, **kwargs): | |
if db_type == 'sqlite': | |
return sqlite_get_document_version(*args, **kwargs) | |
elif db_type == 'elasticsearch': | |
# Implement Elasticsearch version | |
raise NotImplementedError("Elasticsearch version of get_document_version not yet implemented") | |
# | |
# End of Document Versioning Functions | |
############################################################################################################ | |
############################################################################################################ | |
# | |
# Function to close the database connection for SQLite | |
def close_connection(): | |
if db_type == 'sqlite': | |
db.close_all_connections() | |
# Elasticsearch doesn't need explicit closing | |
# | |
# End of file | |
############################################################################################################ | |