|
import csv |
|
import os |
|
from pathlib import Path |
|
|
|
from tqdm import tqdm |
|
|
|
try: |
|
from .section_parser import custom_mimic_cxr_rules, section_text |
|
except ImportError: |
|
from section_parser import custom_mimic_cxr_rules, section_text |
|
|
|
|
|
def list_rindex(l, s): |
|
""" |
|
Source: https://github.com/MIT-LCP/mimic-cxr/blob/master/txt/create_section_files.py |
|
""" |
|
|
|
"""Helper function: *last* matching element in a list""" |
|
return len(l) - l[-1::-1].index(s) - 1 |
|
|
|
|
|
def create_section_files(reports_path, output_path, no_split): |
|
""" |
|
Modification of: https://github.com/MIT-LCP/mimic-cxr/blob/master/txt/create_section_files.py |
|
""" |
|
|
|
reports_path = Path(reports_path) |
|
output_path = Path(output_path) |
|
|
|
if not output_path.exists(): |
|
output_path.mkdir() |
|
|
|
|
|
|
|
custom_section_names, custom_indices = custom_mimic_cxr_rules() |
|
|
|
|
|
p_grp_folders = os.listdir(reports_path) |
|
p_grp_folders = [p for p in p_grp_folders |
|
if p.startswith('p') and len(p) == 3] |
|
p_grp_folders.sort() |
|
|
|
|
|
patient_studies = [] |
|
|
|
|
|
|
|
study_sections = [] |
|
for p_grp in p_grp_folders: |
|
|
|
cxr_path = reports_path / p_grp |
|
p_folders = os.listdir(cxr_path) |
|
p_folders = [p for p in p_folders if p.startswith('p')] |
|
p_folders.sort() |
|
|
|
|
|
print(p_grp) |
|
for p in tqdm(p_folders): |
|
patient_path = cxr_path / p |
|
|
|
|
|
studies = os.listdir(patient_path) |
|
studies = [s for s in studies |
|
if s.endswith('.txt') and s.startswith('s')] |
|
|
|
for s in studies: |
|
|
|
with open(patient_path / s, 'r') as fp: |
|
text = ''.join(fp.readlines()) |
|
|
|
|
|
s_stem = s[0:-4] |
|
|
|
|
|
if s_stem in custom_indices: |
|
idx = custom_indices[s_stem] |
|
patient_studies.append([s_stem, text[idx[0]:idx[1]]]) |
|
continue |
|
|
|
|
|
sections, section_names, section_idx = section_text(text) |
|
|
|
|
|
|
|
if s_stem in custom_section_names: |
|
sn = custom_section_names[s_stem] |
|
idx = list_rindex(section_names, sn) |
|
patient_studies.append([s_stem, sections[idx].strip()]) |
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
idx = -1 |
|
for sn in ('impression', 'findings', 'indication', 'history', 'technique', 'last_paragraph', 'comparison'): |
|
if sn in section_names: |
|
idx = list_rindex(section_names, sn) |
|
break |
|
|
|
if idx == -1: |
|
|
|
patient_studies.append([s_stem, '']) |
|
print(f'no impression/findings: {patient_path / s}') |
|
else: |
|
|
|
patient_studies.append([s_stem, sections[idx].strip()]) |
|
|
|
study_sectioned = [s_stem] |
|
for sn in ('impression', 'findings', 'indication', 'history', 'technique', 'last_paragraph', 'comparison'): |
|
if sn in section_names: |
|
idx = list_rindex(section_names, sn) |
|
study_sectioned.append(sections[idx].strip()) |
|
else: |
|
study_sectioned.append(None) |
|
study_sections.append(study_sectioned) |
|
|
|
if len(patient_studies) > 0: |
|
|
|
with open(output_path / 'mimic_cxr_sectioned.csv', 'w') as fp: |
|
csvwriter = csv.writer(fp) |
|
|
|
csvwriter.writerow(['study', 'impression', 'findings', 'indication', 'history', 'technique', 'last_paragraph', 'comparison']) |
|
for row in study_sections: |
|
csvwriter.writerow(row) |
|
|
|
if no_split: |
|
|
|
with open(output_path / f'mimic_cxr_sections.csv', 'w') as fp: |
|
csvwriter = csv.writer(fp) |
|
for row in patient_studies: |
|
csvwriter.writerow(row) |
|
else: |
|
|
|
n = 0 |
|
jmp = 10000 |
|
|
|
while n < len(patient_studies): |
|
n_fn = n // jmp |
|
with open(output_path / f'mimic_cxr_{n_fn:02d}.csv', 'w') as fp: |
|
csvwriter = csv.writer(fp) |
|
for row in patient_studies[n:n+jmp]: |
|
csvwriter.writerow(row) |
|
n += jmp |
|
|
|
|