from openai import OpenAI import json_repair from transformers import AutoTokenizer from openai import RateLimitError import time from prompts import * import re class ChatbotSimulation: def __init__(self, app_name, site_map, page_details, user_state, system_data, user_data, task, solution, log_location, openai_api_key, agent='human', max_steps=50, max_tokens=8192, buffer_tokens=500): self.app_name = app_name self.sitemap = site_map self.page_details = page_details self.user_state = user_state self.user_state['current_page'] = 'Home' # Initialize current page self.user_state['task_completed'] = 'False' self.user_state['back'] = 'False' self.system_data = system_data self.user_data = user_data self.task = task self.solution = solution self.log_location = log_location self.agent = agent.lower() if self.agent not in ['human', 'llm']: raise ValueError("Invalid agent type. Expected 'Human' or 'llm'.") self.max_steps = max_steps self.max_tokens = max_tokens self.buffer_tokens = buffer_tokens self.conversation = [] # Stores recent conversation snippets self.prompt_count = 0 self.client = OpenAI(api_key=openai_api_key) self.actions = [] self.tokenizer = AutoTokenizer.from_pretrained("gpt2", clean_up_tokenization_spaces=True) # back button self.page_history = ['Home'] def _get_page_uid(self, page_name): """Retrieve the UID of the given page from the sitemap.""" return self.sitemap.get(page_name, {}).get('uid') def _get_page_details(self, page_name): """Retrieve the page details using its UID.""" uid = self._get_page_uid(page_name) return self.page_details.get(uid, {}) def _generate_system_prompt(self): """Create a dynamic system prompt based on the current state.""" #current_page = self.user_state['current_page'] #last_page = self.user_state['last_page'] current_page = self.page_history[-1] if len(self.page_history) >= 1 else "Home" last_page = self.page_history[-2] if len(self.page_history) > 1 else "Home" page_info = self._get_page_details(current_page) return get_system_prompt(app_name=self.app_name, system_data=self.system_data, task=self.task, user_data=self.user_data, current_page=current_page, last_page=last_page, actions=self.actions, user_state=self.user_state, page_info=page_info ) def _get_openai_response(self, prompt): """Fetch response from OpenAI API.""" self._trim_conversation() while True: try: response = self.client.chat.completions.create( model="gpt-4", messages=prompt, max_tokens=self.buffer_tokens, # Adjusted max_tokens if needed temperature=0.7, ) return response.choices[0].message.content except RateLimitError as e: # Parse the suggested retry time from the error message, default to 5s if not available wait_time = 5 try: # Attempt to get the time from the error message wait_time = float(e.response['error']['message'].split("in ")[1].split("s")[0]) except (KeyError, IndexError, ValueError): print("Could not parse wait time from error message. Defaulting to 5 seconds.") print(f"Rate limit reached. Retrying in {wait_time} seconds...") time.sleep(wait_time) def _calculate_token_count(self, conversation): """Accurately calculate the token count in the conversation using a tokenizer.""" total_tokens = 0 for entry in conversation: # Tokenize each entry content and count tokens tokens = self.tokenizer.encode(entry['content'], truncation=False, add_special_tokens=False) total_tokens += len(tokens) return total_tokens def _trim_conversation(self): """Trim the conversation to keep it within the token limit.""" while self._calculate_token_count(self.conversation) >= (self.max_tokens - self.buffer_tokens * 2): self.conversation.pop(0) def one_conversation_round(self, user_input): """Conduct one round of conversation between the user and the assistant.""" # User provides input valid_input = self._is_valid_input(user_input) if valid_input[0]: pass else: return f"\n{self.app_name}: Invalid input. {valid_input[1]}" self.actions.append(user_input + f'on {self.user_state["current_page"]} page') self.conversation.append({"role": "user", "content": user_input}) self.prompt_count += 1 # Update user state using GPT's response current_page = self.page_history[-1] if len(self.page_history) >= 1 else "Home" update_prompt = get_user_state_update_prompt(user_input=user_input, current_page=current_page, task=self.task, solution=self.solution, user_state=self.user_state, sitemap=self.sitemap) self.conversation.append({"role": "assistant", "content": update_prompt}) updated_state = self._get_openai_response(self.conversation).split("UPDATED", 1)[1].strip() self.conversation.pop(-1) ## update prompt don't have to stay in conversation history # Parse and update the user state updated_state = json_repair.loads(updated_state) # format forcing of updated state required_keys = {'current_page', 'task_completed', 'back'} # Ensure `updated_state` is a dictionary while not isinstance(updated_state, dict): transform_prompt = f""" Transform {updated_state} to a properly formatted JSON file. Example Output Format: {{ 'current_page': 'Home', 'task_completed': False, 'back': False }} """ updated_state = self._get_openai_response([{"role": "system", "content": transform_prompt}]) updated_state = json_repair.loads(updated_state) # Manually add missing required keys for key in required_keys: if key not in updated_state: if key == 'current_page': updated_state[key] = self.page_history[-1] if len(self.page_history) >= 1 else "Home" else: updated_state[key] = False try: if str(updated_state['task_completed']).lower() == 'true': return f"Task completed! You took {self.prompt_count} steps." except: updated_state['task_completed'] = 'False' self.user_state = updated_state if str(updated_state['back']).lower() == 'false': self.page_history.append(updated_state['current_page']) elif self.page_history: self.page_history.pop() ## no need to store old system prompt while we get a new one self.conversation = [entry for entry in self.conversation if entry["role"] != "system"] system_prompt = self._generate_system_prompt() # GPT generates the page instructions self.conversation.append({"role": "system", "content": system_prompt}) gpt_instruction = self._get_openai_response(self.conversation) self.conversation.append({"role": "assistant", "content": gpt_instruction}) return gpt_instruction def start_conversation(self): greeting = f'\nWelcome to {self.app_name} simulator! Your task is: {self.task} \n' system_prompt = self._generate_system_prompt() # GPT generates the page instructions self.conversation.append({"role": "system", "content": system_prompt}) gpt_instruction = self._get_openai_response(self.conversation) self.conversation.append({"role": "assistant", "content": gpt_instruction}) return greeting + gpt_instruction def _extract_buttons(self): """Extract buttons and their action types from the latest conversation if role is 'assistant'.""" # Get the last message last_message = self.conversation[-1] # Ensure the role of the last message is 'assistant' if last_message.get("role") != "assistant": return {} # Extract the content of the last message message_content = last_message.get("content", "") # Make the split case-insensitive by searching for the phrase with re.IGNORECASE options_split = re.split(r"you have the following options:", message_content, flags=re.IGNORECASE) # If the split doesn't produce at least two parts, return an empty dictionary if len(options_split) < 2: return {} # Extract button definitions from the second part of the split content button_section = options_split[1] pattern = r"\d+\.\s+(.*?):\s+([a-zA-Z_]+)" buttons = re.findall(pattern, button_section) # Construct the dictionary with button names as keys and action types as values return {name.strip().lower(): action_type.strip().lower() for name, action_type in buttons} def _is_valid_input(self, user_input): """Validate user input format.""" valid_buttons = self._extract_buttons() if valid_buttons == {}: return [True, "Enter Anything is empty"] # Validate input format pattern = r"^(?P\w+)\((?P[^,]+)(?:,\s*(?P.+))?\)$" match = re.match(pattern, user_input) if not match: return [False, "Your input doesn't match the format: action_type(button name), OR if type, use type(button name, query)"] # Extract parsed components action_type = match.group("action_type").lower() button_name = match.group("button_name").strip().lower() query = match.group("query") # Optional query for `type` # Validate button name and action type if button_name not in valid_buttons: return [False, "Invalid Button name! Recall: Each button is in the format: `number. button name: action_type`"] # Button name must match exactly (case insensitive) if action_type != valid_buttons[button_name]: return [False, "Invalid action type! Recall: Each button is in the format: `number. button name: action_type`"] # Action type must match the button's specified type if action_type == "type" and query is None: return [False, "Missing Query for action type 'type'! Recall: use the format: `type(button name, query)`"] # `type` action requires a query if action_type != "type" and query is not None: return [False, "Non-`type` action_type cannot take query!"] # Non-`type` actions must not have a query return [True, 'Pass']