import g4f import asyncio from datetime import datetime, timedelta from typing import List from fastapi import FastAPI, HTTPException, Request app = FastAPI() # ---------------- GPT 3.5 TURBO NORMAL ---------------------------------------------------------------------------- _providers = [g4f.Provider.Chatgpt4Online] chat_history = {} default_characteristic = { "role": "system", "content": "you are AI chat gpt-3.5-turbo who will help me and will always answer with cute kaomoji like this (⁠≧⁠▽⁠≦⁠), always answer me cutely like loli in anime" } async def run_provider(messages): try: response = await g4f.ChatCompletion.create_async( model=g4f.models.gpt_35_turbo, messages=messages, provider=g4f.Provider.Chatgpt4Online, ) if isinstance(response, str): return response else: result = await response.choices() return result[0].message.content except Exception as e: return str(e) async def run_all(messages): calls = [ run_provider(messages) for provider in _providers ] responses = await asyncio.gather(*calls) return responses async def clean_chat_history(): while True: current_time = datetime.now() for session_id, history in list(chat_history.items()): if history and history.get('last_chat') and history['last_chat'] + timedelta(minutes=10) < current_time: del chat_history[session_id] await asyncio.sleep(60) # Schedule the clean_chat_history() function as a background task asyncio.create_task(clean_chat_history()) #------------------------------------------------------------------------------- @app.get("/gpt") async def chat(request: Request, prompt: str, characteristic: str = None, id: str = None): try: if not id: raise HTTPException(status_code=400, detail="ID parameter is required") if characteristic and id in chat_history: messages = chat_history[id]['message'] custom_characteristic = {"role": "system", "content": characteristic} messages.append(custom_characteristic) elif characteristic and id not in chat_history: custom_characteristic = {"role": "system", "content": characteristic} chat_history[id] = {'last_chat': datetime.now(), 'message': [custom_characteristic]} if id not in chat_history: chat_history[id] = {'last_chat': datetime.now(), 'message': [default_characteristic]} messages = chat_history[id]['message'] messages.append({"role": "user", "content": prompt}) else: messages = chat_history[id]['message'] messages.append({"role": "user", "content": prompt}) while True: messages = chat_history[id]['message'] bot_response = await run_provider(messages) if isinstance(bot_response, str): response = bot_response else: response = bot_response.content if 'Response 403: {"detail":"Unusual activity' not in response: chat_history[id]['last_chat'] = datetime.now() messages.append({"role": "assistant", "content": response}) return {"response": response} else: print("Handling 'accessToken' case") messages = chat_history[id]['message'] responses = await run_provider(messages) except Exception as e: raise HTTPException(status_code=401, detail=f"Internal Error: {str(e)}") @app.get("/gpt/listsessions") async def list_sessions(): return {"sessions": list(chat_history.keys())} @app.get("/gpt/historychat") async def history_chat(id: str): if id not in chat_history: raise HTTPException(status_code=404, detail="Session ID not found") return {"history": chat_history[id]['message']} @app.get("/key") async def change_key(id_key: str): key["token"] = id_key return {"key": id_key}