tldw / App_Function_Libraries /RAG /Embeddings_Create.py
oceansweep's picture
Upload 169 files
c5b0bb7 verified
raw
history blame
19.1 kB
# Embeddings_Create.py
# Description: Functions for Creating and managing Embeddings in ChromaDB with LLama.cpp/OpenAI/Transformers
#
# Imports:
import logging
import os
import time
from functools import wraps
from threading import Lock, Timer
from typing import List
#
# 3rd-Party Imports:
import numpy as np
import onnxruntime as ort
import requests
from transformers import AutoTokenizer, AutoModel
import torch
#
# Local Imports:
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
#
#######################################################################################################################
#
# Functions:
# Load configuration
loaded_config = load_comprehensive_config()
embedding_provider = loaded_config['Embeddings']['embedding_provider']
embedding_model = loaded_config['Embeddings']['embedding_model']
embedding_api_url = loaded_config['Embeddings']['embedding_api_url']
embedding_api_key = loaded_config['Embeddings']['embedding_api_key']
model_dir = loaded_config['Embeddings'].get('model_dir', './App_Function_Libraries/models/embedding_models/')
# Embedding Chunking Settings
chunk_size = loaded_config['Embeddings']['chunk_size']
overlap = loaded_config['Embeddings']['overlap']
# Global cache for embedding models
embedding_models = {}
# Commit hashes
commit_hashes = {
"jinaai/jina-embeddings-v3": "4be32c2f5d65b95e4bcce473545b7883ec8d2edd",
"Alibaba-NLP/gte-large-en-v1.5": "104333d6af6f97649377c2afbde10a7704870c7b",
"dunzhang/setll_en_400M_v5": "2aa5579fcae1c579de199a3866b6e514bbbf5d10"
}
class HuggingFaceEmbedder:
def __init__(self, model_name, cache_dir, timeout_seconds=30):
self.model_name = model_name
self.cache_dir = cache_dir # Store cache_dir
self.tokenizer = None
self.model = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.timeout_seconds = timeout_seconds
self.last_used_time = 0
self.unload_timer = None
log_counter("huggingface_embedder_init", labels={"model_name": model_name})
def load_model(self):
log_counter("huggingface_model_load_attempt", labels={"model_name": self.model_name})
start_time = time.time()
# https://huggingface.co/docs/transformers/custom_models
if self.model is None:
# Pass cache_dir to from_pretrained to specify download directory
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True,
cache_dir=self.cache_dir, # Specify cache directory
revision=commit_hashes.get(self.model_name, None) # Pass commit hash
)
self.model = AutoModel.from_pretrained(
self.model_name,
trust_remote_code=True,
cache_dir=self.cache_dir, # Specify cache directory
revision=commit_hashes.get(self.model_name, None) # Pass commit hash
)
self.model.to(self.device)
self.last_used_time = time.time()
self.reset_timer()
load_time = time.time() - start_time
log_histogram("huggingface_model_load_duration", load_time, labels={"model_name": self.model_name})
log_counter("huggingface_model_load_success", labels={"model_name": self.model_name})
def unload_model(self):
log_counter("huggingface_model_unload", labels={"model_name": self.model_name})
if self.model is not None:
del self.model
del self.tokenizer
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.model = None
self.tokenizer = None
if self.unload_timer:
self.unload_timer.cancel()
def reset_timer(self):
if self.unload_timer:
self.unload_timer.cancel()
self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
self.unload_timer.start()
def create_embeddings(self, texts):
log_counter("huggingface_create_embeddings_attempt", labels={"model_name": self.model_name})
start_time = time.time()
self.load_model()
# https://huggingface.co/docs/transformers/custom_models
inputs = self.tokenizer(
texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
try:
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings.cpu().float().numpy() # Convert to float32 before returning
except RuntimeError as e:
if "Got unsupported ScalarType BFloat16" in str(e):
logging.warning("BFloat16 not supported. Falling back to float32.")
# Convert model to float32
self.model = self.model.float()
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1)
embedding_time = time.time() - start_time
log_histogram("huggingface_create_embeddings_duration", embedding_time,
labels={"model_name": self.model_name})
log_counter("huggingface_create_embeddings_success", labels={"model_name": self.model_name})
return embeddings.cpu().float().numpy()
else:
log_counter("huggingface_create_embeddings_failure", labels={"model_name": self.model_name})
raise
class ONNXEmbedder:
def __init__(self, model_name, onnx_model_dir, timeout_seconds=30):
self.model_name = model_name
self.model_path = os.path.join(onnx_model_dir, f"{model_name}.onnx")
# https://huggingface.co/docs/transformers/custom_models
self.tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True,
cache_dir=onnx_model_dir, # Ensure tokenizer uses the same directory
revision=commit_hashes.get(model_name, None) # Pass commit hash
)
self.session = None
self.timeout_seconds = timeout_seconds
self.last_used_time = 0
self.unload_timer = None
self.device = "cpu" # ONNX Runtime will default to CPU unless GPU is configured
log_counter("onnx_embedder_init", labels={"model_name": model_name})
def load_model(self):
log_counter("onnx_model_load_attempt", labels={"model_name": self.model_name})
start_time = time.time()
if self.session is None:
if not os.path.exists(self.model_path):
raise FileNotFoundError(f"ONNX model not found at {self.model_path}")
logging.info(f"Loading ONNX model from {self.model_path}")
self.session = ort.InferenceSession(self.model_path)
self.last_used_time = time.time()
self.reset_timer()
load_time = time.time() - start_time
log_histogram("onnx_model_load_duration", load_time, labels={"model_name": self.model_name})
log_counter("onnx_model_load_success", labels={"model_name": self.model_name})
def unload_model(self):
log_counter("onnx_model_unload", labels={"model_name": self.model_name})
if self.session is not None:
logging.info("Unloading ONNX model to free resources.")
self.session = None
if self.unload_timer:
self.unload_timer.cancel()
def reset_timer(self):
if self.unload_timer:
self.unload_timer.cancel()
self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
self.unload_timer.start()
def create_embeddings(self, texts: List[str]) -> List[List[float]]:
log_counter("onnx_create_embeddings_attempt", labels={"model_name": self.model_name})
start_time = time.time()
self.load_model()
try:
inputs = self.tokenizer(
texts,
return_tensors="np",
padding=True,
truncation=True,
max_length=512
)
input_ids = inputs["input_ids"].astype(np.int64)
attention_mask = inputs["attention_mask"].astype(np.int64)
ort_inputs = {
"input_ids": input_ids,
"attention_mask": attention_mask
}
ort_outputs = self.session.run(None, ort_inputs)
last_hidden_state = ort_outputs[0]
embeddings = np.mean(last_hidden_state, axis=1)
embedding_time = time.time() - start_time
log_histogram("onnx_create_embeddings_duration", embedding_time, labels={"model_name": self.model_name})
log_counter("onnx_create_embeddings_success", labels={"model_name": self.model_name})
return embeddings.tolist()
except Exception as e:
log_counter("onnx_create_embeddings_failure", labels={"model_name": self.model_name})
logging.error(f"Error creating embeddings with ONNX model: {str(e)}")
raise
class RateLimiter:
def __init__(self, max_calls, period):
self.max_calls = max_calls
self.period = period
self.calls = []
self.lock = Lock()
def __call__(self, func):
def wrapper(*args, **kwargs):
with self.lock:
now = time.time()
self.calls = [call for call in self.calls if call > now - self.period]
if len(self.calls) >= self.max_calls:
sleep_time = self.calls[0] - (now - self.period)
time.sleep(sleep_time)
self.calls.append(time.time())
return func(*args, **kwargs)
return wrapper
def exponential_backoff(max_retries=5, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}")
time.sleep(delay)
return wrapper
return decorator
@exponential_backoff()
@RateLimiter(max_calls=50, period=60)
def create_embeddings_batch(texts: List[str],
provider: str,
model: str,
api_url: str,
timeout_seconds: int = 300
) -> List[List[float]]:
global embedding_models
log_counter("create_embeddings_batch_attempt", labels={"provider": provider, "model": model})
start_time = time.time()
try:
if provider.lower() == 'huggingface':
if model not in embedding_models:
if model == "dunzhang/stella_en_400M_v5":
embedding_models[model] = ONNXEmbedder(model, model_dir, timeout_seconds)
else:
# Pass model_dir to HuggingFaceEmbedder
embedding_models[model] = HuggingFaceEmbedder(model, model_dir, timeout_seconds)
embedder = embedding_models[model]
embedding_time = time.time() - start_time
log_histogram("create_embeddings_batch_duration", embedding_time,
labels={"provider": provider, "model": model})
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model})
return embedder.create_embeddings(texts)
elif provider.lower() == 'openai':
logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API")
embedding_time = time.time() - start_time
log_histogram("create_embeddings_batch_duration", embedding_time,
labels={"provider": provider, "model": model})
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model})
return [create_openai_embedding(text, model) for text in texts]
elif provider.lower() == 'local':
response = requests.post(
api_url,
json={"texts": texts, "model": model},
headers={"Authorization": f"Bearer {embedding_api_key}"}
)
if response.status_code == 200:
embedding_time = time.time() - start_time
log_histogram("create_embeddings_batch_duration", embedding_time,
labels={"provider": provider, "model": model})
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model})
return response.json()['embeddings']
else:
raise Exception(f"Error from local API: {response.text}")
else:
raise ValueError(f"Unsupported embedding provider: {provider}")
except Exception as e:
log_counter("create_embeddings_batch_error", labels={"provider": provider, "model": model, "error": str(e)})
logging.error(f"Error in create_embeddings_batch: {str(e)}")
raise
def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]:
log_counter("create_embedding_attempt", labels={"provider": provider, "model": model})
start_time = time.time()
embedding = create_embeddings_batch([text], provider, model, api_url)[0]
if isinstance(embedding, np.ndarray):
embedding = embedding.tolist()
embedding_time = time.time() - start_time
log_histogram("create_embedding_duration", embedding_time, labels={"provider": provider, "model": model})
log_counter("create_embedding_success", labels={"provider": provider, "model": model})
return embedding
def create_openai_embedding(text: str, model: str) -> List[float]:
log_counter("create_openai_embedding_attempt", labels={"model": model})
start_time = time.time()
embedding = get_openai_embeddings(text, model)
embedding_time = time.time() - start_time
log_histogram("create_openai_embedding_duration", embedding_time, labels={"model": model})
log_counter("create_openai_embedding_success", labels={"model": model})
return embedding
#
# ##############################################################
# #
# # ONNX Embeddings Functions
#
# # FIXME - UPDATE
# # Define the model path
# model_dir = "/tldw/App_Function_Libraries/models/embedding_models/"
# model_name = "your-huggingface-model-name"
# onnx_model_path = os.path.join(model_dir, model_name, "model.onnx")
#
# # Tokenizer download (if applicable)
# #tokenizer = AutoTokenizer.from_pretrained(model_name)
#
# # Ensure the model directory exists
# #if not os.path.exists(onnx_model_path):
# # You can add logic to download the ONNX model from a remote source
# # if it's not already available in the folder.
# # Example: huggingface_hub.download (if model is hosted on Hugging Face Hub)
# # raise Exception(f"ONNX model not found at {onnx_model_path}")
#
# class ONNXEmbedder:
# def __init__(self, model_name, model_dir, timeout_seconds=120):
# self.model_name = model_name
# self.model_path = os.path.join(model_dir, f"{model_name}.onnx")
# self.tokenizer = AutoTokenizer.from_pretrained(model_name)
# self.session = None
# self.timeout_seconds = timeout_seconds
# self.last_used_time = 0
# self.unload_timer = None
# self.device = "cpu" # ONNX Runtime will default to CPU unless GPU is configured
#
# def load_model(self):
# if self.session is None:
# if not os.path.exists(self.model_path):
# raise FileNotFoundError(f"ONNX model not found at {self.model_path}")
# logging.info(f"Loading ONNX model from {self.model_path}")
# self.session = ort.InferenceSession(self.model_path)
# self.last_used_time = time.time()
# self.reset_timer()
#
# def unload_model(self):
# if self.session is not None:
# logging.info("Unloading ONNX model to free resources.")
# self.session = None
# if self.unload_timer:
# self.unload_timer.cancel()
#
# def reset_timer(self):
# if self.unload_timer:
# self.unload_timer.cancel()
# self.unload_timer = Timer(self.timeout_seconds, self.unload_model)
# self.unload_timer.start()
#
# def create_embeddings(self, texts: List[str]) -> List[List[float]]:
# self.load_model()
#
# try:
# inputs = self.tokenizer(texts, return_tensors="np", padding=True, truncation=True, max_length=512)
# input_ids = inputs["input_ids"].astype(np.int64)
# attention_mask = inputs["attention_mask"].astype(np.int64)
#
# ort_inputs = {
# "input_ids": input_ids,
# "attention_mask": attention_mask
# }
#
# ort_outputs = self.session.run(None, ort_inputs)
#
# last_hidden_state = ort_outputs[0]
# embeddings = np.mean(last_hidden_state, axis=1)
#
# return embeddings.tolist()
# except Exception as e:
# logging.error(f"Error creating embeddings with ONNX model: {str(e)}")
# raise
#
# # Global cache for the ONNX embedder instance
# onnx_embedder = None
#
# # Global cache for embedding models
# embedding_models = {}
#
# def create_onnx_embeddings(texts: List[str]) -> List[List[float]]:
# global onnx_embedder
# model_dir = "/tldw/App_Function_Libraries/models/embedding_models/"
# model_name = "your-huggingface-model-name" # This can be pulled from config
#
# if onnx_embedder is None:
# onnx_embedder = ONNXEmbedder(model_name=model_name, model_dir=model_dir)
#
# # Generate embeddings
# embeddings = onnx_embedder.create_embeddings(texts)
# return embeddings
#
# #
# # End of ONNX Embeddings Functions
# ##############################################################
#
# End of File.
#######################################################################################################################