Source code for MEDiml.wrangling.ProcessDICOM

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
import warnings
from typing import List, Union

import numpy as np
import pydicom
import ray

from ..utils.imref import imref3d

warnings.simplefilter("ignore")

from pathlib import Path

from ..MEDscan import MEDscan
from ..processing.segmentation import get_roi
from ..utils.save_MEDscan import save_MEDscan


[docs] class ProcessDICOM(): """ Class to process dicom files and extract imaging volume and 3D masks from it in order to oganize the data in a MEDscan class object. """
[docs] def __init__( self, path_images: List[Path], path_rs: List[Path], path_save: Union[str, Path], save: bool) -> None: """ Args: path_images (List[Path]): List of paths to the dicom files of a single scan. path_rs (List[Path]): List of paths to the RT struct dicom files for the same scan. path_save (Union[str, Path]): Path to the folder where the MEDscan object will be saved. save (bool): Whether to save the MEDscan object or not. Returns: None. """ self.path_images = path_images self.path_rs = path_rs self.path_save = Path(path_save) if path_save is str else path_save self.save = save
def __get_dicom_scan_orientation(self, dicom_header: List[pydicom.dataset.FileDataset]) -> str: """ Get the orientation of the scan. Args: dicom_header (List[pydicom.dataset.FileDataset]): List of dicom headers. Returns: str: Orientation of the scan. """ n_slices = len(dicom_header) image_patient_positions_x = [dicom_header[i].ImagePositionPatient[0] for i in range(n_slices)] image_patient_positions_y = [dicom_header[i].ImagePositionPatient[1] for i in range(n_slices)] image_patient_positions_z = [dicom_header[i].ImagePositionPatient[2] for i in range(n_slices)] dist = [ np.median(np.abs(np.diff(image_patient_positions_x))), np.median(np.abs(np.diff(image_patient_positions_y))), np.median(np.abs(np.diff(image_patient_positions_z))) ] index = dist.index(max(dist)) if index == 0: orientation = 'Sagittal' elif index == 1: orientation = 'Coronal' else: orientation = 'Axial' return orientation def __get_minimal_suv_header(self, dcm: pydicom.Dataset) -> dict: """ Extracts only the tags required for SUV conversion and PET scaling. This dict is Ray-serializable and free of weakrefs. """ def find_philips_private_tags(ds): # Find which block Philips reserved offset = None for i in range(0x10, 0x100, 0x01): tag = (0x7053, i) if tag in ds and ds[tag].value.lower().startswith("philips"): # If (7053, 0011) is the creator, the offset is 0x1100 offset = i << 8 break if offset: suv_tag = (0x7053, offset + 0x00) act_tag = (0x7053, offset + 0x09) print(f"Philips Tags Found at: SUV={hex(suv_tag[1])}, Act={hex(act_tag[1])}") return ds.get(suv_tag), ds.get(act_tag) print("Creator 'Philips PET Private Group' not found in group 0x7053.") return None, None suv_elem, act_elem = find_philips_private_tags(dcm) # Map the tags to their values (storing as hex strings for keys) tags = [ 0x00101030, 0x00100040, 0x00080031, 0x00080032, 0x00080021, 0x00541102, 0x00181072, 0x00181078, 0x00281052, 0x00281053, 0x00080070, 0x00541001, 0x00541001, 0x00541006, 0x00101020, 0x00101040, 0x00280030, 0x00180050, ] suv_data = {} for t in tags: if t in dcm: suv_data[t] = dcm[t].value elif t == 0x00541006 and 0x00541001 in dcm and dcm[0x00541001].value == 'GML': suv_data[t] = 'BW' # If absent, and the Units are GML, then the type of SUV shall be assumed to be BW. # Handle the Radiopharmaceutical Sequence specially radio_tag = 0x00540016 if radio_tag in dcm and dcm[radio_tag]: item = dcm[radio_tag][0] sub_tags = [0x00181072, 0x00181074, 0x00181075, 0x00181078] radio_dict = {st: item[st].value for st in sub_tags if st in item} suv_data[radio_tag] = [radio_dict] # List of dicts # Extra tags depending on the unit type if 0x00541001 in dcm: unit = str(dcm[0x00541001].value).lower() if unit == 'cnts': # SD SUV scale factor if 0x70531000 in dcm: suv_data[0x70531000] = dcm[0x70531000].value # If not found, try DS Activity Concentration Scale Factor elif 0x70531009 in dcm: suv_data[0x70531000] = dcm[0x70531009].value # If still not found, try Frame Duration (for dynamic PET) elif 0x00181242 in dcm: suv_data[0x00181242] = dcm[0x00181242].value # Dose Calibration Factor (Needed to convert to CPS then to BQML) if 0x00541322 in dcm: suv_data[0x00541322] = dcm[0x00541322].value # Corrected image tag if 0x00280051 in dcm: suv_data[0x00280051] = dcm[0x00280051].value elif unit == 'cps': # Doe Calibration Factor if 0x00541322 in dcm: suv_data[0x00541322] = dcm[0x00541322].value # Corrected image tag if 0x00280051 in dcm: suv_data[0x00280051] = dcm[0x00280051].value return suv_data def __merge_slice_pixel_arrays(self, slice_datasets): first_dataset = slice_datasets[0] num_rows = first_dataset.Rows num_columns = first_dataset.Columns num_slices = len(slice_datasets) sorted_slice_datasets = self.__sort_by_slice_spacing(slice_datasets) if self.__requires_rescaling(sorted_slice_datasets): if not self.__rescaling_is_the_same(sorted_slice_datasets): if not self.__intercept_is_zero(sorted_slice_datasets): # The scan is skipped is the RescaleSlope attribute has multiple values across slices and the RescaleIntercept is not zero. return None voxels = np.empty((num_columns, num_rows, num_slices), dtype=np.float32) for k, dataset in enumerate(sorted_slice_datasets): slope = float(getattr(dataset, 'RescaleSlope', 1)) intercept = float(getattr(dataset, 'RescaleIntercept', 0)) voxels[:, :, k] = dataset.pixel_array.T.astype(np.float32) * slope + intercept else: dtype = first_dataset.pixel_array.dtype voxels = np.empty((num_columns, num_rows, num_slices), dtype=dtype) for k, dataset in enumerate(sorted_slice_datasets): voxels[:, :, k] = dataset.pixel_array.T return voxels def __requires_rescaling(self, slice_datasets): return any(hasattr(dataset, 'RescaleSlope') or hasattr(dataset, 'RescaleIntercept') for dataset in slice_datasets) def __rescaling_is_the_same(self, slice_datasets): first_slope = float(getattr(slice_datasets[0], 'RescaleSlope', 1)) for dataset in slice_datasets[1:]: slope = float(getattr(dataset, 'RescaleSlope', 1)) if slope != first_slope: return False return True def __intercept_is_zero(self, slice_datasets): for dataset in slice_datasets: intercept = float(getattr(dataset, 'RescaleIntercept', 0)) if intercept != 0: return False return True def __ijk_to_patient_xyz_transform_matrix(self, slice_datasets): first_dataset = self.__sort_by_slice_spacing(slice_datasets)[0] image_orientation = first_dataset.ImageOrientationPatient row_cosine, column_cosine, slice_cosine = self.__extract_cosines( image_orientation) row_spacing, column_spacing = first_dataset.PixelSpacing slice_spacing = self.__slice_spacing(slice_datasets) transform = np.identity(4, dtype=np.float32) rotation = np.identity(3, dtype=np.float32) scaling = np.identity(3, dtype=np.float32) transform[:3, 0] = row_cosine*column_spacing transform[:3, 1] = column_cosine*row_spacing transform[:3, 2] = slice_cosine*slice_spacing transform[:3, 3] = first_dataset.ImagePositionPatient rotation[:3, 0] = row_cosine rotation[:3, 1] = column_cosine rotation[:3, 2] = slice_cosine rotation = np.transpose(rotation) scaling[0, 0] = column_spacing scaling[1, 1] = row_spacing scaling[2, 2] = slice_spacing return transform, rotation, scaling def __validate_slices_form_uniform_grid(self, slice_datasets): """ Perform various data checks to ensure that the list of slices form a evenly-spaced grid of data. Some of these checks are probably not required if the data follows the DICOM specification, however it seems pertinent to check anyway. """ invariant_properties = [ 'Modality', 'SOPClassUID', 'SeriesInstanceUID', 'Rows', 'Columns', 'ImageOrientationPatient', 'PixelSpacing', 'PixelRepresentation', 'BitsAllocated', 'BitsStored', 'HighBit', ] for property_name in invariant_properties: self.__slice_attribute_equal(slice_datasets, property_name) self.__validate_image_orientation(slice_datasets[0].ImageOrientationPatient) slice_positions = self.__slice_positions(slice_datasets) self.__check_for_missing_slices(slice_positions) def __validate_image_orientation(self, image_orientation): """ Ensure that the image orientation is supported - The direction cosines have magnitudes of 1 (just in case) - The direction cosines are perpendicular """ row_cosine, column_cosine, slice_cosine = self.__extract_cosines( image_orientation) if not self.__almost_zero(np.dot(row_cosine, column_cosine), 1e-4): raise ValueError( "Non-orthogonal direction cosines: {}, {}".format(row_cosine, column_cosine)) elif not self.__almost_zero(np.dot(row_cosine, column_cosine), 1e-8): warnings.warn("Direction cosines aren't quite orthogonal: {}, {}".format( row_cosine, column_cosine)) if not self.__almost_one(np.linalg.norm(row_cosine), 1e-4): raise ValueError( "The row direction cosine's magnitude is not 1: {}".format(row_cosine)) elif not self.__almost_one(np.linalg.norm(row_cosine), 1e-8): warnings.warn( "The row direction cosine's magnitude is not quite 1: {}".format(row_cosine)) if not self.__almost_one(np.linalg.norm(column_cosine), 1e-4): raise ValueError( "The column direction cosine's magnitude is not 1: {}".format(column_cosine)) elif not self.__almost_one(np.linalg.norm(column_cosine), 1e-8): warnings.warn( "The column direction cosine's magnitude is not quite 1: {}".format(column_cosine)) sys.stderr.flush() def __is_close(self, a, b, rel_tol=1e-9, abs_tol=0.0): return abs(a-b) <= max(rel_tol*max(abs(a), abs(b)), abs_tol) def __almost_zero(self, value, abs_tol): return self.__is_close(value, 0.0, abs_tol=abs_tol) def __almost_one(self, value, abs_tol): return self.__is_close(value, 1.0, abs_tol=abs_tol) def __extract_cosines(self, image_orientation): row_cosine = np.array(image_orientation[:3]) column_cosine = np.array(image_orientation[3:]) slice_cosine = np.cross(row_cosine, column_cosine) return row_cosine, column_cosine, slice_cosine def __slice_attribute_equal(self, slice_datasets, property_name): initial_value = getattr(slice_datasets[0], property_name, None) for slice_idx, dataset in enumerate(slice_datasets[1:]): value = getattr(dataset, property_name, None) if value != initial_value: msg = f'Slice {slice_idx+1} have different value for {property_name}: {value} != {initial_value}' warnings.warn(msg) def __slice_positions(self, slice_datasets): image_orientation = slice_datasets[0].ImageOrientationPatient row_cosine, column_cosine, slice_cosine = self.__extract_cosines( image_orientation) return [np.dot(slice_cosine, d.ImagePositionPatient) for d in slice_datasets] def __check_for_missing_slices(self, slice_positions): slice_positions_diffs = np.diff(sorted(slice_positions)) if not np.allclose(slice_positions_diffs, slice_positions_diffs[0], atol=0, rtol=1e-5): msg = "The slice spacing is non-uniform. Slice spacings:\n{}" warnings.warn(msg.format(slice_positions_diffs)) sys.stderr.flush() if not np.allclose(slice_positions_diffs, slice_positions_diffs[0], atol=0, rtol=1e-1): raise ValueError('The slice spacing is non-uniform. It appears there are extra slices from another scan') def __slice_spacing(self, slice_datasets): if len(slice_datasets) > 1: slice_positions = self.__slice_positions(slice_datasets) slice_positions_diffs = np.diff(sorted(slice_positions)) return np.mean(slice_positions_diffs) return 0.0 def __sort_by_slice_spacing(self, slice_datasets): slice_spacing = self.__slice_positions(slice_datasets) return [d for (s, d) in sorted(zip(slice_spacing, slice_datasets))]
[docs] def combine_slices(self, slice_datasets: List[pydicom.dataset.FileDataset]) -> List[np.ndarray]: """ Given a list of pydicom datasets for an image series, stitch them together into a three-dimensional numpy array of iamging data. Also calculate a 4x4 affine transformation matrix that converts the ijk-pixel-indices into the xyz-coordinates in the DICOM patient's coordinate system and 4x4 rotation and scaling matrix. If any of the DICOM images contain either the `Rescale Slope <https://dicom.innolitics.com/ciods/ct-image/ct-image/00281053>`__ or the `Rescale Intercept <https://dicom.innolitics.com/ciods/ct-image/ct-image/00281052>`__ attributes they will be applied to each slice individually. This function requires that the datasets: - Be in same series (have the same `Series Instance UID <https://dicom.innolitics.com/ciods/ct-image/general-series/0020000e>`__, `Modality <https://dicom.innolitics.com/ciods/ct-image/general-series/00080060>`__, and `SOP Class UID <https://dicom.innolitics.com/ciods/ct-image/sop-common/00080016>`__). - The binary storage of each slice must be the same (have the same `Bits Allocated <https://dicom.innolitics.com/ciods/ct-image/image-pixel/00280100>`__, `Bits Stored <https://dicom.innolitics.com/ciods/ct-image/image-pixel/00280101>`__, `High Bit <https://dicom.innolitics.com/ciods/ct-image/image-pixel/00280102>`__, and `Pixel Representation <https://dicom.innolitics.com/ciods/ct-image/image-pixel/00280103>`__). - The image slice must approximately form a grid. This means there can not be any missing internal slices (missing slices on the ends of the dataset are not detected). It also means that each slice must have the same `Rows <https://dicom.innolitics.com/ciods/ct-image/image-pixel/00280010>`__, `Columns <https://dicom.innolitics.com/ciods/ct-image/image-pixel/00280011>`__, `Pixel Spacing <https://dicom.innolitics.com/ciods/ct-image/image-plane/00280030>`__, and `Image Orientation (Patient) <https://dicom.innolitics.com/ciods/ct-image/image-plane/00200037>`__ attribute values. - The direction cosines derived from the `Image Orientation (Patient) <https://dicom.innolitics.com/ciods/ct-image/image-plane/00200037>`__ attribute must, within 1e-4, have a magnitude of 1. The cosines must also be approximately perpendicular (their dot-product must be within 1e-4 of 0). Warnings are displayed if any of theseapproximations are below 1e-8, however, since we have seen real datasets with values up to 1e-4, we let them pass. - The `Image Position (Patient) <https://dicom.innolitics.com/ciods/ct-image/image-plane/00200032>`__ values must approximately form a line. If any of these conditions are not met, a `dicom_numpy.DicomImportException` is raised. Args: slice_datasets (List[pydicom.dataset.FileDataset]): List of dicom headers. Returns: List[numpy.ndarray]: List of numpy arrays containing the data extracted the dicom files (voxels, translation, rotation and scaling matrix). """ if not slice_datasets: raise ValueError("Must provide at least one DICOM dataset") self.__validate_slices_form_uniform_grid(slice_datasets) voxels = self.__merge_slice_pixel_arrays(slice_datasets) if voxels is None: return None, None, None, None transform, rotation, scaling = self.__ijk_to_patient_xyz_transform_matrix( slice_datasets) return voxels, transform, rotation, scaling
[docs] def process_files(self): """ Reads DICOM files (imaging volume + ROIs) in the instance data path and then organizes it in the MEDscan class. Args: None. Returns: medscan (MEDscan): Instance of a MEDscan class. """ return self.process_files_wrapper.remote(self)
@ray.remote def process_files_wrapper(self) -> MEDscan: """ Wrapper function to process the files. """ # PARTIAL PARSING OF ARGUMENTS if self.path_images is None: raise ValueError('At least two arguments must be provided') # INITIALIZATION medscan = MEDscan() # IMAGING DATA AND ROI DEFINITION (if applicable) # Reading DICOM images and headers dicom_hi = [pydicom.dcmread(str(dicom_file), force=True) for dicom_file in self.path_images] try: # Determination of the scan orientation medscan.data.orientation = self.__get_dicom_scan_orientation(dicom_hi) # IMPORTANT NOTE: extract_voxel_data using combine_slices from dicom_numpy # missing slices and oblique restrictions apply see the reference: # https://dicom-numpy.readthedocs.io/en/latest/index.html#dicom_numpy.combine_slices try: voxel_ndarray, ijk_to_xyz, rotation_m, scaling_m = self.combine_slices(dicom_hi) if voxel_ndarray is None: return None except ValueError as e: raise ValueError(f'Invalid DICOM data for combine_slices(). Error: {e}') # Alignment of scan coordinates for MR scans # (inverse of ImageOrientationPatient rotation matrix) if not np.allclose(rotation_m, np.eye(rotation_m.shape[0])): medscan.data.volume.scan_rot = rotation_m medscan.data.volume.array = voxel_ndarray medscan.type = dicom_hi[0].Modality + 'scan' # 7. Creation of imref3d object pixel_x = scaling_m[0, 0] pixel_y = scaling_m[1, 1] slice_s = scaling_m[2, 2] min_grid = rotation_m@ijk_to_xyz[:3, 3] min_x_grid = min_grid[0] min_y_grid = min_grid[1] min_z_grid = min_grid[2] size_image = np.shape(voxel_ndarray) spatial_ref = imref3d(size_image, pixel_x, pixel_y, slice_s) spatial_ref.XWorldLimits = (np.array(spatial_ref.XWorldLimits) - (spatial_ref.XWorldLimits[0] - (min_x_grid-pixel_x/2))).tolist() spatial_ref.YWorldLimits = (np.array(spatial_ref.YWorldLimits) - (spatial_ref.YWorldLimits[0] - (min_y_grid-pixel_y/2))).tolist() spatial_ref.ZWorldLimits = (np.array(spatial_ref.ZWorldLimits) - (spatial_ref.ZWorldLimits[0] - (min_z_grid-slice_s/2))).tolist() # Converting the results into lists spatial_ref.ImageSize = spatial_ref.ImageSize.tolist() spatial_ref.XIntrinsicLimits = spatial_ref.XIntrinsicLimits.tolist() spatial_ref.YIntrinsicLimits = spatial_ref.YIntrinsicLimits.tolist() spatial_ref.ZIntrinsicLimits = spatial_ref.ZIntrinsicLimits.tolist() # Update the spatial reference in the MEDscan class medscan.data.volume.spatialRef = spatial_ref # DICOM HEADERS OF IMAGING DATA dicom_h = [ pydicom.dcmread(str(dicom_file),stop_before_pixels=True) for dicom_file in self.path_images ] for i in range(0, len(dicom_h)): dicom_h[i].remove_private_tags() # Save the minimal header required for SUV conversion and PET scaling in the MEDscan class medscan.dicomH = self.__get_minimal_suv_header(dicom_h[0]) # DICOM RTstruct (if applicable) if self.path_rs is not None and len(self.path_rs) > 0: dicom_rs_full = [ pydicom.dcmread(str(dicom_file), stop_before_pixels=True, force=True) for dicom_file in self.path_rs ] for i in range(0, len(dicom_rs_full)): dicom_rs_full[i].remove_private_tags() # GATHER XYZ POINTS OF ROIs USING RTstruct n_rs = len(dicom_rs_full) if type(dicom_rs_full) is list else dicom_rs_full contour_num = 0 for rs in range(n_rs): n_roi = len(dicom_rs_full[rs].StructureSetROISequence) for roi in range(n_roi): if roi!=0: if dicom_rs_full[rs].StructureSetROISequence[roi].ROIName == \ dicom_rs_full[rs].StructureSetROISequence[roi-1].ROIName: continue points = [] name_set_strings = ['StructureSetName', 'StructureSetDescription', 'series_description', 'SeriesInstanceUID'] for name_field in name_set_strings: if name_field in dicom_rs_full[rs]: name_set = getattr(dicom_rs_full[rs], name_field) name_set_info = name_field break medscan.data.ROI.update_roi_name(key=contour_num, roi_name=dicom_rs_full[rs].StructureSetROISequence[roi].ROIName) medscan.data.ROI.update_indexes(key=contour_num, indexes=None) medscan.data.ROI.update_name_set(key=contour_num, name_set=name_set) medscan.data.ROI.update_name_set_info(key=contour_num, nameSetInfo=name_set_info) try: n_closed_contour = len(dicom_rs_full[rs].ROIContourSequence[roi].ContourSequence) ind_closed_contour = [] for s in range(0, n_closed_contour): # points stored in the RTstruct file for a given closed # contour (beware: there can be multiple closed contours # on a given slice). pts_temp = dicom_rs_full[rs].ROIContourSequence[roi].ContourSequence[s].ContourData n_points = int(len(pts_temp) / 3) if len(pts_temp) > 0: ind_closed_contour = ind_closed_contour + np.tile(s, n_points).tolist() if type(points) == list: points = np.reshape(np.transpose(pts_temp),(n_points, 3)) else: points = np.concatenate( (points, np.reshape(np.transpose(pts_temp), (n_points, 3))), axis=0 ) if n_closed_contour == 0: print(f'Warning: no contour data found for ROI: \ {dicom_rs_full[rs].StructureSetROISequence[roi].ROIName}') else: # Save the XYZ points in the MEDscan class medscan.data.ROI.update_indexes( key=contour_num, indexes=np.concatenate( (points, np.reshape(ind_closed_contour, (len(ind_closed_contour), 1))), axis=1) ) # Compute the ROI box _, roi_obj = get_roi( medscan, name_roi='{' + dicom_rs_full[rs].StructureSetROISequence[roi].ROIName + '}', box_string='full' ) # Save the ROI box non-zero indexes in the MEDscan class medscan.data.ROI.update_indexes(key=contour_num, indexes=np.nonzero(roi_obj.data.flatten())) except Exception as e: if 'SeriesDescription' in dicom_h[0]: print(f'patientID: {dicom_hi[0].PatientID} Modality: {dicom_hi[0].SeriesDescription} error: \ {str(e)} n_roi: {str(roi)} n_rs: {str(rs)}') else: print(f'patientID: {dicom_hi[0].PatientID} Modality: {dicom_hi[0].Modality} error: \ {str(e)} n_roi: {str(roi)} n_rs: {str(rs)}') medscan.data.ROI.update_indexes(key=contour_num, indexes=np.NaN) contour_num += 1 # Save additional scan information in the MEDscan class medscan.data.set_patient_position(patient_position=dicom_h[0].PatientPosition) medscan.patientID = str(dicom_h[0].PatientID) medscan.format = "dicom" if 'SeriesDescription' in dicom_h[0]: medscan.series_description = dicom_h[0].SeriesDescription else: medscan.series_description = dicom_h[0].Modality # save MEDscan class instance as a pickle object if self.save and self.path_save: name_complete = save_MEDscan(medscan, self.path_save) del medscan else: return medscan except Exception as e: if 'SeriesDescription' in dicom_hi[0]: print(f'patientID: {dicom_hi[0].PatientID} Modality: {dicom_hi[0].SeriesDescription} error: {str(e)}') else: print(f'patientID: {dicom_hi[0].PatientID} Modality: {dicom_hi[0].Modality} error: {str(e)}') return '' return name_complete