|
import re |
|
import os |
|
import ast |
|
import time |
|
import json |
|
import argparse |
|
from tqdm import tqdm |
|
from multiprocessing.pool import Pool |
|
|
|
import openai |
|
from openai import AzureOpenAI |
|
|
|
|
|
def init(): |
|
client = AzureOpenAI( |
|
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), |
|
api_key=os.getenv("AZURE_OPENAI_KEY"), |
|
api_version="2024-02-15-preview" |
|
) |
|
|
|
return client |
|
|
|
|
|
def interaction(client, message_text): |
|
completion = client.chat.completions.create( |
|
model=os.getenv("AZURE_OPENAI_DEPLOYNAME"), |
|
messages = message_text, |
|
temperature=0.7, |
|
max_tokens=800, |
|
top_p=0.95, |
|
frequency_penalty=0, |
|
presence_penalty=0, |
|
stop=None |
|
) |
|
|
|
return completion |
|
|
|
|
|
def annotate(prediction_set, caption_files, output_dir): |
|
""" |
|
Evaluates question and answer pairs using GPT-3 |
|
Returns a score for correctness. |
|
""" |
|
|
|
for file in tqdm(caption_files): |
|
key = file[:-5] |
|
qa_set = prediction_set[key] |
|
question = qa_set['q'] |
|
answer = str(qa_set['a']) |
|
pred = qa_set['pred'] |
|
try: |
|
message = [ |
|
{ |
|
"role": "system", |
|
"content": |
|
"You are an intelligent chatbot designed for evaluating the factual accuracy of generative outputs for video-based question-answer pairs. " |
|
"Your task is to compare the predicted answer with these correct answers and determine if they are factually consistent. Here's how you can accomplish the task:" |
|
"------" |
|
"##INSTRUCTIONS: " |
|
"- Focus on the factual consistency between the predicted answer and the correct answer. The predicted answer should not contain any misinterpretations or misinformation.\n" |
|
"- The predicted answer must be factually accurate and align with the video content.\n" |
|
"- Consider synonyms or paraphrases as valid matches.\n" |
|
"- Evaluate the factual accuracy of the prediction compared to the answer." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": |
|
"Please evaluate the following video-based question-answer pair:\n\n" |
|
f"Question: {question}\n" |
|
f"Correct Answers: {answer}\n" |
|
f"Predicted Answer: {pred}\n\n" |
|
"Provide your evaluation only as a factual accuracy score where the factual accuracy score is an integer value between 0 and 5, with 5 indicating the highest level of factual consistency. " |
|
"Please generate the response in the form of a Python dictionary string with keys 'score', where its value is the factual accuracy score in INTEGER, not STRING." |
|
"DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. Only provide the Python dictionary string. " |
|
"For example, your response should look like this: {''score': 4.8}." |
|
} |
|
] |
|
completion = interaction(client, message) |
|
|
|
response_message = completion.choices[0].message.content |
|
response_dict = ast.literal_eval(response_message) |
|
result_qa_pair = [response_dict, qa_set] |
|
|
|
with open(f"{output_dir}/{key}.json", "w") as f: |
|
json.dump(result_qa_pair, f) |
|
|
|
except Exception as e: |
|
print(f"Error processing file '{key}': {e}") |
|
|
|
time.sleep(1) |
|
|
|
|
|
def longest_repeating_substring(s): |
|
n = len(s) |
|
dp = [[0] * (n+1) for _ in range(n+1)] |
|
res = "" |
|
res_length = 0 |
|
|
|
index = 0 |
|
for i in range(1, n+1): |
|
for j in range(i+1, n+1): |
|
if (dp[i-1][j-1] > 0 and dp[i-1][j-1] < (j-i)) or s[i-1] == s[j-1]: |
|
dp[i][j] = dp[i-1][j-1] + 1 |
|
if dp[i][j] > res_length: |
|
res_length = dp[i][j] |
|
index = max(i, index) |
|
else: |
|
dp[i][j] = 0 |
|
|
|
if res_length > 0: |
|
for i in range(index-res_length+1, index+1): |
|
res = res + s[i-1] |
|
|
|
return res |
|
|
|
|
|
def main(args): |
|
if args.num_chunks > 1: |
|
pred_contents = [] |
|
for _idx in range(args.num_chunks): |
|
file = os.path.join(args.pred_path, f"{args.num_chunks}_{_idx}.json") |
|
pred_contents += [json.loads(line) for line in open(file)] |
|
else: |
|
pred_contents = [json.loads(line) for line in open(args.pred_path)] |
|
|
|
|
|
video_id_counts = {} |
|
new_pred_contents = [] |
|
|
|
|
|
for sample in pred_contents: |
|
video_id = sample["video_name"] |
|
if video_id in video_id_counts: |
|
video_id_counts[video_id] += 1 |
|
else: |
|
video_id_counts[video_id] = 0 |
|
|
|
|
|
new_sample = sample |
|
new_sample["video_name"] = f"{video_id.split('/')[-1].split('.')[0]}_{video_id_counts[video_id]}" |
|
new_pred_contents.append(new_sample) |
|
|
|
|
|
id_list = [x["video_name"] for x in new_pred_contents] |
|
caption_files = [f"{id}.json" for id in id_list] |
|
|
|
output_dir = args.output_dir |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
|
|
prediction_set = {} |
|
for sample in new_pred_contents: |
|
id = sample["video_name"] |
|
|
|
question = sample["question"] |
|
answer = sample["answer"] |
|
pred = sample["pred"] |
|
qa_set = {"q": question, "a": answer, "pred": pred} |
|
prediction_set[id] = qa_set |
|
|
|
|
|
|
|
|
|
|
|
num_tasks = args.num_tasks |
|
|
|
|
|
while True: |
|
try: |
|
|
|
completed_files = os.listdir(output_dir) |
|
print(f"completed_files: {len(completed_files)}") |
|
|
|
|
|
incomplete_files = [f for f in caption_files if f not in completed_files] |
|
print(f"incomplete_files: {len(incomplete_files)}") |
|
|
|
|
|
if len(incomplete_files) == 0: |
|
break |
|
if len(incomplete_files) <= num_tasks: |
|
num_tasks = 1 |
|
|
|
|
|
part_len = len(incomplete_files) // num_tasks |
|
all_parts = [incomplete_files[i : i + part_len] for i in range(0, len(incomplete_files), part_len)] |
|
task_args = [(prediction_set, part, args.output_dir) for part in all_parts] |
|
print("Generate", len(all_parts), "subprocess.") |
|
|
|
|
|
|
|
|
|
|
|
annotate(*task_args[0]) |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
|
|
|
|
combined_contents = {} |
|
json_path = args.output_json |
|
|
|
|
|
for file_name in os.listdir(output_dir): |
|
if file_name.endswith(".json"): |
|
file_path = os.path.join(output_dir, file_name) |
|
with open(file_path, "r") as json_file: |
|
try: |
|
content = json.load(json_file) |
|
combined_contents[file_name[:-5]] = content |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
pass |
|
|
|
|
|
score_sum = 0 |
|
count = 0 |
|
for key, result in combined_contents.items(): |
|
count += 1 |
|
try: |
|
|
|
|
|
for _ in result[0].keys(): |
|
score_match = result[0][_] |
|
score = int(score_match) |
|
score_sum += score |
|
break |
|
except Exception as e: |
|
print(f"Error processing file '{key}': {e}") |
|
import pdb; pdb.set_trace() |
|
average_score = score_sum / count |
|
combined_contents["average_score"] = average_score |
|
with open(json_path, "w") as json_file: |
|
json.dump(combined_contents, json_file, indent=4) |
|
print("Average score for correctness:", average_score) |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="question-answer-generation-using-gpt-3") |
|
parser.add_argument("--pred-path", required=True, help="The path to file containing prediction.") |
|
parser.add_argument("--output-dir", required=True, help="The path to save annotation json files.") |
|
parser.add_argument("--output-json", required=True, help="The path to save annotation final combined json file.") |
|
parser.add_argument("--num-tasks", required=True, type=int, help="Number of splits.") |
|
parser.add_argument("--num_chunks", default=1, type=int, help="Result splits") |
|
parser.add_argument("--api-key", required=True, type=str, help="Azure Openai API key.") |
|
parser.add_argument("--api-endpoint", required=True, type=str, help="Azure Openai API endpoint.") |
|
parser.add_argument("--api-deployname", required=True, type=str, help="Azure Openai API deployname.") |
|
args = parser.parse_args() |
|
|
|
|
|
os.environ["AZURE_OPENAI_KEY"] = args.api_key |
|
os.environ["AZURE_OPENAI_ENDPOINT"] = args.api_endpoint |
|
os.environ["AZURE_OPENAI_DEPLOYNAME"] = args.api_deployname |
|
|
|
client = init() |
|
|
|
main(args) |
|
|