import numpy as np import neurokit2 as nk from src.feature_extractor import FeatureExtractor class ECG2HRV(FeatureExtractor): def __init__(self): super().__init__() def extract_features(self, ecg, frequency, baseline=None, normalization_method=None): # Ensure the numpy has at least one dimension (i.e. is not a scalar) if ecg.ndim < 1: raise ValueError("Array must have at least one dimension") # Preprocess the ecg signal ecg = nk.ecg_clean(ecg_signal=ecg, sampling_rate=frequency, method="pantompkins1985") # Compute the HRV features features = self.get_hrv_features(ecg, frequency) # Normalize if baseline is available and method is set - TBD if baseline is not None and normalization_method is not None: baseline_features = self.get_hrv_features(baseline) features = self.normalize_features(features, baseline_features) return features def get_hrv_features(self, ecg, frequency): # Find peaks peaks, info = nk.ecg_peaks(ecg, sampling_rate=frequency, method="pantompkins1985") # Compute time domain features hrv_time_features = nk.hrv_time(peaks, sampling_rate=frequency) # Compute frequency domain features # hrv_frequency_features = nk.hrv_frequency(peaks, sampling_rate=fs, method="welch", show=False) # Concat features # hrv_features = pd.concat([hrv_time_features, hrv_frequency_features], axis=1) hrv_features = hrv_time_features return hrv_features.to_dict(orient="records") def normalize_features(self, features, baseline_features, normalization_method=None): if normalization_method == "difference": features = features - baseline_features elif normalization_method == "relative": features = features / baseline_features else: raise ValueError(f"Normalization method {normalization_method} not supported") return features