# each type of embeddings have a different dimensionset. from llama_index.embeddings.huggingface import HuggingFaceEmbedding from pinecone.grpc import PineconeGRPC from pinecone import ServerlessSpec from llama_index.vector_stores import PineconeVectorStore from llama_index.node_parser import SemanticSplitterNodeParser from llama_index.ingestion import IngestionPipeline import gc import re class DocumentEmbedder: """ Takes a document and embeds it directly into a pinecone data store. Process retrieves, cleans, embeds, and sends the documents to vector store. Currently supports hugginface embeddings only. Gotta keep things cheap. """ def __init__(self, api_keys, files, embedding, index_name): # api keys self.pinecone_api_key = api_keys['pinecone'] self.openai_api_key = api_keys['openai'] self.huggingface_api_key = api_keys['huggingface'] # pinecone self.embedding = embedding self.vector_db = index_name # basic items self.files = files self.interactive = interactive def clean_text(self, content: str) -> str: """ Remove unwanted characters and patterns in text input. :param content: Text input. :return: Cleaned version of original text input. """ # Fix hyphenated words broken by newline content = re.sub(r'(\w+)-\n(\w+)', r'\1\2', content) # Remove specific unwanted patterns and characters unwanted_patterns = [ "\\n", " —", "——————————", "—————————", "—————", r'\\u[\dA-Fa-f]{4}', r'\uf075', r'\uf0b7' ] for pattern in unwanted_patterns: content = re.sub(pattern, "", content) # Fix improperly spaced hyphenated words and normalize whitespace content = re.sub(r'(\w)\s*-\s*(\w)', r'\1-\2', content) content = re.sub(r'\s+', ' ', content) return content def create_embedder(self): """Get the right embedding model""" embedding = HuggingFaceEmbedding(model_name=self.embedding) return embedding, metadata['dimensions'] def pinecone_pipeline(self, embedding, dimensions): """Initialize pinecone connection and vectorstore""" # connect pc = PineconeGRPC(api_key=self.pinecone_api_key) # Create your index if index does not exist indexes = [i.name for i in pc.list_indexes()] index_exists = any([self.vector_db in i for i in indexes]) if index_exists: print("Index already exists") else: print("Creating index") pc.create_index( self.vector_db, dimension=dimensions, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) # Initialize your index pinecone_index = pc.Index(self.vector_db) # Initialize VectorStore vector_store = PineconeVectorStore(pinecone_index=pinecone_index) # create pipeline (abstracts away the need to adaptively process and batch) pipeline = IngestionPipeline( transformations=[ # creating appropriate chunks and cutoffs (this needs to be worked on). SemanticSplitterNodeParser( buffer_size=10, # 1 = each sentence is a node breakpoint_percentile_threshold=95, embed_model=embedding, ), embedding, ], vector_store=vector_store ) return pipeline def embed(self): """stringing process above to embed and upsert directly to pinecone""" # read_file print("reading files") results = self.files # Call clean function print("cleaning files") for d in range(len(results)): results[d].text = self.clean_text(results[d].text) # set up embedder print("retrieving embedder") embedder, metadata = self.create_embedder() # set up pinecone pipeline print("initializing pinecone db") pipeline = self.pinecone_pipeline(embedder, metadata) # run pinecone in batches (of 1) for memory preservation. print("reading into pinecone db") batchsize = 1 for i in range(0, len(results), batchsize): gc.collect() batch = pipeline.run(documents=results[i:i+batchsize]) print("completed batch %s" % ((i+batchsize)/batchsize))