import ast import builtins import copy import json import os import re from constant import ( DEFAULT_SYSTEM_PROMPT, GORILLA_TO_OPENAPI, ) from model_style import ModelStyle def _cast_to_openai_type(properties, mapping): for key, value in properties.items(): if "type" not in value: properties[key]["type"] = "string" else: var_type = value["type"] if mapping == GORILLA_TO_OPENAPI and var_type == "float": properties[key]["format"] = "float" properties[key]["description"] += " This is a float type value." if var_type in mapping: properties[key]["type"] = mapping[var_type] else: properties[key]["type"] = "string" # Currently support: # - list of any # - list of list of any # - list of dict # - list of list of dict # - dict of any if properties[key]["type"] == "array" or properties[key]["type"] == "object": if "properties" in properties[key]: properties[key]["properties"] = _cast_to_openai_type( properties[key]["properties"], mapping ) elif "items" in properties[key]: properties[key]["items"]["type"] = mapping[properties[key]["items"]["type"]] if ( properties[key]["items"]["type"] == "array" and "items" in properties[key]["items"] ): properties[key]["items"]["items"]["type"] = mapping[ properties[key]["items"]["items"]["type"] ] elif ( properties[key]["items"]["type"] == "object" and "properties" in properties[key]["items"] ): properties[key]["items"]["properties"] = _cast_to_openai_type( properties[key]["items"]["properties"], mapping ) return properties def convert_to_tool(functions, mapping, model_style): functions = copy.deepcopy(functions) oai_tool = [] for item in functions: if "." in item["name"] and ( model_style == ModelStyle.OpenAI or model_style == ModelStyle.Mistral or model_style == ModelStyle.Google or model_style == ModelStyle.OSSMODEL or model_style == ModelStyle.Anthropic or model_style == ModelStyle.COHERE ): # OAI does not support "." in the function name so we replace it with "_". ^[a-zA-Z0-9_-]{1,64}$ is the regex for the name. item["name"] = re.sub(r"\.", "_", item["name"]) item["parameters"]["type"] = "object" item["parameters"]["properties"] = _cast_to_openai_type( item["parameters"]["properties"], mapping ) if model_style == ModelStyle.Anthropic: item["input_schema"] = item["parameters"] del item["parameters"] if model_style == ModelStyle.Google: # Remove fields that are not supported by Gemini. # No `optional` field in function schema. if "optional" in item["parameters"]: del item["parameters"]["optional"] for params in item["parameters"]["properties"].values(): # No `default` field in Google's schema. if "default" in params: params["description"] += f" Default is: {str(params['default'])}." del params["default"] # No `optional` field in parameter schema as well. if "optional" in params: params["description"] += f" Optional: {str(params['optional'])}." del params["optional"] # No `maximum` field. if "maximum" in params: params["description"] += f" Maximum value: {str(params['maximum'])}." del params["maximum"] # No `minItems` field. if "minItems" in params: params[ "description" ] += f" Minimum number of items: {str(params['minItems'])}." del params["minItems"] # No `maxItems` field. if "maxItems" in params: params[ "description" ] += f" Maximum number of items: {str(params['maxItems'])}." del params["maxItems"] # No `additionalProperties` field. if "additionalProperties" in params: params[ "description" ] += f" Additional properties: {str(params['additionalProperties'])}." del params["additionalProperties"] # Only `enum` field when the type is `string`. if "enum" in params and params["type"] != "string": params["description"] += f" Enum values: {str(params['enum'])}." del params["enum"] if model_style == ModelStyle.COHERE: if os.getenv("USE_COHERE_OPTIMIZATION") == "True": if "required" not in item["parameters"]: item["parameters"]["required"] = [] for param_name, params in item["parameters"]["properties"].items(): if "description" not in params: params["description"] = "" if "default" in params: params["description"] += " The default value is: " + str( params["default"] ) if param_name not in item["parameters"]["required"]: item["parameters"]["required"].append(param_name) del params["default"] if "additionalProperties" in params: params["description"] += " Additional properties: " + str( params["additionalProperties"] ) del params["additionalProperties"] if "items" in params: inner_type = "" if ( "items" in params["items"] and "type" in params["items"]["items"] ): # 2D list inner_type = params["items"]["items"]["type"] params["type"] = f"list[list[{inner_type}]]" elif "type" in params["items"]: # 1D list inner_type = params["items"]["type"] params["type"] = f"list[{inner_type}]" if ( "items" in params and "enum" in params["items"] and params["items"]["enum"] ): params["description"] += " Possible enum values: " params["description"] += ", ".join(params["items"]["enum"]) params["description"] += "." del params["items"] if "properties" in params: params["description"] += " Dictionary properties:" for name, property_ in params["properties"].items(): property_type = property_.get("type", mapping["string"]) property_description = property_.get("description", "") params[ "description" ] += f" {name} ({property_type}): {property_description}" del params["properties"] if "enum" in params: params["description"] += " Possible enum values: " + str( params["enum"] ) del params["enum"] # add ranges to description if "percentage" not in params["description"]: params["description"] = params["description"].replace( "rate ", "rate (from 0.0 to 1.0) " ) params["description"] = params["description"].replace( "percentage ", "percentage (from 0 to 100) " ) params["description"] = params["description"].replace( "currency ", "currency (3 letter ISO code) " ) else: for params in item["parameters"]["properties"].values(): if "description" not in params: params["description"] = "" if "default" in params: params["description"] += " The default value is: " + str( params["default"] ) del params["default"] if "additionalProperties" in params: params["description"] += " Additional properties: " + str( params["additionalProperties"] ) del params["additionalProperties"] if "items" in params: params["description"] += " List Items type: " + str(params["items"]) del params["items"] if "properties" in params: params["description"] += " Dictionary properties: " + str( params["properties"] ) del params["properties"] # Process the return field if "response" in item: if model_style in [ ModelStyle.Anthropic, ModelStyle.Google, ModelStyle.FIREWORK_AI, ModelStyle.WRITER, ]: item[ "description" ] += f" The response field has the following schema: {json.dumps(item['response'])}" del item["response"] if model_style in [ ModelStyle.Anthropic, ModelStyle.Google, ModelStyle.OSSMODEL, ]: oai_tool.append(item) elif model_style == ModelStyle.COHERE: parameter = item["parameters"]["properties"] if "required" in item["parameters"]: required = item["parameters"]["required"] else: required = [] parameter_definitions = {} for key, value in parameter.items(): value["required"] = key in required parameter_definitions[key] = value oai_tool.append( { "name": item["name"], "description": item["description"], "parameter_definitions": parameter_definitions, } ) elif model_style in [ ModelStyle.OpenAI, ModelStyle.Mistral, ModelStyle.FIREWORK_AI, ModelStyle.WRITER, ]: oai_tool.append({"type": "function", "function": item}) return oai_tool def convert_to_function_call(function_call_list): if type(function_call_list) == dict: function_call_list = [function_call_list] # function_call_list is of type list[dict[str, str]] or list[dict[str, dict]] execution_list = [] for function_call in function_call_list: for key, value in function_call.items(): if type(value) == str: value = json.loads(value) execution_list.append( f"{key}({','.join([f'{k}={repr(v)}' for k,v in value.items()])})" ) return execution_list def convert_value(value, type_str): """Convert a string value into its appropriate Python data type based on the provided type string. Arg: value: the value to convert type_str: the type to convert the value to Returns: The value converted into the requested type or the original value if the conversion failed. """ if type_str in ("list", "dict"): try: return ast.literal_eval(value) except: return value type_class = getattr(builtins, type_str) try: return type_class(value) except ValueError: return value def ast_parse(input_str, language="Python"): if language == "Python": cleaned_input = input_str.strip("[]'") parsed = ast.parse(cleaned_input, mode="eval") extracted = [] if isinstance(parsed.body, ast.Call): extracted.append(resolve_ast_call(parsed.body)) else: for elem in parsed.body.elts: assert isinstance(elem, ast.Call) extracted.append(resolve_ast_call(elem)) return extracted elif language == "Java": pass elif language == "JavaScript": pass else: raise NotImplementedError(f"Unsupported language: {language}") def resolve_ast_call(elem): # Handle nested attributes for deeply nested module paths func_parts = [] func_part = elem.func while isinstance(func_part, ast.Attribute): func_parts.append(func_part.attr) func_part = func_part.value if isinstance(func_part, ast.Name): func_parts.append(func_part.id) func_name = ".".join(reversed(func_parts)) args_dict = {} for arg in elem.keywords: output = resolve_ast_by_type(arg.value) args_dict[arg.arg] = output return {func_name: args_dict} def resolve_ast_by_type(value): if isinstance(value, ast.Constant): if value.value is Ellipsis: output = "..." else: output = value.value elif isinstance(value, ast.UnaryOp): output = -value.operand.value elif isinstance(value, ast.List): output = [resolve_ast_by_type(v) for v in value.elts] elif isinstance(value, ast.Dict): output = { resolve_ast_by_type(k): resolve_ast_by_type(v) for k, v in zip(value.keys, value.values) } elif isinstance( value, ast.NameConstant ): # Added this condition to handle boolean values output = value.value elif isinstance( value, ast.BinOp ): # Added this condition to handle function calls as arguments output = eval(ast.unparse(value)) elif isinstance(value, ast.Name): output = value.id elif isinstance(value, ast.Call): if len(value.keywords) == 0: output = ast.unparse(value) else: output = resolve_ast_call(value) elif isinstance(value, ast.Tuple): output = tuple(resolve_ast_by_type(v) for v in value.elts) elif isinstance(value, ast.Lambda): output = eval(ast.unparse(value.body[0].value)) elif isinstance(value, ast.Ellipsis): output = "..." elif isinstance(value, ast.Subscript): try: output = ast.unparse(value.body[0].value) except: output = ast.unparse(value.value) + "[" + ast.unparse(value.slice) + "]" else: raise Exception(f"Unsupported AST type: {type(value)}") return output def system_prompt_pre_processing_chat_model(prompts, function_docs, test_category): """ Add a system prompt to the chat model to instruct the model on the available functions and the expected response format. If the prompts list already contains a system prompt, append the additional system prompt content to the existing system prompt. """ assert type(prompts) == list system_prompt_template = DEFAULT_SYSTEM_PROMPT system_prompt = system_prompt_template.format(functions=function_docs) # System prompt must be in the first position # If the question comes with a system prompt, append its content at the end of the chat template. if prompts[0]["role"] == "system": prompts[0]["content"] = system_prompt + "\n\n" + prompts[0]["content"] # Otherwise, use the system prompt template to create a new system prompt. else: prompts.insert( 0, {"role": "system", "content": system_prompt}, ) return prompts def convert_system_prompt_into_user_prompt(prompts: list[dict]) -> list[dict]: """ Some FC models doesn't support system prompt in the message field, so we turn it into user prompt """ for prompt in prompts: if prompt["role"] == "system": prompt["role"] = "user" return prompts def combine_consecutive_user_prompts(prompts: list[dict]) -> list[dict]: """ Some models require the prompt to be alternating between user and assistant. We combine consecutive user prompts into a single user prompt. """ combined_prompts = [] for prompt in prompts: if ( prompt["role"] == "user" and combined_prompts and combined_prompts[-1]["role"] == "user" ): combined_prompts[-1]["content"] += "\n\n" + prompt["content"] else: combined_prompts.append(prompt) return combined_prompts def _get_language_specific_hint(test_category): if test_category == "java": return " Note that the provided function is in Java 8 SDK syntax." elif test_category == "javascript": return " Note that the provided function is in JavaScript syntax." else: return " Note that the provided function is in Python 3 syntax." def func_doc_language_specific_pre_processing(function, test_category): if len(function) == 0: return function assert type(function) == list for item in function: # Add language specific hints to the function description func_description = item["description"] item["description"] = item["description"] + _get_language_specific_hint( test_category ) # Process the parameters properties = item["parameters"]["properties"] if test_category == "java": for key, value in properties.items(): if value["type"] == "any": properties[key][ "description" ] += " This parameter can be of any type of Java object in string representation." else: value[ "description" ] += f" This is Java {value['type']} type parameter in string representation." if value["type"] == "ArrayList" or value["type"] == "Array": value[ "description" ] += f" The list elements are of type {value['items']['type']}; they are not in string representation." del value["items"] value["type"] = "string" elif test_category == "javascript": for key, value in properties.items(): if value["type"] == "any": properties[key][ "description" ] += " This parameter can be of any type of JavaScript object in string representation." else: value[ "description" ] += f" This is JavaScript {value['type']} type parameter in string representation." if value["type"] == "array": value[ "description" ] += f" The list elements are of type {value['items']['type']}; they are not in string representation." del value["items"] if value["type"] == "dict": if "properties" in value: # not every dict has properties value[ "description" ] += f" The dictionary entries have the following schema; they are not in string representation. {json.dumps(value['properties'])}" del value["properties"] value["type"] = "string" return function def construct_tool_use_system_prompt(tools): tool_use_system_prompt = ( "In this environment you have access to a set of tools you can use to answer the user's question.\n" "\n" "You may call them like this:\n" "\n" "\n" "$TOOL_NAME\n" "\n" "<$PARAMETER_NAME>$PARAMETER_VALUE\n" "...\n" "\n" "\n" "\n" "\n" "Here are the tools available:\n" "\n" + "\n".join( [ construct_format_tool_for_claude_prompt( tool["name"], tool["description"], tool["parameters"]["properties"] ) for tool in tools ] ) + "\n" ) return tool_use_system_prompt def construct_format_tool_for_claude_prompt(name, description, parameters): constructed_prompt = ( "\n" f"{name}\n" "\n" f"{description}\n" "\n" "\n" f"{construct_format_parameters_prompt(parameters)}\n" "\n" "" ) return constructed_prompt def construct_format_parameters_prompt(parameters): constructed_prompt = "" for parameter_name, parameter in parameters.items(): if parameter_name == "required": continue if "description" in parameter: description_string = parameter["description"] else: description_string = "" if "default" in parameter: description_string += f"\nDefault value: {parameter['default']}" elif "items" in parameter: description_string += f"\n List element type: {str(parameter['items'])}" elif "properties" in parameter: description_string += ( f"\n Dictionaries properties: {str(parameter['properties'])}" ) if "description" in parameter: constructed_prompt += f"\n{parameter_name}\n{parameter['type']}\n{description_string}\n\n" else: constructed_prompt += f"\n{parameter_name}\n{parameter['type']}\n\n" constructed_prompt = constructed_prompt[:-1] return constructed_prompt def _function_calls_valid_format_and_invoke_extraction(last_completion): """Check if the function call follows a valid format and extract the attempted function calls if so. Does not check if the tools actually exist or if they are called with the requisite params.""" # Check if there are any of the relevant XML tags present that would indicate an attempted function call. function_call_tags = re.findall( r"|||||||", last_completion, re.DOTALL, ) if not function_call_tags: return {"status": True, "invokes": []} # Extract content between tags. If there are multiple we will only parse the first and ignore the rest, regardless of their correctness. match = re.search(r"(.*)", last_completion, re.DOTALL) if not match: return { "status": False, "reason": "No valid tags present in your query.", } func_calls = match.group(1) prefix_match = re.search(r"^(.*?)", last_completion, re.DOTALL) if prefix_match: func_call_prefix_content = prefix_match.group(1) # Check for invoke tags invoke_regex = r".*?" if not re.search(invoke_regex, func_calls, re.DOTALL): return { "status": False, "reason": "Missing tags inside of tags.", } # Check each invoke contains tool name and parameters invoke_strings = re.findall(invoke_regex, func_calls, re.DOTALL) invokes = [] for invoke_string in invoke_strings: tool_name = re.findall(r".*?", invoke_string, re.DOTALL) if not tool_name: return { "status": False, "reason": "Missing tags inside of tags.", } if len(tool_name) > 1: return { "status": False, "reason": "More than one tool_name specified inside single set of tags.", } parameters = re.findall(r".*?", invoke_string, re.DOTALL) if not parameters: return { "status": False, "reason": "Missing tags inside of tags.", } if len(parameters) > 1: return { "status": False, "reason": "More than one set of tags specified inside single set of tags.", } # Check for balanced tags inside parameters tags = re.findall( r"<.*?>", parameters[0].replace("", "").replace("", ""), re.DOTALL, ) if len(tags) % 2 != 0: return { "status": False, "reason": "Imbalanced tags inside tags.", } # Loop through the tags and check if each even-indexed tag matches the tag in the position after it (with the / of course). If valid store their content for later use. parameters_with_values = [] for i in range(0, len(tags), 2): opening_tag = tags[i] closing_tag = tags[i + 1] closing_tag_without_second_char = closing_tag[:1] + closing_tag[2:] if closing_tag[1] != "/" or opening_tag != closing_tag_without_second_char: return { "status": False, "reason": "Non-matching opening and closing tags inside tags.", } parameters_with_values.append( ( opening_tag[1:-1], re.search( rf"{opening_tag}(.*?){closing_tag}", parameters[0], re.DOTALL ).group(1), ) ) # Parse out the full function call invokes.append( { "tool_name": tool_name[0] .replace("", "") .replace("", ""), "parameters_with_values": parameters_with_values, } ) return { "status": True, "invokes": invokes, "prefix_content": func_call_prefix_content, } def _convert_value(value, type_str): """Convert a string value into its appropriate Python data type based on the provided type string. Arg: value: the value to convert type_str: the type to convert the value to Returns: The value converted into the requested type or the original value if the conversion failed. """ if type_str in ("list", "dict"): try: return ast.literal_eval(value) except: return value if type_str == "string": type_str = "str" type_class = getattr(builtins, type_str) try: return type_class(value) except ValueError: return value # TODO: Re-organize this file to make it more readable and maintainable def extract_system_prompt(prompts: list[dict]) -> str: for i, prompt in enumerate(prompts): if prompt["role"] == "system": system_prompt = prompt["content"] del prompts[i] return system_prompt return None def extract_last_user_message(prompts: list[dict], user_role_name: str = "user") -> dict: for i in range(len(prompts) - 1, -1, -1): if prompts[i]["role"] == user_role_name: last_user_message = prompts[i] del prompts[i] return last_user_message return "User did not specify a query." #### utils for multi-turn #### def format_execution_results_prompting( inference_data: dict, execution_results: list[str], model_response_data: dict ) -> str: # Add the execution results to one single user message tool_results = [] for execution_result, decoded_model_response in zip( execution_results, model_response_data["model_responses_decoded"] ): tool_results.append( {"role": "tool", "name": decoded_model_response, "content": execution_result} ) return repr(tool_results) def default_decode_ast_prompting(result, language="Python"): result = result.strip("`\n ") if not result.startswith("["): result = "[" + result if not result.endswith("]"): result = result + "]" decoded_output = ast_parse(result, language) return decoded_output def default_decode_execute_prompting(result): result = result.strip("`\n ") if not result.startswith("["): result = "[" + result if not result.endswith("]"): result = result + "]" decoded_output = ast_parse(result) return decoded_output_to_execution_list(decoded_output) def parse_nested_value(value): """ Parse a potentially nested value from the AST output. Args: value: The value to parse, which could be a nested dictionary, which includes another function call, or a simple value. Returns: str: A string representation of the value, handling nested function calls and nested dictionary function arguments. """ if isinstance(value, dict): # Check if the dictionary represents a function call (i.e., the value is another dictionary or complex structure) if all(isinstance(v, dict) for v in value.values()): func_name = list(value.keys())[0] args = value[func_name] args_str = ", ".join(f"{k}={parse_nested_value(v)}" for k, v in args.items()) return f"{func_name}({args_str})" else: # If it's a simple dictionary, treat it as key-value pairs return ( "{" + ", ".join(f"'{k}': {parse_nested_value(v)}" for k, v in value.items()) + "}" ) return repr(value) def decoded_output_to_execution_list(decoded_output): """ Convert decoded output to a list of executable function calls. Args: decoded_output (list): A list of dictionaries representing function calls. Returns: list: A list of strings, each representing an executable function call. """ execution_list = [] for function_call in decoded_output: for key, value in function_call.items(): args_str = ", ".join(f"{k}={parse_nested_value(v)}" for k, v in value.items()) execution_list.append(f"{key}({args_str})") return execution_list