|
"""This file contains the ecg processing pipelines.""" |
|
import pandas as pd |
|
|
|
import neurokit2 as nk |
|
|
|
from .ecg_feature_extraction import get_hrv_features, normalize_features |
|
from .utils import cut_out_window, create_windows |
|
|
|
pd.set_option('display.float_format', '{:.6f}'.format) |
|
|
|
from .logger import setup_logger |
|
logger = setup_logger(__name__) |
|
|
|
|
|
def process_window(window, window_id, frequency): |
|
features = get_hrv_features(window['ecg'].values, frequency) |
|
tmp = pd.DataFrame(features, index=[0]) |
|
tmp['subject_id'] = window['subject_id'].unique()[0] |
|
tmp['sample_id'] = str(window['sample_id'].unique()[0]) |
|
tmp['window_id'] = window_id |
|
tmp['w_start_time'] = window['timestamp_idx'].min().strftime('%Y-%m-%d %H:%M:%S') |
|
tmp['w_end_time'] = window['timestamp_idx'].max().strftime('%Y-%m-%d %H:%M:%S') |
|
tmp['baseline'] = window_id == 0 |
|
tmp['frequency'] = frequency |
|
return tmp |
|
|
|
|
|
def process_batch(samples, configs): |
|
features_list = [] |
|
for i, sample in enumerate(samples): |
|
logger.info(f"Processing sample ({i + 1}/{len(samples)})...") |
|
sample_df = pd.DataFrame.from_dict(sample.dict()) |
|
|
|
logger.info("Preprocess ECG signals...") |
|
sample_df['ecg'] = nk.ecg_clean(sample_df['ecg'], sampling_rate=sample.frequency, method="pantompkins1985") |
|
|
|
if configs.baseline_start: |
|
logger.info("Cut out baseline window...") |
|
baseline_window = cut_out_window(sample_df, 'timestamp_idx', start=configs.baseline_start, |
|
end=configs.baseline_end) |
|
sample_df = sample_df[sample_df['timestamp_idx'] > baseline_window['timestamp_idx'].max()] |
|
logger.info("Processing baseline window...") |
|
features_list.append(process_window(baseline_window, 0, sample.frequency)) |
|
logger.info("Cut out windows...") |
|
windows = create_windows(df=sample_df, time_column='timestamp_idx', window_size=configs.window_size, |
|
window_slicing_method=configs.window_slicing_method) |
|
logger.info(f"Processing windows (Total: {len(windows)})...") |
|
features_list.extend(process_window(window, i, sample.frequency) for i, window in enumerate(windows, start=1)) |
|
features_df = pd.concat(features_list, ignore_index=True) |
|
|
|
if configs.baseline_start: |
|
features_df = normalize_features(features_df, configs.normalization_method) |
|
|
|
return features_df |
|
|