from pinecone.grpc import PineconeGRPC from pinecone import ServerlessSpec from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.vector_stores import PineconeVectorStore from llama_index.node_parser import SemanticSplitterNodeParser from llama_index.ingestion import IngestionPipeline class PineconeConnector: """ A way to call the pinecone DB """ def __init__(self, api_key, index_name, embedding): self.pinecone_api_key = api_key self.vector_db = index_name self.embedding = embedding print("Connecting to Pinecone DB") self.pc = self.connect() print("Retrieving Embedder") self.embedder = HuggingFaceEmbedding(model_name=embedding) def connect(self): """Connects to vectorstore""" # connect pc = PineconeGRPC(api_key=self.pinecone_api_key) return pc def create_pipeline(self): # , namespace """Create a pipeline given an index name""" # Create your index if index does not exist indexes = [i.name for i in self.pc.list_indexes()] index_exists = any([self.vector_db in i for i in indexes]) if index_exists: print("Index already exists") else: print("Creating index") self.pc.create_index( self.vector_db, dimension=768, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) # Initialize your index pinecone_index = self.pc.Index(self.vector_db) # Initialize VectorStore vector_store = PineconeVectorStore(pinecone_index=pinecone_index) # namespace=namespace # create pipeline (abstracts away the need to adaptively process and batch) pipeline = IngestionPipeline( transformations=[ # creating appropriate chunks and cutoffs (this needs to be worked on). SemanticSplitterNodeParser( buffer_size=10, # 1 = each sentence is a node breakpoint_percentile_threshold=95, embed_model=self.embedder, ), self.embedder, ], vector_store=vector_store ) return pipeline def run(self): # , namespace """creates the pipeline, returns the connection and the embedder.""" pipeline = self.create_pipeline() # namespace return { "connection": self.pc, "pipeline": pipeline, "embedder": self.embedder, }