ECG2HRV / src /deprecated /pipeline.py
nina-m-m
Refactor deprecated code
9e4b4a3
raw
history blame
2.4 kB
from src.deprecated.conversion import csv_to_pandas
from src.deprecated.ecg_processing import process_batch
from src.deprecated.pydantic_models import ECGConfig, ECGSample
class PreTrainedPipeline():
def __init__(self):
# Preload all the elements you are going to need at inference.
# For instance your model, processors, tokenizer that might be needed.
# This function is only called once, so do all the heavy processing I/O here
self.path = None # Placeholder for the path to the CSV file
self.df = None # Placeholder for the DataFrame
def load_data(self):
# Load CSV file into DataFrame
self.df = csv_to_pandas(self.path)
def process_data(self):
# Read csv file
df = self.df
# Implode
cols_to_implode = ['timestamp_idx', 'ecg', 'label']
df_imploded = df.groupby(list(set(df.columns) - set(cols_to_implode))) \
.agg({'timestamp_idx': list,
'ecg': list,
'label': list}) \
.reset_index()
# Get metadata
config_cols = [col for col in df.columns if col.startswith('configs.')]
configs = df_imploded[config_cols].iloc[0].to_dict()
configs = {key.removeprefix('configs.'): value for key, value in configs.items()}
configs = ECGConfig(**configs)
batch_cols = [col for col in df.columns if col.startswith('batch.')]
batch = df_imploded[batch_cols].iloc[0].to_dict()
batch = {key.removeprefix('batch.'): value for key, value in batch.items()}
# Get samples
samples = df_imploded.to_dict(orient='records')
samples = [ECGSample(**sample) for sample in samples]
features_df = process_batch(samples, configs)
def __call__(
self, inputs):
"""
Args:
inputs (:obj:`dict`):
a dictionary containing a key 'data' mapping to a dict in which
the values represent each column.
Return:
A :obj:`list` of floats or strings: The classification output for each row.
"""
if inputs:
self.path = inputs
else:
raise ValueError("No input provided")
# Load data
self.load_data()
# Implement your processing logic here, if needed
result = self.process_data()
return result