import os import re from dotenv import load_dotenv from langchain.chat_models import ChatOpenAI from langchain.agents import Tool, AgentExecutor from langchain.callbacks import StreamlitCallbackHandler from langchain.chains import LLMMathChain from langchain.chains import LLMChain import streamlit as st import langchain from langchain.utilities import SerpAPIWrapper, GoogleSearchAPIWrapper from langchain.chains import RetrievalQA from langchain.text_splitter import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter, CharacterTextSplitter from langchain.vectorstores import FAISS, Chroma, Pinecone from langchain.embeddings import OpenAIEmbeddings import tempfile import pypdf from pathlib import Path from langchain.docstore.document import Document from langchain.document_loaders import TextLoader from langchain.chains.summarize import load_summarize_chain from langchain import PromptTemplate import lark from langchain.schema import Document import langchain import pinecone from langchain.chains.question_answering import load_qa_chain from typing import List, Dict, Any from langchain.prompts.prompt import PromptTemplate from langchain.agents.openai_functions_agent.agent_token_buffer_memory import AgentTokenBufferMemory from langchain.agents.openai_functions_agent.base import OpenAIFunctionsAgent from langchain.schema.messages import SystemMessage, BaseMessage from langchain.prompts import MessagesPlaceholder from langchain.agents import AgentExecutor from langchain.schema import HumanMessage, SystemMessage from langchain.retrievers.multi_vector import MultiVectorRetriever from langchain.storage import InMemoryStore import uuid from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import DocumentCompressorPipeline, LLMChainFilter from langchain.document_transformers import EmbeddingsRedundantFilter from langchain.retrievers.document_compressors import EmbeddingsFilter import json from bs4 import BeautifulSoup from langchain.document_loaders import SeleniumURLLoader from langchain.memory.chat_message_histories import StreamlitChatMessageHistory from langchain.callbacks import get_openai_callback import pickle from langchain.prompts import ChatPromptTemplate from langchain.schema.output_parser import StrOutputParser langchain.debug = True langchain.verbose = True def on_selectbox_change(): st.session_state.show_info = True def reset_chat(): st.session_state.messages = [{"roles": "assistant", "content": "Hi, I am Miracle. How can I help you?"}] st.session_state.history = [] st.session_state.search_keywords = [] st.session_state.doc_sources = [] st.session_state.summary = None st.session_state.agent.clear_conversation() st.session_state.primed_document_response = None def display_messages(messages): # Display all messages for msg in messages: st.chat_message(msg["roles"]).write(msg["content"]) class DBStore: def __init__(self, file_path, file_name): self.file_path = file_path self.file_name = os.path.splitext(file_name)[0] st.session_state.document_filename = self.file_name self.reader = pypdf.PdfReader(file_path) self.metadata = self.extract_metadata_from_pdf() self.embeddings = OpenAIEmbeddings() self.vector_store = None def extract_metadata_from_pdf(self): """Extract metadata from the PDF.""" metadata = self.reader.metadata st.session_state.document_metadata = metadata return { "title": metadata.get("/Title", "").strip(), "author": metadata.get("/Author", "").strip(), "creation_date": metadata.get("/CreationDate", "").strip(), } def extract_pages_from_pdf(self): pages = [] for page_num, page in enumerate(self.reader.pages): text = page.extract_text() if text.strip(): # Check if extracted text is not empty pages.append((page_num + 1, text)) return pages def parse_pdf(self): """ Extracts the title and text from each page of the PDF. :return: A tuple containing the title and a list of tuples with page numbers and extracted text. """ metadata = self.extract_metadata_from_pdf() pages = self.extract_pages_from_pdf() #st.write(pages) #st.write(metadata) return pages, metadata @staticmethod def merge_hyphenated_words(text): return re.sub(r"(\w)-\n(\w)", r"\1\2", text) @staticmethod def fix_newlines(text): return re.sub(r"(? 0 else "" second_chunk = pdf_text[1].page_content if len(pdf_text) > 1 else "" third_chunk = pdf_text[2].page_content if len(pdf_text) > 2 else "" # Extract the first 300 characters from each chunk to form an information snippet info_document = first_chunk[:300] + second_chunk[:300] + third_chunk[:300] else: info_document = "" #st.write(info_document) return info_document def get_info_response(self): llm = ChatOpenAI( temperature=0, streaming=True, model_name="gpt-3.5-turbo" ) document_filename = self.file_name document_title = self.metadata.get("title", None) document_snippet = self.get_document_info() document_info = { "document_filename": document_filename, "document_title": document_title, "document_snippet": document_snippet, } if document_title: info_response_prompt = """The user has uploaded a document titled '{document_title}' to the Document Database """ else: info_response_prompt = """The user has uploaded a document named '{document_filename}' to the Document Database """ info_response_prompt += """ with the following information: {document_snippet}. In one sentence, inform the user about the document, prioritizing its name or title. Also, prompt the user to ask a general question about the document in an assistive manner. Begin your response with 'It appears you've uploaded a document that contains information on...'. Example: It appears you've uploaded a document that contains information on "COWI Policies and Guideline". Please feel free to ask any question about this document such as "What are the COWI Policies and Guideline?" """ #st.write(info_response_prompt) # Create the LLMChain llm_chain = LLMChain( llm=llm, prompt=PromptTemplate.from_template(info_response_prompt) ) # Generate the primed document message llm_response = llm_chain(document_info) info_response = llm_response.get('text', '') #st.write(info_response) return info_response class DatabaseTool: def __init__(self, llm, vector_store, metadata=None, filename=None): self.llm = llm self.vector_store = vector_store self.metadata = metadata self.filename = filename self.embedding = OpenAIEmbeddings() def get_description(self): base_description = "Always useful for finding the exactly written answer to the question by looking into a collection of documents." filename = self.filename title = self.metadata.get("/Title") if self.metadata else None author = self.metadata.get("/Author") if self.metadata else None subject = self.metadata.get("/Subject") if self.metadata else None footer_description = "Input should be a query, not referencing any obscure pronouns from the conversation before that will pull out relevant information from the database. Use this more than the normal search tool" if title: main_description = f"This tool is currently loaded with '{title}'" if author: main_description += f" by '{author}'" if subject: main_description += f", and has a topic of '{subject}'" return f"{base_description} {main_description}. {footer_description}" else: no_title_description = f"This tool is currently loaded with the document '{filename}'" return f"{base_description} {no_title_description}. {footer_description}" def get_base_retriever(self): base_retriever = self.vector_store.as_retriever(search_kwargs={'k': 5}) return base_retriever def get_contextual_retriever(self): # Initialize embeddings (assuming embeddings is already defined elsewhere) embeddings = self.embedding # Initialize Redundant Filter redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings) # Initialize Relevant Filter relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76, k = 25) #st.write(relevant_filter) # Initialize Text Splitter splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=0, separator=". ") # Create Compressor Pipeline pipeline_compressor = DocumentCompressorPipeline( transformers=[splitter, redundant_filter, relevant_filter] ) # Initialize Contextual Compression Retriever contextual_retriever = ContextualCompressionRetriever( base_compressor=pipeline_compressor, base_retriever=self.get_base_retriever() ) return contextual_retriever def run(self, query: str): contextual_retriever = self.get_contextual_retriever() #DEBUGGING & EVALUTING ANSWERS: compressed_docs = contextual_retriever.get_relevant_documents(query) compressed_docs_list = [] for doc in compressed_docs: doc_info = { "Page Content": doc.page_content, } compressed_docs_list.append(doc_info) #st.write(compressed_docs_list) base_retriever=self.get_base_retriever() initial_retrieved = base_retriever.get_relevant_documents(query) retrieval = RetrievalQA.from_chain_type( llm=self.llm, chain_type="stuff", retriever=contextual_retriever, return_source_documents=True, ) output = retrieval(query) st.session_state.doc_sources = initial_retrieved return output['result'] class BR18_DB: def __init__(self, llm, folder_path: str): self.llm = llm self.folder_path = folder_path self.md_paths = self.load_documents() # Renamed from pdf_paths to md_paths self.embeddings = OpenAIEmbeddings() self.pinecone_index_name = "br18" self.id_key = "doc_id" self.br18_parent_store = InMemoryStore() current_directory = os.getcwd() store_path = os.path.join(current_directory, "inmemorystore", "br18_parent_store.pkl") if self.pinecone_index_name not in pinecone.list_indexes(): pinecone.create_index(self.pinecone_index_name, dimension=1536) self.vectorstore = self.create_vectorstore() self.serialize_inmemorystore(store_path) else: self.vectorstore = Pinecone.from_existing_index(self.pinecone_index_name, self.embeddings) with open(store_path, "rb") as f: self.br18_parent_store = pickle.load(f) self.retriever = None def serialize_inmemorystore(self, store_path): with open(store_path, "wb") as f: pickle.dump(self.br18_parent_store, f) def load_documents(self): md_paths = list(Path(self.folder_path).rglob("*.md")) documents = [] for path in md_paths: loader = TextLoader(str(path)) #st.write(loader) data = loader.load() documents.extend(data) # Assuming data is a list of Document objects #st.text(documents) return documents def split_and_chunk_text(self, markdown_document: Document): markdown_text = markdown_document.page_content # Define headers to split on headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ("###", "Header 3"), ("####", "Header 4"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on) #st.write(markdown_splitter) md_header_splits = markdown_splitter.split_text(markdown_text) #st.write(md_header_splits) #st.write(type(md_header_splits[0])) parent_chunk_size = 5000 parent_chunk_overlap = 0 text_splitter = RecursiveCharacterTextSplitter( chunk_size=parent_chunk_size, chunk_overlap=parent_chunk_overlap ) # Split the header-split documents into chunks all_parent_splits = text_splitter.split_documents(md_header_splits) for split in all_parent_splits: header_3 = split.metadata.get('Header 3', '') header_4 = split.metadata.get('Header 4', '') # Prepend "Section:" to Header 4 if it exists if header_4: header_4 = f"Section: {header_4}" metadata_str = f"{header_3}\n\n{header_4}" split.page_content = f"{metadata_str}\n\n{split.page_content}" split.metadata['type'] = 'parents' return all_parent_splits def save_summaries(self, summaries: List[str]): """Save the generated summaries to a JSON file.""" current_directory = os.getcwd() save_path = os.path.join(current_directory, 'savesummary', 'br18_summaries.json') with open(save_path, 'w') as f: json.dump(summaries, f) def load_summaries(self) -> List[str]: """Load summaries from a JSON file if it exists.""" current_directory = os.getcwd() load_path = os.path.join(current_directory, 'savesummary', 'br18_summaries.json') if os.path.exists(load_path): with open(load_path, 'r') as f: summaries = json.load(f) return summaries else: return None # or raise an exception, or generate new summaries def generate_summaries(self, parent_splits: List[Document]) -> List[str]: loaded_summaries = self.load_summaries() if loaded_summaries is not None: return loaded_summaries chain = ( {"doc": lambda x: x.page_content} | ChatPromptTemplate.from_template("Summarize the following document:\n\n{doc}") | ChatOpenAI(max_retries=3) | StrOutputParser() ) summaries = chain.batch(parent_splits, {"max_concurrency": 4}) self.save_summaries(summaries) return summaries def generate_child_splits(self, parent_splits: List[Document], summaries: List[str]) -> List[Document]: child_chunk_size = 300 child_text_splitter = RecursiveCharacterTextSplitter( chunk_size=child_chunk_size, chunk_overlap=0 ) all_child_splits = [] for i, parent_split in enumerate(parent_splits): child_splits = child_text_splitter.split_text(parent_split.page_content) new_metadata = dict(parent_split.metadata) new_metadata['type'] = 'children' summary_with_prefix = f"Summary: {summaries[i]}" first_child_content = f"{child_splits[0]}\n\n{summary_with_prefix}" first_child_split = Document( page_content=first_child_content, metadata=new_metadata ) all_child_splits.append(first_child_split) # Append only the first child split (assuming it contains the metadata) return all_child_splits def process_all_documents(self): all_parent_splits = [] # Local variable to store all parent splits all_child_splits = [] # Local variable to store all child splits for markdown_document in self.md_paths: parent_splits = self.split_and_chunk_text(markdown_document) all_parent_splits.extend(parent_splits) summaries = self.generate_summaries(all_parent_splits) all_child_splits = self.generate_child_splits(all_parent_splits, summaries) st.write(all_parent_splits) st.write(all_child_splits) return all_parent_splits, all_child_splits # Return both lists def create_vectorstore(self): all_parent_splits, all_child_splits = self.process_all_documents() parent_doc_ids = [str(uuid.uuid4()) for _ in all_parent_splits] self.br18_parent_store.mset(list(zip(parent_doc_ids, all_parent_splits))) for parent_id, child_split in zip(parent_doc_ids, all_child_splits): child_split.metadata[self.id_key] = parent_id # Create and save the vector store to disk br18_vectorstore = Pinecone.from_documents(documents=all_child_splits, embedding=self.embeddings, index_name=self.pinecone_index_name) #st.write(br18_appendix_child_vectorstore) for i, doc in enumerate(all_parent_splits): doc.metadata[self.id_key] = parent_doc_ids[i] # Store the vector store in the session state st.session_state.br18_vectorstore = br18_vectorstore return br18_vectorstore def create_retriever(self, query: str): search_type = st.session_state.search_type if search_type == "By Context": # Initialize retriever for By Context, filtering by the presence of the "text" metadata general_retriever = MultiVectorRetriever( vectorstore=self.vectorstore, docstore=self.br18_parent_store, id_key=self.id_key, search_kwargs={"k": 5} ) parent_docs = general_retriever.vectorstore.similarity_search(query, k = 5) #st.write(parent_docs) st.session_state.doc_sources = parent_docs embeddings = self.embeddings # Initialize Redundant Filter redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings) # Initialize Relevant Filter relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.75, k = 15) #st.write(relevant_filter) # Initialize Text Splitter splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50, separator=". ") # Create Compressor Pipeline pipeline_compressor = DocumentCompressorPipeline( transformers=[splitter, redundant_filter, relevant_filter] ) # Initialize Contextual Compression Retriever contextual_general_retriever = ContextualCompressionRetriever( base_compressor=pipeline_compressor, base_retriever=general_retriever ) # Retrieve parent documents that match the query retrieved_parent_docs = contextual_general_retriever.get_relevant_documents(query) # Display retrieved parent documents display_list = [] for doc in retrieved_parent_docs: display_dict = { "Page Content": doc.page_content, "Doc ID": doc.metadata.get('doc_id', 'N/A'), "Header 3": doc.metadata.get('Header 3', 'N/A'), "Header 4": doc.metadata.get('Header 4', 'N/A'), } display_list.append(display_dict) #st.write(display_list) return retrieved_parent_docs elif search_type == "By Headers": # Initialize retriever for By Headers, filtering by the absence of the "text" metadata specific_retriever = MultiVectorRetriever( vectorstore=self.vectorstore, docstore=self.br18_parent_store, id_key=self.id_key, search_kwargs={"k": 3} ) child_docs = specific_retriever.vectorstore.similarity_search(query, k = 3) #st.write(child_docs) # Retrieve child documents that match the query embeddings = self.embeddings embedding_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.75) #llm_filter = LLMChainFilter.from_llm(self.llm) compression_retriever = ContextualCompressionRetriever(base_compressor=embedding_filter, base_retriever=specific_retriever) retrieved_child_docs = compression_retriever.get_relevant_documents(query) st.session_state.doc_sources = retrieved_child_docs # Display retrieved child documents display_list = [] for doc in retrieved_child_docs: display_dict = { "Page Content": doc.page_content, "Doc ID": doc.metadata.get('doc_id', 'N/A'), "Header 3": doc.metadata.get('Header 3', 'N/A'), "Header 4": doc.metadata.get('Header 4', 'N/A'), } display_list.append(display_dict) #st.write(display_list) return retrieved_child_docs def run(self, query: str): prompt_template = """Use the following pieces of context to answer the question at the end. The answer should be as specific as possible to a chapter and section where clause numbers and their respective subclause are referenced. Make sure to mention requirement numbers and specific integer values where relevant. If you don't know the answer, just say that you don't know, don't try to make up an answer. {context} Question: {question} EXAMPLE: The building regulation regarding stairs is outlined in Chapter 2 - Access, specifically in Section - Stairs: Width: Stairs in shared access routes must have a minimum free width of 1.0 meter. (clause 57.1) Headroom: Stairs must have a minimum free headroom of 2.10 meters. (clause 57.1) Gradient: The gradient of the stairs must not exceed 0.18 meters. (clause 57.2) """ PROMPT = PromptTemplate( template=prompt_template, input_variables=["context", "question"] ) # Retrieve the filtered documents retrieved_docs = self.create_retriever(query) #st.write(type(filtered_docs[0])) #st.write(filtered_docs) qa_chain = load_qa_chain(self.llm, chain_type="stuff", verbose=True, prompt=PROMPT) output = qa_chain({"input_documents": retrieved_docs, "question": query}, return_only_outputs=True) return output class SummarizationTool(): def __init__(self, document_chunks): self.llm = ChatOpenAI( temperature=0, streaming=True, model_name="gpt-3.5-turbo" ) self.document_chunks = document_chunks self.map_prompt_template, self.combine_prompt_template = self.load_prompts() self.chain = self.load_summarize_chain() def load_prompts(self): map_prompt = ''' Summarize the following text in a clear and concise way: TEXT:`{text}` Brief Summary: ''' combine_prompt = ''' Generate a summary of the following text that includes the following elements: * A title that accurately reflects the content of the text. * An introduction paragraph that provides an overview of the topic. * Bullet points that list the key points of the text. Text:`{text}` ''' map_prompt_template = PromptTemplate(template=map_prompt, input_variables=["text"]) combine_prompt_template = PromptTemplate(template=combine_prompt, input_variables=["text"]) return map_prompt_template, combine_prompt_template def load_summarize_chain(self): return load_summarize_chain( llm=self.llm, chain_type='map_reduce', map_prompt=self.map_prompt_template, combine_prompt=self.combine_prompt_template, verbose=True ) def run(self, query=None): return self.run_chain() def run_chain(self): return self.chain.run(self.document_chunks) class CustomGoogleSearchAPIWrapper(GoogleSearchAPIWrapper): def clean_text(self, text: str) -> str: # Remove extra whitespaces and line breaks text = ' '.join(text.split()) return text def scrape_content(self, url: str) -> dict: loader = SeleniumURLLoader(urls=[url]) data = loader.load() if data is not None and len(data) > 0: soup = BeautifulSoup(data[0].page_content, "html.parser") text = soup.get_text() cleaned_text = self.clean_text(text) return {'url': url, 'content': cleaned_text[:1000]} # Return first 1000 non-space characters return {'url': url, 'content': ''} def fetch_and_scrape(self, query: str, num_results: int = 3) -> str: # Step 1: Fetch search results metadata metadata_results = self.results(query, num_results) if len(metadata_results) == 0: return '[URL: None, Content: No good Google Search Result was found]' # Step 2: Extract URLs urls = [result.get("link", "") for result in metadata_results if "link" in result] # Step 3: Scrape content from URLs texts = [] for url in urls: scraped_content = self.scrape_content(url) formatted_text = f"[URL: {scraped_content['url']}, Content: {scraped_content['content']}]" texts.append(formatted_text) return " ".join(texts)[:3000] class MRKL: def __init__(self): self.tools = self.load_tools() self.agent_executor, self.memory = self.load_agent() def load_tools(self): # Load tools llm = ChatOpenAI( temperature=0, streaming=True, model_name="gpt-3.5-turbo" ) llm_math = LLMMathChain(llm=llm) llm_search = CustomGoogleSearchAPIWrapper() current_directory = os.getcwd() tools = [ Tool( name="Google_Search", func=llm_search.fetch_and_scrape, description="Useful when you cannot find a clear answer after looking up the database and that you need to search the internet for information. Input should be a fully formed question based on the context of what you couldn't find and not referencing any obscure pronouns from the conversation before" ), Tool( name='Calculator', func=llm_math.run, description='Useful for when you need to answer questions about math.' ), ] if st.session_state.vector_store is not None: metadata = st.session_state.document_metadata file_name = st.session_state.document_filename llm_database = DatabaseTool(llm=llm, vector_store=st.session_state.vector_store, metadata=metadata, filename=file_name) #st.write(llm_database.get_description()) tools.append( Tool( name='Document_Database', func=llm_database.run, description=llm_database.get_description(), ), ) if st.session_state.br18_exp is True: br18_folder_path = os.path.join(current_directory, "BR18_DB") llm_br18 = BR18_DB(llm=llm, folder_path=br18_folder_path) tools.extend([ Tool( name='BR18_Database', func=llm_br18.run, description=""" Always useful for when you need to answer questions about the Danish Building Regulation 18 (BR18). Input should be the specific keywords from the user query. Exclude the following common terms and their variations or synonyms especially words such as "building" and "regulation". Use this tool more often than the normal search tool. """ ), ]) return tools def load_agent(self): llm = ChatOpenAI( temperature=0, streaming=True, model_name="gpt-3.5-turbo", ) # Memory chat_msg = StreamlitChatMessageHistory(key="mrkl_chat_history") memory_key = "history" memory = AgentTokenBufferMemory(memory_key=memory_key, llm=llm, input_key='input', output_key="output", max_token_limit=8000, chat_memory=chat_msg) st.session_state.history = memory system_message_content = """ You are MRKL, an expert in construction, legal frameworks, and regulatory matters. You are designed to be an AI Chatbot for the engineering firm COWI, and you have the following tools to answer user queries, but only use them if necessary. Unless otherwise explicitly stated, the user queries are about the context given. Your primary objective is to provide responses that: 1. Offer an overview of the topic, referencing the chapter and the section if relevant 2. List key points in bullet-points or numbered list format, referencing the clauses and their respective subclauses if relevant. 3. Always match or exceed the details of the tool's output text in your answers. 4. Reflect back to the user's question and give a concise conclusion. You must maintain a professional and helpful demeanor in all interactions. """ # System Message system_message = SystemMessage(content=system_message_content) reflection_message_content = """ Reminder: Always try all your tools to find the right answer with the search tool as your last resort. Always self-reflect your answer based on the user's query and follows the list of response objective. """ reflection_message = SystemMessage(content=reflection_message_content) # Prompt prompt = OpenAIFunctionsAgent.create_prompt( system_message=system_message, extra_prompt_messages=[MessagesPlaceholder(variable_name=memory_key), reflection_message] ) # Agent agent = OpenAIFunctionsAgent(llm=llm, tools=self.tools, prompt=prompt) # Agent Executor agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=self.tools, memory=memory, verbose=True, return_intermediate_steps=True) return agent_executor, memory def clear_conversation(self): self.memory.clear() def run_agent(self, input, callbacks=[]): with get_openai_callback() as cb: result = self.agent_executor({"input": input}, callbacks=callbacks) st.session_state.token_count = cb print(cb) return result def main(): load_dotenv() pinecone.init( api_key=os.environ["PINECONE_API_KEY"], environment=os.environ["PINECONE_ENV"] ) st.set_page_config(page_title="MRKL AGENT", page_icon="🦜️", layout="wide") st.title("🦜️ MRKL AGENT") if 'openai' not in st.session_state: st.session_state.openai = None if "messages" not in st.session_state: st.session_state.messages = [{"roles": "assistant", "content": "Hi, I am Miracle. How can I help you?"}] if "user_input" not in st.session_state: st.session_state.user_input = None if "vector_store" not in st.session_state: st.session_state.vector_store = None if "summary" not in st.session_state: st.session_state.summary = None if "doc_sources" not in st.session_state: st.session_state.doc_sources = [] if "br18_vectorstore" not in st.session_state: st.session_state.br18_vectorstore = None if "history" not in st.session_state: st.session_state.history = None if 'br18_exp' not in st.session_state: st.session_state.br18_exp = False if "token_count" not in st.session_state: st.session_state.token_count = 0 if "agent" not in st.session_state: st.session_state.agent = MRKL() if 'show_info' not in st.session_state: st.session_state.show_info = False with st.expander("Configuration", expanded = False): openai_api_key = st.text_input("Enter OpenAI API Key", value="", placeholder="Enter the OpenAI API key which begins with sk-", type="password") if openai_api_key: st.session_state.openai = openai_api_key os.environ["OPENAI_API_KEY"] = openai_api_key st.write("API key has entered") with st.sidebar: br18_experiment = st.checkbox("Experimental Feature: Enable BR18", value=False) if br18_experiment != st.session_state.br18_exp: st.session_state.br18_exp = br18_experiment st.session_state.agent = MRKL() if br18_experiment: # If BR18 is enabled search_type = st.radio( "Select Search Type:", options=["By Headers", "By Context"], index=0, horizontal=True # Default to "By Context" ) st.session_state.search_type = search_type st.sidebar.title("Upload Document to Database") uploaded_files = st.sidebar.file_uploader("Choose a file", accept_multiple_files=True) # You can specify the types of files you want to accept if uploaded_files: file_details = {"FileName": [], "FileType": [], "FileSize": []} # Populate file_details using traditional loops for file in uploaded_files: file_details["FileName"].append(file.name) file_details["FileType"].append(file.type) file_details["FileSize"].append(file.size) # Use selectbox to choose a file selected_file_name = st.sidebar.selectbox('Choose a file:', file_details["FileName"], on_change=on_selectbox_change) # Get the index of the file selected file_index = file_details["FileName"].index(selected_file_name) # Display details of the selected file st.sidebar.write("You selected:") st.sidebar.write("FileName : ", file_details["FileName"][file_index]) st.sidebar.write("FileType : ", file_details["FileType"][file_index]) st.sidebar.write("FileSize : ", file_details["FileSize"][file_index]) # Add a note to remind the user to press the "Process" button if st.session_state.show_info: st.sidebar.info("**Note:** Remember to press the 'Process' button for the current selection.") st.session_state.show_info = False with st.sidebar: if st.sidebar.button("Process"): with st.spinner("Processing"): selected_file = uploaded_files[file_index] with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: tmpfile.write(selected_file.getvalue()) temp_path = tmpfile.name db_store = DBStore(temp_path, selected_file.name) document_chunks = db_store.get_pdf_text() st.session_state.document_chunks = document_chunks #st.write(document_chunks) vector_store = db_store.get_vectorstore() st.session_state.vector_store = vector_store st.session_state.agent = MRKL() primed_info_response = db_store.get_info_response() #st.write(primed_info_response) st.session_state.history.chat_memory.add_ai_message(primed_info_response) st.session_state.messages.append({"roles": "assistant", "content": primed_info_response}) st.success("PDF uploaded successfully!") if "document_chunks" in st.session_state: if st.sidebar.button("Create Detailed Summary"): with st.spinner("Summarizing"): summarization_tool = SummarizationTool(document_chunks=st.session_state.document_chunks) st.session_state.summary = summarization_tool.run() # Append the summary to the chat messages st.session_state.messages.append({"roles": "assistant", "content": st.session_state.summary}) else: st.session_state.vector_store = None display_messages(st.session_state.messages) if user_input := st.chat_input("Type something here..."): st.session_state.user_input = user_input st.session_state.messages.append({"roles": "user", "content": st.session_state.user_input}) st.chat_message("user").write(st.session_state.user_input) current_user_message = {"input": st.session_state.user_input} with st.chat_message("assistant"): st_callback = StreamlitCallbackHandler(st.container(), expand_new_thoughts=True) result = st.session_state.agent.run_agent(input=st.session_state.user_input, callbacks=[st_callback]) st.session_state.result = result response = result.get('output', '') st.session_state.messages.append({"roles": "assistant", "content": response}) st.write(response) current_assistant_response = {"output": response} current_messages = [current_user_message, current_assistant_response] with st.expander("View Document Sources"): if len(st.session_state.doc_sources) != 0: for document in st.session_state.doc_sources: st.divider() st.subheader("Source Content:") st.write(document.page_content) st.subheader("Metadata:") # Display only relevant metadata keys relevant_keys = ["Header ", "Header 3", "Header 4", "page_number", "source", "file_name", "title", "author"] for key in relevant_keys: value = document.metadata.get(key, 'N/A') if value != 'N/A': st.write(f"{key}: {value}") else: st.write("No document sources found") if st.session_state.summary is not None: with st.expander("Show Summary"): st.subheader("Summarization") result_summary = st.session_state.summary st.write(result_summary) #with st.expander("Cost Tracking", expanded=True): #total_token = st.session_state.token_count #st.write(total_token) buttons_placeholder = st.container() with buttons_placeholder: #st.button("Regenerate Response", key="regenerate", on_click=st.session_state.agent.regenerate_response) st.button("Clear Chat", key="clear", on_click=reset_chat) #st.write(st.session_state.history) #st.write(st.session_state.messages) #st.write(st.session_state.br18_vectorstore) #st.write(st.session_state.br18_appendix_child_vectorstore) #st.write(st.session_state.usc_vectorstore) #st.write(st.session_state.agent) #st.write(st.session_state.result) if __name__== '__main__': main()