oceansweep's picture
Upload 169 files
c5b0bb7 verified
# Plaintext_Files.py
# Description: This file contains functions for reading and writing plaintext files.
#
# Import necessary libraries
import logging
import os
import tempfile
import zipfile
from datetime import datetime
#
# External Imports
from docx2txt import docx2txt
from pypandoc import convert_file
#
# Local Imports
from App_Function_Libraries.Gradio_UI.Import_Functionality import import_data
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
#
#######################################################################################################################
#
# Function Definitions
def import_plain_text_file(file_path, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key):
"""Import a single plain text file."""
try:
log_counter("file_processing_attempt", labels={"file_path": file_path})
# Extract title from filename
title = os.path.splitext(os.path.basename(file_path))[0]
# Determine the file type and convert if necessary
file_extension = os.path.splitext(file_path)[1].lower()
# Get the content based on file type
try:
if file_extension == '.rtf':
with tempfile.NamedTemporaryFile(suffix='.md', delete=False) as temp_file:
convert_file(file_path, 'md', outputfile=temp_file.name)
file_path = temp_file.name
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
log_counter("rtf_conversion_success", labels={"file_path": file_path})
elif file_extension == '.docx':
content = docx2txt.process(file_path)
log_counter("docx_conversion_success", labels={"file_path": file_path})
else:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
except Exception as e:
logging.error(f"Error reading file content: {str(e)}")
return f"Error reading file content: {str(e)}"
# Import the content
result = import_data(
content, # Pass the content directly
title,
author,
keywords,
user_prompt, # This is the custom_prompt parameter
None, # No summary - let auto_summarize handle it
auto_summarize,
api_name,
api_key
)
log_counter("file_processing_success", labels={"file_path": file_path})
return result
except Exception as e:
logging.exception(f"Error processing file {file_path}")
log_counter("file_processing_error", labels={"file_path": file_path, "error": str(e)})
return f"Error processing file {os.path.basename(file_path)}: {str(e)}"
def process_plain_text_zip_file(zip_file, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key):
"""Process multiple text files from a zip archive."""
results = []
try:
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
for filename in os.listdir(temp_dir):
if filename.lower().endswith(('.md', '.txt', '.rtf', '.docx')):
file_path = os.path.join(temp_dir, filename)
result = import_plain_text_file(
file_path=file_path,
author=author,
keywords=keywords,
system_prompt=system_prompt,
user_prompt=user_prompt,
auto_summarize=auto_summarize,
api_name=api_name,
api_key=api_key
)
results.append(f"πŸ“„ {filename}: {result}")
return "\n\n".join(results)
except Exception as e:
logging.exception(f"Error processing zip file: {str(e)}")
return f"Error processing zip file: {str(e)}"
def import_file_handler(files, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key):
"""Handle the import of one or more files, including zip files."""
try:
if not files:
log_counter("plaintext_import_error", labels={"error": "No files uploaded"})
return "No files uploaded."
# Convert single file to list for consistent processing
if not isinstance(files, list):
files = [files]
results = []
for file in files:
log_counter("plaintext_import_attempt", labels={"file_name": file.name})
start_time = datetime.now()
if not os.path.exists(file.name):
log_counter("plaintext_import_error", labels={"error": "File not found", "file_name": file.name})
results.append(f"❌ File not found: {file.name}")
continue
if file.name.lower().endswith(('.md', '.txt', '.rtf', '.docx')):
result = import_plain_text_file(
file_path=file.name,
author=author,
keywords=keywords,
system_prompt=system_prompt,
user_prompt=user_prompt,
auto_summarize=auto_summarize,
api_name=api_name,
api_key=api_key
)
log_counter("plaintext_import_success", labels={"file_name": file.name})
results.append(f"πŸ“„ {file.name}: {result}")
elif file.name.lower().endswith('.zip'):
result = process_plain_text_zip_file(
zip_file=file,
author=author,
keywords=keywords,
system_prompt=system_prompt,
user_prompt=user_prompt,
auto_summarize=auto_summarize,
api_name=api_name,
api_key=api_key
)
log_counter("zip_import_success", labels={"file_name": file.name})
results.append(f"πŸ“¦ {file.name}:\n{result}")
else:
log_counter("unsupported_file_type", labels={"file_type": file.name.split('.')[-1]})
results.append(f"❌ Unsupported file type: {file.name}")
continue
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds()
log_histogram("plaintext_import_duration", processing_time, labels={"file_name": file.name})
return "\n\n".join(results)
except Exception as e:
logging.exception("Error in import_file_handler")
log_counter("plaintext_import_error", labels={"error": str(e)})
return f"❌ Error during import: {str(e)}"
#
# End of Plaintext_Files.py
#######################################################################################################################