# %% import nltk from langchain.indexes import VectorstoreIndexCreator from langchain.text_splitter import CharacterTextSplitter, NLTKTextSplitter from langchain.document_loaders import OnlinePDFLoader, UnstructuredPDFLoader from langchain.vectorstores import Chroma from langchain.embeddings import LlamaCppEmbeddings, HuggingFaceInstructEmbeddings from chromadb.config import Settings import chromadb from chromadb.utils import embedding_functions from hashlib import sha256 import cloudpickle import logging import os from load_model import load_embedding, load_vectorstore import torch import re import pathlib import tempfile current_path = str( pathlib.Path(__file__).parent.resolve() ) os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" nltk.download('punkt') persist_directory = current_path + "/VectorStore" logger = logging.getLogger() # %% def create_collection(collection_name, model_name, client): """Not used atm""" if not torch.cuda.is_available(): device= "cpu" else: device= "cuda" ef = embedding_functions.InstructorEmbeddingFunction( model_name=model_name, device=device) client.get_or_create_collection(collection_name, embedding_function=ef) return True def create_and_add(collection_name, sub_docs, model_name, metadata): logging.info(f"Adding documents to {collection_name}") embeddings = load_embedding(model_name) vectorstore = load_vectorstore(model_name, collection_name, metadata = metadata) vectorstore.add_documents(documents=sub_docs, embedding=embeddings) vectorstore.persist() # Test Vectorstore #vectorstore2 = Chroma( #collection_name=collection_name, #embedding_function=embeddings, #client_settings=client_settings, #persist_directory=persist_directory, #) #print( vectorstore2.similarity_search_with_score(query="What are AXAs green Goals?", k=4) ) return vectorstore def load_from_file(files): saved_files=[] with tempfile.TemporaryDirectory() as tmpdirname: for file in files: temp_dir = pathlib.Path(tmpdirname) file_name = os.path.join(temp_dir,file.name) saved_files.append(file_name) with open(file_name, mode='wb') as w: w.write(file.read()) print(saved_files) loaders=[UnstructuredPDFLoader(pdf) for pdf in saved_files] docs = [] print(loaders) for loader in loaders: docs.extend(loader.load()) return docs def load_from_web(urls, cache=True): docs_list = urls filename=f"{current_path}/.cache/{sha256(str(urls).encode('utf-8')).hexdigest()}.pkl" isFile = os.path.isfile(filename) if cache and isFile: logger.info("Using Cache") pikd = open(filename, "rb") docs = cloudpickle.load(pikd) else: loaders=[OnlinePDFLoader(pdf) for pdf in docs_list] docs = [] for loader in loaders: docs.extend(loader.load()) with open(filename, 'wb') as output: cloudpickle.dump(docs, output) #update metadata i=0 for doc in docs: doc.metadata = {'source': docs_list[i], 'url': docs_list[i], 'owner':'Heiko Wagner'} i=i+1 return docs def load_and_split(docs, chunk_size=700): text_splitter = NLTKTextSplitter(chunk_size=chunk_size, chunk_overlap=0) sub_docs = text_splitter.split_documents(docs) return sub_docs