from src.deprecated.conversion import csv_to_pandas from src.deprecated.ecg_processing import process_batch from src.deprecated.pydantic_models import ECGConfig, ECGSample class PreTrainedPipeline(): def __init__(self): # Preload all the elements you are going to need at inference. # For instance your model, processors, tokenizer that might be needed. # This function is only called once, so do all the heavy processing I/O here self.path = None # Placeholder for the path to the CSV file self.df = None # Placeholder for the DataFrame def load_data(self): # Load CSV file into DataFrame self.df = csv_to_pandas(self.path) def process_data(self): # Read csv file df = self.df # Implode cols_to_implode = ['timestamp_idx', 'ecg', 'label'] df_imploded = df.groupby(list(set(df.columns) - set(cols_to_implode))) \ .agg({'timestamp_idx': list, 'ecg': list, 'label': list}) \ .reset_index() # Get metadata config_cols = [col for col in df.columns if col.startswith('configs.')] configs = df_imploded[config_cols].iloc[0].to_dict() configs = {key.removeprefix('configs.'): value for key, value in configs.items()} configs = ECGConfig(**configs) batch_cols = [col for col in df.columns if col.startswith('batch.')] batch = df_imploded[batch_cols].iloc[0].to_dict() batch = {key.removeprefix('batch.'): value for key, value in batch.items()} # Get samples samples = df_imploded.to_dict(orient='records') samples = [ECGSample(**sample) for sample in samples] features_df = process_batch(samples, configs) def __call__( self, inputs): """ Args: inputs (:obj:`dict`): a dictionary containing a key 'data' mapping to a dict in which the values represent each column. Return: A :obj:`list` of floats or strings: The classification output for each row. """ if inputs: self.path = inputs else: raise ValueError("No input provided") # Load data self.load_data() # Implement your processing logic here, if needed result = self.process_data() return result