Source code for MEDiml.learning.RadiomicsLearner

import logging
import os
import time
from copy import deepcopy
from pathlib import Path
from typing import Dict, List, Tuple

import pandas as pd
from numpyencoder import NumpyEncoder
from pycaret.classification import *

from MEDiml.learning.DataCleaner import DataCleaner
from MEDiml.learning.DesignExperiment import DesignExperiment
from MEDiml.learning.Estimator import Estimator
from MEDiml.learning.FSR import FSR
from MEDiml.learning.ml_utils import (average_results, combine_rad_tables,
                                      feature_importance_analysis,
                                      get_ml_test_table, get_radiomics_table,
                                      intersect)
from MEDiml.learning.Normalization import CombatNormalization
from MEDiml.learning.Results import Results

from ..utils.json_utils import load_json, save_json


[docs] class RadiomicsLearner:
[docs] def __init__(self, path_study: Path, path_settings: Path, experiment_label: str) -> None: """ Constructor of the class DesignExperiment. Args: path_study (Path): Path to the main study folder where the outcomes, learning patients and holdout patients dictionaries are found. path_settings (Path): Path to the settings folder. experiment_label (str): String specifying the label to attach to a given learning experiment in "path_experiments". This label will be attached to the ml__$experiments_label$.json file as well as the learn__$experiment_label$ folder. This label is used to keep track of different experiments with different settings (e.g. radiomics, scans, machine learning algorithms, etc.). Returns: None """ self.path_study = Path(path_study) self.path_settings = Path(path_settings) self.experiment_label = experiment_label
def __load_ml_info(self, ml_dict_paths: Dict) -> Dict: """ Initializes the test dictionary information (training patients, test patients, ML dict, etc). Args: ml_dict_paths (Dict): Dictionary containing the paths to the different files needed to run the machine learning experiment. Returns: dict: Dictionary containing the information of the machine learning test. """ ml_dict = dict() # Training and test patients ml_dict['patientsTrain'] = load_json(ml_dict_paths['patientsTrain']) ml_dict['patientsTest'] = load_json(ml_dict_paths['patientsTest']) # Outcome table for training and test patients outcome_table = pd.read_csv(ml_dict_paths['outcomes'], index_col=0) ml_dict['outcome_table_binary'] = outcome_table.iloc[:, [0]] if outcome_table.shape[1] == 2: ml_dict['outcome_table_time'] = outcome_table.iloc[:, [1]] # Machine learning dictionary ml_dict['ml'] = load_json(ml_dict_paths['ml']) ml_dict['path_results'] = ml_dict_paths['results'] return ml_dict
[docs] def get_hold_out_set_table(self, ml: Dict, var_id: str, patients_id: List): """ Loads and pre-processes different radiomics tables then combines them to be used for hold-out testing. Args: ml (Dict): The machine learning dictionary containing the information of the machine learning test. var_id (str): String specifying the ID of the radiomics variable in ml. --> Ex: var1 patients_id (List): List of patients of the hold-out set. Returns: pd.DataFrame: Radiomics table for the hold-out set. """ # Loading and pre-processing rad_var_struct = ml['variables'][var_id] rad_tables_holdout = list() for item in rad_var_struct['path'].values(): # Reading the table path_radiomics_csv = item['csv'] path_radiomics_txt = item['txt'] image_type = item['type'] rad_table_holdout = get_radiomics_table(path_radiomics_csv, path_radiomics_txt, image_type, patients_id) rad_tables_holdout.append(rad_table_holdout) # Combine the tables rad_tables_holdout = combine_rad_tables(rad_tables_holdout) rad_tables_holdout.Properties['userData']['flags_processing'] = {} return rad_tables_holdout
[docs] def pre_process_variables(self, ml: Dict, outcome_table_binary: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Loads and pre-processes different radiomics tables from different variable types found in the ml dict. Note: only patients of the training/learning set should be found in this outcome table. Args: ml (Dict): The machine learning dictionary containing the information of the machine learning test. outcome_table_binary (pd.DataFrame): outcome table with binary labels. This table may be used to pre-process some variables with the "FDA" feature set reduction algorithm. Returns: Tuple: Two dict of processed radiomics tables, one dict for training and one for testing (no feature set reduction). """ # Get a list of unique variables found in the ml variables combinations dict variables_id = [s.split('_') for s in ml['variables']['combinations']] variables_id = list(set([x for sublist in variables_id for x in sublist])) # For each variable, load the corresponding radiomics table and pre-process it processed_var_tables, processed_var_tables_test = {var_id : self.pre_process_radiomics_table( ml, var_id, outcome_table_binary ) for var_id in variables_id} return processed_var_tables, processed_var_tables_test
[docs] def pre_process_radiomics_table( self, ml: Dict, var_id: str, outcome_table_binary: pd.DataFrame, patients_train: list ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ For the given variable, this function loads the corresponding radiomics tables and pre-processes them (cleaning, normalization and feature set reduction). Note: Only patients of the training/learning set should be found in the given outcome table. Args: ml (Dict): The machine learning dictionary containing the information of the machine learning test (parameters, options, etc.). var_id (str): String specifying the ID of the radiomics variable in ml. For example: 'var1'. outcome_table_binary (pd.DataFrame): outcome table with binary labels. This table may be used to pre-process some variables with the "FDA" feature set reduction algorithm. patients_train (list): List of patients to use for training. Returns: Tuple[pd.DataFrame, pd.DataFrame]: Two dataframes of processed radiomics tables, one for training and one for testing (no feature set reduction). """ # Initialization patient_ids = list(outcome_table_binary.index) outcome_table_binary_training = outcome_table_binary.loc[patients_train] var_names = ['cleaning_profile', 'normalization', 'reduction_method'] flags_preprocessing = {key: key in ml['variables'][var_id].keys() for key in var_names} flags_preprocessing_test = flags_preprocessing.copy() flags_preprocessing_test['reduction_method'] = False # Pre-processing rad_var_struct = ml['variables'][var_id] rad_tables_learning = list() for item in rad_var_struct['path'].values(): # Loading the table path_radiomics_csv = item['csv'] path_radiomics_txt = item['txt'] image_type = item['type'] rad_table_learning = get_radiomics_table(path_radiomics_csv, path_radiomics_txt, image_type, patient_ids) # Data cleaning if flags_preprocessing['cleaning_profile']: cleaning_dict = ml['datacleaning'][ml['variables'][var_id]['cleaning_profile']]['continuous'] data_cleaner = DataCleaner(**cleaning_dict) # Temp save of properties temp_properties = deepcopy(rad_table_learning.Properties) # Apply data cleaning rad_table_learning = data_cleaner.fit_transform(rad_table_learning) # Re-assign properties rad_table_learning.Properties = temp_properties if rad_table_learning is None: continue # Normalization (ComBat) if flags_preprocessing['normalization']: normalization_method = ml['variables'][var_id]['normalization'] # Some information must be stored to re-apply combat for testing data if 'combat' in normalization_method.lower(): # Training data rad_table_learning.Properties['userData']['normalization'] = dict() rad_table_learning.Properties['userData']['normalization']['original_data'] = dict() rad_table_learning.Properties['userData']['normalization']['original_data']['path_radiomics_csv'] = path_radiomics_csv rad_table_learning.Properties['userData']['normalization']['original_data']['path_radiomics_txt'] = path_radiomics_txt rad_table_learning.Properties['userData']['normalization']['original_data']['image_type'] = image_type rad_table_learning.Properties['userData']['normalization']['original_data']['patient_ids'] = patient_ids if flags_preprocessing['cleaning_profile']: data_cln_method = ml['variables'][var_id]['cleaning_profile'] rad_table_learning.Properties['userData']['normalization']['original_data']['datacleaning_method'] = data_cln_method # Apply ComBat normalization = CombatNormalization() rad_table_learning = normalization.fit_transform(rad_table_learning) # Training data else: raise NotImplementedError(f'Normalization method: {normalization_method} not recognized.') # Save the table rad_tables_learning.append(rad_table_learning) # Seperate training and testing data before feature set reduction rad_tables_testing = deepcopy(rad_tables_learning) rad_tables_training = [] for rad_tab in rad_tables_learning: patients_ids = intersect(patients_train, list(rad_tab.index)) rad_tables_training.append(deepcopy(rad_tab.loc[patients_ids])) # Deepcopy properties temp_properties = list() for rad_tab in rad_tables_testing: temp_properties.append(deepcopy(rad_tab.Properties)) # Feature set reduction (for training data only) if flags_preprocessing['reduction_method']: f_set_reduction_method = ml['variables'][var_id]['reduction_method'] fsr = FSR(f_set_reduction_method) # Apply FDA rad_tables_training = fsr.apply_fsr( ml, rad_tables_training, outcome_table_binary_training, path_save_logging=ml['path_results'] ) # Re-assign properties for i in range(len(rad_tables_testing)): rad_tables_testing[i].Properties = temp_properties[i] del temp_properties # Finalization steps rad_tables_training.Properties['userData']['flags_preprocessing'] = flags_preprocessing rad_tables_testing = combine_rad_tables(rad_tables_testing) rad_tables_testing.Properties['userData']['flags_processing'] = flags_preprocessing_test return rad_tables_training, rad_tables_testing
[docs] def ml_run(self, path_ml: Path, holdout_test: bool = True, method: str = 'auto') -> None: """ This function runs the machine learning test for the ceated experiment. Args: path_ml (Path): Path to the main dictionary containing info about the ml current experiment. holdout_test (bool, optional): Boolean specifying if the hold-out test should be performed. Returns: None. """ # Set up logging file for the batch log_file = os.path.dirname(path_ml) + '/batch.log' logging.basicConfig(filename=log_file, level=logging.INFO, format='%(message)s', filemode='w') # Start the timer batch_start = time.time() logging.info("\n\n********************MACHINE LEARNING RUN********************\n\n") # --> A. Initialization phase # Load the test dictionary and machine learning information ml_dict_paths = load_json(path_ml) # Test information dictionary ml_info_dict = self.__load_ml_info(ml_dict_paths) # Machine learning information dictionary # Machine learning assets patients_train = ml_info_dict['patientsTrain'] patients_test = ml_info_dict['patientsTest'] patients_holdout = load_json(self.path_study / 'patientsHoldOut.json') if holdout_test else None outcome_table_binary = ml_info_dict['outcome_table_binary'] ml = ml_info_dict['ml'] path_results = ml_info_dict['path_results'] ml['path_results'] = path_results # --> B. Machine Learning phase # B.1. Pre-processing features start = time.time() logging.info("\n\n--> PRE-PROCESSING TRAINING VARIABLES") # Not all variables will be used to train the model, only the user-selected variable var_id = str(ml['variables']['varStudy']) # Pre-processing of the radiomics tables/variables processed_training_table, processed_testing_table = self.pre_process_radiomics_table( ml, var_id, outcome_table_binary.copy(), patients_train ) logging.info(f"...Done in {time.time()-start} s") # B.2. Pre-learning initialization # Patient definitions (training and test sets) patient_ids = list(outcome_table_binary.index) patients_train = intersect(intersect(patient_ids, patients_train), processed_training_table.index) patients_test = intersect(intersect(patient_ids, patients_test), processed_testing_table.index) patients_holdout = intersect(patient_ids, patients_holdout) if holdout_test else None # Initializing outcome tables for training and test sets outcome_table_binary_train = outcome_table_binary.loc[patients_train, :] outcome_table_binary_test = outcome_table_binary.loc[patients_test, :] outcome_table_binary_holdout = outcome_table_binary.loc[patients_holdout, :] if holdout_test else None # Serperate variable table for training sets (repetitive but double-checking) var_table_train = processed_training_table.loc[patients_train, :] # Initializing the model settings algorithm = ml['modeling']['method'] if 'method' in ml['modeling'].keys() else method var_importance_threshold = ml['modeling']['var_importance_threshold'] optimize_threshold = ml['modeling']['optimize_threshold'] optimization_metric = ml['modeling']['optimization_metric'] method = ml['modeling']['method'] if 'method' in ml['modeling'].keys() else method use_gpu = ml['modeling']['useGPU'] if 'useGPU' in ml['modeling'].keys() else True seed = ml['modeling']['seed'] if 'seed' in ml['modeling'].keys() else None # B.2. Training the model tstart = time.time() logging.info(f"\n\n--> TRAINING {algorithm.upper()} MODEL FOR VARIABLE {var_id}") # Training the model estimator = Estimator( algorithm=algorithm, ml_config={ 'var_importance_threshold': var_importance_threshold, 'optimize_threshold': optimize_threshold, 'optimization_metric': optimization_metric, 'use_gpu': use_gpu, 'seed': seed }) estimator.fit(var_table_train, outcome_table_binary_train) # Saving the trained model using pickle name_save_model = ml['modeling']['nameSave'] if 'nameSave' in ml['modeling'].keys() else None model_id = name_save_model + '_' + str(ml['variables']['varStudy']) path_model = os.path.dirname(path_results) + '/' + (model_id + '.pickle') estimator.save(path_model) logging.info("{}--> DONE. TOTAL TIME OF LEARNING PROCESS: {:.2f} min".format(" " * 4, (time.time()-tstart) / 60)) # --> C. Testing phase # C.1. Testing the model and computing model response tstart = time.time() logging.info(f"\n\n--> TESTING {algorithm.upper()} MODEL FOR VARIABLE {var_id}") # Preparing the variable table var_table_test = get_ml_test_table(estimator, processed_testing_table) # Getting the model response for training and test sets response_train = estimator.predict_proba(var_table_test.loc[patients_train, :]) response_test = estimator.predict_proba(var_table_test.loc[patients_test, :]) logging.info('{}--> DONE. TOTAL TIME OF LEARNING PROCESS: {:.2f}'.format(" " * 4, (time.time() - tstart)/60)) if holdout_test: # --> D. Holdoutset testing phase # D.1. Prepare holdout test data var_table_all_holdout = self.get_hold_out_set_table(ml, var_id, patients_holdout) # D.2. Testing the model and computing model response on the holdout set tstart = time.time() logging.info(f"\n\n--> TESTING {algorithm.upper()} MODEL FOR VARIABLE {var_id} ON THE HOLDOUT SET") response_holdout = estimator.predict_proba(var_table_all_holdout.loc[patients_holdout, :]) logging.info('{}--> DONE. TOTAL TIME OF LEARNING PROCESS: {:.2f}'.format(" " * 4, (time.time() - tstart)/60)) # E. Computing performance metrics tstart = time.time() # Initialize the Results class result = Results(estimator.estimator_.model_info_, model_id) if holdout_test: run_results = result.to_json( response_train=response_train, response_test=response_test, response_holdout=response_holdout, patients_train=patients_train, patients_test=patients_test, patients_holdout=patients_holdout, outcome_table_binary_train=outcome_table_binary_train, outcome_table_binary_test=outcome_table_binary_test, outcome_table_binary_holdout=outcome_table_binary_holdout ) else: run_results = result.to_json( response_train=response_train, response_test=response_test, patients_train=patients_train, patients_test=patients_test, outcome_table_binary_train=outcome_table_binary_train, outcome_table_binary_test=outcome_table_binary_test, ) logging.info('\n\n--> COMPUTING PERFORMANCE METRICS ... Done in {:.2f} sec'.format(time.time()-tstart)) # F. Saving the results dictionary save_json(path_results, run_results, cls=NumpyEncoder) # Total computing time logging.info("\n\n*********************************************************************") logging.info('{} TOTAL COMPUTATION TIME: {:.2f} hours'.format(" " * 13, (time.time()-batch_start)/3600)) logging.info("*********************************************************************")
[docs] def run_experiment(self, holdout_test: bool = True, method: str = "pycaret") -> None: """ Run the machine learning experiment for each split/run Args: holdout_test (bool, optional): Boolean specifying if the hold-out test should be performed. method (str, optional): String specifying the method to use to train the model. - "pycaret": Use PyCaret to train the model (automatic). - "grid_search": Grid search with cross-validation to find the best parameters. - "random_search": Random search with cross-validation to find the best parameters. Returns: None """ # Initialize the DesignExperiment class experiment = DesignExperiment(self.path_study, self.path_settings, self.experiment_label) # Generate the machine learning experiment path_file_ml_paths = experiment.generate_experiment() # Run the different machine learning tests for the experiment tests_dict = load_json(path_file_ml_paths) # Tests dictionary for run in tests_dict.keys(): self.ml_run(tests_dict[run], holdout_test, method) # Average results of the different splits/runs average_results(self.path_study / f'learn__{self.experiment_label}', save=True) # Analyze the features importance for all the runs feature_importance_analysis(self.path_study / f'learn__{self.experiment_label}')