Spaces:
Runtime error
Runtime error
import os | |
import re | |
from dotenv import load_dotenv | |
from langchain.chat_models import ChatOpenAI | |
from langchain.agents import Tool, AgentExecutor | |
from langchain.callbacks import StreamlitCallbackHandler | |
from langchain.chains import LLMMathChain | |
from langchain.chains import LLMChain | |
import streamlit as st | |
import langchain | |
from langchain.utilities import SerpAPIWrapper, GoogleSearchAPIWrapper | |
from langchain.chains import RetrievalQA | |
from langchain.text_splitter import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter, CharacterTextSplitter | |
from langchain.vectorstores import FAISS, Chroma, Pinecone | |
from langchain.embeddings import OpenAIEmbeddings | |
import tempfile | |
import pypdf | |
from pathlib import Path | |
from langchain.docstore.document import Document | |
from langchain.document_loaders import TextLoader | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain import PromptTemplate | |
import lark | |
from langchain.schema import Document | |
import langchain | |
import pinecone | |
from langchain.chains.question_answering import load_qa_chain | |
from typing import List, Dict, Any | |
from langchain.prompts.prompt import PromptTemplate | |
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import AgentTokenBufferMemory | |
from langchain.agents.openai_functions_agent.base import OpenAIFunctionsAgent | |
from langchain.schema.messages import SystemMessage, BaseMessage | |
from langchain.prompts import MessagesPlaceholder | |
from langchain.agents import AgentExecutor | |
from langchain.schema import HumanMessage, SystemMessage | |
from langchain.retrievers.multi_vector import MultiVectorRetriever | |
from langchain.storage import InMemoryStore | |
import uuid | |
from langchain.retrievers import ContextualCompressionRetriever | |
from langchain.retrievers.document_compressors import DocumentCompressorPipeline, LLMChainFilter | |
from langchain.document_transformers import EmbeddingsRedundantFilter | |
from langchain.retrievers.document_compressors import EmbeddingsFilter | |
import json | |
from bs4 import BeautifulSoup | |
from langchain.document_loaders import SeleniumURLLoader | |
from langchain.memory.chat_message_histories import StreamlitChatMessageHistory | |
from langchain.callbacks import get_openai_callback | |
import pickle | |
from langchain.prompts import ChatPromptTemplate | |
from langchain.schema.output_parser import StrOutputParser | |
langchain.debug = True | |
langchain.verbose = True | |
def on_selectbox_change(): | |
st.session_state.show_info = True | |
def reset_chat(): | |
st.session_state.messages = [{"roles": "assistant", "content": "Hi, I am Miracle. How can I help you?"}] | |
st.session_state.history = [] | |
st.session_state.search_keywords = [] | |
st.session_state.doc_sources = [] | |
st.session_state.summary = None | |
st.session_state.agent.clear_conversation() | |
st.session_state.primed_document_response = None | |
def display_messages(messages): | |
# Display all messages | |
for msg in messages: | |
st.chat_message(msg["roles"]).write(msg["content"]) | |
class DBStore: | |
def __init__(self, file_path, file_name): | |
self.file_path = file_path | |
self.file_name = os.path.splitext(file_name)[0] | |
st.session_state.document_filename = self.file_name | |
self.reader = pypdf.PdfReader(file_path) | |
self.metadata = self.extract_metadata_from_pdf() | |
self.embeddings = OpenAIEmbeddings() | |
self.vector_store = None | |
def extract_metadata_from_pdf(self): | |
"""Extract metadata from the PDF.""" | |
metadata = self.reader.metadata | |
st.session_state.document_metadata = metadata | |
return { | |
"title": metadata.get("/Title", "").strip(), | |
"author": metadata.get("/Author", "").strip(), | |
"creation_date": metadata.get("/CreationDate", "").strip(), | |
} | |
def extract_pages_from_pdf(self): | |
pages = [] | |
for page_num, page in enumerate(self.reader.pages): | |
text = page.extract_text() | |
if text.strip(): # Check if extracted text is not empty | |
pages.append((page_num + 1, text)) | |
return pages | |
def parse_pdf(self): | |
""" | |
Extracts the title and text from each page of the PDF. | |
:return: A tuple containing the title and a list of tuples with page numbers and extracted text. | |
""" | |
metadata = self.extract_metadata_from_pdf() | |
pages = self.extract_pages_from_pdf() | |
#st.write(pages) | |
#st.write(metadata) | |
return pages, metadata | |
def merge_hyphenated_words(text): | |
return re.sub(r"(\w)-\n(\w)", r"\1\2", text) | |
def fix_newlines(text): | |
return re.sub(r"(?<!\n)\n(?!\n)", " ", text) | |
def remove_multiple_newlines(text): | |
return re.sub(r"\n{2,}", "\n", text) | |
def remove_dots(text): | |
# Replace sequences of three or more dots with a single space. | |
return re.sub(r'\.{4,}', ' ', text) | |
def clean_text(self, pages): | |
cleaning_functions = [ | |
self.merge_hyphenated_words, | |
self.fix_newlines, | |
self.remove_multiple_newlines, | |
self.remove_dots, | |
] | |
cleaned_pages = [] | |
for page_num, text in pages: | |
for cleaning_function in cleaning_functions: | |
text = cleaning_function(text) | |
cleaned_pages.append((page_num, text)) | |
return cleaned_pages | |
def text_to_docs(self, text): | |
doc_chunks = [] | |
for page_num, page in text: | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=2000, | |
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""], | |
chunk_overlap=200, | |
) | |
chunks = text_splitter.split_text(page) | |
for i, chunk in enumerate(chunks): | |
doc = Document( | |
page_content=chunk, | |
metadata={ | |
"page_number": page_num, | |
"chunk": i, | |
"source": f"p{page_num}-{i}", | |
"file_name": self.file_name, | |
**self.metadata, | |
}, | |
) | |
doc_chunks.append(doc) | |
st.write(doc_chunks) | |
return doc_chunks | |
def get_pdf_text(self): | |
pages, metadata = self.parse_pdf() # We only need the pages from the tuple | |
cleaned_text_pdf = self.clean_text(pages) | |
document_chunks = self.text_to_docs(cleaned_text_pdf) | |
return document_chunks | |
def get_vectorstore(self): | |
document_chunks = self.get_pdf_text() | |
#st.write(document_chunks) | |
vector_store = FAISS.from_documents(documents=document_chunks, embedding=self.embeddings) | |
#st.write(vector_store) | |
return vector_store | |
def get_document_info(self): | |
""" | |
Generate a one-sentence document information snippet by taking the beginning of the first chunk of the document. | |
Returns: | |
str: A one-sentence information snippet of the document. | |
""" | |
# Get the first chunk of the document | |
pdf_text = self.get_pdf_text() | |
if pdf_text: | |
first_chunk = pdf_text[0].page_content if len(pdf_text) > 0 else "" | |
second_chunk = pdf_text[1].page_content if len(pdf_text) > 1 else "" | |
third_chunk = pdf_text[2].page_content if len(pdf_text) > 2 else "" | |
# Extract the first 300 characters from each chunk to form an information snippet | |
info_document = first_chunk[:300] + second_chunk[:300] + third_chunk[:300] | |
else: | |
info_document = "" | |
#st.write(info_document) | |
return info_document | |
def get_info_response(self): | |
llm = ChatOpenAI( | |
temperature=0, | |
streaming=True, | |
model_name="gpt-3.5-turbo" | |
) | |
document_filename = self.file_name | |
document_title = self.metadata.get("title", None) | |
document_snippet = self.get_document_info() | |
document_info = { | |
"document_filename": document_filename, | |
"document_title": document_title, | |
"document_snippet": document_snippet, | |
} | |
if document_title: | |
info_response_prompt = """The user has uploaded a document titled '{document_title}' to the Document Database """ | |
else: | |
info_response_prompt = """The user has uploaded a document named '{document_filename}' to the Document Database """ | |
info_response_prompt += """ | |
with the following information: {document_snippet}. | |
In one sentence, inform the user about the document, prioritizing its name or title. | |
Also, prompt the user to ask a general question about the document in an assistive manner. | |
Begin your response with 'It appears you've uploaded a document that contains information on...'. | |
Example: | |
It appears you've uploaded a document that contains information on "COWI Policies and Guideline". | |
Please feel free to ask any question about this document such as "What are the COWI Policies and Guideline?" | |
""" | |
#st.write(info_response_prompt) | |
# Create the LLMChain | |
llm_chain = LLMChain( | |
llm=llm, | |
prompt=PromptTemplate.from_template(info_response_prompt) | |
) | |
# Generate the primed document message | |
llm_response = llm_chain(document_info) | |
info_response = llm_response.get('text', '') | |
#st.write(info_response) | |
return info_response | |
class DatabaseTool: | |
def __init__(self, llm, vector_store, metadata=None, filename=None): | |
self.llm = llm | |
self.vector_store = vector_store | |
self.metadata = metadata | |
self.filename = filename | |
self.embedding = OpenAIEmbeddings() | |
def get_description(self): | |
base_description = "Always useful for finding the exactly written answer to the question by looking into a collection of documents." | |
filename = self.filename | |
title = self.metadata.get("/Title") if self.metadata else None | |
author = self.metadata.get("/Author") if self.metadata else None | |
subject = self.metadata.get("/Subject") if self.metadata else None | |
footer_description = "Input should be a query, not referencing any obscure pronouns from the conversation before that will pull out relevant information from the database. Use this more than the normal search tool" | |
if title: | |
main_description = f"This tool is currently loaded with '{title}'" | |
if author: | |
main_description += f" by '{author}'" | |
if subject: | |
main_description += f", and has a topic of '{subject}'" | |
return f"{base_description} {main_description}. {footer_description}" | |
else: | |
no_title_description = f"This tool is currently loaded with the document '{filename}'" | |
return f"{base_description} {no_title_description}. {footer_description}" | |
def get_base_retriever(self): | |
base_retriever = self.vector_store.as_retriever(search_kwargs={'k': 5}) | |
return base_retriever | |
def get_contextual_retriever(self): | |
# Initialize embeddings (assuming embeddings is already defined elsewhere) | |
embeddings = self.embedding | |
# Initialize Redundant Filter | |
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings) | |
# Initialize Relevant Filter | |
relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76, k = 25) | |
#st.write(relevant_filter) | |
# Initialize Text Splitter | |
splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=0, separator=". ") | |
# Create Compressor Pipeline | |
pipeline_compressor = DocumentCompressorPipeline( | |
transformers=[splitter, redundant_filter, relevant_filter] | |
) | |
# Initialize Contextual Compression Retriever | |
contextual_retriever = ContextualCompressionRetriever( | |
base_compressor=pipeline_compressor, | |
base_retriever=self.get_base_retriever() | |
) | |
return contextual_retriever | |
def run(self, query: str): | |
contextual_retriever = self.get_contextual_retriever() | |
#DEBUGGING & EVALUTING ANSWERS: | |
compressed_docs = contextual_retriever.get_relevant_documents(query) | |
compressed_docs_list = [] | |
for doc in compressed_docs: | |
doc_info = { | |
"Page Content": doc.page_content, | |
} | |
compressed_docs_list.append(doc_info) | |
#st.write(compressed_docs_list) | |
base_retriever=self.get_base_retriever() | |
initial_retrieved = base_retriever.get_relevant_documents(query) | |
retrieval = RetrievalQA.from_chain_type( | |
llm=self.llm, chain_type="stuff", | |
retriever=contextual_retriever, | |
return_source_documents=True, | |
) | |
output = retrieval(query) | |
st.session_state.doc_sources = initial_retrieved | |
return output['result'] | |
class BR18_DB: | |
def __init__(self, llm, folder_path: str): | |
self.llm = llm | |
self.folder_path = folder_path | |
self.md_paths = self.load_documents() # Renamed from pdf_paths to md_paths | |
self.embeddings = OpenAIEmbeddings() | |
self.pinecone_index_name = "br18" | |
self.id_key = "doc_id" | |
self.br18_parent_store = InMemoryStore() | |
current_directory = os.getcwd() | |
store_path = os.path.join(current_directory, "inmemorystore", "br18_parent_store.pkl") | |
if self.pinecone_index_name not in pinecone.list_indexes(): | |
pinecone.create_index(self.pinecone_index_name, dimension=1536) | |
self.vectorstore = self.create_vectorstore() | |
self.serialize_inmemorystore(store_path) | |
else: | |
self.vectorstore = Pinecone.from_existing_index(self.pinecone_index_name, self.embeddings) | |
with open(store_path, "rb") as f: | |
self.br18_parent_store = pickle.load(f) | |
self.retriever = None | |
def serialize_inmemorystore(self, store_path): | |
with open(store_path, "wb") as f: | |
pickle.dump(self.br18_parent_store, f) | |
def load_documents(self): | |
md_paths = list(Path(self.folder_path).rglob("*.md")) | |
documents = [] | |
for path in md_paths: | |
loader = TextLoader(str(path)) | |
#st.write(loader) | |
data = loader.load() | |
documents.extend(data) # Assuming data is a list of Document objects | |
#st.text(documents) | |
return documents | |
def split_and_chunk_text(self, markdown_document: Document): | |
markdown_text = markdown_document.page_content | |
# Define headers to split on | |
headers_to_split_on = [ | |
("#", "Header 1"), | |
("##", "Header 2"), | |
("###", "Header 3"), | |
("####", "Header 4"), | |
] | |
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on) | |
#st.write(markdown_splitter) | |
md_header_splits = markdown_splitter.split_text(markdown_text) | |
#st.write(md_header_splits) | |
#st.write(type(md_header_splits[0])) | |
parent_chunk_size = 5000 | |
parent_chunk_overlap = 0 | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=parent_chunk_size, chunk_overlap=parent_chunk_overlap | |
) | |
# Split the header-split documents into chunks | |
all_parent_splits = text_splitter.split_documents(md_header_splits) | |
for split in all_parent_splits: | |
header_3 = split.metadata.get('Header 3', '') | |
header_4 = split.metadata.get('Header 4', '') | |
# Prepend "Section:" to Header 4 if it exists | |
if header_4: | |
header_4 = f"Section: {header_4}" | |
metadata_str = f"{header_3}\n\n{header_4}" | |
split.page_content = f"{metadata_str}\n\n{split.page_content}" | |
split.metadata['type'] = 'parents' | |
return all_parent_splits | |
def save_summaries(self, summaries: List[str]): | |
"""Save the generated summaries to a JSON file.""" | |
current_directory = os.getcwd() | |
save_path = os.path.join(current_directory, 'savesummary', 'br18_summaries.json') | |
with open(save_path, 'w') as f: | |
json.dump(summaries, f) | |
def load_summaries(self) -> List[str]: | |
"""Load summaries from a JSON file if it exists.""" | |
current_directory = os.getcwd() | |
load_path = os.path.join(current_directory, 'savesummary', 'br18_summaries.json') | |
if os.path.exists(load_path): | |
with open(load_path, 'r') as f: | |
summaries = json.load(f) | |
return summaries | |
else: | |
return None # or raise an exception, or generate new summaries | |
def generate_summaries(self, parent_splits: List[Document]) -> List[str]: | |
loaded_summaries = self.load_summaries() | |
if loaded_summaries is not None: | |
return loaded_summaries | |
chain = ( | |
{"doc": lambda x: x.page_content} | |
| ChatPromptTemplate.from_template("Summarize the following document:\n\n{doc}") | |
| ChatOpenAI(max_retries=3) | |
| StrOutputParser() | |
) | |
summaries = chain.batch(parent_splits, {"max_concurrency": 4}) | |
self.save_summaries(summaries) | |
return summaries | |
def generate_child_splits(self, parent_splits: List[Document], summaries: List[str]) -> List[Document]: | |
child_chunk_size = 300 | |
child_text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=child_chunk_size, chunk_overlap=0 | |
) | |
all_child_splits = [] | |
for i, parent_split in enumerate(parent_splits): | |
child_splits = child_text_splitter.split_text(parent_split.page_content) | |
new_metadata = dict(parent_split.metadata) | |
new_metadata['type'] = 'children' | |
summary_with_prefix = f"Summary: {summaries[i]}" | |
first_child_content = f"{child_splits[0]}\n\n{summary_with_prefix}" | |
first_child_split = Document( | |
page_content=first_child_content, | |
metadata=new_metadata | |
) | |
all_child_splits.append(first_child_split) # Append only the first child split (assuming it contains the metadata) | |
return all_child_splits | |
def process_all_documents(self): | |
all_parent_splits = [] # Local variable to store all parent splits | |
all_child_splits = [] # Local variable to store all child splits | |
for markdown_document in self.md_paths: | |
parent_splits = self.split_and_chunk_text(markdown_document) | |
all_parent_splits.extend(parent_splits) | |
summaries = self.generate_summaries(all_parent_splits) | |
all_child_splits = self.generate_child_splits(all_parent_splits, summaries) | |
st.write(all_parent_splits) | |
st.write(all_child_splits) | |
return all_parent_splits, all_child_splits # Return both lists | |
def create_vectorstore(self): | |
all_parent_splits, all_child_splits = self.process_all_documents() | |
parent_doc_ids = [str(uuid.uuid4()) for _ in all_parent_splits] | |
self.br18_parent_store.mset(list(zip(parent_doc_ids, all_parent_splits))) | |
for parent_id, child_split in zip(parent_doc_ids, all_child_splits): | |
child_split.metadata[self.id_key] = parent_id | |
# Create and save the vector store to disk | |
br18_vectorstore = Pinecone.from_documents(documents=all_child_splits, embedding=self.embeddings, index_name=self.pinecone_index_name) | |
#st.write(br18_appendix_child_vectorstore) | |
for i, doc in enumerate(all_parent_splits): | |
doc.metadata[self.id_key] = parent_doc_ids[i] | |
# Store the vector store in the session state | |
st.session_state.br18_vectorstore = br18_vectorstore | |
return br18_vectorstore | |
def create_retriever(self, query: str): | |
search_type = st.session_state.search_type | |
if search_type == "By Context": | |
# Initialize retriever for By Context, filtering by the presence of the "text" metadata | |
general_retriever = MultiVectorRetriever( | |
vectorstore=self.vectorstore, | |
docstore=self.br18_parent_store, | |
id_key=self.id_key, | |
search_kwargs={"k": 5} | |
) | |
parent_docs = general_retriever.vectorstore.similarity_search(query, k = 5) | |
#st.write(parent_docs) | |
st.session_state.doc_sources = parent_docs | |
embeddings = self.embeddings | |
# Initialize Redundant Filter | |
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings) | |
# Initialize Relevant Filter | |
relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.75, k = 15) | |
#st.write(relevant_filter) | |
# Initialize Text Splitter | |
splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50, separator=". ") | |
# Create Compressor Pipeline | |
pipeline_compressor = DocumentCompressorPipeline( | |
transformers=[splitter, redundant_filter, relevant_filter] | |
) | |
# Initialize Contextual Compression Retriever | |
contextual_general_retriever = ContextualCompressionRetriever( | |
base_compressor=pipeline_compressor, | |
base_retriever=general_retriever | |
) | |
# Retrieve parent documents that match the query | |
retrieved_parent_docs = contextual_general_retriever.get_relevant_documents(query) | |
# Display retrieved parent documents | |
display_list = [] | |
for doc in retrieved_parent_docs: | |
display_dict = { | |
"Page Content": doc.page_content, | |
"Doc ID": doc.metadata.get('doc_id', 'N/A'), | |
"Header 3": doc.metadata.get('Header 3', 'N/A'), | |
"Header 4": doc.metadata.get('Header 4', 'N/A'), | |
} | |
display_list.append(display_dict) | |
#st.write(display_list) | |
return retrieved_parent_docs | |
elif search_type == "By Headers": | |
# Initialize retriever for By Headers, filtering by the absence of the "text" metadata | |
specific_retriever = MultiVectorRetriever( | |
vectorstore=self.vectorstore, | |
docstore=self.br18_parent_store, | |
id_key=self.id_key, | |
search_kwargs={"k": 3} | |
) | |
child_docs = specific_retriever.vectorstore.similarity_search(query, k = 3) | |
#st.write(child_docs) | |
# Retrieve child documents that match the query | |
embeddings = self.embeddings | |
embedding_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.75) | |
#llm_filter = LLMChainFilter.from_llm(self.llm) | |
compression_retriever = ContextualCompressionRetriever(base_compressor=embedding_filter, base_retriever=specific_retriever) | |
retrieved_child_docs = compression_retriever.get_relevant_documents(query) | |
st.session_state.doc_sources = retrieved_child_docs | |
# Display retrieved child documents | |
display_list = [] | |
for doc in retrieved_child_docs: | |
display_dict = { | |
"Page Content": doc.page_content, | |
"Doc ID": doc.metadata.get('doc_id', 'N/A'), | |
"Header 3": doc.metadata.get('Header 3', 'N/A'), | |
"Header 4": doc.metadata.get('Header 4', 'N/A'), | |
} | |
display_list.append(display_dict) | |
#st.write(display_list) | |
return retrieved_child_docs | |
def run(self, query: str): | |
prompt_template = """Use the following pieces of context to answer the question at the end. | |
The answer should be as specific as possible to a chapter and section where clause numbers and their respective subclause are referenced. | |
Make sure to mention requirement numbers and specific integer values where relevant. | |
If you don't know the answer, just say that you don't know, don't try to make up an answer. | |
{context} | |
Question: {question} | |
EXAMPLE: | |
The building regulation regarding stairs is outlined in Chapter 2 - Access, specifically in Section - Stairs: | |
Width: Stairs in shared access routes must have a minimum free width of 1.0 meter. (clause 57.1) | |
Headroom: Stairs must have a minimum free headroom of 2.10 meters. (clause 57.1) | |
Gradient: The gradient of the stairs must not exceed 0.18 meters. (clause 57.2) | |
""" | |
PROMPT = PromptTemplate( | |
template=prompt_template, input_variables=["context", "question"] | |
) | |
# Retrieve the filtered documents | |
retrieved_docs = self.create_retriever(query) | |
#st.write(type(filtered_docs[0])) | |
#st.write(filtered_docs) | |
qa_chain = load_qa_chain(self.llm, chain_type="stuff", verbose=True, prompt=PROMPT) | |
output = qa_chain({"input_documents": retrieved_docs, "question": query}, return_only_outputs=True) | |
return output | |
class SummarizationTool(): | |
def __init__(self, document_chunks): | |
self.llm = ChatOpenAI( | |
temperature=0, | |
streaming=True, | |
model_name="gpt-3.5-turbo" | |
) | |
self.document_chunks = document_chunks | |
self.map_prompt_template, self.combine_prompt_template = self.load_prompts() | |
self.chain = self.load_summarize_chain() | |
def load_prompts(self): | |
map_prompt = ''' | |
Summarize the following text in a clear and concise way: | |
TEXT:`{text}` | |
Brief Summary: | |
''' | |
combine_prompt = ''' | |
Generate a summary of the following text that includes the following elements: | |
* A title that accurately reflects the content of the text. | |
* An introduction paragraph that provides an overview of the topic. | |
* Bullet points that list the key points of the text. | |
Text:`{text}` | |
''' | |
map_prompt_template = PromptTemplate(template=map_prompt, input_variables=["text"]) | |
combine_prompt_template = PromptTemplate(template=combine_prompt, input_variables=["text"]) | |
return map_prompt_template, combine_prompt_template | |
def load_summarize_chain(self): | |
return load_summarize_chain( | |
llm=self.llm, | |
chain_type='map_reduce', | |
map_prompt=self.map_prompt_template, | |
combine_prompt=self.combine_prompt_template, | |
verbose=True | |
) | |
def run(self, query=None): | |
return self.run_chain() | |
def run_chain(self): | |
return self.chain.run(self.document_chunks) | |
class CustomGoogleSearchAPIWrapper(GoogleSearchAPIWrapper): | |
def clean_text(self, text: str) -> str: | |
# Remove extra whitespaces and line breaks | |
text = ' '.join(text.split()) | |
return text | |
def scrape_content(self, url: str) -> dict: | |
loader = SeleniumURLLoader(urls=[url]) | |
data = loader.load() | |
if data is not None and len(data) > 0: | |
soup = BeautifulSoup(data[0].page_content, "html.parser") | |
text = soup.get_text() | |
cleaned_text = self.clean_text(text) | |
return {'url': url, 'content': cleaned_text[:1000]} # Return first 1000 non-space characters | |
return {'url': url, 'content': ''} | |
def fetch_and_scrape(self, query: str, num_results: int = 3) -> str: | |
# Step 1: Fetch search results metadata | |
metadata_results = self.results(query, num_results) | |
if len(metadata_results) == 0: | |
return '[URL: None, Content: No good Google Search Result was found]' | |
# Step 2: Extract URLs | |
urls = [result.get("link", "") for result in metadata_results if "link" in result] | |
# Step 3: Scrape content from URLs | |
texts = [] | |
for url in urls: | |
scraped_content = self.scrape_content(url) | |
formatted_text = f"[URL: {scraped_content['url']}, Content: {scraped_content['content']}]" | |
texts.append(formatted_text) | |
return " ".join(texts)[:3000] | |
class MRKL: | |
def __init__(self): | |
self.tools = self.load_tools() | |
self.agent_executor, self.memory = self.load_agent() | |
def load_tools(self): | |
# Load tools | |
llm = ChatOpenAI( | |
temperature=0, | |
streaming=True, | |
model_name="gpt-3.5-turbo" | |
) | |
llm_math = LLMMathChain(llm=llm) | |
llm_search = CustomGoogleSearchAPIWrapper() | |
current_directory = os.getcwd() | |
tools = [ | |
Tool( | |
name="Google_Search", | |
func=llm_search.fetch_and_scrape, | |
description="Useful when you cannot find a clear answer after looking up the database and that you need to search the internet for information. Input should be a fully formed question based on the context of what you couldn't find and not referencing any obscure pronouns from the conversation before" | |
), | |
Tool( | |
name='Calculator', | |
func=llm_math.run, | |
description='Useful for when you need to answer questions about math.' | |
), | |
] | |
if st.session_state.vector_store is not None: | |
metadata = st.session_state.document_metadata | |
file_name = st.session_state.document_filename | |
llm_database = DatabaseTool(llm=llm, vector_store=st.session_state.vector_store, metadata=metadata, filename=file_name) | |
#st.write(llm_database.get_description()) | |
tools.append( | |
Tool( | |
name='Document_Database', | |
func=llm_database.run, | |
description=llm_database.get_description(), | |
), | |
) | |
if st.session_state.br18_exp is True: | |
br18_folder_path = os.path.join(current_directory, "BR18_DB") | |
llm_br18 = BR18_DB(llm=llm, folder_path=br18_folder_path) | |
tools.extend([ | |
Tool( | |
name='BR18_Database', | |
func=llm_br18.run, | |
description=""" | |
Always useful for when you need to answer questions about the Danish Building Regulation 18 (BR18). | |
Input should be the specific keywords from the user query. Exclude the following common terms and their variations or synonyms especially words such as "building" and "regulation". | |
Use this tool more often than the normal search tool. | |
""" | |
), | |
]) | |
return tools | |
def load_agent(self): | |
llm = ChatOpenAI( | |
temperature=0, | |
streaming=True, | |
model_name="gpt-3.5-turbo", | |
) | |
# Memory | |
chat_msg = StreamlitChatMessageHistory(key="mrkl_chat_history") | |
memory_key = "history" | |
memory = AgentTokenBufferMemory(memory_key=memory_key, llm=llm, input_key='input', output_key="output", max_token_limit=8000, chat_memory=chat_msg) | |
st.session_state.history = memory | |
system_message_content = """ | |
You are MRKL, an expert in construction, legal frameworks, and regulatory matters. | |
You are designed to be an AI Chatbot for the engineering firm COWI, and you have the following tools to answer user queries, but only use them if necessary. | |
Unless otherwise explicitly stated, the user queries are about the context given. | |
Your primary objective is to provide responses that: | |
1. Offer an overview of the topic, referencing the chapter and the section if relevant | |
2. List key points in bullet-points or numbered list format, referencing the clauses and their respective subclauses if relevant. | |
3. Always match or exceed the details of the tool's output text in your answers. | |
4. Reflect back to the user's question and give a concise conclusion. | |
You must maintain a professional and helpful demeanor in all interactions. | |
""" | |
# System Message | |
system_message = SystemMessage(content=system_message_content) | |
reflection_message_content = """ | |
Reminder: | |
Always try all your tools to find the right answer with the search tool as your last resort. | |
Always self-reflect your answer based on the user's query and follows the list of response objective. | |
""" | |
reflection_message = SystemMessage(content=reflection_message_content) | |
# Prompt | |
prompt = OpenAIFunctionsAgent.create_prompt( | |
system_message=system_message, | |
extra_prompt_messages=[MessagesPlaceholder(variable_name=memory_key), reflection_message] | |
) | |
# Agent | |
agent = OpenAIFunctionsAgent(llm=llm, tools=self.tools, prompt=prompt) | |
# Agent Executor | |
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=self.tools, memory=memory, verbose=True, return_intermediate_steps=True) | |
return agent_executor, memory | |
def clear_conversation(self): | |
self.memory.clear() | |
def run_agent(self, input, callbacks=[]): | |
with get_openai_callback() as cb: | |
result = self.agent_executor({"input": input}, callbacks=callbacks) | |
st.session_state.token_count = cb | |
print(cb) | |
return result | |
def main(): | |
load_dotenv() | |
pinecone.init( | |
api_key=os.environ["PINECONE_API_KEY"], environment=os.environ["PINECONE_ENV"] | |
) | |
st.set_page_config(page_title="MRKL AGENT", page_icon="🦜️", layout="wide") | |
st.title("🦜️ MRKL AGENT") | |
if 'openai' not in st.session_state: | |
st.session_state.openai = None | |
if "messages" not in st.session_state: | |
st.session_state.messages = [{"roles": "assistant", "content": "Hi, I am Miracle. How can I help you?"}] | |
if "user_input" not in st.session_state: | |
st.session_state.user_input = None | |
if "vector_store" not in st.session_state: | |
st.session_state.vector_store = None | |
if "summary" not in st.session_state: | |
st.session_state.summary = None | |
if "doc_sources" not in st.session_state: | |
st.session_state.doc_sources = [] | |
if "br18_vectorstore" not in st.session_state: | |
st.session_state.br18_vectorstore = None | |
if "history" not in st.session_state: | |
st.session_state.history = None | |
if 'br18_exp' not in st.session_state: | |
st.session_state.br18_exp = False | |
if "token_count" not in st.session_state: | |
st.session_state.token_count = 0 | |
if "agent" not in st.session_state: | |
st.session_state.agent = MRKL() | |
if 'show_info' not in st.session_state: | |
st.session_state.show_info = False | |
with st.expander("Configuration", expanded = False): | |
openai_api_key = st.text_input("Enter OpenAI API Key", value="", placeholder="Enter the OpenAI API key which begins with sk-", type="password") | |
if openai_api_key: | |
st.session_state.openai = openai_api_key | |
os.environ["OPENAI_API_KEY"] = openai_api_key | |
st.write("API key has entered") | |
with st.sidebar: | |
br18_experiment = st.checkbox("Experimental Feature: Enable BR18", value=False) | |
if br18_experiment != st.session_state.br18_exp: | |
st.session_state.br18_exp = br18_experiment | |
st.session_state.agent = MRKL() | |
if br18_experiment: # If BR18 is enabled | |
search_type = st.radio( | |
"Select Search Type:", | |
options=["By Headers", "By Context"], | |
index=0, horizontal=True # Default to "By Context" | |
) | |
st.session_state.search_type = search_type | |
st.sidebar.title("Upload Document to Database") | |
uploaded_files = st.sidebar.file_uploader("Choose a file", accept_multiple_files=True) # You can specify the types of files you want to accept | |
if uploaded_files: | |
file_details = {"FileName": [], "FileType": [], "FileSize": []} | |
# Populate file_details using traditional loops | |
for file in uploaded_files: | |
file_details["FileName"].append(file.name) | |
file_details["FileType"].append(file.type) | |
file_details["FileSize"].append(file.size) | |
# Use selectbox to choose a file | |
selected_file_name = st.sidebar.selectbox('Choose a file:', file_details["FileName"], on_change=on_selectbox_change) | |
# Get the index of the file selected | |
file_index = file_details["FileName"].index(selected_file_name) | |
# Display details of the selected file | |
st.sidebar.write("You selected:") | |
st.sidebar.write("FileName : ", file_details["FileName"][file_index]) | |
st.sidebar.write("FileType : ", file_details["FileType"][file_index]) | |
st.sidebar.write("FileSize : ", file_details["FileSize"][file_index]) | |
# Add a note to remind the user to press the "Process" button | |
if st.session_state.show_info: | |
st.sidebar.info("**Note:** Remember to press the 'Process' button for the current selection.") | |
st.session_state.show_info = False | |
with st.sidebar: | |
if st.sidebar.button("Process"): | |
with st.spinner("Processing"): | |
selected_file = uploaded_files[file_index] | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: | |
tmpfile.write(selected_file.getvalue()) | |
temp_path = tmpfile.name | |
db_store = DBStore(temp_path, selected_file.name) | |
document_chunks = db_store.get_pdf_text() | |
st.session_state.document_chunks = document_chunks | |
#st.write(document_chunks) | |
vector_store = db_store.get_vectorstore() | |
st.session_state.vector_store = vector_store | |
st.session_state.agent = MRKL() | |
primed_info_response = db_store.get_info_response() | |
#st.write(primed_info_response) | |
st.session_state.history.chat_memory.add_ai_message(primed_info_response) | |
st.session_state.messages.append({"roles": "assistant", "content": primed_info_response}) | |
st.success("PDF uploaded successfully!") | |
if "document_chunks" in st.session_state: | |
if st.sidebar.button("Create Detailed Summary"): | |
with st.spinner("Summarizing"): | |
summarization_tool = SummarizationTool(document_chunks=st.session_state.document_chunks) | |
st.session_state.summary = summarization_tool.run() | |
# Append the summary to the chat messages | |
st.session_state.messages.append({"roles": "assistant", "content": st.session_state.summary}) | |
else: | |
st.session_state.vector_store = None | |
display_messages(st.session_state.messages) | |
if user_input := st.chat_input("Type something here..."): | |
st.session_state.user_input = user_input | |
st.session_state.messages.append({"roles": "user", "content": st.session_state.user_input}) | |
st.chat_message("user").write(st.session_state.user_input) | |
current_user_message = {"input": st.session_state.user_input} | |
with st.chat_message("assistant"): | |
st_callback = StreamlitCallbackHandler(st.container(), expand_new_thoughts=True) | |
result = st.session_state.agent.run_agent(input=st.session_state.user_input, callbacks=[st_callback]) | |
st.session_state.result = result | |
response = result.get('output', '') | |
st.session_state.messages.append({"roles": "assistant", "content": response}) | |
st.write(response) | |
current_assistant_response = {"output": response} | |
current_messages = [current_user_message, current_assistant_response] | |
with st.expander("View Document Sources"): | |
if len(st.session_state.doc_sources) != 0: | |
for document in st.session_state.doc_sources: | |
st.divider() | |
st.subheader("Source Content:") | |
st.write(document.page_content) | |
st.subheader("Metadata:") | |
# Display only relevant metadata keys | |
relevant_keys = ["Header ", "Header 3", "Header 4", "page_number", "source", "file_name", "title", "author"] | |
for key in relevant_keys: | |
value = document.metadata.get(key, 'N/A') | |
if value != 'N/A': | |
st.write(f"{key}: {value}") | |
else: | |
st.write("No document sources found") | |
if st.session_state.summary is not None: | |
with st.expander("Show Summary"): | |
st.subheader("Summarization") | |
result_summary = st.session_state.summary | |
st.write(result_summary) | |
#with st.expander("Cost Tracking", expanded=True): | |
#total_token = st.session_state.token_count | |
#st.write(total_token) | |
buttons_placeholder = st.container() | |
with buttons_placeholder: | |
#st.button("Regenerate Response", key="regenerate", on_click=st.session_state.agent.regenerate_response) | |
st.button("Clear Chat", key="clear", on_click=reset_chat) | |
#st.write(st.session_state.history) | |
#st.write(st.session_state.messages) | |
#st.write(st.session_state.br18_vectorstore) | |
#st.write(st.session_state.br18_appendix_child_vectorstore) | |
#st.write(st.session_state.usc_vectorstore) | |
#st.write(st.session_state.agent) | |
#st.write(st.session_state.result) | |
if __name__== '__main__': | |
main() | |