ECG2HRV / src /ecg_processing.py
nina-m-m's picture
Rename ecg_processing.py to src/ecg_processing.py
1847c5d verified
raw
history blame
2.6 kB
"""This file contains the ecg processing pipelines."""
import pandas as pd
import neurokit2 as nk
from .ecg_feature_extraction import get_hrv_features, normalize_features
from .utils import cut_out_window, create_windows
pd.set_option('display.float_format', '{:.6f}'.format)
from .logger import setup_logger
logger = setup_logger(__name__)
def process_window(window, window_id, frequency):
features = get_hrv_features(window['ecg'].values, frequency)
tmp = pd.DataFrame(features, index=[0])
tmp['subject_id'] = window['subject_id'].unique()[0]
tmp['sample_id'] = str(window['sample_id'].unique()[0])
tmp['window_id'] = window_id
tmp['w_start_time'] = window['timestamp_idx'].min().strftime('%Y-%m-%d %H:%M:%S')
tmp['w_end_time'] = window['timestamp_idx'].max().strftime('%Y-%m-%d %H:%M:%S')
tmp['baseline'] = window_id == 0
tmp['frequency'] = frequency
return tmp
def process_batch(samples, configs):
features_list = []
for i, sample in enumerate(samples):
logger.info(f"Processing sample ({i + 1}/{len(samples)})...")
sample_df = pd.DataFrame.from_dict(sample.dict())
# Preprocess the ecg signal
logger.info("Preprocess ECG signals...")
sample_df['ecg'] = nk.ecg_clean(sample_df['ecg'], sampling_rate=sample.frequency, method="pantompkins1985")
# Cut out the windows and process them
if configs.baseline_start:
logger.info("Cut out baseline window...")
baseline_window = cut_out_window(sample_df, 'timestamp_idx', start=configs.baseline_start,
end=configs.baseline_end)
sample_df = sample_df[sample_df['timestamp_idx'] > baseline_window['timestamp_idx'].max()]
logger.info("Processing baseline window...")
features_list.append(process_window(baseline_window, 0, sample.frequency))
logger.info("Cut out windows...")
windows = create_windows(df=sample_df, time_column='timestamp_idx', window_size=configs.window_size,
window_slicing_method=configs.window_slicing_method)
logger.info(f"Processing windows (Total: {len(windows)})...")
features_list.extend(process_window(window, i, sample.frequency) for i, window in enumerate(windows, start=1))
features_df = pd.concat(features_list, ignore_index=True)
# Normalize the features via baseline subtraction
if configs.baseline_start:
features_df = normalize_features(features_df, configs.normalization_method)
return features_df