import os from typing import List from chainlit.types import AskFileResponse from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader from aimakerspace.openai_utils.prompts import ( UserRolePrompt, SystemRolePrompt, AssistantRolePrompt, ) from aimakerspace.openai_utils.embedding import EmbeddingModel from aimakerspace.vectordatabase import VectorDatabase from aimakerspace.openai_utils.chatmodel import ChatOpenAI import chainlit as cl import tempfile import pandas as pd import pdfplumber system_template = """\ Use the following context to answer the user's question. If you cannot find the answer in the context, say you don't know the answer. Additionally, if the user requests a summary or context overview, generate an engaging and concise summary that captures the main ideas with an interesting and appealing tone. """ system_role_prompt = SystemRolePrompt(system_template) user_prompt_template = """\ Context: {context} Question: {question} """ user_role_prompt = UserRolePrompt(user_prompt_template) class RetrievalAugmentedQAPipeline: def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase) -> None: self.llm = llm self.vector_db_retriever = vector_db_retriever async def arun_pipeline(self, user_query: str): context_list = self.vector_db_retriever.search_by_text(user_query, k=4) context_prompt = "" for context in context_list: context_prompt += context[0] + "\n" formatted_system_prompt = system_role_prompt.create_message() formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt) async def generate_response(): async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]): yield chunk return {"response": generate_response(), "context": context_list} text_splitter = CharacterTextSplitter() def process_text_file(file: AskFileResponse): with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".txt") as temp_file: temp_file_path = temp_file.name temp_file.write(file.read) text_loader = TextFileLoader(temp_file_path) documents = text_loader.load_documents() texts = text_splitter.split_texts(documents) return texts def process_pdf_file(file: AskFileResponse): # Use the path attribute to read the file directly temp_file_path = file.path # Get the path of the uploaded file extracted_text = "" with pdfplumber.open(temp_file_path) as pdf: for page in pdf.pages: extracted_text += page.extract_text() texts = text_splitter.split_texts([extracted_text]) return texts def process_csv_file(file: AskFileResponse): with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".csv") as temp_file: temp_file_path = temp_file.name temp_file.write(file.content) df = pd.read_csv(temp_file_path) texts = df.apply(lambda row: ' '.join(row.astype(str)), axis=1).tolist() return text_splitter.split_texts(texts) @cl.on_chat_start async def on_chat_start(): cl.user_session.set("all_texts", []) files = await cl.AskFileMessage( content="Please upload one or more Text, PDF, or CSV files to begin!", accept=["text/plain", "application/pdf", "text/csv"], max_size_mb=20, timeout=180, ).send() if not files: await cl.Message(content="No files were uploaded. Please upload at least one file to proceed.").send() return all_texts = cl.user_session.get("all_texts", []) for file in files: file_type = file.name.split(".")[-1].lower() msg = cl.Message(content=f"Processing `{file.name}`...") await msg.send() # Process each file based on its type if file_type == "txt": texts = process_text_file(file) elif file_type == "pdf": texts = process_pdf_file(file) elif file_type == "csv": texts = process_csv_file(file) else: await cl.Message(content=f"Unsupported file type: `{file.name}`. Please upload text, PDF, or CSV files.").send() continue all_texts.extend(texts) # Combine texts from all uploaded files cl.user_session.set("all_texts", all_texts) await cl.Message(content="Files processed! You can now start asking questions.").send() @cl.on_message async def main(message): chain = cl.user_session.get("chain") if not chain: all_texts = cl.user_session.get("all_texts") if not all_texts: await cl.Message(content="Please upload at least one file before asking questions.").send() return # Create a dict vector store vector_db = VectorDatabase() vector_db = await vector_db.abuild_from_list(all_texts) chat_openai = ChatOpenAI() # Create a chain retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline( vector_db_retriever=vector_db, llm=chat_openai ) cl.user_session.set("chain", retrieval_augmented_qa_pipeline) chain = retrieval_augmented_qa_pipeline msg = cl.Message(content="") result = await chain.arun_pipeline(message.content) async for stream_resp in result["response"]: await msg.stream_token(stream_resp) await msg.send()