File size: 7,279 Bytes
43cd37c
 
 
 
 
c5b0bb7
43cd37c
 
c5b0bb7
 
43cd37c
c5b0bb7
 
 
43cd37c
 
c5b0bb7
 
 
 
43cd37c
 
 
 
c5b0bb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Plaintext_Files.py
# Description: This file contains functions for reading and writing plaintext files.
#
# Import necessary libraries
import logging
import os
import tempfile
import zipfile
from datetime import datetime

#
# External Imports
from docx2txt import docx2txt
from pypandoc import convert_file
#
# Local Imports
from App_Function_Libraries.Gradio_UI.Import_Functionality import import_data
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram


#
#######################################################################################################################
#
# Function Definitions

def import_plain_text_file(file_path, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key):
    """Import a single plain text file."""
    try:
        log_counter("file_processing_attempt", labels={"file_path": file_path})

        # Extract title from filename
        title = os.path.splitext(os.path.basename(file_path))[0]

        # Determine the file type and convert if necessary
        file_extension = os.path.splitext(file_path)[1].lower()

        # Get the content based on file type
        try:
            if file_extension == '.rtf':
                with tempfile.NamedTemporaryFile(suffix='.md', delete=False) as temp_file:
                    convert_file(file_path, 'md', outputfile=temp_file.name)
                    file_path = temp_file.name
                    with open(file_path, 'r', encoding='utf-8') as file:
                        content = file.read()
                log_counter("rtf_conversion_success", labels={"file_path": file_path})
            elif file_extension == '.docx':
                content = docx2txt.process(file_path)
                log_counter("docx_conversion_success", labels={"file_path": file_path})
            else:
                with open(file_path, 'r', encoding='utf-8') as file:
                    content = file.read()
        except Exception as e:
            logging.error(f"Error reading file content: {str(e)}")
            return f"Error reading file content: {str(e)}"

        # Import the content
        result = import_data(
            content,  # Pass the content directly
            title,
            author,
            keywords,
            user_prompt,  # This is the custom_prompt parameter
            None,        # No summary - let auto_summarize handle it
            auto_summarize,
            api_name,
            api_key
        )

        log_counter("file_processing_success", labels={"file_path": file_path})
        return result

    except Exception as e:
        logging.exception(f"Error processing file {file_path}")
        log_counter("file_processing_error", labels={"file_path": file_path, "error": str(e)})
        return f"Error processing file {os.path.basename(file_path)}: {str(e)}"


def process_plain_text_zip_file(zip_file, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key):
    """Process multiple text files from a zip archive."""
    results = []
    try:
        with tempfile.TemporaryDirectory() as temp_dir:
            with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
                zip_ref.extractall(temp_dir)

            for filename in os.listdir(temp_dir):
                if filename.lower().endswith(('.md', '.txt', '.rtf', '.docx')):
                    file_path = os.path.join(temp_dir, filename)
                    result = import_plain_text_file(
                        file_path=file_path,
                        author=author,
                        keywords=keywords,
                        system_prompt=system_prompt,
                        user_prompt=user_prompt,
                        auto_summarize=auto_summarize,
                        api_name=api_name,
                        api_key=api_key
                    )
                    results.append(f"πŸ“„ {filename}: {result}")

        return "\n\n".join(results)
    except Exception as e:
        logging.exception(f"Error processing zip file: {str(e)}")
        return f"Error processing zip file: {str(e)}"



def import_file_handler(files, author, keywords, system_prompt, user_prompt, auto_summarize, api_name, api_key):
    """Handle the import of one or more files, including zip files."""
    try:
        if not files:
            log_counter("plaintext_import_error", labels={"error": "No files uploaded"})
            return "No files uploaded."

        # Convert single file to list for consistent processing
        if not isinstance(files, list):
            files = [files]

        results = []
        for file in files:
            log_counter("plaintext_import_attempt", labels={"file_name": file.name})

            start_time = datetime.now()

            if not os.path.exists(file.name):
                log_counter("plaintext_import_error", labels={"error": "File not found", "file_name": file.name})
                results.append(f"❌ File not found: {file.name}")
                continue

            if file.name.lower().endswith(('.md', '.txt', '.rtf', '.docx')):
                result = import_plain_text_file(
                    file_path=file.name,
                    author=author,
                    keywords=keywords,
                    system_prompt=system_prompt,
                    user_prompt=user_prompt,
                    auto_summarize=auto_summarize,
                    api_name=api_name,
                    api_key=api_key
                )
                log_counter("plaintext_import_success", labels={"file_name": file.name})
                results.append(f"πŸ“„ {file.name}: {result}")

            elif file.name.lower().endswith('.zip'):
                result = process_plain_text_zip_file(
                    zip_file=file,
                    author=author,
                    keywords=keywords,
                    system_prompt=system_prompt,
                    user_prompt=user_prompt,
                    auto_summarize=auto_summarize,
                    api_name=api_name,
                    api_key=api_key
                )
                log_counter("zip_import_success", labels={"file_name": file.name})
                results.append(f"πŸ“¦ {file.name}:\n{result}")

            else:
                log_counter("unsupported_file_type", labels={"file_type": file.name.split('.')[-1]})
                results.append(f"❌ Unsupported file type: {file.name}")
                continue

            end_time = datetime.now()
            processing_time = (end_time - start_time).total_seconds()
            log_histogram("plaintext_import_duration", processing_time, labels={"file_name": file.name})

        return "\n\n".join(results)

    except Exception as e:
        logging.exception("Error in import_file_handler")
        log_counter("plaintext_import_error", labels={"error": str(e)})
        return f"❌ Error during import: {str(e)}"

#
# End of Plaintext_Files.py
#######################################################################################################################