from openai import OpenAI import json_repair from transformers import AutoTokenizer from openai import RateLimitError import time class ChatbotSimulation: def __init__(self, site_map, page_details, user_state, task, app_name, log_location, openai_api_key, agent='human', max_steps=50, max_tokens=8192, buffer_tokens=500): self.sitemap = site_map self.page_details = page_details self.user_state = user_state self.user_state['current_page'] = 'Home' # Initialize current page self.user_state['last_page'] = 'Home' self.user_state['task_completed'] = 'False' self.task = task self.app_name = app_name self.log_location = log_location self.agent = agent.lower() if self.agent not in ['human', 'llm']: raise ValueError("Invalid agent type. Expected 'Human' or 'llm'.") self.max_steps = max_steps self.max_tokens = max_tokens self.buffer_tokens = buffer_tokens self.conversation = [] # Stores recent conversation snippets self.prompt_count = 0 self.client = OpenAI(api_key=openai_api_key) self.actions = [] self.tokenizer = AutoTokenizer.from_pretrained("gpt2") def _get_page_uid(self, page_name): """Retrieve the UID of the given page from the sitemap.""" return self.sitemap['pages'].get(page_name, {}).get('uid') def _get_page_details(self, page_name): """Retrieve the page details using its UID.""" uid = self._get_page_uid(page_name) return self.page_details.get(uid, {}) def _generate_system_prompt(self): """Create a dynamic system prompt based on the current state.""" current_page = self.user_state['current_page'] last_page = self.user_state['last_page'] page_info = self._get_page_details(current_page) return f""" You are a text-based simulator of {self.app_name} app. You are interacting with a user. User's task is: {self.task}. User's last page was {last_page} and the user have taken actions: {self.actions}. After action, user is currently on the {current_page} page. Current user state: {self.user_state}. Page Information: - **If the user requests page you do not possess** (such as a list of restaurants, menus, or similar details), you are permitted to create plausible and relevant information to fulfill the request. Present this fabricated information convincingly as if it were real data. {page_info} - **Features**: Represent available options the user can select on this page. - **User Data**: Represents user-specific data accessible on this page. Provide instructions or request input from the user. If the user provides an invalid action, respond with: "Invalid action. Please select a valid option." ### Instruction Format: You are at the {current_page} page. You have the following options: 1. Feature 1 2. Feature 2 3. Feature 3 4. Feature 4 Please enter your choice as 'Number. Description'. If you have a query, enter as 'Number. Description: query' Rules: - Be sure to display all options that is available in features. - Be robotic and emotionless. Avoid offering any advice to the user. """ def _get_openai_response(self, prompt): """Fetch response from OpenAI API.""" self._trim_conversation() while True: try: response = self.client.chat.completions.create( model="gpt-4", messages=prompt, max_tokens=self.buffer_tokens, # Adjusted max_tokens if needed temperature=0.7, ) return response.choices[0].message.content except RateLimitError as e: # Parse the suggested retry time from the error message, default to 5s if not available wait_time = 5 try: # Attempt to get the time from the error message wait_time = float(e.response['error']['message'].split("in ")[1].split("s")[0]) except (KeyError, IndexError, ValueError): print("Could not parse wait time from error message. Defaulting to 5 seconds.") print(f"Rate limit reached. Retrying in {wait_time} seconds...") time.sleep(wait_time) def _calculate_token_count(self, conversation): """Accurately calculate the token count in the conversation using a tokenizer.""" total_tokens = 0 for entry in conversation: # Tokenize each entry content and count tokens tokens = self.tokenizer.encode(entry['content'], truncation=False, add_special_tokens=False) total_tokens += len(tokens) return total_tokens def _trim_conversation(self): """Trim the conversation to keep it within the token limit.""" while self._calculate_token_count(self.conversation) >= (self.max_tokens - self.buffer_tokens * 2): self.conversation.pop(0) def one_conversation_round(self, user_input): """Conduct one round of conversation between the user and the assistant.""" # User provides input self.actions.append(user_input + f'on {self.user_state["current_page"]} page') self.conversation.append({"role": "user", "content": user_input}) self.prompt_count += 1 # Update user state using GPT's response update_prompt = f""" If user takes action '{user_input}' on {self.user_state['current_page']} page, which page will they move to? Recall user's task: {self.task} Update the user_state dictionary based on user's last action: Current user_state: {self.user_state} Sitemap: {self.sitemap} Instructions: 1. If the 'current_page' has changed, update it to a page from the sitemap. 2. If the task is finished, update 'task_completed' to True. Otherwise, leave it as False. 3. If no updates are needed, return the user state exactly as provided, without modification. Important: - Ensure 'current_page' and 'task_completed' are keys in the returned dictionary. - Return only the dictionary without additional output or wrapping. Example Output Format: {{ 'current_page': 'Home', 'last_page': 'Home', 'task_completed': 'False', }} """ self.conversation.append({"role": "assistant", "content": update_prompt}) updated_state = self._get_openai_response(self.conversation) self.conversation.pop(-1) # Parse and update the user state updated_state = json_repair.loads(updated_state) required_keys = {'current_page', 'last_page', 'task_completed'} while not isinstance(updated_state, dict) or not required_keys.issubset(updated_state.keys()): transform_prompt = f"""" Transform {updated_state} to a properly formate JSON file. Example Output Format: {{ 'current_page': 'Home', 'last_page': 'Home', 'task_completed': 'False', }} """ updated_state = self._get_openai_response([{"role": "system", "content": transform_prompt}]) updated_state = json_repair.loads(updated_state) try: if updated_state['task_completed'].lower() == 'true': return f"Task completed! You took {self.prompt_count} steps." except: updated_state['task_completed'] = 'False' self.user_state = updated_state #self.conversation.clear() system_prompt = self._generate_system_prompt() # GPT generates the page instructions self.conversation = [entry for entry in self.conversation if entry["role"] != "system"] self.conversation.append({"role": "system", "content": system_prompt}) gpt_instruction = self._get_openai_response(self.conversation) self.conversation.append({"role": "assistant", "content": gpt_instruction}) return gpt_instruction def start_conversation(self): greeting = f'\n Welcome to {self.app_name} simulator! Your task is: {self.task}. \n' system_prompt = self._generate_system_prompt() # GPT generates the page instructions self.conversation.append({"role": "system", "content": system_prompt}) gpt_instruction = self._get_openai_response(self.conversation) self.conversation.append({"role": "assistant", "content": gpt_instruction}) return greeting + gpt_instruction