|
import numpy as np |
|
import neurokit2 as nk |
|
|
|
from src.feature_extractor import FeatureExtractor |
|
|
|
|
|
class ECG2HRV(FeatureExtractor): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
def extract_features(self, ecg, frequency, baseline=None, normalization_method=None): |
|
|
|
if ecg.ndim < 1: |
|
raise ValueError("Array must have at least one dimension") |
|
|
|
|
|
ecg = nk.ecg_clean(ecg_signal=ecg, sampling_rate=frequency, method="pantompkins1985") |
|
|
|
|
|
features = self.get_hrv_features(ecg, frequency) |
|
|
|
|
|
if baseline is not None and normalization_method is not None: |
|
baseline_features = self.get_hrv_features(baseline) |
|
features = self.normalize_features(features, baseline_features) |
|
|
|
return features |
|
|
|
def get_hrv_features(self, ecg, frequency): |
|
|
|
peaks, info = nk.ecg_peaks(ecg, sampling_rate=frequency, method="pantompkins1985") |
|
|
|
|
|
hrv_time_features = nk.hrv_time(peaks, sampling_rate=frequency) |
|
|
|
|
|
|
|
|
|
|
|
hrv_features = hrv_time_features |
|
|
|
return hrv_features.to_dict(orient="records") |
|
|
|
def normalize_features(self, features, baseline_features, normalization_method=None): |
|
if normalization_method == "difference": |
|
features = features - baseline_features |
|
elif normalization_method == "relative": |
|
features = features / baseline_features |
|
else: |
|
raise ValueError(f"Normalization method {normalization_method} not supported") |
|
|
|
return features |
|
|