|
import sys |
|
import os |
|
from typing import Any, Dict, List |
|
|
|
import torch |
|
import transformers |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModelForSequenceClassification |
|
from accelerate import Accelerator |
|
from trl import KTOConfig, KTOTrainer, ModelConfig, get_peft_config, maybe_unpair_preference_dataset, setup_chat_format |
|
from tqdm import tqdm |
|
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
sys.path.append(os.path.dirname(SCRIPT_DIR)) |
|
|
|
from eval.utils import jload, jdump |
|
from eval.evaluate_arguments import EvalArguments |
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
def create_model(): |
|
|
|
|
|
model = AutoModelForSequenceClassification.from_pretrained("Skywork/Skywork-Reward-Llama-3.1-8B-v0.2", torch_dtype=torch.bfloat16, attn_implementation="flash_attention_2", num_labels=1).to("cuda") |
|
return model |
|
|
|
|
|
def create_tokenizer(): |
|
|
|
tokenizer = AutoTokenizer.from_pretrained("Skywork/Skywork-Reward-Llama-3.1-8B-v0.2", use_auth_token=True) |
|
return tokenizer |
|
|
|
|
|
def MyAccelerator(mixed_precision): |
|
|
|
accelerator = Accelerator(mixed_precision=mixed_precision) |
|
return accelerator |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
parser = transformers.HfArgumentParser((EvalArguments, )) |
|
args, = parser.parse_args_into_dataclasses() |
|
|
|
|
|
mixed_precision = 'bf16' if args.bfloat16 else 'fp16' |
|
args.mixed_precision = mixed_precision |
|
|
|
|
|
accelerator = MyAccelerator( |
|
mixed_precision=mixed_precision, |
|
) |
|
|
|
|
|
|
|
model = create_model() |
|
if 't5' not in args.model_name_or_path: |
|
|
|
model = accelerator.prepare(model) |
|
model.eval() |
|
|
|
tokenizer = create_tokenizer() |
|
|
|
print("Output file path:", args.output_filepath) |
|
|
|
|
|
filenames = [] |
|
eval_data_list_dict = [] |
|
if os.path.isfile(args.output_filepath): |
|
print(f'Loading data from {args.output_filepath}...') |
|
eval_data_list_dict.append(jload(args.output_filepath)) |
|
filenames.append(args.output_filepath) |
|
elif os.path.isdir(args.output_filepath): |
|
print(f'Loading data from {args.output_filepath}...') |
|
for filename in os.listdir(args.output_filepath): |
|
if filename.endswith('.json'): |
|
print(f'Loaded file {filename}') |
|
eval_data_list_dict.append(jload(os.path.join(args.output_filepath, filename))) |
|
filenames.append(os.path.join(args.output_filepath, filename)) |
|
else: |
|
raise Exception('Output file(s) not found!') |
|
|
|
|
|
|
|
for filename, eval_data_dict in zip(filenames, eval_data_list_dict): |
|
eval_data = evaluate_data(args, model, tokenizer, eval_data_dict) |
|
|
|
if args.result_filename is None: |
|
path_to_result = os.path.basename(filename).split('.json')[0] + f"_reward_{args.model_name_or_path.replace('/', '')}.json" |
|
else: |
|
path_to_result = args.result_filename |
|
|
|
print(f'Saving results to file {path_to_result}...') |
|
jdump(eval_data, path_to_result) |
|
|
|
|
|
def get_reward_output_fn(reward_output_fmt: str, apply_sigmoid_to_reward: bool): |
|
|
|
if reward_output_fmt is None: |
|
reward_output_fn = lambda x: x.squeeze().cpu().detach().numpy().tolist() |
|
elif reward_output_fmt == '0': |
|
reward_output_fn = lambda x: x.squeeze().cpu().detach().softmax(dim=-1).numpy()[0].tolist() |
|
elif reward_output_fmt == '1': |
|
reward_output_fn = lambda x: x.squeeze().cpu().detach().softmax(dim=-1).numpy()[1].tolist() |
|
elif reward_output_fmt == '1-0': |
|
reward_output_fn = lambda x: (x.squeeze().cpu().detach().softmax(dim=-1).numpy()[1] - x.squeeze().cpu().detach().softmax(dim=-1).numpy()[0]).tolist() |
|
else: |
|
raise NotImplementedError(f'Unsupported reward output format: {reward_output_fmt}') |
|
|
|
|
|
if apply_sigmoid_to_reward: |
|
reward_output_fn = lambda x: torch.sigmoid(torch.tensor(x)).numpy().tolist() |
|
|
|
return reward_output_fn |
|
|
|
|
|
@torch.inference_mode() |
|
def evaluate_data(args: EvalArguments, model, tokenizer, eval_data_list_dict) -> List[Dict[str, Any]]: |
|
"""Given a generated dataset, evaluate it using the reward model |
|
|
|
args: argparse.Namespace, the arguments to use |
|
reward_model: reward_model_module.RewardModel, the reward model to use |
|
eval_data_list_dict: List[Dict[str, Any]], the generated data to evaluate |
|
""" |
|
|
|
pbar = tqdm(total=len(eval_data_list_dict), desc="eval") |
|
rewards_list = [] |
|
reward_output_fn = get_reward_output_fn(args.reward_output_fmt, args.apply_sigmoid_to_reward) |
|
|
|
print('Evaluating reward scores...') |
|
|
|
|
|
for idx in range(0, len(eval_data_list_dict), args.per_device_batch_size): |
|
if len(eval_data_list_dict) > (idx + args.per_device_batch_size): |
|
batch_list_dict = eval_data_list_dict[idx:idx+args.per_device_batch_size] |
|
else: |
|
batch_list_dict = eval_data_list_dict[idx:] |
|
|
|
|
|
if 'prompt' in batch_list_dict[0]: |
|
batch_full_outputs = [l['prompt'] + ' ' + l['output'] for l in batch_list_dict] |
|
else: |
|
print('Overriding with custom prompt format') |
|
prompt_fmt = "Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Response: {output}" |
|
for l in batch_list_dict: |
|
l['output'] = l['output'].split('.')[0] + '.' |
|
batch_full_outputs = [prompt_fmt.format_map(l) for l in batch_list_dict] |
|
|
|
|
|
encoded_full_responses = tokenizer(batch_full_outputs, return_tensors="pt", padding=True, truncation=True) |
|
encoded_full_responses = encoded_full_responses.to(model.device) |
|
|
|
|
|
reward_outputs = model(**encoded_full_responses) |
|
rewards = reward_output_fn(reward_outputs.logits) |
|
rewards_list.extend(rewards if isinstance(rewards, list) else [rewards]) |
|
|
|
|
|
pbar.update(len(batch_list_dict)) |
|
|
|
print('Combining reward outputs into outputs...') |
|
|
|
|
|
for j in range(len(eval_data_list_dict)): |
|
eval_data_list_dict[j]['reward'] = rewards_list[j] |
|
eval_data_list_dict[j]['reward_model'] = args.model_name_or_path + args.model_pretrained_lora_weights if args.model_pretrained_lora_weights is not None else args.model_name_or_path |
|
|
|
print('Finished evaluating reward scores!') |
|
|
|
print('Mean reward score: ', sum(rewards_list) / len(rewards_list)) |
|
print('Std reward score: ', torch.tensor(rewards_list).std().item()) |
|
|
|
return eval_data_list_dict |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|