Spaces:
Running
Running
# Embeddings_Create.py | |
# Description: Functions for Creating and managing Embeddings in ChromaDB with LLama.cpp/OpenAI/Transformers | |
# | |
# Imports: | |
import logging | |
import os | |
import time | |
from functools import wraps | |
from threading import Lock, Timer | |
from typing import List | |
# | |
# 3rd-Party Imports: | |
import numpy as np | |
import onnxruntime as ort | |
import requests | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
# | |
# Local Imports: | |
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings | |
from App_Function_Libraries.Utils.Utils import load_comprehensive_config | |
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram | |
# | |
####################################################################################################################### | |
# | |
# Functions: | |
# Load configuration | |
loaded_config = load_comprehensive_config() | |
embedding_provider = loaded_config['Embeddings']['embedding_provider'] | |
embedding_model = loaded_config['Embeddings']['embedding_model'] | |
embedding_api_url = loaded_config['Embeddings']['embedding_api_url'] | |
embedding_api_key = loaded_config['Embeddings']['embedding_api_key'] | |
model_dir = loaded_config['Embeddings'].get('model_dir', './App_Function_Libraries/models/embedding_models/') | |
# Embedding Chunking Settings | |
chunk_size = loaded_config['Embeddings']['chunk_size'] | |
overlap = loaded_config['Embeddings']['overlap'] | |
# Global cache for embedding models | |
embedding_models = {} | |
# Commit hashes | |
commit_hashes = { | |
"jinaai/jina-embeddings-v3": "4be32c2f5d65b95e4bcce473545b7883ec8d2edd", | |
"Alibaba-NLP/gte-large-en-v1.5": "104333d6af6f97649377c2afbde10a7704870c7b", | |
"dunzhang/setll_en_400M_v5": "2aa5579fcae1c579de199a3866b6e514bbbf5d10" | |
} | |
class HuggingFaceEmbedder: | |
def __init__(self, model_name, cache_dir, timeout_seconds=30): | |
self.model_name = model_name | |
self.cache_dir = cache_dir # Store cache_dir | |
self.tokenizer = None | |
self.model = None | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
self.timeout_seconds = timeout_seconds | |
self.last_used_time = 0 | |
self.unload_timer = None | |
log_counter("huggingface_embedder_init", labels={"model_name": model_name}) | |
def load_model(self): | |
log_counter("huggingface_model_load_attempt", labels={"model_name": self.model_name}) | |
start_time = time.time() | |
# https://huggingface.co/docs/transformers/custom_models | |
if self.model is None: | |
# Pass cache_dir to from_pretrained to specify download directory | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
self.model_name, | |
trust_remote_code=True, | |
cache_dir=self.cache_dir, # Specify cache directory | |
revision=commit_hashes.get(self.model_name, None) # Pass commit hash | |
) | |
self.model = AutoModel.from_pretrained( | |
self.model_name, | |
trust_remote_code=True, | |
cache_dir=self.cache_dir, # Specify cache directory | |
revision=commit_hashes.get(self.model_name, None) # Pass commit hash | |
) | |
self.model.to(self.device) | |
self.last_used_time = time.time() | |
self.reset_timer() | |
load_time = time.time() - start_time | |
log_histogram("huggingface_model_load_duration", load_time, labels={"model_name": self.model_name}) | |
log_counter("huggingface_model_load_success", labels={"model_name": self.model_name}) | |
def unload_model(self): | |
log_counter("huggingface_model_unload", labels={"model_name": self.model_name}) | |
if self.model is not None: | |
del self.model | |
del self.tokenizer | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
self.model = None | |
self.tokenizer = None | |
if self.unload_timer: | |
self.unload_timer.cancel() | |
def reset_timer(self): | |
if self.unload_timer: | |
self.unload_timer.cancel() | |
self.unload_timer = Timer(self.timeout_seconds, self.unload_model) | |
self.unload_timer.start() | |
def create_embeddings(self, texts): | |
log_counter("huggingface_create_embeddings_attempt", labels={"model_name": self.model_name}) | |
start_time = time.time() | |
self.load_model() | |
# https://huggingface.co/docs/transformers/custom_models | |
inputs = self.tokenizer( | |
texts, | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=512 | |
) | |
inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
try: | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
embeddings = outputs.last_hidden_state.mean(dim=1) | |
return embeddings.cpu().float().numpy() # Convert to float32 before returning | |
except RuntimeError as e: | |
if "Got unsupported ScalarType BFloat16" in str(e): | |
logging.warning("BFloat16 not supported. Falling back to float32.") | |
# Convert model to float32 | |
self.model = self.model.float() | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
embeddings = outputs.last_hidden_state.mean(dim=1) | |
embedding_time = time.time() - start_time | |
log_histogram("huggingface_create_embeddings_duration", embedding_time, | |
labels={"model_name": self.model_name}) | |
log_counter("huggingface_create_embeddings_success", labels={"model_name": self.model_name}) | |
return embeddings.cpu().float().numpy() | |
else: | |
log_counter("huggingface_create_embeddings_failure", labels={"model_name": self.model_name}) | |
raise | |
class ONNXEmbedder: | |
def __init__(self, model_name, onnx_model_dir, timeout_seconds=30): | |
self.model_name = model_name | |
self.model_path = os.path.join(onnx_model_dir, f"{model_name}.onnx") | |
# https://huggingface.co/docs/transformers/custom_models | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
model_name, | |
trust_remote_code=True, | |
cache_dir=onnx_model_dir, # Ensure tokenizer uses the same directory | |
revision=commit_hashes.get(model_name, None) # Pass commit hash | |
) | |
self.session = None | |
self.timeout_seconds = timeout_seconds | |
self.last_used_time = 0 | |
self.unload_timer = None | |
self.device = "cpu" # ONNX Runtime will default to CPU unless GPU is configured | |
log_counter("onnx_embedder_init", labels={"model_name": model_name}) | |
def load_model(self): | |
log_counter("onnx_model_load_attempt", labels={"model_name": self.model_name}) | |
start_time = time.time() | |
if self.session is None: | |
if not os.path.exists(self.model_path): | |
raise FileNotFoundError(f"ONNX model not found at {self.model_path}") | |
logging.info(f"Loading ONNX model from {self.model_path}") | |
self.session = ort.InferenceSession(self.model_path) | |
self.last_used_time = time.time() | |
self.reset_timer() | |
load_time = time.time() - start_time | |
log_histogram("onnx_model_load_duration", load_time, labels={"model_name": self.model_name}) | |
log_counter("onnx_model_load_success", labels={"model_name": self.model_name}) | |
def unload_model(self): | |
log_counter("onnx_model_unload", labels={"model_name": self.model_name}) | |
if self.session is not None: | |
logging.info("Unloading ONNX model to free resources.") | |
self.session = None | |
if self.unload_timer: | |
self.unload_timer.cancel() | |
def reset_timer(self): | |
if self.unload_timer: | |
self.unload_timer.cancel() | |
self.unload_timer = Timer(self.timeout_seconds, self.unload_model) | |
self.unload_timer.start() | |
def create_embeddings(self, texts: List[str]) -> List[List[float]]: | |
log_counter("onnx_create_embeddings_attempt", labels={"model_name": self.model_name}) | |
start_time = time.time() | |
self.load_model() | |
try: | |
inputs = self.tokenizer( | |
texts, | |
return_tensors="np", | |
padding=True, | |
truncation=True, | |
max_length=512 | |
) | |
input_ids = inputs["input_ids"].astype(np.int64) | |
attention_mask = inputs["attention_mask"].astype(np.int64) | |
ort_inputs = { | |
"input_ids": input_ids, | |
"attention_mask": attention_mask | |
} | |
ort_outputs = self.session.run(None, ort_inputs) | |
last_hidden_state = ort_outputs[0] | |
embeddings = np.mean(last_hidden_state, axis=1) | |
embedding_time = time.time() - start_time | |
log_histogram("onnx_create_embeddings_duration", embedding_time, labels={"model_name": self.model_name}) | |
log_counter("onnx_create_embeddings_success", labels={"model_name": self.model_name}) | |
return embeddings.tolist() | |
except Exception as e: | |
log_counter("onnx_create_embeddings_failure", labels={"model_name": self.model_name}) | |
logging.error(f"Error creating embeddings with ONNX model: {str(e)}") | |
raise | |
class RateLimiter: | |
def __init__(self, max_calls, period): | |
self.max_calls = max_calls | |
self.period = period | |
self.calls = [] | |
self.lock = Lock() | |
def __call__(self, func): | |
def wrapper(*args, **kwargs): | |
with self.lock: | |
now = time.time() | |
self.calls = [call for call in self.calls if call > now - self.period] | |
if len(self.calls) >= self.max_calls: | |
sleep_time = self.calls[0] - (now - self.period) | |
time.sleep(sleep_time) | |
self.calls.append(time.time()) | |
return func(*args, **kwargs) | |
return wrapper | |
def exponential_backoff(max_retries=5, base_delay=1): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
for attempt in range(max_retries): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
if attempt == max_retries - 1: | |
raise | |
delay = base_delay * (2 ** attempt) | |
logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}") | |
time.sleep(delay) | |
return wrapper | |
return decorator | |
def create_embeddings_batch(texts: List[str], | |
provider: str, | |
model: str, | |
api_url: str, | |
timeout_seconds: int = 300 | |
) -> List[List[float]]: | |
global embedding_models | |
log_counter("create_embeddings_batch_attempt", labels={"provider": provider, "model": model}) | |
start_time = time.time() | |
try: | |
if provider.lower() == 'huggingface': | |
if model not in embedding_models: | |
if model == "dunzhang/stella_en_400M_v5": | |
embedding_models[model] = ONNXEmbedder(model, model_dir, timeout_seconds) | |
else: | |
# Pass model_dir to HuggingFaceEmbedder | |
embedding_models[model] = HuggingFaceEmbedder(model, model_dir, timeout_seconds) | |
embedder = embedding_models[model] | |
embedding_time = time.time() - start_time | |
log_histogram("create_embeddings_batch_duration", embedding_time, | |
labels={"provider": provider, "model": model}) | |
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model}) | |
return embedder.create_embeddings(texts) | |
elif provider.lower() == 'openai': | |
logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API") | |
embedding_time = time.time() - start_time | |
log_histogram("create_embeddings_batch_duration", embedding_time, | |
labels={"provider": provider, "model": model}) | |
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model}) | |
return [create_openai_embedding(text, model) for text in texts] | |
elif provider.lower() == 'local': | |
response = requests.post( | |
api_url, | |
json={"texts": texts, "model": model}, | |
headers={"Authorization": f"Bearer {embedding_api_key}"} | |
) | |
if response.status_code == 200: | |
embedding_time = time.time() - start_time | |
log_histogram("create_embeddings_batch_duration", embedding_time, | |
labels={"provider": provider, "model": model}) | |
log_counter("create_embeddings_batch_success", labels={"provider": provider, "model": model}) | |
return response.json()['embeddings'] | |
else: | |
raise Exception(f"Error from local API: {response.text}") | |
else: | |
raise ValueError(f"Unsupported embedding provider: {provider}") | |
except Exception as e: | |
log_counter("create_embeddings_batch_error", labels={"provider": provider, "model": model, "error": str(e)}) | |
logging.error(f"Error in create_embeddings_batch: {str(e)}") | |
raise | |
def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]: | |
log_counter("create_embedding_attempt", labels={"provider": provider, "model": model}) | |
start_time = time.time() | |
embedding = create_embeddings_batch([text], provider, model, api_url)[0] | |
if isinstance(embedding, np.ndarray): | |
embedding = embedding.tolist() | |
embedding_time = time.time() - start_time | |
log_histogram("create_embedding_duration", embedding_time, labels={"provider": provider, "model": model}) | |
log_counter("create_embedding_success", labels={"provider": provider, "model": model}) | |
return embedding | |
def create_openai_embedding(text: str, model: str) -> List[float]: | |
log_counter("create_openai_embedding_attempt", labels={"model": model}) | |
start_time = time.time() | |
embedding = get_openai_embeddings(text, model) | |
embedding_time = time.time() - start_time | |
log_histogram("create_openai_embedding_duration", embedding_time, labels={"model": model}) | |
log_counter("create_openai_embedding_success", labels={"model": model}) | |
return embedding | |
# | |
# ############################################################## | |
# # | |
# # ONNX Embeddings Functions | |
# | |
# # FIXME - UPDATE | |
# # Define the model path | |
# model_dir = "/tldw/App_Function_Libraries/models/embedding_models/" | |
# model_name = "your-huggingface-model-name" | |
# onnx_model_path = os.path.join(model_dir, model_name, "model.onnx") | |
# | |
# # Tokenizer download (if applicable) | |
# #tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# | |
# # Ensure the model directory exists | |
# #if not os.path.exists(onnx_model_path): | |
# # You can add logic to download the ONNX model from a remote source | |
# # if it's not already available in the folder. | |
# # Example: huggingface_hub.download (if model is hosted on Hugging Face Hub) | |
# # raise Exception(f"ONNX model not found at {onnx_model_path}") | |
# | |
# class ONNXEmbedder: | |
# def __init__(self, model_name, model_dir, timeout_seconds=120): | |
# self.model_name = model_name | |
# self.model_path = os.path.join(model_dir, f"{model_name}.onnx") | |
# self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# self.session = None | |
# self.timeout_seconds = timeout_seconds | |
# self.last_used_time = 0 | |
# self.unload_timer = None | |
# self.device = "cpu" # ONNX Runtime will default to CPU unless GPU is configured | |
# | |
# def load_model(self): | |
# if self.session is None: | |
# if not os.path.exists(self.model_path): | |
# raise FileNotFoundError(f"ONNX model not found at {self.model_path}") | |
# logging.info(f"Loading ONNX model from {self.model_path}") | |
# self.session = ort.InferenceSession(self.model_path) | |
# self.last_used_time = time.time() | |
# self.reset_timer() | |
# | |
# def unload_model(self): | |
# if self.session is not None: | |
# logging.info("Unloading ONNX model to free resources.") | |
# self.session = None | |
# if self.unload_timer: | |
# self.unload_timer.cancel() | |
# | |
# def reset_timer(self): | |
# if self.unload_timer: | |
# self.unload_timer.cancel() | |
# self.unload_timer = Timer(self.timeout_seconds, self.unload_model) | |
# self.unload_timer.start() | |
# | |
# def create_embeddings(self, texts: List[str]) -> List[List[float]]: | |
# self.load_model() | |
# | |
# try: | |
# inputs = self.tokenizer(texts, return_tensors="np", padding=True, truncation=True, max_length=512) | |
# input_ids = inputs["input_ids"].astype(np.int64) | |
# attention_mask = inputs["attention_mask"].astype(np.int64) | |
# | |
# ort_inputs = { | |
# "input_ids": input_ids, | |
# "attention_mask": attention_mask | |
# } | |
# | |
# ort_outputs = self.session.run(None, ort_inputs) | |
# | |
# last_hidden_state = ort_outputs[0] | |
# embeddings = np.mean(last_hidden_state, axis=1) | |
# | |
# return embeddings.tolist() | |
# except Exception as e: | |
# logging.error(f"Error creating embeddings with ONNX model: {str(e)}") | |
# raise | |
# | |
# # Global cache for the ONNX embedder instance | |
# onnx_embedder = None | |
# | |
# # Global cache for embedding models | |
# embedding_models = {} | |
# | |
# def create_onnx_embeddings(texts: List[str]) -> List[List[float]]: | |
# global onnx_embedder | |
# model_dir = "/tldw/App_Function_Libraries/models/embedding_models/" | |
# model_name = "your-huggingface-model-name" # This can be pulled from config | |
# | |
# if onnx_embedder is None: | |
# onnx_embedder = ONNXEmbedder(model_name=model_name, model_dir=model_dir) | |
# | |
# # Generate embeddings | |
# embeddings = onnx_embedder.create_embeddings(texts) | |
# return embeddings | |
# | |
# # | |
# # End of ONNX Embeddings Functions | |
# ############################################################## | |
# | |
# End of File. | |
####################################################################################################################### | |