Spaces:
Running
Running
import ast | |
import builtins | |
import copy | |
import json | |
import os | |
import re | |
from constant import ( | |
DEFAULT_SYSTEM_PROMPT, | |
GORILLA_TO_OPENAPI, | |
) | |
from model_style import ModelStyle | |
def _cast_to_openai_type(properties, mapping): | |
for key, value in properties.items(): | |
if "type" not in value: | |
properties[key]["type"] = "string" | |
else: | |
var_type = value["type"] | |
if mapping == GORILLA_TO_OPENAPI and var_type == "float": | |
properties[key]["format"] = "float" | |
properties[key]["description"] += " This is a float type value." | |
if var_type in mapping: | |
properties[key]["type"] = mapping[var_type] | |
else: | |
properties[key]["type"] = "string" | |
# Currently support: | |
# - list of any | |
# - list of list of any | |
# - list of dict | |
# - list of list of dict | |
# - dict of any | |
if properties[key]["type"] == "array" or properties[key]["type"] == "object": | |
if "properties" in properties[key]: | |
properties[key]["properties"] = _cast_to_openai_type( | |
properties[key]["properties"], mapping | |
) | |
elif "items" in properties[key]: | |
properties[key]["items"]["type"] = mapping[properties[key]["items"]["type"]] | |
if ( | |
properties[key]["items"]["type"] == "array" | |
and "items" in properties[key]["items"] | |
): | |
properties[key]["items"]["items"]["type"] = mapping[ | |
properties[key]["items"]["items"]["type"] | |
] | |
elif ( | |
properties[key]["items"]["type"] == "object" | |
and "properties" in properties[key]["items"] | |
): | |
properties[key]["items"]["properties"] = _cast_to_openai_type( | |
properties[key]["items"]["properties"], mapping | |
) | |
return properties | |
def convert_to_tool(functions, mapping, model_style): | |
functions = copy.deepcopy(functions) | |
oai_tool = [] | |
for item in functions: | |
if "." in item["name"] and ( | |
model_style == ModelStyle.OpenAI | |
or model_style == ModelStyle.Mistral | |
or model_style == ModelStyle.Google | |
or model_style == ModelStyle.OSSMODEL | |
or model_style == ModelStyle.Anthropic | |
or model_style == ModelStyle.COHERE | |
): | |
# OAI does not support "." in the function name so we replace it with "_". ^[a-zA-Z0-9_-]{1,64}$ is the regex for the name. | |
item["name"] = re.sub(r"\.", "_", item["name"]) | |
item["parameters"]["type"] = "object" | |
item["parameters"]["properties"] = _cast_to_openai_type( | |
item["parameters"]["properties"], mapping | |
) | |
if model_style == ModelStyle.Anthropic: | |
item["input_schema"] = item["parameters"] | |
del item["parameters"] | |
if model_style == ModelStyle.Google: | |
# Remove fields that are not supported by Gemini. | |
# No `optional` field in function schema. | |
if "optional" in item["parameters"]: | |
del item["parameters"]["optional"] | |
for params in item["parameters"]["properties"].values(): | |
# No `default` field in Google's schema. | |
if "default" in params: | |
params["description"] += f" Default is: {str(params['default'])}." | |
del params["default"] | |
# No `optional` field in parameter schema as well. | |
if "optional" in params: | |
params["description"] += f" Optional: {str(params['optional'])}." | |
del params["optional"] | |
# No `maximum` field. | |
if "maximum" in params: | |
params["description"] += f" Maximum value: {str(params['maximum'])}." | |
del params["maximum"] | |
# No `minItems` field. | |
if "minItems" in params: | |
params[ | |
"description" | |
] += f" Minimum number of items: {str(params['minItems'])}." | |
del params["minItems"] | |
# No `maxItems` field. | |
if "maxItems" in params: | |
params[ | |
"description" | |
] += f" Maximum number of items: {str(params['maxItems'])}." | |
del params["maxItems"] | |
# No `additionalProperties` field. | |
if "additionalProperties" in params: | |
params[ | |
"description" | |
] += f" Additional properties: {str(params['additionalProperties'])}." | |
del params["additionalProperties"] | |
# Only `enum` field when the type is `string`. | |
if "enum" in params and params["type"] != "string": | |
params["description"] += f" Enum values: {str(params['enum'])}." | |
del params["enum"] | |
if model_style == ModelStyle.COHERE: | |
if os.getenv("USE_COHERE_OPTIMIZATION") == "True": | |
if "required" not in item["parameters"]: | |
item["parameters"]["required"] = [] | |
for param_name, params in item["parameters"]["properties"].items(): | |
if "description" not in params: | |
params["description"] = "" | |
if "default" in params: | |
params["description"] += " The default value is: " + str( | |
params["default"] | |
) | |
if param_name not in item["parameters"]["required"]: | |
item["parameters"]["required"].append(param_name) | |
del params["default"] | |
if "additionalProperties" in params: | |
params["description"] += " Additional properties: " + str( | |
params["additionalProperties"] | |
) | |
del params["additionalProperties"] | |
if "items" in params: | |
inner_type = "" | |
if ( | |
"items" in params["items"] | |
and "type" in params["items"]["items"] | |
): | |
# 2D list | |
inner_type = params["items"]["items"]["type"] | |
params["type"] = f"list[list[{inner_type}]]" | |
elif "type" in params["items"]: | |
# 1D list | |
inner_type = params["items"]["type"] | |
params["type"] = f"list[{inner_type}]" | |
if ( | |
"items" in params | |
and "enum" in params["items"] | |
and params["items"]["enum"] | |
): | |
params["description"] += " Possible enum values: " | |
params["description"] += ", ".join(params["items"]["enum"]) | |
params["description"] += "." | |
del params["items"] | |
if "properties" in params: | |
params["description"] += " Dictionary properties:" | |
for name, property_ in params["properties"].items(): | |
property_type = property_.get("type", mapping["string"]) | |
property_description = property_.get("description", "") | |
params[ | |
"description" | |
] += f" {name} ({property_type}): {property_description}" | |
del params["properties"] | |
if "enum" in params: | |
params["description"] += " Possible enum values: " + str( | |
params["enum"] | |
) | |
del params["enum"] | |
# add ranges to description | |
if "percentage" not in params["description"]: | |
params["description"] = params["description"].replace( | |
"rate ", "rate (from 0.0 to 1.0) " | |
) | |
params["description"] = params["description"].replace( | |
"percentage ", "percentage (from 0 to 100) " | |
) | |
params["description"] = params["description"].replace( | |
"currency ", "currency (3 letter ISO code) " | |
) | |
else: | |
for params in item["parameters"]["properties"].values(): | |
if "description" not in params: | |
params["description"] = "" | |
if "default" in params: | |
params["description"] += " The default value is: " + str( | |
params["default"] | |
) | |
del params["default"] | |
if "additionalProperties" in params: | |
params["description"] += " Additional properties: " + str( | |
params["additionalProperties"] | |
) | |
del params["additionalProperties"] | |
if "items" in params: | |
params["description"] += " List Items type: " + str(params["items"]) | |
del params["items"] | |
if "properties" in params: | |
params["description"] += " Dictionary properties: " + str( | |
params["properties"] | |
) | |
del params["properties"] | |
# Process the return field | |
if "response" in item: | |
if model_style in [ | |
ModelStyle.Anthropic, | |
ModelStyle.Google, | |
ModelStyle.FIREWORK_AI, | |
ModelStyle.WRITER, | |
]: | |
item[ | |
"description" | |
] += f" The response field has the following schema: {json.dumps(item['response'])}" | |
del item["response"] | |
if model_style in [ | |
ModelStyle.Anthropic, | |
ModelStyle.Google, | |
ModelStyle.OSSMODEL, | |
]: | |
oai_tool.append(item) | |
elif model_style == ModelStyle.COHERE: | |
parameter = item["parameters"]["properties"] | |
if "required" in item["parameters"]: | |
required = item["parameters"]["required"] | |
else: | |
required = [] | |
parameter_definitions = {} | |
for key, value in parameter.items(): | |
value["required"] = key in required | |
parameter_definitions[key] = value | |
oai_tool.append( | |
{ | |
"name": item["name"], | |
"description": item["description"], | |
"parameter_definitions": parameter_definitions, | |
} | |
) | |
elif model_style in [ | |
ModelStyle.OpenAI, | |
ModelStyle.Mistral, | |
ModelStyle.FIREWORK_AI, | |
ModelStyle.WRITER, | |
]: | |
oai_tool.append({"type": "function", "function": item}) | |
return oai_tool | |
def convert_to_function_call(function_call_list): | |
if type(function_call_list) == dict: | |
function_call_list = [function_call_list] | |
# function_call_list is of type list[dict[str, str]] or list[dict[str, dict]] | |
execution_list = [] | |
for function_call in function_call_list: | |
for key, value in function_call.items(): | |
if type(value) == str: | |
value = json.loads(value) | |
execution_list.append( | |
f"{key}({','.join([f'{k}={repr(v)}' for k,v in value.items()])})" | |
) | |
return execution_list | |
def convert_value(value, type_str): | |
"""Convert a string value into its appropriate Python data type based on the provided type string. | |
Arg: | |
value: the value to convert | |
type_str: the type to convert the value to | |
Returns: | |
The value converted into the requested type or the original value | |
if the conversion failed. | |
""" | |
if type_str in ("list", "dict"): | |
try: | |
return ast.literal_eval(value) | |
except: | |
return value | |
type_class = getattr(builtins, type_str) | |
try: | |
return type_class(value) | |
except ValueError: | |
return value | |
def ast_parse(input_str, language="Python"): | |
if language == "Python": | |
cleaned_input = input_str.strip("[]'") | |
parsed = ast.parse(cleaned_input, mode="eval") | |
extracted = [] | |
if isinstance(parsed.body, ast.Call): | |
extracted.append(resolve_ast_call(parsed.body)) | |
else: | |
for elem in parsed.body.elts: | |
assert isinstance(elem, ast.Call) | |
extracted.append(resolve_ast_call(elem)) | |
return extracted | |
elif language == "Java": | |
pass | |
elif language == "JavaScript": | |
pass | |
else: | |
raise NotImplementedError(f"Unsupported language: {language}") | |
def resolve_ast_call(elem): | |
# Handle nested attributes for deeply nested module paths | |
func_parts = [] | |
func_part = elem.func | |
while isinstance(func_part, ast.Attribute): | |
func_parts.append(func_part.attr) | |
func_part = func_part.value | |
if isinstance(func_part, ast.Name): | |
func_parts.append(func_part.id) | |
func_name = ".".join(reversed(func_parts)) | |
args_dict = {} | |
for arg in elem.keywords: | |
output = resolve_ast_by_type(arg.value) | |
args_dict[arg.arg] = output | |
return {func_name: args_dict} | |
def resolve_ast_by_type(value): | |
if isinstance(value, ast.Constant): | |
if value.value is Ellipsis: | |
output = "..." | |
else: | |
output = value.value | |
elif isinstance(value, ast.UnaryOp): | |
output = -value.operand.value | |
elif isinstance(value, ast.List): | |
output = [resolve_ast_by_type(v) for v in value.elts] | |
elif isinstance(value, ast.Dict): | |
output = { | |
resolve_ast_by_type(k): resolve_ast_by_type(v) | |
for k, v in zip(value.keys, value.values) | |
} | |
elif isinstance( | |
value, ast.NameConstant | |
): # Added this condition to handle boolean values | |
output = value.value | |
elif isinstance( | |
value, ast.BinOp | |
): # Added this condition to handle function calls as arguments | |
output = eval(ast.unparse(value)) | |
elif isinstance(value, ast.Name): | |
output = value.id | |
elif isinstance(value, ast.Call): | |
if len(value.keywords) == 0: | |
output = ast.unparse(value) | |
else: | |
output = resolve_ast_call(value) | |
elif isinstance(value, ast.Tuple): | |
output = tuple(resolve_ast_by_type(v) for v in value.elts) | |
elif isinstance(value, ast.Lambda): | |
output = eval(ast.unparse(value.body[0].value)) | |
elif isinstance(value, ast.Ellipsis): | |
output = "..." | |
elif isinstance(value, ast.Subscript): | |
try: | |
output = ast.unparse(value.body[0].value) | |
except: | |
output = ast.unparse(value.value) + "[" + ast.unparse(value.slice) + "]" | |
else: | |
raise Exception(f"Unsupported AST type: {type(value)}") | |
return output | |
def system_prompt_pre_processing_chat_model(prompts, function_docs, test_category): | |
""" | |
Add a system prompt to the chat model to instruct the model on the available functions and the expected response format. | |
If the prompts list already contains a system prompt, append the additional system prompt content to the existing system prompt. | |
""" | |
assert type(prompts) == list | |
system_prompt_template = DEFAULT_SYSTEM_PROMPT | |
system_prompt = system_prompt_template.format(functions=function_docs) | |
# System prompt must be in the first position | |
# If the question comes with a system prompt, append its content at the end of the chat template. | |
if prompts[0]["role"] == "system": | |
prompts[0]["content"] = system_prompt + "\n\n" + prompts[0]["content"] | |
# Otherwise, use the system prompt template to create a new system prompt. | |
else: | |
prompts.insert( | |
0, | |
{"role": "system", "content": system_prompt}, | |
) | |
return prompts | |
def convert_system_prompt_into_user_prompt(prompts: list[dict]) -> list[dict]: | |
""" | |
Some FC models doesn't support system prompt in the message field, so we turn it into user prompt | |
""" | |
for prompt in prompts: | |
if prompt["role"] == "system": | |
prompt["role"] = "user" | |
return prompts | |
def combine_consecutive_user_prompts(prompts: list[dict]) -> list[dict]: | |
""" | |
Some models require the prompt to be alternating between user and assistant. | |
We combine consecutive user prompts into a single user prompt. | |
""" | |
combined_prompts = [] | |
for prompt in prompts: | |
if ( | |
prompt["role"] == "user" | |
and combined_prompts | |
and combined_prompts[-1]["role"] == "user" | |
): | |
combined_prompts[-1]["content"] += "\n\n" + prompt["content"] | |
else: | |
combined_prompts.append(prompt) | |
return combined_prompts | |
def _get_language_specific_hint(test_category): | |
if test_category == "java": | |
return " Note that the provided function is in Java 8 SDK syntax." | |
elif test_category == "javascript": | |
return " Note that the provided function is in JavaScript syntax." | |
else: | |
return " Note that the provided function is in Python 3 syntax." | |
def func_doc_language_specific_pre_processing(function, test_category): | |
if len(function) == 0: | |
return function | |
assert type(function) == list | |
for item in function: | |
# Add language specific hints to the function description | |
func_description = item["description"] | |
item["description"] = item["description"] + _get_language_specific_hint( | |
test_category | |
) | |
# Process the parameters | |
properties = item["parameters"]["properties"] | |
if test_category == "java": | |
for key, value in properties.items(): | |
if value["type"] == "any": | |
properties[key][ | |
"description" | |
] += " This parameter can be of any type of Java object in string representation." | |
else: | |
value[ | |
"description" | |
] += f" This is Java {value['type']} type parameter in string representation." | |
if value["type"] == "ArrayList" or value["type"] == "Array": | |
value[ | |
"description" | |
] += f" The list elements are of type {value['items']['type']}; they are not in string representation." | |
del value["items"] | |
value["type"] = "string" | |
elif test_category == "javascript": | |
for key, value in properties.items(): | |
if value["type"] == "any": | |
properties[key][ | |
"description" | |
] += " This parameter can be of any type of JavaScript object in string representation." | |
else: | |
value[ | |
"description" | |
] += f" This is JavaScript {value['type']} type parameter in string representation." | |
if value["type"] == "array": | |
value[ | |
"description" | |
] += f" The list elements are of type {value['items']['type']}; they are not in string representation." | |
del value["items"] | |
if value["type"] == "dict": | |
if "properties" in value: # not every dict has properties | |
value[ | |
"description" | |
] += f" The dictionary entries have the following schema; they are not in string representation. {json.dumps(value['properties'])}" | |
del value["properties"] | |
value["type"] = "string" | |
return function | |
def construct_tool_use_system_prompt(tools): | |
tool_use_system_prompt = ( | |
"In this environment you have access to a set of tools you can use to answer the user's question.\n" | |
"\n" | |
"You may call them like this:\n" | |
"<function_calls>\n" | |
"<invoke>\n" | |
"<tool_name>$TOOL_NAME</tool_name>\n" | |
"<parameters>\n" | |
"<$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>\n" | |
"...\n" | |
"</parameters>\n" | |
"</invoke>\n" | |
"</function_calls>\n" | |
"\n" | |
"Here are the tools available:\n" | |
"<tools>\n" | |
+ "\n".join( | |
[ | |
construct_format_tool_for_claude_prompt( | |
tool["name"], tool["description"], tool["parameters"]["properties"] | |
) | |
for tool in tools | |
] | |
) | |
+ "\n</tools>" | |
) | |
return tool_use_system_prompt | |
def construct_format_tool_for_claude_prompt(name, description, parameters): | |
constructed_prompt = ( | |
"<tool_description>\n" | |
f"<tool_name>{name}</tool_name>\n" | |
"<description>\n" | |
f"{description}\n" | |
"</description>\n" | |
"<parameters>\n" | |
f"{construct_format_parameters_prompt(parameters)}\n" | |
"</parameters>\n" | |
"</tool_description>" | |
) | |
return constructed_prompt | |
def construct_format_parameters_prompt(parameters): | |
constructed_prompt = "" | |
for parameter_name, parameter in parameters.items(): | |
if parameter_name == "required": | |
continue | |
if "description" in parameter: | |
description_string = parameter["description"] | |
else: | |
description_string = "" | |
if "default" in parameter: | |
description_string += f"\nDefault value: {parameter['default']}" | |
elif "items" in parameter: | |
description_string += f"\n List element type: {str(parameter['items'])}" | |
elif "properties" in parameter: | |
description_string += ( | |
f"\n Dictionaries properties: {str(parameter['properties'])}" | |
) | |
if "description" in parameter: | |
constructed_prompt += f"<parameter>\n<name>{parameter_name}</name>\n<type>{parameter['type']}</type>\n<description>{description_string}</description>\n</parameter>\n" | |
else: | |
constructed_prompt += f"<parameter>\n<name>{parameter_name}</name>\n<type>{parameter['type']}</type>\n</parameter>\n" | |
constructed_prompt = constructed_prompt[:-1] | |
return constructed_prompt | |
def _function_calls_valid_format_and_invoke_extraction(last_completion): | |
"""Check if the function call follows a valid format and extract the attempted function calls if so. Does not check if the tools actually exist or if they are called with the requisite params.""" | |
# Check if there are any of the relevant XML tags present that would indicate an attempted function call. | |
function_call_tags = re.findall( | |
r"<function_calls>|</function_calls>|<invoke>|</invoke>|<tool_name>|</tool_name>|<parameters>|</parameters>", | |
last_completion, | |
re.DOTALL, | |
) | |
if not function_call_tags: | |
return {"status": True, "invokes": []} | |
# Extract content between <function_calls> tags. If there are multiple we will only parse the first and ignore the rest, regardless of their correctness. | |
match = re.search(r"<function_calls>(.*)</function_calls>", last_completion, re.DOTALL) | |
if not match: | |
return { | |
"status": False, | |
"reason": "No valid <function_calls></function_calls> tags present in your query.", | |
} | |
func_calls = match.group(1) | |
prefix_match = re.search(r"^(.*?)<function_calls>", last_completion, re.DOTALL) | |
if prefix_match: | |
func_call_prefix_content = prefix_match.group(1) | |
# Check for invoke tags | |
invoke_regex = r"<invoke>.*?</invoke>" | |
if not re.search(invoke_regex, func_calls, re.DOTALL): | |
return { | |
"status": False, | |
"reason": "Missing <invoke></invoke> tags inside of <function_calls></function_calls> tags.", | |
} | |
# Check each invoke contains tool name and parameters | |
invoke_strings = re.findall(invoke_regex, func_calls, re.DOTALL) | |
invokes = [] | |
for invoke_string in invoke_strings: | |
tool_name = re.findall(r"<tool_name>.*?</tool_name>", invoke_string, re.DOTALL) | |
if not tool_name: | |
return { | |
"status": False, | |
"reason": "Missing <tool_name></tool_name> tags inside of <invoke></invoke> tags.", | |
} | |
if len(tool_name) > 1: | |
return { | |
"status": False, | |
"reason": "More than one tool_name specified inside single set of <invoke></invoke> tags.", | |
} | |
parameters = re.findall(r"<parameters>.*?</parameters>", invoke_string, re.DOTALL) | |
if not parameters: | |
return { | |
"status": False, | |
"reason": "Missing <parameters></paraeters> tags inside of <invoke></invoke> tags.", | |
} | |
if len(parameters) > 1: | |
return { | |
"status": False, | |
"reason": "More than one set of <parameters></parameters> tags specified inside single set of <invoke></invoke> tags.", | |
} | |
# Check for balanced tags inside parameters | |
tags = re.findall( | |
r"<.*?>", | |
parameters[0].replace("<parameters>", "").replace("</parameters>", ""), | |
re.DOTALL, | |
) | |
if len(tags) % 2 != 0: | |
return { | |
"status": False, | |
"reason": "Imbalanced tags inside <parameters></parameters> tags.", | |
} | |
# Loop through the tags and check if each even-indexed tag matches the tag in the position after it (with the / of course). If valid store their content for later use. | |
parameters_with_values = [] | |
for i in range(0, len(tags), 2): | |
opening_tag = tags[i] | |
closing_tag = tags[i + 1] | |
closing_tag_without_second_char = closing_tag[:1] + closing_tag[2:] | |
if closing_tag[1] != "/" or opening_tag != closing_tag_without_second_char: | |
return { | |
"status": False, | |
"reason": "Non-matching opening and closing tags inside <parameters></parameters> tags.", | |
} | |
parameters_with_values.append( | |
( | |
opening_tag[1:-1], | |
re.search( | |
rf"{opening_tag}(.*?){closing_tag}", parameters[0], re.DOTALL | |
).group(1), | |
) | |
) | |
# Parse out the full function call | |
invokes.append( | |
{ | |
"tool_name": tool_name[0] | |
.replace("<tool_name>", "") | |
.replace("</tool_name>", ""), | |
"parameters_with_values": parameters_with_values, | |
} | |
) | |
return { | |
"status": True, | |
"invokes": invokes, | |
"prefix_content": func_call_prefix_content, | |
} | |
def _convert_value(value, type_str): | |
"""Convert a string value into its appropriate Python data type based on the provided type string. | |
Arg: | |
value: the value to convert | |
type_str: the type to convert the value to | |
Returns: | |
The value converted into the requested type or the original value | |
if the conversion failed. | |
""" | |
if type_str in ("list", "dict"): | |
try: | |
return ast.literal_eval(value) | |
except: | |
return value | |
if type_str == "string": | |
type_str = "str" | |
type_class = getattr(builtins, type_str) | |
try: | |
return type_class(value) | |
except ValueError: | |
return value | |
# TODO: Re-organize this file to make it more readable and maintainable | |
def extract_system_prompt(prompts: list[dict]) -> str: | |
for i, prompt in enumerate(prompts): | |
if prompt["role"] == "system": | |
system_prompt = prompt["content"] | |
del prompts[i] | |
return system_prompt | |
return None | |
def extract_last_user_message(prompts: list[dict], user_role_name: str = "user") -> dict: | |
for i in range(len(prompts) - 1, -1, -1): | |
if prompts[i]["role"] == user_role_name: | |
last_user_message = prompts[i] | |
del prompts[i] | |
return last_user_message | |
return "User did not specify a query." | |
#### utils for multi-turn #### | |
def format_execution_results_prompting( | |
inference_data: dict, execution_results: list[str], model_response_data: dict | |
) -> str: | |
# Add the execution results to one single user message | |
tool_results = [] | |
for execution_result, decoded_model_response in zip( | |
execution_results, model_response_data["model_responses_decoded"] | |
): | |
tool_results.append( | |
{"role": "tool", "name": decoded_model_response, "content": execution_result} | |
) | |
return repr(tool_results) | |
def default_decode_ast_prompting(result, language="Python"): | |
result = result.strip("`\n ") | |
if not result.startswith("["): | |
result = "[" + result | |
if not result.endswith("]"): | |
result = result + "]" | |
decoded_output = ast_parse(result, language) | |
return decoded_output | |
def default_decode_execute_prompting(result): | |
result = result.strip("`\n ") | |
if not result.startswith("["): | |
result = "[" + result | |
if not result.endswith("]"): | |
result = result + "]" | |
decoded_output = ast_parse(result) | |
return decoded_output_to_execution_list(decoded_output) | |
def parse_nested_value(value): | |
""" | |
Parse a potentially nested value from the AST output. | |
Args: | |
value: The value to parse, which could be a nested dictionary, which includes another function call, or a simple value. | |
Returns: | |
str: A string representation of the value, handling nested function calls and nested dictionary function arguments. | |
""" | |
if isinstance(value, dict): | |
# Check if the dictionary represents a function call (i.e., the value is another dictionary or complex structure) | |
if all(isinstance(v, dict) for v in value.values()): | |
func_name = list(value.keys())[0] | |
args = value[func_name] | |
args_str = ", ".join(f"{k}={parse_nested_value(v)}" for k, v in args.items()) | |
return f"{func_name}({args_str})" | |
else: | |
# If it's a simple dictionary, treat it as key-value pairs | |
return ( | |
"{" | |
+ ", ".join(f"'{k}': {parse_nested_value(v)}" for k, v in value.items()) | |
+ "}" | |
) | |
return repr(value) | |
def decoded_output_to_execution_list(decoded_output): | |
""" | |
Convert decoded output to a list of executable function calls. | |
Args: | |
decoded_output (list): A list of dictionaries representing function calls. | |
Returns: | |
list: A list of strings, each representing an executable function call. | |
""" | |
execution_list = [] | |
for function_call in decoded_output: | |
for key, value in function_call.items(): | |
args_str = ", ".join(f"{k}={parse_nested_value(v)}" for k, v in value.items()) | |
execution_list.append(f"{key}({args_str})") | |
return execution_list | |