import copy |
import json |
import logging |
import os |
import pathlib |
from typing import Dict, List |
import numpy as np |
from monai.apps.utils import tqdm |
from monai.utils import optional_import |
loadmat, _ = optional_import("scipy.io", name="loadmat") |
PILImage, _ = optional_import("PIL.Image") |
def consep_nuclei_dataset(datalist, output_dir, crop_size, min_area=80, min_distance=20, limit=0) -> List[Dict]: |
""" |
Utility to pre-process and create dataset list for Patches per Nuclei for training over ConSeP dataset. |
Args: |
datalist: A list of data dictionary. Each entry should at least contain 'image_key': <image filename>. |
For example, typical input data can be a list of dictionaries:: |
[{'image': <image filename>, 'label': <label filename>}] |
output_dir: target directory to store the training data after flattening |
crop_size: Crop Size for each patch |
min_area: Min Area for each nuclei to be included in dataset |
min_distance: Min Distance from boundary for each nuclei to be included in dataset |
limit: limit number of inputs for pre-processing. Defaults to 0 (no limit). |
Raises: |
ValueError: When ``datalist`` is Empty |
ValueError: When ``scipy.io.loadmat`` is Not available |
Returns: |
A new datalist that contains path to the images/labels after pre-processing. |
Example:: |
datalist = consep_nuclei_dataset( |
datalist=[{'image': 'img1.png', 'label': 'label1.mat'}], |
output_dir=output, |
crop_size=128, |
limit=1, |
) |
print(datalist[0]["image"], datalist[0]["label"]) |
""" |
if not len(datalist): |
raise ValueError("Input datalist is empty") |
if not loadmat: |
print("Please make sure scipy with loadmat function is correctly installed") |
raise ValueError("scipy.io.loadmat module/function not found") |
dataset_json: List[Dict] = [] |
for d in tqdm(datalist): |
logging.debug(f"Processing Image: {d['image']} => Label: {d['label']}") |
image = PILImage.open(d["image"]).convert("RGB") |
m = loadmat(d["label"]) |
instances = m["inst_map"] |
for nuclei_id, (class_id, (y, x)) in enumerate(zip(m["inst_type"], m["inst_centroid"]), start=1): |
x, y = (int(x), int(y)) |
class_id = int(class_id) |
class_id = 3 if class_id in (3, 4) else 4 if class_id in (5, 6, 7) else class_id |
if 0 < limit <= len(dataset_json): |
return dataset_json |
item = __prepare_patch( |
d=d, |
nuclei_id=nuclei_id, |
output_dir=output_dir, |
image=image, |
instances=instances, |
instance_idx=nuclei_id, |
crop_size=crop_size, |
class_id=class_id, |
centroid=(x, y), |
min_area=min_area, |
min_distance=min_distance, |
others_idx=255, |
) |
if item: |
dataset_json.append(item) |
return dataset_json |
def __prepare_patch( |
d, |
nuclei_id, |
output_dir, |
image, |
instances, |
instance_idx, |
crop_size, |
class_id, |
centroid, |
min_area, |
min_distance, |
others_idx=255, |
): |
image_np = np.array(image) |
image_size = image.size |
bbox = __compute_bbox(crop_size, centroid, image_size) |
cropped_label_np = instances[bbox[0] : bbox[2], bbox[1] : bbox[3]] |
cropped_label_np = np.array(cropped_label_np) |
this_label = np.where(cropped_label_np == instance_idx, class_id, 0) |
if np.count_nonzero(this_label) < min_area: |
return None |
x, y = centroid |
if x < min_distance or y < min_distance or (image_size[0] - x) < min_distance or (image_size[1] - y < min_distance): |
return None |
centroid = centroid[0] - bbox[0], centroid[1] - bbox[1] |
others = np.where(np.logical_and(cropped_label_np > 0, cropped_label_np != instance_idx), others_idx, 0) |
cropped_label_np = this_label + others |
cropped_label = PILImage.fromarray(cropped_label_np.astype(np.uint8), None) |
cropped_image_np = image_np[bbox[0] : bbox[2], bbox[1] : bbox[3], :] |
cropped_image = PILImage.fromarray(cropped_image_np, "RGB") |
images_dir = os.path.join(output_dir, "Images") if output_dir else "Images" |
labels_dir = os.path.join(output_dir, "Labels") if output_dir else "Labels" |
centroids_dir = os.path.join(output_dir, "Centroids") if output_dir else "Centroids" |
os.makedirs(images_dir, exist_ok=True) |
os.makedirs(labels_dir, exist_ok=True) |
os.makedirs(centroids_dir, exist_ok=True) |
image_id = pathlib.Path(d["image"]).stem |
file_prefix = f"{image_id}_{class_id}_{str(instance_idx).zfill(4)}" |
image_file = os.path.join(images_dir, f"{file_prefix}.png") |
label_file = os.path.join(labels_dir, f"{file_prefix}.png") |
centroid_file = os.path.join(centroids_dir, f"{file_prefix}.txt") |
cropped_image.save(image_file) |
cropped_label.save(label_file) |
with open(centroid_file, "w") as fp: |
json.dump([centroid], fp) |
item = copy.deepcopy(d) |
item["nuclei_id"] = nuclei_id |
item["mask_value"] = class_id |
item["image"] = image_file |
item["label"] = label_file |
item["centroid"] = centroid |
return item |
def __compute_bbox(patch_size, centroid, size): |
x, y = centroid |
m, n = size |
x_start = int(max(x - patch_size / 2, 0)) |
y_start = int(max(y - patch_size / 2, 0)) |
x_end = x_start + patch_size |
y_end = y_start + patch_size |
if x_end > m: |
x_end = m |
x_start = m - patch_size |
if y_end > n: |
y_end = n |
y_start = n - patch_size |
return x_start, y_start, x_end, y_end |