"""Final RestAPI File""" from io import BytesIO from datetime import datetime import pandas as pd from typing import List import fastapi from fastapi import FastAPI, File, UploadFile, Body, Form from fastapi.responses import StreamingResponse, Response import app.src from app.src.conversion import h5_to_pandas, csv_to_pandas from app.src.ecg_processing import process_batch from app.src.pydantic_models import ECGBatch, ECGSample, ECGConfig from app.src.configs import OutputFormats from app.src.logger import setup_logger logger = setup_logger(__name__) # Set metadata with open("app/docs/description.md", "r", encoding="utf-8") as f: description = f.read() tags_metadata = [ { "name": "💾conversion", "description": "Convert and create data files without HRV feature processing.", }, { "name": "🚀feature processing", "description": "Run HRV feature processing.", "externalDocs": { "description": "Input Data Form external docs", "url": "https://github.com/hubii-world/pipeline_hrv-02#input-data-form", }, }, ] # Initialize an instance of FastAPI app = FastAPI( default_response_class=fastapi.responses.ORJSONResponse, openapi_tags=tags_metadata, title="hrv-pipeline-02 💓", description=description, version="0.0.1", contact={ "name": "The Open HUman BIosignal Intelligence Platform (HUBII)", "url": "https://hubii.world/hrv-pipeline-02/", }) @app.post("/raw_json_input/", tags=["🚀feature processing"], summary="📥 Run feature processing given a raw json input.") def process_features_by_raw_json_input(data: ECGBatch = Body(...)): try: samples = data.samples configs = data.configs features_df = process_batch(samples, configs) features_dict = features_df.to_dict(orient='records') return { "supervisor": data.supervisor, "record_date": data.record_date, "configs": configs, "features": features_dict} except Exception as e: error_message = str(e) return {"error": error_message} @app.post("/h5_input/", tags=["🚀feature processing"], summary="📂 Run feature processing given multiple h5 files.") def process_features_by_h5_file_input( output_format: OutputFormats = Form(..., alias="Output Format", description="Output file format ('csv' or 'json' or 'excel_spreadsheet')."), supervisor: str = Form(..., alias="Supervisor", description="Name of the supervisor doing the analysis."), configs: ECGConfig = Form(None, alias="Additional Configurations", description="Additional configurations that should be included."), subject_ids: List[str] = Form(..., alias="Subject ID", description="Id of the subject of the sample data"), ecg_files: List[UploadFile] = File(..., alias="ECG Data", description="HDF5 file with the ecg data."), labels: List[str] = Form(None, alias="Labels", description="List with the label data."), ): try: logger.info(f"Received {len(ecg_files)} ECG file(s)...") logger.info("Validating inputs...") assert len(labels) in [0, len(ecg_files)], "Not enough labels defined, none or one for each sample." assert len(subject_ids) <= len(ecg_files), "Too many subject IDs defined, maximal one for each sample." if len(subject_ids) == 1: subject_ids = [subject_ids[0]] * len(ecg_files) if len(subject_ids) != len(ecg_files): subject_ids += ["unknown"] * (len(ecg_files) - len(subject_ids)) logger.info("Extracting samples from files...") samples = [] for i, file in enumerate(ecg_files): sample_df = h5_to_pandas(file.file) freq = int(sample_df["frequency"].iloc[0]) device_name = str(sample_df["device_name"].iloc[0]) samples.append( ECGSample( subject_id=subject_ids[i], frequency=freq, device_name=device_name, timestamp_idx=sample_df["timestamp_idx"].tolist(), ecg=sample_df["ecg"].tolist(), label=labels[i] if labels else None ) ) logger.info("Processing batch of samples...") features_df = process_batch(samples, configs) if output_format == "json": features_dict = features_df.to_dict(orient='records') # Return JSON response return { "supervisor": supervisor, "record_date": datetime.now(), "configs": configs, "features": features_dict } elif output_format == "csv": # Return CSV file csv_data = features_df.to_csv(index=False) filename = "features_output.csv" return StreamingResponse(iter([csv_data]), media_type='text/csv', headers={'Content-Disposition': f'attachment; filename="{filename}"'}) elif output_format == "excel_spreadsheet": # Return Excel file output_buffer = BytesIO() with pd.ExcelWriter(output_buffer, engine='xlsxwriter') as writer: features_df.to_excel(writer, index=False, sheet_name='Sheet1') output_buffer.seek(0) response = Response(content=output_buffer.getvalue(), media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response.headers['Content-Disposition'] = 'attachment; filename="features_output.xlsx"' return response else: raise ValueError(f"Output format '{output_format}' not supported.") except Exception as e: error_message = str(e) return {"error": error_message} @app.post("/csv_input/", tags=["🚀feature processing"], summary="📂 Run feature processing given multiple csv files.") def process_features_by_csv_file_input( output_format: OutputFormats = Form(..., alias="Output Format", description="Output file format ('csv' or 'json' or 'excel_spreadsheet')."), csv_file: UploadFile = File(..., alias="CSV Data", description="CSV file with the ecg data."), ): try: # Read csv file df = csv_to_pandas(csv_file.file) # Implode cols_to_implode = ['timestamp_idx', 'ecg', 'label'] df_imploded = df.groupby(list(set(df.columns) - set(cols_to_implode))) \ .agg({'timestamp_idx': list, 'ecg': list, 'label': list}) \ .reset_index() # Get metadata config_cols = [col for col in df.columns if col.startswith('configs.')] configs = df_imploded[config_cols].iloc[0].to_dict() configs = {key.removeprefix('configs.'): value for key, value in configs.items()} configs = ECGConfig(**configs) batch_cols = [col for col in df.columns if col.startswith('batch.')] batch = df_imploded[batch_cols].iloc[0].to_dict() batch = {key.removeprefix('batch.'): value for key, value in batch.items()} # Get samples samples = df_imploded.to_dict(orient='records') samples = [ECGSample(**sample) for sample in samples] logger.info("Processing batch of samples...") features_df = process_batch(samples, configs) if output_format == "json": features_dict = features_df.to_dict(orient='records') # Return JSON response return { "supervisor": batch['supervisor'], "record_date": batch['record_date'], "configs": configs, "features": features_dict } elif output_format == "csv": # Return CSV file csv_data = features_df.to_csv(index=False) filename = "features_output.csv" return StreamingResponse(iter([csv_data]), media_type='text/csv', headers={'Content-Disposition': f'attachment; filename="{filename}"'}) elif output_format == "excel_spreadsheet": output_buffer = BytesIO() with pd.ExcelWriter(output_buffer, engine='xlsxwriter') as writer: features_df.to_excel(writer, index=False, sheet_name='Sheet1') output_buffer.seek(0) response = Response(content=output_buffer.getvalue(), media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response.headers['Content-Disposition'] = 'attachment; filename="features_output.xlsx"' return response else: raise ValueError(f"Output format '{output_format}' not supported.") except Exception as e: error_message = str(e) return {"error": error_message}