import time import json import ast from sys import dont_write_bytecode import codegen import os import re from tqdm import tqdm from openai import OpenAI from datasets import load_dataset client_local = OpenAI( # api_key="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", api_key="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" ) use_model = "gpt-4o-2024-08-06" # use_model_program = "o1-preview" use_model_program = "gpt-4o-2024-08-06" program_header_file = "program_header_math.py" program_generation_times = 2 def format_abstraction(question): messages = [ {"role": "system", "content": "Identify numerical values in the given question, then replace some of them with Python parameters that are either int or float, so that the resulting abstract question is still answerable with the same general solution as the original question. Follow the the provided examples."}, {"role": "user", "content": "Jack has a stack of books that is 12 inches thick. He knows from experience that 80 pages is one inch thick. If he has 6 books, how many pages is each one on average?"}, {"role": "assistant", "content": "Becuase this is a math question, we identify all numerical values. We identify: 12 inches, 80 pages, one inch, 6\nAs a result, we can replace\n\"12 inches\" to \"Number of Inches X\" (num_inches_x: int)\n\"80 pages\" to \"Number of Pages Y\" (num_pages_y: int)\n\"one inch\" to \"Number of Inches Z\" (num_inches_z: int)\n\"6\" to \"Number W\" (num_w: int)\nSo the question becomes\nJack has a stack of books that is Number of Inches X thick. He knows from experience that Number of Pages Y is Number of Inches Z thick. If he has Number W books, how many pages is each one on average?\nWith parameters\nnum_inches_x=12, num_pages_y=80, num_inches_z=1, num_w=6"}, {"role": "user", "content": "Benny bought 2 soft drinks for$ 4 each and 5 candy bars. He spent a total of 28 dollars. How much did each candy bar cost?"}, {"role": "assistant", "content": "Becuase this is a math question, we identify all numerical values. We identify: 2, $4, 5, 28 dollars\nAs a result, we can replace\n\"2\" to \"Number X\" (num_x: int)\n\"$4\" to \"Dollar Amount Y\" (dollar_y: int)\n\"5\" to \"Number Z\" (num_z: int)\n\"28 dollars\" to \"Dollar Amount W\" (dollar_w: int)\nSo the question becomes\nBenny bought Number X soft drinks for Dollar Amount Y each and Number Z candy bars. He spent a total of Dollar Amount W dollars. How much did each candy bar cost?\nWith parameters\nnum_x=2, dollar_y=4, num_z=5, dollar_w=28"}, {"role": "user", "content": "Wickham is throwing a huge Christmas party. He invites 30 people. Everyone attends the party, and half of the guests bring a plus one (one other person). He plans to serve a 3-course meal for the guests. If he uses a new plate for every course, how many plates does he need in total for his guests?"}, {"role": "assistant", "content": "Becuase this is a math question, we identify all numerical values. We identify: 30, half, plus one (one other person), 3-course\nAs a result, we can replace\n\"30\" to \"Number X\" (num_x: int)\n\"half\" to \"Fraction Y\" (fraction_y: float)\n\"plus one (one other person)\" to \"Number of Additional People Z\" (num_additional_z: int)\n\"3-course\" to \"Number of Course W\" (num_courses_w: int)\nSo the question becomes\nWickham is throwing a huge Christmas party. He invites Number X people. Everyone attends the party, and Fraction Y of the guests bring Number of Additional People Z. He plans to serve a Number of Courses W meal for the guests. If he uses a new plate for every course, how many plates does he need in total for his guests?\nWith parameters\nnum_x=30, fraction_y=0.5, num_additional_z=1, num_courses_w=3"}, {"role": "user", "content": "John volunteers at a shelter twice a month for 3 hours at a time. How many hours does he volunteer per year?"}, {"role": "assistant", "content": "Becuase this is a math question, we identify all numerical values. We identify: twice, 3\nAs a result, we can replace\n\"twice\" to \"Number of Occurrences X\" (num_occurrence_x: int)\n\"3\" to \"Number Y\" (num_y: int)\nJohn volunteers at a shelter Number of Occurrences X per month for Number Y hours at a time. How many hours does he volunteer per year?\nWith parameters\nnum_occurrence_x=2, num_y=3"}, ] messages.append({"role": "user", "content": question}) return messages def query_model(messages, do_sample=True, max_length=256, use_temp=None, gen_program=False): use_model_in_function = use_model if gen_program: use_model_in_function = use_model_program temp = 0.7 if use_temp is not None: temp = use_temp if not do_sample: temp = 0.0 while True: try: if gen_program: r = client_local.chat.completions.create( model=use_model_in_function, messages=messages, ) else: r = client_local.chat.completions.create( model=use_model_in_function, messages=messages, max_tokens=max_length, temperature=temp, ) return r.choices[0].message.content except Exception as e: print(e) time.sleep(1) continue def find_parameters_math(function_call): tree = ast.parse(function_call) rets = {} for node in ast.walk(tree): if isinstance(node, ast.keyword): if isinstance(node.value, ast.BinOp): rets[node.arg] = eval(codegen.to_source(node.value)) else: rets[node.arg] = node.value.value return rets def process_abstraction_single(question_obj): question = question_obj["question"] messages = format_abstraction(question) for _ in range(10): ret = query_model(messages) response = ret.strip().split("\n") replace_map = {} masked_question = "" parameters = "" for i, line in enumerate(response): if line.startswith("\""): group = re.findall('"([^"]*)"', line) if len(group) != 2: continue codename = re.findall('\([^"]*\)', line) if len(codename) == 0: continue codename = codename[-1][1:-1] replace_map[group[0].strip()] = (group[1].strip(), codename) if line.startswith("So the question becomes") and i < len(response) - 1: masked_question = response[i + 1] if line.startswith("With parameters") and i < len(response) - 1: parameters = response[i + 1] if replace_map is not None and masked_question is not None and parameters is not None: para_map = {} for k in replace_map: para_map[k] = replace_map[k][0] + " ({})".format(replace_map[k][1]) masked_question = question for k in para_map: masked_question = masked_question.replace(k, para_map[k]) question_obj["masked_question"] = masked_question question_obj["replacement"] = replace_map question_obj["parameters"] = parameters return question_obj return None def format_masked_question(question, replacement_map): system_msg = "Write a Python program to solve the given abstract math question. Your program must contain a function called 'answer' that accepts the input parameters as specified in the question." example_questions = [ "Benny bought Number of Soft Drinks X (num_soft_drinks_x: int) for Cost per Soft Drink Y (cost_per_soft_drink_y: int) each and Number of Candy Bars Z (num_candy_bars_z: int). He spent a total of Total Amount Spent W (total_spent_w: int) dollars. How much did each candy bar cost?", "Jack has a stack of books that is Total Thickness X (total_thickness_x: int) inches thick. He knows from experience that Pages per Inch Y (pages_per_inch_y: int) is one inch thick. If he has Number of Books Z (num_books_z: int), how many pages is each one on average?", "Wickham is throwing a huge Christmas party. He invites Number of Guests X (num_guests_x: int) people. Everyone attends the party, and Fraction Y (fraction_y: float) of the guests bring a plus one (one other person). He plans to serve a Number of Courses Z (num_courses_z: int) meal for the guests. If he uses a new plate for every course, how many plates does he need in total for his guests?", "A church has Total Members X (total_members_x: int). Percentage Y (percentage_y: float) are adults. The rest are children. How many children more are there than adults?", ] example_responses = [ "'''\nTo solve this question, we need to calculate the total cost of the soft drinks and subtract it from the total amount spent to find the total cost spent on candy bars. Then, we divide the total cost spent on candy bars by the number of candy bars to find the cost per candy bar.\n'''\n\ndef answer(num_soft_drinks_x: int, cost_per_soft_drink_y: int, num_candy_bars_z: int, total_spent_w: int) -> float:\n\ttotal_cost_soft_drinks = num_soft_drinks_x * cost_per_soft_drink_y\n\ttotal_cost_candy_bars = total_spent_w - total_cost_soft_drinks\n\tcost_candy_bar = total_cost_candy_bars / num_candy_bars_z\n\treturn cost_candy_bar\n#The program ends here.", "'''\nTo solve this question, we first need to calculate the total number of pages in the stack of books by multiplying the total thickness by the number of pages per inch. Then, we divide this total number of pages by the number of books to find the average number of pages per book.\n'''\n\ndef answer(total_thickness_x: int, pages_per_inch_y: int, num_books_z: int) -> float:\n\ttotal_pages = total_thickness_x * pages_per_inch_y\n\taverage_pages_per_book = total_pages / num_books_z\n\treturn average_pages_per_book", "'''\nTo solve this question, we need to calculate the total number of guests including those who bring a plus one. Then, we multiply this total number of guests by the number of courses to find out how many plates are needed in total.\n'''\n\ndef answer(num_guests_x: int, fraction_y: float, num_courses_z: int) -> int:\n\ttotal_guests = num_guests_x + int(num_guests_x * fraction_y)\n\ttotal_plates_needed = total_guests * num_courses_z\n\treturn total_plates_needed", "'''\nTo find the number of children more than adults, we first calculate the number of adults using the percentage given. The rest of the members are children. The difference between the number of children and adults will give us the desired answer.\n'''\n\ndef answer(total_members_x: int, percentage_y: float) -> int:\n number_of_adults = int((percentage_y / 100) * total_members_x)\n number_of_children = total_members_x - number_of_adults\n difference = number_of_children - number_of_adults\n return difference", ] # Chagned for o1 messages = [ {"role": "system", "content": system_msg} # {"role": "user", "content": system_msg} ] limiter = len(example_responses) for i in range(0, limiter): messages.append({"role": "user", "content": example_questions[i]}) messages.append({"role": "assistant", "content": example_responses[i]}) para_map = {} for k in replacement_map: para_map[k] = replacement_map[k][0] + " ({})".format(replacement_map[k][1]) for k in para_map: question = question.replace(k, para_map[k]) messages.append({"role": "user", "content": question}) return messages def clean_runnable_program_simple(program): lines = program.split("\n") outs_lines = [] start = False for line in lines: if line.startswith("def") or line.startswith("'''"): start = True if not start: continue if "program ends" in line.lower(): break if "```" in line: break outs_lines.append(line) return "\n".join(outs_lines) def process_program_generation_single(question_obj): messages = format_masked_question(question_obj["question"], question_obj["replacement"]) programs = [] for _ in range(program_generation_times): ret = query_model(messages, max_length=640, gen_program=True) program = clean_runnable_program_simple(ret) programs.append(program) question_obj["candidate_programs"] = programs return question_obj def execute(program, parameters): program_header = open(program_header_file).read() function_call = "predicted_answer = answer({})".format(parameters) run_program = program_header + "\n" + program + "\n" + function_call + "\nprint(predicted_answer)\n" f_open = open("execution_gen_file_math.py", "w") f_open.flush() f_open.write(run_program) f_open.flush() f_open.close() os.system("timeout 30 stdbuf -oL python -W ignore execution_gen_file_math.py &> execution_gen_file_math_output.txt") result = open("execution_gen_file_math_output.txt").read().strip() return result def execute_programs_from_original(question_obj): entry_key = "candidate_programs" if entry_key not in question_obj: return question_obj results = [] for program in question_obj[entry_key]: result = execute(program, question_obj["parameters"]) results.append(result) question_obj["candidate_program_results"] = results return question_obj def pipeline(): outs = [] all_objs = [] for line in open("amazon_data/gsm_train.jsonl").readlines(): obj = json.loads(line) all_objs.append(obj) for obj in tqdm(all_objs): obj["answer"] = obj["answer"].split("\n#### ")[-1].replace(", ", "") obj = process_abstraction_single(obj) if obj["parameters"] == "" or len(obj["replacement"]) == 0: continue obj = process_program_generation_single(obj) obj = execute_programs_from_original(obj) selected_result = [] selected_program = [] for i, r in enumerate(obj["candidate_program_results"]): try: if float(r) == float(obj["answer"]): selected_result.append(r) selected_program.append(obj["candidate_programs"][i]) except: continue obj["selected_executions"] = selected_result obj["selected_programs"] = selected_program outs.append(obj) json.dump(outs, open("test_dump_gsm8k_train.json", "w"), indent=4) def pipeline_math(): all_objs = [] ds = load_dataset("lighteval/MATH", "all", split="train") for it in ds: question = str(it["problem"]) solution = str(it["solution"]) pattern = r'\\boxed\{([^\}]*)\}' match = re.search(pattern, solution) solution = "NO_SOLUTION" if match: solution = match.group(1) try: solution = int(solution) except: continue all_objs.append({"question": question, "answer": solution}) outs = [] for obj in tqdm(all_objs[:1000]): obj = process_abstraction_single(obj) if obj["parameters"] == "" or len(obj["replacement"]) == 0: continue obj = process_program_generation_single(obj) obj = execute_programs_from_original(obj) selected_result = [] selected_program = [] for i, r in enumerate(obj["candidate_program_results"]): try: if float(r) == float(obj["answer"]): selected_result.append(r) selected_program.append(obj["candidate_programs"][i]) except: continue obj["selected_executions"] = selected_result obj["selected_programs"] = selected_program outs.append(obj) json.dump(outs, open("test_dump_math_train_4o.json", "w"), indent=4) # p = "def answer(dollar_x: float, standard_hours_y: int, overtime_fraction_z: float, worked_hours_w: int, days_v: int) -> float:\n # Calculate overtime hourly rate\n overtime_rate = dollar_x + (dollar_x * overtime_fraction_z)\n \n # Calculate daily earnings\n if worked_hours_w > standard_hours_y:\n regular_hours = standard_hours_y\n overtime_hours = worked_hours_w - standard_hours_y\n else:\n regular_hours = worked_hours_w\n overtime_hours = 0\n \n daily_earnings = (regular_hours * dollar_x) + (overtime_hours * overtime_rate)\n \n # Calculate total earnings for all days\n total_earnings = daily_earnings * days_v\n \n return total_earnings\n```" # p = clean_runnable_program_simple(p) # print(p) # pipeline() pipeline_math()