rag_milvus / milvus_singleton.py
januarevan's picture
init
da05b08
raw
history blame contribute delete
782 Bytes
from pymilvus import connections
from pymilvus.exceptions import ConnectionConfigException
class MilvusClientSingleton:
_instance = None
@staticmethod
def get_instance(uri):
if MilvusClientSingleton._instance is None:
MilvusClientSingleton()
# Initialize the client here
try:
MilvusClientSingleton._instance = connections.connect(uri=uri)
except ConnectionConfigException as e:
print(f"Error connecting to Milvus: {e}")
# Handle error appropriately
return MilvusClientSingleton._instance
def __init__(self):
if MilvusClientSingleton._instance is not None:
raise Exception("This class is a singleton!")
self._instance = None