App_Simulator / chatbot_simulator.py
jjz5463's picture
reduce element in conversation history
e1f0dec
raw
history blame
8.63 kB
from openai import OpenAI
import json_repair
from transformers import AutoTokenizer
from openai import RateLimitError
import time
class ChatbotSimulation:
def __init__(self, site_map, page_details, user_state, task,
app_name, log_location, openai_api_key, agent='human',
max_steps=50, max_tokens=8192, buffer_tokens=500):
self.sitemap = site_map
self.page_details = page_details
self.user_state = user_state
self.user_state['current_page'] = 'Home' # Initialize current page
self.user_state['last_page'] = 'Home'
self.user_state['task_completed'] = 'False'
self.task = task
self.app_name = app_name
self.log_location = log_location
self.agent = agent.lower()
if self.agent not in ['human', 'llm']:
raise ValueError("Invalid agent type. Expected 'Human' or 'llm'.")
self.max_steps = max_steps
self.max_tokens = max_tokens
self.buffer_tokens = buffer_tokens
self.conversation = [] # Stores recent conversation snippets
self.prompt_count = 0
self.client = OpenAI(api_key=openai_api_key)
self.actions = []
self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
def _get_page_uid(self, page_name):
"""Retrieve the UID of the given page from the sitemap."""
return self.sitemap['pages'].get(page_name, {}).get('uid')
def _get_page_details(self, page_name):
"""Retrieve the page details using its UID."""
uid = self._get_page_uid(page_name)
return self.page_details.get(uid, {})
def _generate_system_prompt(self):
"""Create a dynamic system prompt based on the current state."""
current_page = self.user_state['current_page']
last_page = self.user_state['last_page']
page_info = self._get_page_details(current_page)
return f"""
You are a text-based simulator of {self.app_name} app.
You are interacting with a user. User's task is: {self.task}.
User's last page was {last_page} and the user have taken actions: {self.actions}.
After action, user is currently on the {current_page} page.
Current user state: {self.user_state}.
Page Information:
- **If the user requests page you do not possess** (such as a list of restaurants, menus, or similar details),
you are permitted to create plausible and relevant information to fulfill the request.
Present this fabricated information convincingly as if it were real data.
{page_info}
- **Features**: Represent available options the user can select on this page.
- **User Data**: Represents user-specific data accessible on this page.
Provide instructions or request input from the user. If the user provides an invalid action, respond with:
"Invalid action. Please select a valid option."
### Instruction Format:
<if actions is non-empty: You have successfully done actions[-1]> You are at the {current_page} page. You have the following options:
1. Feature 1
2. Feature 2
3. Feature 3
4. Feature 4
Please enter your choice as 'Number. Description'. If you have a query, enter as 'Number. Description: query'
Rules:
- Be sure to display all options that is available in features.
- Be robotic and emotionless. Avoid offering any advice to the user.
"""
def _get_openai_response(self, prompt):
"""Fetch response from OpenAI API."""
self._trim_conversation()
while True:
try:
response = self.client.chat.completions.create(
model="gpt-4",
messages=prompt,
max_tokens=self.buffer_tokens, # Adjusted max_tokens if needed
temperature=0.7,
)
return response.choices[0].message.content
except RateLimitError as e:
# Parse the suggested retry time from the error message, default to 5s if not available
wait_time = 5
try:
# Attempt to get the time from the error message
wait_time = float(e.response['error']['message'].split("in ")[1].split("s")[0])
except (KeyError, IndexError, ValueError):
print("Could not parse wait time from error message. Defaulting to 5 seconds.")
print(f"Rate limit reached. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
def _calculate_token_count(self, conversation):
"""Accurately calculate the token count in the conversation using a tokenizer."""
total_tokens = 0
for entry in conversation:
# Tokenize each entry content and count tokens
tokens = self.tokenizer.encode(entry['content'], truncation=False, add_special_tokens=False)
total_tokens += len(tokens)
return total_tokens
def _trim_conversation(self):
"""Trim the conversation to keep it within the token limit."""
while self._calculate_token_count(self.conversation) >= (self.max_tokens - self.buffer_tokens * 2):
self.conversation.pop(0)
def one_conversation_round(self, user_input):
"""Conduct one round of conversation between the user and the assistant."""
# User provides input
self.actions.append(user_input + f'on {self.user_state["current_page"]} page')
self.conversation.append({"role": "user", "content": user_input})
self.prompt_count += 1
# Update user state using GPT's response
update_prompt = f"""
If user takes action '{user_input}' on {self.user_state['current_page']} page, which page will they move to?
Recall user's task: {self.task}
Update the user_state dictionary based on user's last action:
Current user_state: {self.user_state}
Sitemap: {self.sitemap}
Instructions:
1. If the 'current_page' has changed, update it to a page from the sitemap.
2. If the task is finished, update 'task_completed' to True. Otherwise, leave it as False.
3. If no updates are needed, return the user state exactly as provided, without modification.
Important:
- Ensure 'current_page' and 'task_completed' are keys in the returned dictionary.
- Return only the dictionary without additional output or wrapping.
Example Output Format:
{{
'current_page': 'Home',
'last_page': 'Home',
'task_completed': 'False',
}}
"""
self.conversation.append({"role": "assistant", "content": update_prompt})
updated_state = self._get_openai_response(self.conversation)
self.conversation.pop(-1)
# Parse and update the user state
updated_state = json_repair.loads(updated_state)
required_keys = {'current_page', 'last_page', 'task_completed'}
while not isinstance(updated_state, dict) or not required_keys.issubset(updated_state.keys()):
transform_prompt = f""""
Transform {updated_state} to a properly formate JSON file.
Example Output Format:
{{
'current_page': 'Home',
'last_page': 'Home',
'task_completed': 'False',
}}
"""
updated_state = self._get_openai_response([{"role": "system", "content": transform_prompt}])
updated_state = json_repair.loads(updated_state)
try:
if updated_state['task_completed'].lower() == 'true':
return f"Task completed! You took {self.prompt_count} steps."
except:
updated_state['task_completed'] = 'False'
self.user_state = updated_state
#self.conversation.clear()
system_prompt = self._generate_system_prompt()
# GPT generates the page instructions
self.conversation = [entry for entry in self.conversation if entry["role"] != "system"]
self.conversation.append({"role": "system", "content": system_prompt})
gpt_instruction = self._get_openai_response(self.conversation)
self.conversation.append({"role": "assistant", "content": gpt_instruction})
return gpt_instruction
def start_conversation(self):
greeting = f'\n Welcome to {self.app_name} simulator! Your task is: {self.task}. \n'
system_prompt = self._generate_system_prompt()
# GPT generates the page instructions
self.conversation.append({"role": "system", "content": system_prompt})
gpt_instruction = self._get_openai_response(self.conversation)
self.conversation.append({"role": "assistant", "content": gpt_instruction})
return greeting + gpt_instruction