Spaces:
Runtime error
Runtime error
from chromadb.config import Settings | |
from langchain.chains import RetrievalQA | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
from langchain.vectorstores import Chroma | |
import os | |
import requests | |
from fastapi import FastAPI, UploadFile, File | |
from typing import List, Optional | |
import urllib.parse | |
from langchain.llms import HuggingFacePipeline | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import transformers | |
from torch import cuda, bfloat16 | |
import gradio as gr | |
import gc | |
import torch | |
import glob | |
from multiprocessing import Pool | |
from tqdm import tqdm | |
persist_directory = "db" | |
source_directory = 'source_documents' | |
embeddings_model_name = "all-MiniLM-L6-v2" | |
model = "tiiuae/falcon-7b-instruct" | |
chunk_size = 500 | |
chunk_overlap = 50 | |
target_source_chunks = 4 | |
# Define the folder for storing database | |
persist_directory = 'db' | |
embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name) | |
llm = HuggingFacePipeline.from_model_id(model_id=model, task="text-generation", device=0, model_kwargs={"temperature":0.1,"trust_remote_code": True, "max_length":100000, "top_p":0.15, "top_k":0, "repetition_penalty":1.1, "num_return_sequences":1, "torch_dtype":bfloat16}) | |
# Define the Chroma settings | |
CHROMA_SETTINGS = Settings( | |
chroma_db_impl='duckdb+parquet', | |
persist_directory=persist_directory, | |
anonymized_telemetry=False | |
) | |
import os | |
import glob | |
from typing import List | |
import argparse | |
from langchain.document_loaders import ( | |
CSVLoader, | |
EverNoteLoader, | |
PDFMinerLoader, | |
TextLoader, | |
UnstructuredEmailLoader, | |
UnstructuredEPubLoader, | |
UnstructuredHTMLLoader, | |
UnstructuredMarkdownLoader, | |
UnstructuredODTLoader, | |
UnstructuredPowerPointLoader, | |
UnstructuredWordDocumentLoader, | |
) | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.docstore.document import Document | |
# from constants import CHROMA_SETTINGS | |
# from PyPDF2 import PdfReader | |
import requests | |
# Map file extensions to document loaders and their arguments | |
LOADER_MAPPING = { | |
".csv": (CSVLoader, {}), | |
# ".docx": (Docx2txtLoader, {}), | |
".doc": (UnstructuredWordDocumentLoader, {}), | |
".docx": (UnstructuredWordDocumentLoader, {}), | |
".enex": (EverNoteLoader, {}), | |
# ".eml": (MyElmLoader, {}), | |
".epub": (UnstructuredEPubLoader, {}), | |
".html": (UnstructuredHTMLLoader, {}), | |
".md": (UnstructuredMarkdownLoader, {}), | |
".odt": (UnstructuredODTLoader, {}), | |
".pdf": (PDFMinerLoader, {}), | |
".ppt": (UnstructuredPowerPointLoader, {}), | |
".pptx": (UnstructuredPowerPointLoader, {}), | |
".txt": (TextLoader, {"encoding": "cp1252"}), | |
# Add more mappings for other file extensions and loaders as needed | |
} | |
def load_single_document(file_path: str) -> List[Document]: | |
ext = "." + file_path.rsplit(".", 1)[-1] | |
if ext in LOADER_MAPPING: | |
loader_class, loader_args = LOADER_MAPPING[ext] | |
loader = loader_class(file_path, **loader_args) | |
return loader.load() | |
raise ValueError(f"Unsupported file extension '{ext}'") | |
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]: | |
""" | |
Loads all documents from the source documents directory, ignoring specified files | |
""" | |
all_files = [] | |
for ext in LOADER_MAPPING: | |
all_files.extend( | |
glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True) | |
) | |
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files] | |
with Pool(processes=os.cpu_count()) as pool: | |
results = [] | |
with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar: | |
for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)): | |
results.extend(docs) | |
pbar.update() | |
return results | |
def process_documents(ignored_files: List[str] = []) -> List[Document]: | |
""" | |
Load documents and split in chunks | |
""" | |
print(f"Loading documents from {source_directory}") | |
documents = load_documents(source_directory, ignored_files) | |
if not documents: | |
print("No new documents to load") | |
exit(0) | |
print(f"Loaded {len(documents)} new documents from {source_directory}") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
texts = text_splitter.split_documents(documents) | |
print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)") | |
return texts | |
def does_vectorstore_exist(persist_directory: str) -> bool: | |
""" | |
Checks if vectorstore exists | |
""" | |
if os.path.exists(os.path.join(persist_directory, 'index')): | |
if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')): | |
list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin')) | |
list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl')) | |
# At least 3 documents are needed in a working vectorstore | |
if len(list_index_files) > 3: | |
return True | |
return False | |
def ingest(): | |
# Load environment variables | |
embeddings_model_name = "all-MiniLM-L6-v2" | |
persist_directory = "db" | |
model = "tiiuae/falcon-7b-instruct" | |
source_directory = "source_documents" | |
os.makedirs(source_directory, exist_ok=True) | |
# Load documents and split in chunks | |
print(f"Loading documents from {source_directory}") | |
chunk_size = 500 | |
chunk_overlap = 50 | |
documents = load_documents(source_directory) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
texts = text_splitter.split_documents(documents) | |
print(f"Loaded {len(documents)} documents from {source_directory}") | |
print(f"Split into {len(texts)} chunks of text (max. {chunk_size} characters each)") | |
# Create embeddings | |
# embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name) | |
# Create and store locally vectorstore | |
db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS) | |
db.persist() | |
db = None | |
def embed_documents(files): | |
saved_files = [] | |
source_directory = "source_documents" | |
# print(files) | |
# Save the files to the specified folder | |
for file_ in files: | |
print(type(file_)) | |
os.makedirs(source_directory, exist_ok= True) | |
filename = "file.pdf" | |
file_path = os.path.join(source_directory, filename) | |
saved_files.append(file_path) | |
print(type(file_)) | |
print(file_path) | |
# file_content = file_.read() | |
with open(file_path, "wb") as f: | |
print("write") | |
f.write(file_) | |
ingest() | |
# Delete the contents of the folder | |
[os.remove(os.path.join(source_directory, filename)) or os.path.join(source_directory, filename) for file in files] | |
return {"message": "Files embedded successfully"} | |
def retrieve_documents(query: str): | |
target_source_chunks = 4 | |
mute_stream = "" | |
embeddings_model_name = "all-MiniLM-L6-v2" | |
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS) | |
retriever = db.as_retriever(search_kwargs={"k": target_source_chunks}) | |
# Prepare the LLM | |
callbacks = [] if mute_stream else [StreamingStdOutCallbackHandler()] | |
qa = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=False) | |
# Get the answer from the chain | |
res = qa(query) | |
print(res) | |
answer = res['result'] | |
return answer | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(): | |
file_input = gr.File(file_count="multiple", file_types=["text", ".json", ".csv", ".pdf"], type= 'binary') | |
initiate_btn = gr.Button(value="Generate Embedding") | |
with gr.Column(): | |
question = gr.Textbox(label="Question") | |
question_btn = gr.Button(value="Question_btn") | |
answer = gr.Textbox(label="answer") | |
initiate_btn.click(embed_documents, inputs=file_input, api_name="embed-file") | |
question_btn.click(retrieve_documents, inputs=question , outputs=answer, api_name="llm") | |
demo.launch(debug= True) |