|
from src.deprecated.conversion import csv_to_pandas |
|
from src.deprecated.ecg_processing import process_batch |
|
from src.deprecated.pydantic_models import ECGConfig, ECGSample |
|
|
|
|
|
class PreTrainedPipeline(): |
|
def __init__(self): |
|
|
|
|
|
|
|
self.path = None |
|
self.df = None |
|
|
|
def load_data(self): |
|
|
|
self.df = csv_to_pandas(self.path) |
|
|
|
def process_data(self): |
|
|
|
df = self.df |
|
|
|
cols_to_implode = ['timestamp_idx', 'ecg', 'label'] |
|
df_imploded = df.groupby(list(set(df.columns) - set(cols_to_implode))) \ |
|
.agg({'timestamp_idx': list, |
|
'ecg': list, |
|
'label': list}) \ |
|
.reset_index() |
|
|
|
config_cols = [col for col in df.columns if col.startswith('configs.')] |
|
configs = df_imploded[config_cols].iloc[0].to_dict() |
|
configs = {key.removeprefix('configs.'): value for key, value in configs.items()} |
|
configs = ECGConfig(**configs) |
|
batch_cols = [col for col in df.columns if col.startswith('batch.')] |
|
batch = df_imploded[batch_cols].iloc[0].to_dict() |
|
batch = {key.removeprefix('batch.'): value for key, value in batch.items()} |
|
|
|
samples = df_imploded.to_dict(orient='records') |
|
samples = [ECGSample(**sample) for sample in samples] |
|
|
|
features_df = process_batch(samples, configs) |
|
|
|
def __call__( |
|
self, inputs): |
|
""" |
|
Args: |
|
inputs (:obj:`dict`): |
|
a dictionary containing a key 'data' mapping to a dict in which |
|
the values represent each column. |
|
Return: |
|
A :obj:`list` of floats or strings: The classification output for each row. |
|
""" |
|
if inputs: |
|
self.path = inputs |
|
else: |
|
raise ValueError("No input provided") |
|
|
|
|
|
self.load_data() |
|
|
|
|
|
result = self.process_data() |
|
|
|
return result |
|
|