Source code for MEDiml.processing.PETSUVConverter

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging

import dateutil.parser
import numpy as np


[docs] class PETSUVConverter: """ A class for converting raw PET volumes into Standardized Uptake Value (SUV) maps. """
[docs] def __init__(self, dicom_proxy): """ Initializes the converter with a DICOM proxy object. """ self.dcm = dicom_proxy self.logger = logging.getLogger(self.__class__.__name__) # Strategy pattern for dynamic computation routing self._strategies = { 'gml': self._compute_gml, 'bqml': self._compute_bqml, 'cm2ml': self._compute_cm2ml, 'cnts': self._compute_cnts, 'cps': self._compute_cps }
# ========================================== # # PROPERTIES # # ========================================== # @property def unit(self) -> str: return str(self.dcm.get(0x00541001, 'unknown')).lower() @property def patient_weight_g(self) -> float: """Returns patient weight_kg in grams.""" return float(self.dcm[0x0010, 0x1030].value) * 1000.0 if (0x0010, 0x1030) in self.dcm else 75000.0 @property def patient_height_cm(self) -> float: """Returns patient height_m in cm.""" return float(self.dcm[0x0010, 0x1020].value) * 100.0 if (0x0010, 0x1020) in self.dcm else 170.0 @property def patient_sex(self) -> str: return str(self.dcm[0x0010, 0x0040].value).upper() if (0x0010, 0x0040) in self.dcm else 'O' # ========================================== # # PUBLIC METHODS # # ========================================== #
[docs] def get_conversion_factors(self) -> tuple: """ Calculates conversion factors (suv_factor, rescale_slope, rescale_intercept). """ nuclide_dose = self.dcm[0x0054, 0x0016][0][0x0018, 0x1074].value weight_kg = self.patient_weight_g / 1000.0 half_life = float(self.dcm[0x0054, 0x0016][0][0x0018, 0x1075].value) series_time = str(self.dcm[0x0008, 0x0031].value) series_date = str(self.dcm[0x0008, 0x0021].value) series_dt = dateutil.parser.parse(f"{series_date} {series_time}") nuclide_time = str(self.dcm[0x0054, 0x0016][0][0x0018, 0x1072].value) nuclide_dt = dateutil.parser.parse(f"{series_date} {nuclide_time}") delta_time = (series_dt - nuclide_dt).total_seconds() decay_correction = 2 ** (-1 * delta_time / half_life) suv_factor = (weight_kg * 1000) / (decay_correction * nuclide_dose) rescale_slope = self.dcm[0x0028, 0x1053].value rescale_intercept = self.dcm[0x0028, 0x1052].value manufacturer = str(self.dcm[0x0008, 0x0070].value).lower() # Philips private tag logic if "philips" in manufacturer and "bqml" not in self.unit: if 0x70531000 in self.dcm: suv_factor = float(self.dcm[0x7053, 0x1000].value) return (suv_factor, rescale_slope, rescale_intercept)
[docs] def compute(self, raw_pet: np.ndarray) -> np.ndarray: """ Main entry point. Routes the raw PET array to the correct computation strategy based on unit. """ strategy = self._strategies.get(self.unit) if not strategy: raise ValueError(f"Unsupported unit '{self.unit}' for SUV computation.") return strategy(raw_pet)
# ========================================== # # COMPUTATION STRATEGIES # # ========================================== # def _compute_gml(self, raw_pet: np.ndarray) -> np.ndarray: weight_kg = self.patient_weight_g / 1000.0 sex = self.patient_sex suv_type = self.dcm.get(0x00541006, 'unknown').lower() lbm = None bmi = weight_kg / (self.patient_height_cm ** 2) if self.patient_height_cm > 0 else 0 if suv_type == 'lbm': if sex == 'M': lbm = 1.10 * weight_kg - 120 * (weight_kg / self.patient_height_cm) ** 2 elif sex == 'F' or sex == 'O': lbm = 1.07 * weight_kg - 148 * (weight_kg / self.patient_height_cm) ** 2 elif suv_type == 'lbmjames128': if sex == 'M': lbm = 1.10 * weight_kg - 128 * (weight_kg / self.patient_height_cm) ** 2 elif sex == 'F' or sex == 'O': lbm = 1.07 * weight_kg - 148 * (weight_kg / self.patient_height_cm) ** 2 elif suv_type == 'lbmjamna': if sex == 'M': lbm = 9270 * weight_kg / (6680 + 216 * bmi) else: lbm = 9270 * weight_kg / (8780 + 244 * bmi) elif suv_type == 'ibw': if sex == 'M': lbm = 48 + 1.06 * (self.patient_height_cm - 152) else: lbm = 45.5 + 0.91 * (self.patient_height_cm - 152) elif suv_type == 'bw': return raw_pet else: raise ValueError(f"Unsupported SUV type '{suv_type}'.") return (raw_pet / lbm) * weight_kg def _compute_cm2ml(self, raw_pet: np.ndarray) -> np.ndarray: weight_kg = self.patient_weight_g / 1000.0 bsa = 0.007184 * (weight_kg ** 0.425) * (self.patient_height_cm ** 0.725) return (raw_pet / bsa) * weight_kg / 10 def _compute_cnts(self, raw_pet: np.ndarray) -> np.ndarray: if 0x70531000 in self.dcm and float(self.dcm.get(0x70531000)) != 0: return raw_pet * float(self.dcm.get(0x70531000)) if 0x70531009 in self.dcm and float(self.dcm.get(0x70531009)) != 0: act_scale = float(self.dcm.get(0x70531009)) return self._compute_bqml(raw_pet * act_scale) if 0x00181242 in self.dcm and float(self.dcm.get(0x00181242)) != 0: frame_duration_sec = float(self.dcm.get(0x00181242)) / 1000.0 return self._compute_cps(raw_pet / frame_duration_sec) raise ValueError("No valid scale factor found for 'cnts' unit in DICOM header.") def _compute_cps(self, cps_map: np.ndarray) -> np.ndarray: corrected_image_tags = self.dcm.get(0x00280051) if 0x00280051 in self.dcm else [] is_dcal = "DCAL" in corrected_image_tags pixel_spacing = self.dcm.get(0x00280030) slice_thickness = self.dcm.get(0x00180050) if not pixel_spacing or not slice_thickness: raise KeyError("Voxel dimensions (0028,0030 or 0018,0050) missing.") voxel_vol_ml = (float(pixel_spacing[0]) * float(pixel_spacing[1]) * float(slice_thickness)) / 1000.0 if is_dcal: bqml_map = cps_map / voxel_vol_ml else: cal_factor = self.dcm.get(0x00541322) if cal_factor is None: raise ValueError("Image is not DCAL and Dose Calibration Factor (0054,1322) is unknown.") bqml_map = (cps_map * float(cal_factor)) / voxel_vol_ml return self._compute_bqml(bqml_map) def _compute_bqml(self, raw_pet: np.ndarray) -> np.ndarray: try: scantime = self._parse_time(str(self.dcm[0x0008, 0x0032].value)) radio_item = self.dcm[0x0054, 0x0016][0] if (0x0018, 0x1072) not in radio_item and (0x0018, 0x1078) in radio_item: injection_time = self._parse_time(str(radio_item[0x0018, 0x1078].value)[8:]) elif (0x0018, 0x1072) in radio_item: injection_time = self._parse_time(str(radio_item[0x0018, 0x1072].value)) else: raise KeyError("Radiopharmaceutical Start Time tags missing.") half_life = float(radio_item[0x0018, 0x1075].value) injected_dose = float(radio_item[0x0018, 0x1074].value) decay_correction = str(self.dcm.get(0x00541102, '')).upper() if decay_correction == 'ADMIN': injected_dose_decay = injected_dose elif decay_correction in ['START', 'NONE']: decay = np.exp(-np.log(2) * (scantime - injection_time) / half_life) injected_dose_decay = injected_dose * decay else: raise ValueError(f"Unrecognized decay correction status: {decay_correction}") raw_pet = raw_pet * self.patient_weight_g / injected_dose_decay # Convert MBq to Bq if necessary if (np.any(raw_pet > 0) and np.nanmean(raw_pet[raw_pet > 0]) > 100): raw_pet = raw_pet / 1_000_000.0 except Exception as e: self.logger.warning(f"Error computing BQML ({e}). Using standard 1.75h fallback decay.") decay = np.exp(-np.log(2) * (1.75 * 3600) / 6588) raw_pet = raw_pet * self.patient_weight_g / (420000000 * decay) return raw_pet # ========================================== # # STATIC HELPERS # # ========================================== #
[docs] @staticmethod def _parse_time(time_str: str) -> float: """Helper to convert HHMMSS string to total seconds.""" time_str = str(time_str).zfill(6) hh, mm, ss = float(time_str[0:2]), float(time_str[2:4]), float(time_str[4:6]) return hh * 3600.0 + mm * 60.0 + ss