Spaces:
Sleeping
Sleeping
import json | |
import os | |
from collections import defaultdict | |
import pandas as pd | |
from src.assets.hardcoded_evals import baseline, gpt4_values, gpt35_values | |
from src.get_model_info.apply_metadata_to_df import apply_metadata | |
from src.plots.read_results import get_eval_results_dicts, make_clickable_model | |
from src.get_model_info.utils import AutoEvalColumn, EvalQueueColumn, has_no_nan_values | |
IS_PUBLIC = bool(os.environ.get("IS_PUBLIC", True)) | |
def get_all_requested_models(requested_models_dir: str) -> set[str]: | |
depth = 1 | |
file_names = [] | |
users_to_submission_dates = defaultdict(list) | |
for root, _, files in os.walk(requested_models_dir): | |
current_depth = root.count(os.sep) - requested_models_dir.count(os.sep) | |
if current_depth == depth: | |
for file in files: | |
if not file.endswith(".json"): | |
continue | |
with open(os.path.join(root, file), "r") as f: | |
info = json.load(f) | |
file_names.append(f"{info['model']}_{info['revision']}_{info['precision']}") | |
# Select organisation | |
if info["model"].count("/") == 0 or "submitted_time" not in info: | |
continue | |
organisation, _ = info["model"].split("/") | |
users_to_submission_dates[organisation].append(info["submitted_time"]) | |
return set(file_names), users_to_submission_dates | |
def get_leaderboard_df(results_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
all_data = get_eval_results_dicts(results_path) | |
if not IS_PUBLIC: | |
all_data.append(gpt4_values) | |
all_data.append(gpt35_values) | |
all_data.append(baseline) | |
apply_metadata(all_data) # Populate model type based on known hardcoded values in `metadata.py` | |
df = pd.DataFrame.from_records(all_data) | |
df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
df = df[cols].round(decimals=2) | |
# filter out if any of the benchmarks have not been produced | |
df = df[has_no_nan_values(df, benchmark_cols)] | |
return df | |
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
all_evals = [] | |
for entry in entries: | |
if ".json" in entry: | |
file_path = os.path.join(save_path, entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
all_evals.append(data) | |
elif ".md" not in entry: | |
# this is a folder | |
sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if not e.startswith(".")] | |
for sub_entry in sub_entries: | |
file_path = os.path.join(save_path, entry, sub_entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
all_evals.append(data) | |
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
df_pending = pd.DataFrame.from_records(pending_list, columns=cols) | |
df_running = pd.DataFrame.from_records(running_list, columns=cols) | |
df_finished = pd.DataFrame.from_records(finished_list, columns=cols) | |
return df_finished[cols], df_running[cols], df_pending[cols] | |