Eric Michael Martinez
update
9787302
raw
history blame
14.7 kB
from httpx import AsyncClient
import os
import requests
import gradio as gr
from fastapi import Depends, FastAPI, Request
from app.db import User, create_db_and_tables
from app.schemas import UserCreate, UserRead, UserUpdate
from app.users import auth_backend, current_active_user, fastapi_users
from dotenv import load_dotenv
from tenacity import retry, stop_after_attempt, wait_exponential
import examples as chatbot_examples
# Get the current environment from the environment variable
current_environment = os.getenv("APP_ENV", "dev")
# Load the appropriate .env file based on the current environment
if current_environment == "dev":
load_dotenv(".env.dev")
elif current_environment == "test":
load_dotenv(".env.test")
elif current_environment == "prod":
load_dotenv(".env.prod")
else:
raise ValueError("Invalid environment specified")
import openai
def api_login(email, password):
port = os.getenv("APP_PORT")
scheme = os.getenv("APP_SCHEME")
host = os.getenv("APP_HOST")
url = f"{scheme}://{host}:{port}/auth/jwt/login"
payload = {"username": email, "password": password}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=payload, headers=headers)
if response.status_code == 200:
response_json = response.json()
api_key = response_json["access_token"]
return True, api_key
else:
response_json = response.json()
detail = response_json["detail"]
return False, detail
def get_api_key(email, password):
successful, message = api_login(email, password)
if successful:
return os.getenv("APP_API_BASE"), message
else:
raise gr.Error(message)
return "", ""
# Define a function to get the AI's reply using the OpenAI API
def get_ai_reply(
message,
model="gpt-3.5-turbo",
system_message=None,
temperature=0,
message_history=[],
):
# Initialize the messages list
messages = []
# Add the system message to the messages list
if system_message is not None:
messages += [{"role": "system", "content": system_message}]
# Add the message history to the messages list
if message_history is not None:
messages += message_history
# Add the user's message to the messages list
messages += [{"role": "user", "content": message}]
# Make an API call to the OpenAI ChatCompletion endpoint with the model and messages
completion = openai.ChatCompletion.create(
model=model, messages=messages, temperature=temperature
)
# Extract and return the AI's response from the API response
return completion.choices[0].message.content.strip()
def get_ai_image(prompt, size="512x512"):
response = openai.Image.create(prompt=prompt, n=1, size=size)
image_1_url = response.data[0]["url"]
return image_1_url
def get_ai_transcript(path_to_audio, language=None):
audio_file = open(path_to_audio, "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file, language=language)
return transcript.text
def generate_transcription(path_to_audio_file):
try:
transcript = get_ai_transcript(path_to_audio_file)
return transcript
except Exception as e:
raise gr.Error(e)
return ""
def generate_image(prompt):
try:
image_url = get_ai_image(prompt)
return image_url
except Exception as e:
raise gr.Error(e)
return None
# Define a function to handle the chat interaction with the AI model
def chat(message, chatbot_messages, model, temperature, system_message):
history_openai_format = []
for human, assistant in chatbot_messages:
history_openai_format.append({"role": "user", "content": human})
history_openai_format.append({"role": "assistant", "content": assistant})
# Try to get the AI's reply using the get_ai_reply function
try:
ai_reply = get_ai_reply(
message,
model=model,
system_message=system_message,
message_history=history_openai_format,
temperature=temperature,
)
except Exception as e:
# If an error occurs, raise a Gradio error
raise gr.Error(e)
# Return None (empty out the user's message textbox), the updated chatbot_messages
return ai_reply
# Define a function to launch the chatbot interface using Gradio
def get_chatbot_app(additional_examples=[]):
# Load chatbot examples and merge with any additional examples provided
examples = chatbot_examples.load_examples(additional=additional_examples)
# Define a function to get the names of the examples
def get_examples():
return [example["name"] for example in examples]
# Define a function to choose an example based on the index
def choose_example(index):
if index != None:
system_message = examples[index]["system_message"].strip()
user_message = examples[index]["message"].strip()
return system_message, user_message, [], []
else:
return "", "", [], []
# Create the Gradio interface using the Blocks layout
with gr.Blocks() as app:
with gr.Tab("Conversation"):
with gr.Row():
# Create a textbox for the system message (prompt)
system_message = gr.TextArea(
label="System Message (Prompt)",
value="You are a helpful assistant.",
lines=20,
max_lines=400,
)
# Create a chatbot interface for the conversation
chatbot = gr.ChatInterface(
chat,
additional_inputs=[
gr.Dropdown(
["gpt-3.5-turbo", "gpt-3.5-turbo-16k", "gpt-4"],
label="Model",
value="gpt-3.5-turbo",
),
gr.Slider(
label="Temperature",
minimum=0,
maximum=2,
step=0.1,
value=0,
),
system_message,
],
)
with gr.Tab("Image Generation"):
image_prompt = gr.Textbox(
label="Prompt", placeholder="A cute puppy wearing sunglasses."
)
image_btn = gr.Button(value="Generate")
image = gr.Image(label="Result", interactive=False, type="filepath")
image_btn.click(generate_image, inputs=[image_prompt], outputs=[image])
with gr.Tab("Speech-to-text"):
audio_file = gr.Audio(label="Audio", source="microphone", type="filepath")
transcribe = gr.Button(value="Transcribe")
audio_transcript = gr.Textbox(label="Transcription", interactive=False)
transcribe.click(
generate_transcription, inputs=[audio_file], outputs=[audio_transcript]
)
with gr.Tab("Get API Key"):
email_box = gr.Textbox(label="Email Address", placeholder="Student Email")
password_box = gr.Textbox(
label="Password", type="password", placeholder="Student ID"
)
btn = gr.Button(value="Generate")
api_host_box = gr.Textbox(label="OpenAI API Base", interactive=False)
api_key_box = gr.Textbox(label="OpenAI API Key", interactive=False)
btn.click(
get_api_key,
inputs=[email_box, password_box],
outputs=[api_host_box, api_key_box],
)
# Return the app
return app
app = FastAPI()
app.include_router(
fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"]
)
app.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["auth"],
)
app.include_router(
fastapi_users.get_users_router(UserRead, UserUpdate),
prefix="/users",
tags=["users"],
)
@app.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_user)):
return {"message": f"Hello {user.email}!"}
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/embeddings")
async def openai_api_embeddings_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/embeddings",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/engines/text-embedding-ada-002/embeddings")
async def openai_api_embeddings_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/engines/text-embedding-ada-002/embeddings",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/completions")
async def openai_api_completions_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/completions",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/chat/completions")
async def openai_api_chat_completions_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# if "gpt-4" in request_data["model"]:
# request_data["model"] = "gpt-3.5-turbo"
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/images/generations")
async def openai_api_chat_completions_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/images/generations",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/audio/speech")
async def openai_api_audio_speech_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/audio/speech",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@app.on_event("startup")
async def on_startup():
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()
gradio_gui = get_chatbot_app()
gradio_gui.auth = api_login
gradio_gui.auth_message = "Welcome, to the 4341 OpenAI Service"
app = gr.mount_gradio_app(app, gradio_gui, path="/")