Corvius's picture
SO BACK YEEEEEEEEEEEEEEEEEEEEEEEEEEE i love open router
09d5baa verified
import gradio as gr
import requests
import json
import os
import datetime
from requests.exceptions import RequestException
API_URL = os.environ.get('API_URL')
if API_URL is None:
raise ValueError("API_URL not set in env.")
API_KEYS = os.environ.get('API_KEYS')
if API_KEYS is None:
raise ValueError("no keys in env")
api_keys_list = [key.strip() for key in API_KEYS.strip().splitlines() if key.strip()]
if not api_keys_list:
raise ValueError("no valid keys in env")
# dee baag seeweeechieeezzz u got no beechieezzz
USER_LOGGING_ENABLED = False
RESPONSE_LOGGING_ENABLED = True
DEFAULT_PARAMS = {
"temperature": 0.8,
"top_p": 0.95,
"frequency_penalty": 0,
"presence_penalty": 0,
"max_tokens": 512
}
def get_timestamp():
return datetime.datetime.now().strftime("%H:%M:%S")
def predict(message, history, system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag):
history_format = [{"role": "system", "content": system_prompt}] if system_prompt else []
for human, assistant in history:
history_format.append({"role": "user", "content": human})
if assistant:
history_format.append({"role": "assistant", "content": assistant})
history_format.append({"role": "user", "content": message})
if USER_LOGGING_ENABLED and not message.startswith(('*', '"')):
print(f"<|system|> {system_prompt}")
print(f"{get_timestamp()} <|user|> {message}")
current_params = {
"temperature": temperature,
"top_p": top_p,
"frequency_penalty": frequency_penalty,
"presence_penalty": presence_penalty,
"max_tokens": int(max_tokens)
}
non_default_params = {k: v for k, v in current_params.items() if v != DEFAULT_PARAMS[k]}
if USER_LOGGING_ENABLED and non_default_params and not message.startswith(('*', '"')):
for param, value in non_default_params.items():
print(f"{param}={value}")
data = {
"model": "meta/llama-3.1-405b-instruct",
"messages": history_format,
"stream": True,
**current_params
}
partial_message = ""
for api_key in api_keys_list:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
try:
response = requests.post(API_URL, headers=headers, data=json.dumps(data), stream=True)
if response.status_code == 200:
for line in response.iter_lines():
if stop_flag[0]:
response.close()
break
if line:
line = line.decode('utf-8')
if RESPONSE_LOGGING_ENABLED:
print(f"API Response: {line}")
if line.startswith("data: "):
if line.strip() == "data: [DONE]":
break
try:
json_data = json.loads(line[6:])
if 'choices' in json_data and json_data['choices']:
delta = json_data['choices'][0]['delta']
content = delta.get('content', '')
if content:
partial_message += content
yield partial_message
except json.JSONDecodeError:
continue
if partial_message:
yield partial_message
return
elif response.status_code == 429:
print(f"API key {api_key} hit rate limit, trying next key.")
continue
else:
response_text = response.text
print(f"Request error with API key {api_key}: {response.status_code} {response_text}")
yield f"An error occurred: {response_text}"
return
except RequestException as e:
print(f"Request exception with API key {api_key}: {e}")
yield f"An error occurred: {str(e)}"
return
print("All keys rate limited or ded.")
yield "All keys rate limited or ded."
def import_chat(custom_format_string):
try:
sections = custom_format_string.split('<|')
imported_history = []
system_prompt = ""
for section in sections:
if section.startswith('system|>'):
system_prompt = section.replace('system|>', '').strip()
elif section.startswith('user|>'):
user_message = section.replace('user|>', '').strip()
imported_history.append([user_message, None])
elif section.startswith('assistant|>'):
assistant_message = section.replace('assistant|>', '').strip()
if imported_history:
imported_history[-1][1] = assistant_message
else:
imported_history.append(["", assistant_message])
return imported_history, system_prompt
except Exception as e:
print(f"Error importing chat: {e}")
return None, None
def export_chat(history, system_prompt):
export_data = ""
if system_prompt:
export_data += f"<|system|> {system_prompt}\n\n"
if history is not None:
for user_msg, assistant_msg in history:
export_data += f"<|user|> {user_msg}\n\n"
if assistant_msg:
export_data += f"<|assistant|> {assistant_msg}\n\n"
return export_data
def stop_generation_func(stop_flag):
stop_flag[0] = True
return stop_flag
with gr.Blocks(theme='gradio/monochrome') as demo:
stop_flag = gr.State([False])
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(value=[])
msg = gr.Textbox(label="Message")
with gr.Row():
clear = gr.Button("Clear")
regenerate = gr.Button("Regenerate")
stop_btn = gr.Button("Stop")
with gr.Row():
with gr.Column(scale=4):
import_textbox = gr.Textbox(label="Import textbox", lines=5)
with gr.Column(scale=1):
export_button = gr.Button("Export Chat")
import_button = gr.Button("Import Chat")
with gr.Column(scale=1):
system_prompt = gr.Textbox("", label="System Prompt", lines=5)
temperature = gr.Slider(0, 2, value=0.8, step=0.01, label="Temperature")
top_p = gr.Slider(0, 1, value=0.95, step=0.01, label="Top P")
frequency_penalty = gr.Slider(-2, 2, value=0, step=0.1, label="Frequency Penalty")
presence_penalty = gr.Slider(-2, 2, value=0, step=0.1, label="Presence Penalty")
max_tokens = gr.Slider(1, 4096, value=512, step=1, label="Max Output (max_tokens)")
def user(user_message, history):
history = history or []
return "", history + [[user_message, None]]
def bot(history, system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag):
stop_flag[0] = False
history = history or []
if not history:
return history
user_message = history[-1][0]
bot_message = predict(user_message, history[:-1], system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag)
history[-1][1] = ""
for chunk in bot_message:
if stop_flag[0]:
history[-1][1] += " [Generation stopped]"
break
history[-1][1] = chunk
yield history
def regenerate_response(history, system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag):
if history and len(history) > 0:
last_user_message = history[-1][0]
history[-1][1] = None
for new_history in bot(history, system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag):
yield new_history
else:
yield []
def import_chat_wrapper(custom_format_string):
imported_history, imported_system_prompt = import_chat(custom_format_string)
return imported_history, imported_system_prompt
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, [chatbot, system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag], chatbot
)
clear.click(lambda: None, None, chatbot, queue=False)
regenerate.click(
regenerate_response,
[chatbot, system_prompt, temperature, top_p, frequency_penalty, presence_penalty, max_tokens, stop_flag],
chatbot
)
import_button.click(import_chat_wrapper, inputs=[import_textbox], outputs=[chatbot, system_prompt])
export_button.click(
export_chat,
inputs=[chatbot, system_prompt],
outputs=[import_textbox]
)
stop_btn.click(stop_generation_func, inputs=[stop_flag], outputs=[stop_flag])
if __name__ == "__main__":
demo.queue(max_size=3, default_concurrency_limit=3).launch(debug=True)