# Book_Ingestion_Lib.py ######################################### # Library to hold functions for ingesting book files.# # #################### # Function List # # 1. ingest_text_file(file_path, title=None, author=None, keywords=None): # 2. # # #################### # # Imports import os import re import tempfile import zipfile from datetime import datetime import logging import xml.etree.ElementTree as ET import html2text import csv # # External Imports import ebooklib from bs4 import BeautifulSoup from ebooklib import epub # # Import Local from App_Function_Libraries.DB.DB_Manager import add_media_with_keywords, add_media_to_database from App_Function_Libraries.Summarization.Summarization_General_Lib import perform_summarization from App_Function_Libraries.Chunk_Lib import chunk_ebook_by_chapters from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram # ####################################################################################################################### # Function Definitions # def import_epub(file_path, title=None, author=None, keywords=None, custom_prompt=None, system_prompt=None, summary=None, auto_summarize=False, api_name=None, api_key=None, chunk_options=None, custom_chapter_pattern=None ): """ Imports an EPUB file, extracts its content, chunks it, optionally summarizes it, and adds it to the database. Parameters: - file_path (str): Path to the EPUB file. - title (str, optional): Title of the book. - author (str, optional): Author of the book. - keywords (str, optional): Comma-separated keywords for the book. - custom_prompt (str, optional): Custom user prompt for summarization. - summary (str, optional): Predefined summary of the book. - auto_summarize (bool, optional): Whether to auto-summarize the chunks. - api_name (str, optional): API name for summarization. - api_key (str, optional): API key for summarization. - chunk_options (dict, optional): Options for chunking. - custom_chapter_pattern (str, optional): Custom regex pattern for chapter detection. Returns: - str: Status message indicating success or failure. """ try: logging.info(f"Importing EPUB file from {file_path}") log_counter("epub_import_attempt", labels={"file_path": file_path}) start_time = datetime.now() # Convert EPUB to Markdown markdown_content = epub_to_markdown(file_path) logging.debug("Converted EPUB to Markdown.") # Extract metadata if not provided if not title or not author: extracted_title, extracted_author = extract_epub_metadata(markdown_content) title = title or extracted_title or os.path.splitext(os.path.basename(file_path))[0] author = author or extracted_author or "Unknown" logging.debug(f"Extracted metadata - Title: {title}, Author: {author}") # Process keywords keyword_list = [kw.strip() for kw in keywords.split(',')] if keywords else [] logging.debug(f"Keywords: {keyword_list}") # Set default chunk options if not provided if chunk_options is None: chunk_options = { 'method': 'chapter', 'max_size': 500, 'overlap': 200, 'custom_chapter_pattern': custom_chapter_pattern } else: # Ensure 'method' is set to 'chapter' when using chapter chunking chunk_options.setdefault('method', 'chapter') chunk_options.setdefault('custom_chapter_pattern', custom_chapter_pattern) # Chunk the content by chapters chunks = chunk_ebook_by_chapters(markdown_content, chunk_options) logging.info(f"Total chunks created: {len(chunks)}") log_histogram("epub_chunks_created", len(chunks), labels={"file_path": file_path}) if chunks: logging.debug(f"Structure of first chunk: {chunks[0].keys()}") # Handle summarization if enabled if auto_summarize and api_name and api_key: logging.info("Auto-summarization is enabled.") summarized_chunks = [] for chunk in chunks: chunk_text = chunk.get('text', '') if chunk_text: summary_text = perform_summarization(api_name, chunk_text, custom_prompt, api_key, recursive_summarization=False, temp=None, system_message=system_prompt ) chunk['metadata']['summary'] = summary_text summarized_chunks.append(chunk) chunks = summarized_chunks logging.info("Summarization of chunks completed.") log_counter("epub_chunks_summarized", value=len(chunks), labels={"file_path": file_path}) else: # If not summarizing, set a default summary or use provided summary if summary: logging.debug("Using provided summary.") else: summary = "No summary provided." # Create info_dict info_dict = { 'title': title, 'uploader': author, 'ingestion_date': datetime.now().strftime('%Y-%m-%d') } # Prepare segments for database segments = [{'Text': chunk.get('text', chunk.get('content', ''))} for chunk in chunks] logging.debug(f"Prepared segments for database. Number of segments: {len(segments)}") # Add to database result = add_media_to_database( url=file_path, info_dict=info_dict, segments=segments, summary=summary, keywords=keyword_list, custom_prompt_input=custom_prompt, whisper_model="Imported", media_type="ebook", overwrite=False ) end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() log_histogram("epub_import_duration", processing_time, labels={"file_path": file_path}) logging.info(f"Ebook '{title}' by {author} imported successfully. Database result: {result}") log_counter("epub ingested into the DB successfully", labels={"file_path": file_path}) return f"Ebook '{title}' by {author} imported successfully. Database result: {result}" except Exception as e: logging.exception(f"Error importing ebook: {str(e)}") log_counter("epub_import_error", labels={"file_path": file_path, "error": str(e)}) return f"Error importing ebook: {str(e)}" # FIXME def process_zip_file(zip_file, title, author, keywords, custom_prompt, system_prompt, summary, auto_summarize, api_name, api_key, chunk_options ): """ Processes a ZIP file containing multiple EPUB files and imports each one. Parameters: - zip_file (file-like object): The ZIP file to process. - title (str): Title prefix for the books. - author (str): Author name for the books. - keywords (str): Comma-separated keywords. - custom_prompt (str): Custom user prompt for summarization. - summary (str): Predefined summary (not used in this context). - auto_summarize (bool): Whether to auto-summarize the chunks. - api_name (str): API name for summarization. - api_key (str): API key for summarization. - chunk_options (dict): Options for chunking. Returns: - str: Combined status messages for all EPUB files in the ZIP. """ results = [] try: with tempfile.TemporaryDirectory() as temp_dir: zip_path = zip_file.name if hasattr(zip_file, 'name') else zip_file.path logging.info(f"Extracting ZIP file {zip_path} to temporary directory {temp_dir}") log_counter("zip_processing_attempt", labels={"zip_path": zip_path}) with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) epub_files = [f for f in os.listdir(temp_dir) if f.lower().endswith('.epub')] log_histogram("epub_files_in_zip", len(epub_files), labels={"zip_path": zip_path}) for filename in epub_files: file_path = os.path.join(temp_dir, filename) logging.info(f"Processing EPUB file {filename} from ZIP.") result = import_epub( file_path=file_path, title=title, author=author, keywords=keywords, custom_prompt=custom_prompt, summary=summary, auto_summarize=auto_summarize, api_name=api_name, api_key=api_key, chunk_options=chunk_options, custom_chapter_pattern=chunk_options.get('custom_chapter_pattern') if chunk_options else None ) results.append(f"File: {filename} - {result}") logging.info("Completed processing all EPUB files in the ZIP.") log_counter("zip_processing_success", labels={"zip_path": zip_path}) except Exception as e: logging.exception(f"Error processing ZIP file: {str(e)}") log_counter("zip_processing_error", labels={"zip_path": zip_path, "error": str(e)}) return f"Error processing ZIP file: {str(e)}" return "\n".join(results) def import_html(file_path, title=None, author=None, keywords=None, **kwargs): """ Imports an HTML file and converts it to markdown format. """ try: logging.info(f"Importing HTML file from {file_path}") h = html2text.HTML2Text() h.ignore_links = False with open(file_path, 'r', encoding='utf-8') as file: html_content = file.read() markdown_content = h.handle(html_content) # Extract title from HTML if not provided if not title: soup = BeautifulSoup(html_content, 'html.parser') title_tag = soup.find('title') title = title_tag.string if title_tag else os.path.basename(file_path) return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs) except Exception as e: logging.exception(f"Error importing HTML file: {str(e)}") raise def import_xml(file_path, title=None, author=None, keywords=None, **kwargs): """ Imports an XML file and converts it to markdown format. """ try: logging.info(f"Importing XML file from {file_path}") tree = ET.parse(file_path) root = tree.getroot() # Convert XML to markdown markdown_content = xml_to_markdown(root) return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs) except Exception as e: logging.exception(f"Error importing XML file: {str(e)}") raise def import_opml(file_path, title=None, author=None, keywords=None, **kwargs): """ Imports an OPML file and converts it to markdown format. """ try: logging.info(f"Importing OPML file from {file_path}") tree = ET.parse(file_path) root = tree.getroot() # Extract title from OPML if not provided if not title: title_elem = root.find(".//title") title = title_elem.text if title_elem is not None else os.path.basename(file_path) # Convert OPML to markdown markdown_content = opml_to_markdown(root) return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs) except Exception as e: logging.exception(f"Error importing OPML file: {str(e)}") raise def xml_to_markdown(element, level=0): """ Recursively converts XML elements to markdown format. """ markdown = "" # Add element name as heading if level > 0: markdown += f"{'#' * min(level, 6)} {element.tag}\n\n" # Add element text if it exists if element.text and element.text.strip(): markdown += f"{element.text.strip()}\n\n" # Process child elements for child in element: markdown += xml_to_markdown(child, level + 1) return markdown def opml_to_markdown(root): """ Converts OPML structure to markdown format. """ markdown = "# Table of Contents\n\n" def process_outline(outline, level=0): result = "" for item in outline.findall("outline"): text = item.get("text", "") result += f"{' ' * level}- {text}\n" result += process_outline(item, level + 1) return result body = root.find(".//body") if body is not None: markdown += process_outline(body) return markdown def process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs): """ Processes markdown content and adds it to the database. """ info_dict = { 'title': title or os.path.basename(file_path), 'uploader': author or "Unknown", 'ingestion_date': datetime.now().strftime('%Y-%m-%d') } # Create segments (you may want to adjust the chunking method) segments = [{'Text': markdown_content}] # Add to database result = add_media_to_database( url=file_path, info_dict=info_dict, segments=segments, summary=kwargs.get('summary', "No summary provided"), keywords=keywords.split(',') if keywords else [], custom_prompt_input=kwargs.get('custom_prompt'), whisper_model="Imported", media_type="document", overwrite=False ) return f"Document '{title}' imported successfully. Database result: {result}" def import_file_handler(files, author, keywords, system_prompt, custom_prompt, auto_summarize, api_name, api_key, max_chunk_size, chunk_overlap, custom_chapter_pattern): try: if not files: return "No files uploaded." # Convert single file to list for consistent processing if not isinstance(files, list): files = [files] results = [] for file in files: log_counter("file_import_attempt", labels={"file_name": file.name}) # Handle max_chunk_size and chunk_overlap chunk_size = int(max_chunk_size) if isinstance(max_chunk_size, (str, int)) else 4000 overlap = int(chunk_overlap) if isinstance(chunk_overlap, (str, int)) else 0 chunk_options = { 'method': 'chapter', 'max_size': chunk_size, 'overlap': overlap, 'custom_chapter_pattern': custom_chapter_pattern if custom_chapter_pattern else None } file_path = file.name if not os.path.exists(file_path): results.append(f"❌ File not found: {file.name}") continue start_time = datetime.now() # Extract title from filename title = os.path.splitext(os.path.basename(file_path))[0] if file_path.lower().endswith('.epub'): status = import_epub( file_path, title=title, # Use filename as title author=author, keywords=keywords, custom_prompt=custom_prompt, system_prompt=system_prompt, summary=None, auto_summarize=auto_summarize, api_name=api_name, api_key=api_key, chunk_options=chunk_options, custom_chapter_pattern=custom_chapter_pattern ) log_counter("epub_import_success", labels={"file_name": file.name}) results.append(f"📚 {file.name}: {status}") elif file_path.lower().endswith('.zip'): status = process_zip_file( zip_file=file, title=None, # Let each file use its own name author=author, keywords=keywords, custom_prompt=custom_prompt, system_prompt=system_prompt, summary=None, auto_summarize=auto_summarize, api_name=api_name, api_key=api_key, chunk_options=chunk_options ) log_counter("zip_import_success", labels={"file_name": file.name}) results.append(f"📦 {file.name}: {status}") else: results.append(f"❌ Unsupported file type: {file.name}") continue end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() log_histogram("file_import_duration", processing_time, labels={"file_name": file.name}) return "\n\n".join(results) except ValueError as ve: logging.exception(f"Error parsing input values: {str(ve)}") return f"❌ Error: Invalid input for chunk size or overlap. Please enter valid numbers." except Exception as e: logging.exception(f"Error during file import: {str(e)}") return f"❌ Error during import: {str(e)}" def read_epub(file_path): """ Reads and extracts text from an EPUB file. Parameters: - file_path (str): Path to the EPUB file. Returns: - str: Extracted text content from the EPUB. """ try: logging.info(f"Reading EPUB file from {file_path}") book = epub.read_epub(file_path) chapters = [] for item in book.get_items(): if item.get_type() == ebooklib.ITEM_DOCUMENT: chapters.append(item.get_content()) text = "" for html_content in chapters: soup = BeautifulSoup(html_content, 'html.parser') text += soup.get_text(separator='\n\n') + "\n\n" logging.debug("EPUB content extraction completed.") return text except Exception as e: logging.exception(f"Error reading EPUB file: {str(e)}") raise # Ingest a text file into the database with Title/Author/Keywords def extract_epub_metadata(content): title_match = re.search(r'Title:\s*(.*?)\n', content) author_match = re.search(r'Author:\s*(.*?)\n', content) title = title_match.group(1) if title_match else None author = author_match.group(1) if author_match else None return title, author def ingest_text_file(file_path, title=None, author=None, keywords=None): """ Ingests a plain text file into the database with optional metadata. Parameters: - file_path (str): Path to the text file. - title (str, optional): Title of the document. - author (str, optional): Author of the document. - keywords (str, optional): Comma-separated keywords. Returns: - str: Status message indicating success or failure. """ try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() # Check if it's a converted epub and extract metadata if so if 'epub_converted' in (keywords or '').lower(): extracted_title, extracted_author = extract_epub_metadata(content) title = title or extracted_title author = author or extracted_author logging.debug(f"Extracted metadata for converted EPUB - Title: {title}, Author: {author}") # If title is still not provided, use the filename without extension if not title: title = os.path.splitext(os.path.basename(file_path))[0] # If author is still not provided, set it to 'Unknown' if not author: author = 'Unknown' # If keywords are not provided, use a default keyword if not keywords: keywords = 'text_file,epub_converted' else: keywords = f'text_file,epub_converted,{keywords}' # Add the text file to the database add_media_with_keywords( url="its_a_book", title=title, media_type='book', content=content, keywords=keywords, prompt='No prompt for text files', summary='No summary for text files', transcription_model='None', author=author, ingestion_date=datetime.now().strftime('%Y-%m-%d') ) logging.info(f"Text file '{title}' by {author} ingested successfully.") return f"Text file '{title}' by {author} ingested successfully." except Exception as e: logging.error(f"Error ingesting text file: {str(e)}") return f"Error ingesting text file: {str(e)}" def ingest_folder(folder_path, keywords=None): """ Ingests all text files within a specified folder. Parameters: - folder_path (str): Path to the folder containing text files. - keywords (str, optional): Comma-separated keywords to add to each file. Returns: - str: Combined status messages for all ingested text files. """ results = [] try: logging.info(f"Ingesting all text files from folder {folder_path}") for filename in os.listdir(folder_path): if filename.lower().endswith('.txt'): file_path = os.path.join(folder_path, filename) result = ingest_text_file(file_path, keywords=keywords) results.append(result) logging.info("Completed ingestion of all text files in the folder.") except Exception as e: logging.exception(f"Error ingesting folder: {str(e)}") return f"Error ingesting folder: {str(e)}" return "\n".join(results) def epub_to_markdown(epub_path): """ Converts an EPUB file to Markdown format, including the table of contents and chapter contents. Parameters: - epub_path (str): Path to the EPUB file. Returns: - str: Markdown-formatted content of the EPUB. """ try: logging.info(f"Converting EPUB to Markdown from {epub_path}") book = epub.read_epub(epub_path) markdown_content = "# Table of Contents\n\n" chapters = [] # Extract and format the table of contents toc = book.toc for item in toc: if isinstance(item, tuple): section, children = item level = 1 markdown_content += format_toc_item(section, level) for child in children: markdown_content += format_toc_item(child, level + 1) else: markdown_content += format_toc_item(item, 1) markdown_content += "\n---\n\n" # Process each chapter for item in book.get_items(): if item.get_type() == ebooklib.ITEM_DOCUMENT: chapter_content = item.get_content().decode('utf-8') soup = BeautifulSoup(chapter_content, 'html.parser') # Extract chapter title title = soup.find(['h1', 'h2', 'h3']) if title: chapter_title = title.get_text() markdown_content += f"# {chapter_title}\n\n" # Process chapter content for elem in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ul', 'ol']): if elem.name.startswith('h'): level = int(elem.name[1]) markdown_content += f"{'#' * level} {elem.get_text()}\n\n" elif elem.name == 'p': markdown_content += f"{elem.get_text()}\n\n" elif elem.name in ['ul', 'ol']: for li in elem.find_all('li'): prefix = '-' if elem.name == 'ul' else '1.' markdown_content += f"{prefix} {li.get_text()}\n" markdown_content += "\n" markdown_content += "---\n\n" logging.debug("EPUB to Markdown conversion completed.") return markdown_content except Exception as e: logging.exception(f"Error converting EPUB to Markdown: {str(e)}") raise def format_toc_item(item, level): """ Formats a table of contents item into Markdown list format. Parameters: - item (epub.Link or epub.Section): TOC item. - level (int): Heading level for indentation. Returns: - str: Markdown-formatted TOC item. """ try: if isinstance(item, epub.Link): title = item.title elif isinstance(item, epub.Section): title = item.title else: title = str(item) return f"{' ' * (level - 1)}- [{title}](#{slugify(title)})\n" except Exception as e: logging.exception(f"Error formatting TOC item: {str(e)}") return "" def slugify(text): """ Converts a string into a slug suitable for Markdown links. Parameters: - text (str): The text to slugify. Returns: - str: Slugified text. """ return re.sub(r'[\W_]+', '-', text.lower()).strip('-') # # End of Function Definitions #######################################################################################################################