Spaces:
Runtime error
Runtime error
from pydantic import BaseModel | |
from llama_cpp import Llama | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import re | |
import os | |
from dotenv import load_dotenv | |
import spaces | |
import requests | |
import random | |
from faker import Faker | |
from fastapi import FastAPI, Request | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from threading import Thread | |
from time import sleep | |
from fastapi.staticfiles import StaticFiles | |
import gradio as gr | |
from typing import Dict, Any, Tuple | |
from urllib.parse import urlparse | |
load_dotenv() | |
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") | |
global_data = { | |
'models': {}, | |
} | |
model_configs = [ | |
{"repo_id": "Ffftdtd5dtft/gpt2-xl-Q2_K-GGUF", "filename": "gpt2-xl-q2_k.gguf", "name": "GPT-2 XL"}, | |
{"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Instruct-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-instruct-q2_k.gguf", "name": "Meta Llama 3.1-8B Instruct"}, | |
{"repo_id": "Ffftdtd5dtft/gemma-2-9b-it-Q2_K-GGUF", "filename": "gemma-2-9b-it-q2_k.gguf", "name": "Gemma 2-9B IT"}, | |
{"repo_id": "Ffftdtd5dtft/gemma-2-27b-Q2_K-GGUF", "filename": "gemma-2-27b-q2_k.gguf", "name": "Gemma 2-27B"}, | |
{"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3 Mini 128K Instruct"}, | |
{"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-q2_k.gguf", "name": "Meta Llama 3.1-8B"}, | |
{"repo_id": "Ffftdtd5dtft/Qwen2-7B-Instruct-Q2_K-GGUF", "filename": "qwen2-7b-instruct-q2_k.gguf", "name": "Qwen2 7B Instruct"}, | |
{"repo_id": "Ffftdtd5dtft/starcoder2-3b-Q2_K-GGUF", "filename": "starcoder2-3b-q2_k.gguf", "name": "Starcoder2 3B"}, | |
{"repo_id": "Ffftdtd5dtft/Qwen2-1.5B-Instruct-Q2_K-GGUF", "filename": "qwen2-1.5b-instruct-q2_k.gguf", "name": "Qwen2 1.5B Instruct"}, | |
{"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-70B-Q2_K-GGUF", "filename": "meta-llama-3.1-70b-q2_k.gguf", "name": "Meta Llama 3.1-70B"}, | |
{"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"}, | |
{"repo_id": "Ffftdtd5dtft/Hermes-3-Llama-3.1-8B-IQ1_S-GGUF", "filename": "hermes-3-llama-3.1-8b-iq1_s-imat.gguf", "name": "Hermes 3 Llama 3.1-8B"}, | |
{"repo_id": "Ffftdtd5dtft/Phi-3.5-mini-instruct-Q2_K-GGUF", "filename": "phi-3.5-mini-instruct-q2_k.gguf", "name": "Phi 3.5 Mini Instruct"}, | |
{"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-70B-Instruct-Q2_K-GGUF", "filename": "meta-llama-3.1-70b-instruct-q2_k.gguf", "name": "Meta Llama 3.1-70B Instruct"}, | |
{"repo_id": "Ffftdtd5dtft/codegemma-2b-IQ1_S-GGUF", "filename": "codegemma-2b-iq1_s-imat.gguf", "name": "Codegemma 2B"}, | |
{"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-IQ2_XXS-GGUF", "filename": "phi-3-mini-128k-instruct-iq2_xxs-imat.gguf", "name": "Phi 3 Mini 128K Instruct XXS"}, | |
{"repo_id": "Ffftdtd5dtft/TinyLlama-1.1B-Chat-v1.0-IQ1_S-GGUF", "filename": "tinyllama-1.1b-chat-v1.0-iq1_s-imat.gguf", "name": "TinyLlama 1.1B Chat"}, | |
{"repo_id": "Ffftdtd5dtft/Mistral-NeMo-Minitron-8B-Base-IQ1_S-GGUF", "filename": "mistral-nemo-minitron-8b-base-iq1_s-imat.gguf", "name": "Mistral NeMo Minitron 8B Base"}, | |
{"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"} | |
] | |
class ModelManager: | |
def __init__(self): | |
self.models = {} | |
def load_model(self, model_config): | |
if model_config['name'] not in self.models: | |
try: | |
print(f"Loading model {model_config['name']}...") | |
self.models[model_config['name']] = Llama.from_pretrained( | |
repo_id=model_config['repo_id'], | |
filename=model_config['filename'], | |
use_auth_token=HUGGINGFACE_TOKEN | |
) | |
print(f"Model {model_config['name']} loaded successfully.") | |
except Exception as e: | |
print(f"Error loading model {model_config['name']}: {e}") | |
def load_all_models(self): | |
with ThreadPoolExecutor() as executor: | |
for config in model_configs: | |
executor.submit(self.load_model, config) | |
return self.models | |
model_manager = ModelManager() | |
global_data['models'] = model_manager.load_all_models() | |
class ChatRequest(BaseModel): | |
message: str | |
def normalize_input(input_text): | |
return input_text.strip() | |
def remove_duplicates(text): | |
text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you? [/INST]', text) | |
text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you? [/INST]', text) | |
text = text.replace('[/INST]', '') | |
lines = text.split('\n') | |
unique_lines = [] | |
seen_lines = set() | |
for line in lines: | |
if line not in seen_lines: | |
unique_lines.append(line) | |
seen_lines.add(line) | |
return '\n'.join(unique_lines) | |
PROXY_URL = "https://uhhy-fsfsfs.hf.space/valid" | |
def get_random_proxy(): | |
try: | |
response = requests.get(PROXY_URL) | |
proxies = response.text.splitlines() | |
return random.choice(proxies) | |
except Exception as e: | |
print(f"Error fetching proxy: {e}") | |
return None | |
fake = Faker() | |
def generate_fake_ip(): | |
return fake.ipv4() | |
def get_random_user_agent(): | |
user_agents = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7; rv:89.0) Gecko/20100101 Firefox/89.0", | |
"Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0", | |
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Mobile/15E148 Safari/604.1", | |
"Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Mobile/15E148 Safari/604.1", | |
"Mozilla/5.0 (Android 11; Mobile; rv:89.0) Gecko/89.0 Firefox/89.0" | |
] | |
return random.choice(user_agents) | |
def get_model_name_from_url(url: str) -> str: | |
"""Extracts the model name from a Hugging Face model URL.""" | |
parsed_url = urlparse(url) | |
path_parts = parsed_url.path.split('/') | |
if len(path_parts) >= 2: | |
return path_parts[-2] | |
else: | |
return "Unknown Model" | |
def get_model_config_by_name(model_name: str) -> Dict[str, Any]: | |
"""Finds the model configuration based on the model name.""" | |
for config in model_configs: | |
if config['name'] == model_name: | |
return config | |
return {} # Return an empty dictionary if not found | |
def load_model_from_url(url: str) -> Llama: | |
"""Loads a Llama model from a Hugging Face model URL.""" | |
model_name = get_model_name_from_url(url) | |
model_config = get_model_config_by_name(model_name) | |
if model_config: | |
try: | |
print(f"Loading model {model_name}...") | |
model = Llama.from_pretrained( | |
repo_id=model_config['repo_id'], | |
filename=model_config['filename'], | |
use_auth_token=HUGGINGFACE_TOKEN | |
) | |
print(f"Model {model_name} loaded successfully.") | |
return model | |
except Exception as e: | |
print(f"Error loading model {model_name}: {e}") | |
else: | |
print(f"Model configuration not found for {model_name}") | |
return None | |
async def generate_model_response(model: Llama, inputs: str) -> str: | |
"""Generates a response from the model.""" | |
try: | |
print(f"Generating response for model: {model}") | |
response = model(inputs) | |
print(f"Response from {model}: {response}") | |
return remove_duplicates(response['choices'][0]['text']) | |
except Exception as e: | |
print(f"Error with model: {e}") | |
return "Error generating response. Please try again later." | |
def remove_repetitive_responses(responses: Dict[str, str]) -> Dict[str, str]: | |
"""Removes duplicate responses from a dictionary of model responses.""" | |
unique_responses = {} | |
for model, response in responses.items(): | |
if response not in unique_responses: | |
unique_responses[model] = response | |
return unique_responses | |
async def process_message(message: str) -> Tuple[str, str]: | |
"""Processes a user message and generates responses from multiple LLMs.""" | |
inputs = normalize_input(message) | |
# Retrieve models from global_data and process responses | |
responses = {} | |
for model_name, model in global_data['models'].items(): | |
responses[model_name] = await generate_model_response(model, inputs) | |
unique_responses = remove_repetitive_responses(responses) | |
formatted_response = "" | |
for model, response in unique_responses.items(): | |
formatted_response += f"**{model}:**\n{response}\n\n" | |
curl_command = f""" | |
curl -X POST -H "Content-Type: application/json" \\ | |
-d '{{"message": "{message}"}}' \\ | |
http://localhost:7860/generate | |
""" | |
return formatted_response, curl_command | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
app.mount("/", StaticFiles(directory="public", html=True), name="static") | |
async def generate_response(request: Request): | |
"""Handles API requests to generate responses.""" | |
data = await request.json() | |
message = data.get("message") | |
if not message: | |
return JSONResponse(status_code=400, content={"error": "Message is required."}) | |
response, _ = await process_message(message) | |
return JSONResponse(content={"response": response}) | |
iface = gr.Interface( | |
fn=process_message, | |
inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), | |
outputs=[gr.Markdown(), gr.Textbox(label="cURL command")], | |
title="Multi-Model LLM API", | |
description="Enter a message and get responses from multiple LLMs.", | |
) | |
def anonymize_ip(): | |
"""Continuously updates IP addresses to anonymize requests.""" | |
while True: | |
sleep(0) | |
os.environ['HTTP_X_FORWARDED_FOR'] = generate_fake_ip() | |
os.environ['REMOTE_ADDR'] = generate_fake_ip() | |
Thread(target=anonymize_ip).start() | |
if __name__ == "__main__": | |
iface.launch(share=True) |