Spaces:
Sleeping
Sleeping
from openai import OpenAI | |
import json_repair | |
from transformers import AutoTokenizer | |
from openai import RateLimitError | |
import time | |
class ChatbotSimulation: | |
def __init__(self, site_map, page_details, user_state, task, | |
app_name, log_location, openai_api_key, agent='human', | |
max_steps=50, max_tokens=8192, buffer_tokens=500): | |
self.sitemap = site_map | |
self.page_details = page_details | |
self.user_state = user_state | |
self.user_state['current_page'] = 'Home' # Initialize current page | |
self.user_state['last_page'] = 'Home' | |
self.user_state['task_completed'] = 'False' | |
self.task = task | |
self.app_name = app_name | |
self.log_location = log_location | |
self.agent = agent.lower() | |
if self.agent not in ['human', 'llm']: | |
raise ValueError("Invalid agent type. Expected 'Human' or 'llm'.") | |
self.max_steps = max_steps | |
self.max_tokens = max_tokens | |
self.buffer_tokens = buffer_tokens | |
self.conversation = [] # Stores recent conversation snippets | |
self.prompt_count = 0 | |
self.client = OpenAI(api_key=openai_api_key) | |
self.actions = [] | |
self.tokenizer = AutoTokenizer.from_pretrained("gpt2") | |
def _get_page_uid(self, page_name): | |
"""Retrieve the UID of the given page from the sitemap.""" | |
return self.sitemap['pages'].get(page_name, {}).get('uid') | |
def _get_page_details(self, page_name): | |
"""Retrieve the page details using its UID.""" | |
uid = self._get_page_uid(page_name) | |
return self.page_details.get(uid, {}) | |
def _generate_system_prompt(self): | |
"""Create a dynamic system prompt based on the current state.""" | |
current_page = self.user_state['current_page'] | |
last_page = self.user_state['last_page'] | |
page_info = self._get_page_details(current_page) | |
return f""" | |
You are a text-based simulator of {self.app_name} app. | |
You are interacting with a user. User's task is: {self.task}. | |
User's last page was {last_page} and the user have taken actions: {self.actions}. | |
After action, user is currently on the {current_page} page. | |
Current user state: {self.user_state}. | |
Page Information: | |
- **If the user requests page you do not possess** (such as a list of restaurants, menus, or similar details), | |
you are permitted to create plausible and relevant information to fulfill the request. | |
Present this fabricated information convincingly as if it were real data. | |
{page_info} | |
- **Features**: Represent available options the user can select on this page. | |
- **User Data**: Represents user-specific data accessible on this page. | |
Provide instructions or request input from the user. If the user provides an invalid action, respond with: | |
"Invalid action. Please select a valid option." | |
### Instruction Format: | |
<if actions is non-empty: You have successfully done actions[-1]> You are at the {current_page} page. You have the following options: | |
1. Feature 1 | |
2. Feature 2 | |
3. Feature 3 | |
4. Feature 4 | |
Please enter your choice as 'Number. Description'. If you have a query, enter as 'Number. Description: query' | |
Rules: | |
- Be sure to display all options that is available in features. | |
- Be robotic and emotionless. Avoid offering any advice to the user. | |
""" | |
def _get_openai_response(self, prompt): | |
"""Fetch response from OpenAI API.""" | |
self._trim_conversation() | |
while True: | |
try: | |
response = self.client.chat.completions.create( | |
model="gpt-4", | |
messages=prompt, | |
max_tokens=self.buffer_tokens, # Adjusted max_tokens if needed | |
temperature=0.7, | |
) | |
return response.choices[0].message.content | |
except RateLimitError as e: | |
# Parse the suggested retry time from the error message, default to 5s if not available | |
wait_time = 5 | |
try: | |
# Attempt to get the time from the error message | |
wait_time = float(e.response['error']['message'].split("in ")[1].split("s")[0]) | |
except (KeyError, IndexError, ValueError): | |
print("Could not parse wait time from error message. Defaulting to 5 seconds.") | |
print(f"Rate limit reached. Retrying in {wait_time} seconds...") | |
time.sleep(wait_time) | |
def _calculate_token_count(self, conversation): | |
"""Accurately calculate the token count in the conversation using a tokenizer.""" | |
total_tokens = 0 | |
for entry in conversation: | |
# Tokenize each entry content and count tokens | |
tokens = self.tokenizer.encode(entry['content'], truncation=False, add_special_tokens=False) | |
total_tokens += len(tokens) | |
return total_tokens | |
def _trim_conversation(self): | |
"""Trim the conversation to keep it within the token limit.""" | |
while self._calculate_token_count(self.conversation) >= (self.max_tokens - self.buffer_tokens * 2): | |
self.conversation.pop(0) | |
def one_conversation_round(self, user_input): | |
"""Conduct one round of conversation between the user and the assistant.""" | |
# User provides input | |
self.actions.append(user_input + f'on {self.user_state["current_page"]} page') | |
self.conversation.append({"role": "user", "content": user_input}) | |
self.prompt_count += 1 | |
# Update user state using GPT's response | |
update_prompt = f""" | |
If user takes action '{user_input}' on {self.user_state['current_page']} page, which page will they move to? | |
Recall user's task: {self.task} | |
Update the user_state dictionary based on user's last action: | |
Current user_state: {self.user_state} | |
Sitemap: {self.sitemap} | |
Instructions: | |
1. If the 'current_page' has changed, update it to a page from the sitemap. | |
2. If the task is finished, update 'task_completed' to True. Otherwise, leave it as False. | |
3. If no updates are needed, return the user state exactly as provided, without modification. | |
Important: | |
- Ensure 'current_page' and 'task_completed' are keys in the returned dictionary. | |
- Return only the dictionary without additional output or wrapping. | |
Example Output Format: | |
{{ | |
'current_page': 'Home', | |
'last_page': 'Home', | |
'task_completed': 'False', | |
}} | |
""" | |
self.conversation.append({"role": "assistant", "content": update_prompt}) | |
updated_state = self._get_openai_response(self.conversation) | |
self.conversation.pop(-1) | |
# Parse and update the user state | |
updated_state = json_repair.loads(updated_state) | |
required_keys = {'current_page', 'last_page', 'task_completed'} | |
while not isinstance(updated_state, dict) or not required_keys.issubset(updated_state.keys()): | |
transform_prompt = f"""" | |
Transform {updated_state} to a properly formate JSON file. | |
Example Output Format: | |
{{ | |
'current_page': 'Home', | |
'last_page': 'Home', | |
'task_completed': 'False', | |
}} | |
""" | |
updated_state = self._get_openai_response([{"role": "system", "content": transform_prompt}]) | |
updated_state = json_repair.loads(updated_state) | |
try: | |
if updated_state['task_completed'].lower() == 'true': | |
return f"Task completed! You took {self.prompt_count} steps." | |
except: | |
updated_state['task_completed'] = 'False' | |
self.user_state = updated_state | |
#self.conversation.clear() | |
system_prompt = self._generate_system_prompt() | |
# GPT generates the page instructions | |
self.conversation = [entry for entry in self.conversation if entry["role"] != "system"] | |
self.conversation.append({"role": "system", "content": system_prompt}) | |
gpt_instruction = self._get_openai_response(self.conversation) | |
self.conversation.append({"role": "assistant", "content": gpt_instruction}) | |
return gpt_instruction | |
def start_conversation(self): | |
greeting = f'\n Welcome to {self.app_name} simulator! Your task is: {self.task}. \n' | |
system_prompt = self._generate_system_prompt() | |
# GPT generates the page instructions | |
self.conversation.append({"role": "system", "content": system_prompt}) | |
gpt_instruction = self._get_openai_response(self.conversation) | |
self.conversation.append({"role": "assistant", "content": gpt_instruction}) | |
return greeting + gpt_instruction | |