|
from typing import List |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from sklearn.metrics import f1_score |
|
|
|
COLUMNS = ["observationID", "class_id"] |
|
poisonous_lvl = pd.read_csv( |
|
"http://ptak.felk.cvut.cz/plants//DanishFungiDataset/poison_status_list.csv" |
|
) |
|
POISONOUS_SPECIES = poisonous_lvl[poisonous_lvl["poisonous"] == 1].class_id.unique() |
|
|
|
|
|
def classification_error_with_unknown( |
|
merged_df, cost_unkwnown_misclassified=10, cost_misclassified_as_unknown=0.1 |
|
): |
|
num_misclassified_unknown = sum((merged_df.class_id_gt == -1) & (merged_df.class_id_pred != -1)) |
|
num_misclassified_as_unknown = sum( |
|
(merged_df.class_id_gt != -1) & (merged_df.class_id_pred == -1) |
|
) |
|
num_misclassified_other = sum( |
|
(merged_df.class_id_gt != merged_df.class_id_pred) |
|
& (merged_df.class_id_pred != -1) |
|
& (merged_df.class_id_gt != -1) |
|
) |
|
return ( |
|
num_misclassified_other |
|
+ num_misclassified_unknown * cost_unkwnown_misclassified |
|
+ num_misclassified_as_unknown * cost_misclassified_as_unknown |
|
) / len(merged_df) |
|
|
|
|
|
def classification_error(merged_df): |
|
return classification_error_with_unknown( |
|
merged_df, cost_misclassified_as_unknown=1, cost_unkwnown_misclassified=1 |
|
) |
|
|
|
|
|
def num_psc_decisions(merged_df): |
|
|
|
num_psc = sum( |
|
merged_df.class_id_gt.isin(POISONOUS_SPECIES) |
|
& ~merged_df.class_id_pred.isin(POISONOUS_SPECIES) |
|
) |
|
return num_psc |
|
|
|
|
|
def num_esc_decisions(merged_df): |
|
|
|
num_esc = sum( |
|
~merged_df.class_id_gt.isin(POISONOUS_SPECIES) |
|
& merged_df.class_id_pred.isin(POISONOUS_SPECIES) |
|
) |
|
return num_esc |
|
|
|
|
|
def psc_esc_cost_score(merged_df, cost_psc=100, cost_esc=1): |
|
return ( |
|
cost_psc * num_psc_decisions(merged_df) + cost_esc * num_esc_decisions(merged_df) |
|
) / len(merged_df) |
|
|
|
|
|
def evaluate_csv(test_annotation_file: str, user_submission_file: str) -> List[dict]: |
|
|
|
gt_df = pd.read_csv(test_annotation_file, sep=",") |
|
for col in COLUMNS: |
|
assert col in gt_df, f"Test annotation file is missing column '{col}'." |
|
|
|
gt_df = gt_df.drop_duplicates("observationID") |
|
|
|
|
|
try: |
|
is_tsv = user_submission_file.endswith(".tsv") |
|
user_pred_df = pd.read_csv(user_submission_file, sep="\t" if is_tsv else ",") |
|
except Exception: |
|
print("Could not read file submitted by the user.") |
|
raise ValueError("Could not read file submitted by the user.") |
|
|
|
|
|
for col in COLUMNS: |
|
if col not in user_pred_df: |
|
print(f"File submitted by the user is missing column '{col}'.") |
|
raise ValueError(f"File submitted by the user is missing column '{col}'.") |
|
if len(gt_df) != len(user_pred_df): |
|
print(f"File submitted by the user should have {len(gt_df)} records.") |
|
raise ValueError(f"File submitted by the user should have {len(gt_df)} records.") |
|
missing_obs = gt_df.loc[ |
|
~gt_df["observationID"].isin(user_pred_df["observationID"]), |
|
"observationID", |
|
] |
|
if len(missing_obs) > 0: |
|
if len(missing_obs) > 3: |
|
missing_obs_str = ", ".join(missing_obs.iloc[:3].astype(str)) + ", ..." |
|
else: |
|
missing_obs_str = ", ".join(missing_obs.astype(str)) |
|
print(f"File submitted by the user is missing observations: {missing_obs_str}") |
|
raise ValueError(f"File submitted by the user is missing observations: {missing_obs_str}") |
|
|
|
|
|
merged_df = pd.merge( |
|
gt_df, |
|
user_pred_df, |
|
how="outer", |
|
on="observationID", |
|
validate="one_to_one", |
|
suffixes=("_gt", "_pred"), |
|
) |
|
|
|
|
|
cls_error = classification_error(merged_df) |
|
cls_error_with_unknown = classification_error_with_unknown(merged_df) |
|
psc_esc_cost = psc_esc_cost_score(merged_df) |
|
|
|
result = [ |
|
{ |
|
"test_split": { |
|
"F1 Score": np.round( |
|
f1_score(merged_df["class_id_gt"], merged_df["class_id_pred"], average="macro") |
|
* 100, |
|
2, |
|
), |
|
"Track 1: Classification Error": np.round(cls_error, 4), |
|
"Track 2: Cost for Poisonousness Confusion": np.round(psc_esc_cost, 4), |
|
"Track 3: User-Focused Loss": np.round(cls_error + psc_esc_cost, 4), |
|
"Track 4: Classification Error with Special Cost for Unknown": np.round( |
|
cls_error_with_unknown, 4 |
|
), |
|
} |
|
} |
|
] |
|
|
|
print(f"Evaluated scores: {result[0]['test_split']}") |
|
|
|
return result |
|
|
|
|
|
def evaluate(test_annotation_file, user_submission_file, phase_codename, **kwargs): |
|
""" |
|
Evaluates the submission for a particular challenge phase and returns score |
|
Arguments: |
|
|
|
`test_annotations_file`: Path to test_annotation_file on the server |
|
`user_submission_file`: Path to file submitted by the user |
|
`phase_codename`: Phase to which submission is made |
|
|
|
`**kwargs`: keyword arguments that contains additional submission |
|
metadata that challenge hosts can use to send slack notification. |
|
You can access the submission metadata |
|
with kwargs['submission_metadata'] |
|
|
|
Example: A sample submission metadata can be accessed like this: |
|
>>> print(kwargs['submission_metadata']) |
|
{ |
|
'status': u'running', |
|
'when_made_public': None, |
|
'participant_team': 5, |
|
'input_file': 'https://abc.xyz/path/to/submission/file.json', |
|
'execution_time': u'123', |
|
'publication_url': u'ABC', |
|
'challenge_phase': 1, |
|
'created_by': u'ABC', |
|
'stdout_file': 'https://abc.xyz/path/to/stdout/file.json', |
|
'method_name': u'Test', |
|
'stderr_file': 'https://abc.xyz/path/to/stderr/file.json', |
|
'participant_team_name': u'Test Team', |
|
'project_url': u'http://foo.bar', |
|
'method_description': u'ABC', |
|
'is_public': False, |
|
'submission_result_file': 'https://abc.xyz/path/result/file.json', |
|
'id': 123, |
|
'submitted_at': u'2017-03-20T19:22:03.880652Z' |
|
} |
|
""" |
|
print("Starting Evaluation.....") |
|
out = {} |
|
if phase_codename == "prediction-based": |
|
print("Evaluating for Prediction-based Phase") |
|
out["result"] = evaluate_csv(test_annotation_file, user_submission_file) |
|
|
|
|
|
out["submission_result"] = out["result"][0]["test_split"] |
|
print("Completed evaluation") |
|
return out |
|
|
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
import json |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument( |
|
"--test-annotation-file", |
|
help="Path to test_annotation_file on the server.", |
|
type=str, |
|
required=True, |
|
) |
|
parser.add_argument( |
|
"--user-submission-file", |
|
help="Path to a file created by predict script.", |
|
type=str, |
|
required=True, |
|
) |
|
args = parser.parse_args() |
|
|
|
result = evaluate( |
|
test_annotation_file=args.test_annotation_file, |
|
user_submission_file=args.user_submission_file, |
|
phase_codename="prediction-based", |
|
) |
|
with open("scores.json", "w") as f: |
|
json.dump(result, f) |
|
|