Source code for prism.emulator._emulator

# -*- coding: utf-8 -*-

"""
Emulator
========
Provides the definition of the base class holding the emulator of the *PRISM*
package, the :class:`~Emulator` class.

"""


# %% IMPORTS
# Built-in imports
from collections import Counter
from itertools import chain
import os
from os import path
from struct import calcsize
from time import time

# Package imports
import e13tools as e13
import h5py
from mlxtend.feature_selection import SequentialFeatureSelector as SFS
from mpi4pyd import MPI
import numpy as np
from numpy.linalg import pinv
from pkg_resources import parse_version
from sklearn.linear_model import LinearRegression as LR
from sklearn.metrics import mean_squared_error as mse
from sklearn.pipeline import Pipeline as Pipeline_sk
from sklearn.preprocessing import PolynomialFeatures as PF
from sortedcontainers import SortedDict as sdict, SortedSet as sset
import threadpoolctl as tpc

# PRISM imports
from prism import __version__
from prism._docstrings import (
    adj_exp_doc, adj_var_doc, def_par_doc, emul_s_seq_doc, eval_doc,
    full_cov_doc, get_emul_i_doc, regr_cov_doc, save_data_doc_e, set_par_doc,
    std_emul_i_doc)
from prism._internal import (
    RequestError, RequestWarning, check_compatibility, check_vals, getCLogger,
    getRLogger, np_array)
from prism.modellink import ModelLink

# All declaration
__all__ = ['Emulator']


# %% GLOBALS
INT_SIZE = 'int%i' % (calcsize('P')*8)          # Default bitsize of an integer


# %% EMULATOR CLASS DEFINITION
# TODO: Implement a system that a non-PRISM error is logged with its emul_s
[docs]class Emulator(object): """ Defines the :class:`~Emulator` base class of the *PRISM* package. Description ----------- The :class:`~Emulator` class is the backbone of the *PRISM* package, holding all tools necessary to construct, load, save and evaluate the emulator of a model. It performs many checks to see if the provided :obj:`~prism.modellink.ModelLink` object is compatible with the current emulator, advises the user on alternatives when certain operations are requested, automatically takes care of distributing emulator systems over MPI ranks and more. Even though the purpose of the :class:`~Emulator` class is to hold only information about the emulator and therefore does not require any details about the provided :obj:`~prism.modellink.ModelLink` object, it will keep track of changes made to it. This is to allow the user to modify the properties of the :class:`~prism.modellink.ModelLink` subclass without causing any desynchronization problems by accident. The :class:`~Emulator` class requires to be linked to an instance of the :class:`~prism.Pipeline` class and will automatically attempt to do so when initialized. By default, this class should only be initialized from within a :obj:`~prism.Pipeline` object. """ # Identify this class as being a default emulator _emul_type = 'default'
[docs] def __init__(self, pipeline_obj, modellink_obj): """ Initialize an instance of the :class:`~Emulator` class. Parameters ---------- pipeline_obj : :obj:`~prism.Pipeline` object The :obj:`~prism.Pipeline` instance this :obj:`~Emulator` instance should be linked to. modellink_obj : :obj:`~prism.modellink.ModelLink` object The :obj:`~prism.modellink.ModelLink` instance that should be linked to `pipeline_obj`. """ # Save the provided Pipeline object self._pipeline = pipeline_obj # Make pointers to MPI properties self._comm = self._pipeline._comm self._size = self._pipeline._size self._rank = self._pipeline._rank self._is_controller = self._pipeline._is_controller self._is_worker = self._pipeline._is_worker self._worker_mode = self._pipeline._worker_mode # Make pointer to File property self._File = self._pipeline._File # Make pointer to prism_dict property self._prism_dict = self._pipeline._prism_dict # Load the emulator and data self._load_emulator(modellink_obj) # Bind this Emulator instance to the supplied Pipeline object self._pipeline._emulator = self
# %% CLASS PROPERTIES # General details @property def emul_type(self): """ str: The type of emulator that is currently loaded. This determines the way in which the :obj:`~prism.Pipeline` instance will treat this :obj:`~Emulator` instance. """ return(self._emul_type) @property def emul_load(self): """ bool: Whether or not a previously constructed emulator is currently loaded. """ return(bool(self._emul_load)) @property def prism_version(self): """ str: The version of *PRISM* that was used to construct the emulator that is currently loaded. """ return(self._prism_version) @property def emul_i(self): """ int: The last emulator iteration that is fully constructed for all emulator systems on this MPI rank. """ return(self._emul_i) @property def ccheck(self): """ list of str: The emulator system components that are still required to complete the construction of an emulator iteration on this MPI rank. The controller rank additionally lists the required components that are emulator iteration specific ('mod_real_set' and 'active_par'). """ return(self._ccheck) @property def n_sam(self): """ int: Number of model evaluation samples that have been/will be used to construct an emulator iteration. """ return(self._n_sam) @property def n_emul_s(self): """ int: Number of emulator systems assigned to this MPI rank. """ return(self._n_emul_s) @property def n_emul_s_tot(self): """ int: Total number of emulator systems assigned to all MPI ranks combined. Only available on the controller rank. """ return(self._n_emul_s_tot) @property def method(self): """ str: The emulation method to use for constructing the emulator. Possible are 'gaussian', 'regression' and 'full'. """ return(self._method) @property def use_mock(self): """ bool: Whether or not mock data has been used for the construction of this emulator instead of actual data. If *True*, changes made to the data in the provided :obj:`~prism.modellink.ModelLink` object are ignored. """ return(bool(self._use_mock)) @property def use_regr_cov(self): """ bool: Whether or not to take into account the regression covariance when calculating the covariance of the emulator, in addition to the Gaussian covariance. If :attr:`~method` == 'gaussian', this bool is not required. If :attr:`~method` == 'regression', this bool is always set to *True*. """ return(bool(self._use_regr_cov)) @property def poly_order(self): """ int: Polynomial order that is considered for the regression process. If :attr:`~method` == 'gaussian' and :attr:`~prism.Pipeline.do_active_anal` is *False*, this number is not required. """ return(self._poly_order) @property def n_cross_val(self): """ int: Number of (k-fold) cross-validations that are used for determining the quality of the regression process. It is set to zero if cross-validations are not used. If :attr:`~method` == 'gaussian' and :attr:`~prism.Pipeline.do_active_anal` is *False*, this number is not required. """ return(self._n_cross_val) @property def active_emul_s(self): """ list of int: The indices of the emulator systems on this MPI rank that are active. """ return(self._active_emul_s) @property def emul_s(self): """ list of int: The indices of the emulator systems that are assigned to this MPI rank. """ return(self._emul_s) @property def emul_s_to_core(self): """ list of lists: List of the indices of the emulator systems that are assigned to every MPI rank. Only available on the controller rank. """ return(self._emul_s_to_core) @property def data_idx_to_core(self): """ list of lists: List of the data identifiers that were assigned to the emulator systems listed in :attr:`~emul_s_to_core`. Only available on the controller rank. """ return(self._data_idx_to_core) # Active Parameters @property def active_par(self): """ list of str: The model parameter names that are considered active. Only available on the controller rank. """ return([[self._modellink._par_name[par] for par in active_par] for active_par in self._active_par]) @property def active_par_data(self): """ dict of lists: The model parameter names that are considered active for every emulator system on this MPI rank. """ # Initialize empty active_par_data_list active_par_data_list = [] # Loop over all emulator iterations for data_idx, active_par_data in zip(self._data_idx, self._active_par_data): # Convert active_par_data from integers to strings active_par_data = [[self._modellink._par_name[par] for par in active_par] for active_par in active_par_data] # Add dict to active_par_data_list for this iteration active_par_data_list.append(dict(zip(data_idx, active_par_data))) # Return it return(active_par_data_list) # Regression @property def rsdl_var(self): """ dict of float: The residual variance of every emulator system on this MPI rank. Obtained from regression process and replaces the Gaussian sigma. Empty if :attr:`~method` == 'gaussian'. """ return([dict(zip(data_idx, rsdl_var)) for data_idx, rsdl_var in zip(self._data_idx, self._rsdl_var)]) @property def poly_terms(self): """ dict of dict: Overview of all polynomial terms with non-zero coefficients in the regression function for every emulator system on this MPI rank. Empty if :attr:`~method` == 'gaussian'. This is basically a human-readable representation of :attr:`~poly_coef` plus :attr:`~poly_powers`. Given its formatting, it is not advised to use this for any operations. """ # Initialize empty poly_term_list poly_terms_list = [] # Loop over all emulator iterations iterator = zip(self._data_idx, self._active_par_data, self._poly_powers, self._poly_coef) for args in iterator: # Convert all poly_powers to their string representations poly_terms_dict = {} for idx, active_par, powers, coefs in zip(*args): poly_terms_dict[idx] = { self._get_poly_term_str(active_par, power): coef for power, coef in zip(powers, coefs)} # Add poly_terms_dict to poly_terms_list poly_terms_list.append(poly_terms_dict) # Return it return(poly_terms_list) @property def poly_coef(self): """ list of :obj:`~numpy.ndarray`: The non-zero coefficients of the polynomial terms in the regression function for every emulator system on this MPI rank. Empty if :attr:`~method` == 'gaussian'. """ return(self._poly_coef) @property def poly_coef_cov(self): """ list of :obj:`~numpy.ndarray`: The covariances of all coefficients in :attr:`~poly_coef` for every emulator system on this MPI rank. Empty if :attr:`~method` == 'gaussian' or :attr:`~use_regr_cov` is *False*. """ return(self._poly_coef_cov) @property def poly_powers(self): """ list of :obj:`~numpy.ndarray`: The powers of all polynomial terms with non-zero coefficients in the regression function for every emulator system on this MPI rank. Empty if :attr:`~method` == 'gaussian'. """ return(self._poly_powers) @property def poly_idx(self): """ list of :obj:`~numpy.ndarray`: The indices of all polynomial terms with non-zero coefficients in the regression function for every emulator system on this MPI rank. Empty if :attr:`~method` == 'gaussian'. """ return(self._poly_idx) # Emulator Data @property def sam_set(self): """ list of dict: The model evaluation samples that have been/will be used to construct the specified emulator iteration. """ return([[sdict(zip(self._modellink._par_name, par_set)) for par_set in sam_set] for sam_set in self._sam_set]) @property def emul_space(self): """ dict of :obj:`~numpy.ndarray`: The boundaries of the hypercube that encloses the parameter space in which the specified emulator iteration is defined. This is always equal to the plausible space of the previous iteration. """ return([sdict(zip(self._modellink._par_name, emul_space)) for emul_space in self._emul_space]) @property def mod_set(self): """ dict of :obj:`~numpy.ndarray`: The model outputs corresponding to the samples in :attr:`~sam_set` for every emulator system on this MPI rank. """ return([dict(zip(data_idx, mod_set)) for data_idx, mod_set in zip(self._data_idx, self._mod_set)]) @property def cov_mat_inv(self): """ dict of :obj:`~numpy.ndarray`: The inverses of the covariance matrices for every emulator system on this MPI rank. """ return([dict(zip(data_idx, cov_mat_inv)) for data_idx, cov_mat_inv in zip(self._data_idx, self._cov_mat_inv)]) @property def exp_dot_term(self): """ dict of :obj:`~numpy.ndarray`: The second expectation adjustment dot-term values of all model evaluation samples for every emulator system on this MPI rank. """ return([dict(zip(data_idx, exp_dot_term)) for data_idx, exp_dot_term in zip(self._data_idx, self._exp_dot_term)]) # Covariances @property def sigma(self): """ float: Value of the Gaussian sigma. If :attr:`~method` != 'gaussian', this value is not required, since it is obtained from the regression process instead. """ return(self._sigma) @property def l_corr(self): """ dict of float: The Gaussian correlation lengths for all model parameters, which is defined as the maximum distance between two values of a specific model parameter within which the Gaussian contribution to the correlation between the values is still significant. """ return(sdict(zip(self._modellink._par_name, self._l_corr))) @property def f_infl(self): """ float: The residual variance inflation factor. The prior variance of all known samples is inflated by this factor multiplied with :attr:`~rsdl_var` (regression) or :attr:`~sigma` (Gaussian). If this value is zero, no variance inflation is performed. """ return(self._f_infl) @property def act_rsdl_var(self): """ dict of float: The active contribution of the residual variance of every emulator system on this MPI rank. Obtained from either :attr:`~rsdl_var` (regression) or :attr:`~sigma` (Gaussian). """ return([dict(zip(data_idx, act_rsdl_var)) for data_idx, act_rsdl_var in zip(self._data_idx, self._act_rsdl_var)]) @property def pas_rsdl_var(self): """ dict of float: The passive contribution of the residual variance of every emulator system on this MPI rank. If :attr:`~f_infl` is not zero, this also includes the inflated residual variance value. Obtained from either :attr:`~rsdl_var` (regression) or :attr:`~sigma` (Gaussian). """ return([dict(zip(data_idx, pas_rsdl_var)) for data_idx, pas_rsdl_var in zip(self._data_idx, self._pas_rsdl_var)]) # %% GENERAL CLASS METHODS # This function checks if provided emul_i can be requested
[docs] @e13.docstring_substitute(emul_i=get_emul_i_doc) def _get_emul_i(self, emul_i, cur_iter=True): """ Checks if the provided emulator iteration `emul_i` can be requested or replaces it if *None* was provided. This method requires all MPI ranks to call it simultaneously. Parameters ---------- %(emul_i)s Optional -------- cur_iter : bool Bool determining whether the current (*True*) or the next (*False*) emulator iteration is requested. Returns ------- emul_i : int The requested emulator iteration that passed the check. """ # Log that emul_i is being selected logger = getCLogger('INIT') logger.info("Checking requested emulator iteration %s." % (emul_i)) # Determine the emul_i that is constructed on all ranks global_emul_i = self._comm.allreduce(self._emul_i, op=MPI.MIN) # Check if provided emul_i is correct/allowed # If the current iteration is requested if cur_iter: if(emul_i == 0 or self._emul_load == 0 or global_emul_i == 0): err_msg = "Emulator is not built yet!" e13.raise_error(err_msg, RequestError, logger) elif emul_i is None: emul_i = global_emul_i elif not(1 <= emul_i <= global_emul_i): err_msg = ("Requested emulator iteration %i does not exist!" % (emul_i)) e13.raise_error(err_msg, RequestError, logger) else: emul_i = check_vals(emul_i, 'emul_i', 'pos', 'int') # If the next iteration is requested else: if emul_i is None: emul_i = global_emul_i+1 elif not(1 <= emul_i <= global_emul_i+1): err_msg = ("Requested emulator iteration %i cannot be " "requested!" % (emul_i)) e13.raise_error(err_msg, RequestError, logger) else: emul_i = check_vals(emul_i, 'emul_i', 'pos', 'int') # Do some logging logger.info("Requested emulator iteration set to %i." % (emul_i)) # Return correct emul_i return(emul_i)
# Checks the compatibility with a specific version and warns if it fails
[docs] def _check_future_compat(self, req_version, dep_version): """ Checks if the version of this emulator is compatible with the provided `req_version`. If not, raises a :class:`~FutureWarning`, indicating that the given `dep_version` will no longer support this emulator. Parameters ---------- req_version : str The version in which an incompatible change was introduced. dep_version : str The version in which the backward compatibility for this change will be removed. """ # Make logger logger = getCLogger('FUTURE') # Check if this emulator's version is compatible with req_version if e13.compare_versions(self._prism_version, req_version): # If so, return True return(True) else: # Transform req_version into its base version req_version = parse_version(req_version).base_version # Make sure that a list of raised warnings exists self._warn_list = getattr(self, '_warn_list', []) # If this warning has not been raised before, raise it if (req_version, dep_version) not in self._warn_list: # Raise warning warn_msg = ("The provided emulator was constructed with a " "version earlier than v%s (v%s). Starting in v%s, " "this emulator will no longer be compatible." % (req_version, self._prism_version, dep_version)) e13.raise_warning(warn_msg, FutureWarning, logger, 4) # Save that this warning has been raised before self._warn_list.append((req_version, dep_version)) # Return False return(False)
# Returns the hypercube that encloses defined emulator space
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _get_emul_space(self, emul_i): """ Returns the boundaries of the hypercube that encloses the parameter space in which the provided emulator iteration `emul_i` is defined. Parameters ---------- %(emul_i)s Returns ------- emul_space : 2D :obj:`~numpy.ndarray` object The requested hypercube boundaries. If `emul_i` == 1, this is equal to the defined model parameter space. Note ---- The parameter space over which an emulator iteration is defined is always equal to the plausible space of the previous iteration. """ # If emul_i is 1, the defined emulator space is model parameter space if(emul_i == 1): return(self._modellink._par_rng.copy()) # Else, it is defined as the plausible space of the previous iteration else: # Obtain the sam_set of this emulator iteration sam_set = self._sam_set[emul_i] # Determine emul_space emul_space = self._modellink._get_sam_space(sam_set) # Return emul_space return(emul_space)
# Creates a new emulator file and writes all information to it
[docs] def _create_new_emulator(self): """ Creates a new master HDF5-file that holds all the information of a new emulator and writes all important emulator details to it. Afterwards, resets all loaded emulator data and prepares the HDF5-file and emulator for the construction of the first emulator iteration. Generates --------- A new master HDF5-file 'prism.hdf5' contained in the working directory specified in the :obj:`~prism.Pipeline` instance, holding all information required to construct the first iteration of the emulator. """ # Start logger logger = getCLogger('INIT') logger.info("Creating a new emulator in working directory %r." % (self._pipeline._working_dir)) # Clean-up all emulator system files self._cleanup_emul_files(1) # Set parameters from provided parameter file mock_par = self._set_parameters() # Set emul_load to 0 self._emul_load = 0 # Controller creating the master file if self._is_controller: # Create master hdf5-file with self._File('w', None) as file: # Save all relevant emulator parameters to hdf5 file.attrs['sigma'] = self._sigma file.attrs['l_corr'] = self._l_corr file.attrs['f_infl'] = self._f_infl file.attrs['method'] = self._method.encode('ascii', 'ignore') file.attrs['use_regr_cov'] = bool(self._use_regr_cov) file.attrs['poly_order'] = self._poly_order file.attrs['n_cross_val'] = self._n_cross_val file.attrs['modellink_name'] =\ self._modellink._name.encode('ascii', 'ignore') file.attrs['prism_version'] =\ self._prism_version.encode('ascii', 'ignore') file.attrs['emul_type'] = self._emul_type.encode('ascii', 'ignore') file.attrs['use_mock'] = bool(self._use_mock) # Check if mock data is requested if self._use_mock: # If so, temporarily save ModelLink properties as Emulator props # This is to make sure that one version of get_md_var() is required self._n_data[0] = self._modellink._n_data self._data_val[0] = self._modellink._data_val self._data_err[0] = self._modellink._data_err self._data_spc[0] = self._modellink._data_spc self._data_idx[0] = self._modellink._data_idx # Call get_mock_data() self._pipeline._get_mock_data(mock_par) # Controller only if self._is_controller: # Open master hdf5-file with self._File('r+', None) as file: # Save mock_data to hdf5 file.attrs['mock_par'] = self._modellink._par_est # Prepare first emulator iteration for construction self._prepare_new_iteration(1) # Logging again logger.info("Finished creating new emulator.")
# This function cleans up all the emulator files
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _cleanup_emul_files(self, emul_i): """ Opens all emulator HDF5-files and removes the provided emulator iteration `emul_i` and subsequent iterations from them. Also removes any related projection figures that have default names. If `emul_i` == 1, all emulator HDF5-files are removed instead. Parameters ---------- %(emul_i)s """ # Do some logging logger = getCLogger('CLEAN-UP') logger.info("Cleaning up emulator HDF5-files, starting at emulator" " iteration %i." % (emul_i)) # Workers wait for controller to finish clean-up if self._is_worker: # MPI Barrier self._comm.Barrier() # Set emul_i to the last iteration still present in files self._emul_i = emul_i-1 # Return return # Loop over all emulator system HDF5-files for s in range(0, self._n_emul_s_tot): # Open emulator system HDF5-file with self._File('r+', s) as file: # Save filename filename = file.filename # Loop over all requested iterations to be removed for i in range(emul_i, self._emul_i+2): # Try to remove it, skip if not possible try: del file['%i' % (i)] except KeyError: pass # If emul_i is 1, remove the entire emulator system HDF5-file if(emul_i == 1): os.remove(filename) # Open emulator master HDF5-file if it exists if self._n_emul_s_tot: with self._File('r+', None) as file: # Loop over all requested iterations to be removed for i in range(emul_i, self._emul_i+2): # Check if proj_hcube exists try: file['%i/proj_hcube' % (i)] except KeyError: pass else: # If so, get names of available hcubes hcube_names = list(file['%i/proj_hcube' % (i)].keys()) # Try to remove figures for which data is available for hcube in hcube_names: fig_path, fig_path_s =\ self._pipeline._Projection__get_fig_path(hcube) if path.exists(fig_path): os.remove(fig_path) if path.exists(fig_path_s): os.remove(fig_path_s) # Try to remove the iteration, skip if not possible try: del file['%i' % (i)] except KeyError: pass # If emul_i is 1, remove the entire emulator master HDF5-file if(emul_i == 1): os.remove(self._pipeline._hdf5_file) # MPI Barrier self._comm.Barrier() # Set emul_i to the last iteration still present in files self._emul_i = emul_i-1 # Do more logging if(emul_i == 1): logger.info("Finished removing emulator files.") else: logger.info("Finished cleaning up emulator files.")
# This function reads in data_idx parts, combines them and returns it
[docs] def _read_data_idx(self, emul_s_group): """ Reads in and combines the parts of the data point identifier that is assigned to the provided `emul_s_group`. Parameters ---------- emul_s_group : :obj:`~h5py.Group` object The HDF5-group from which the data point identifier needs to be read in. Returns ------- data_idx : tuple of {int, float, str} The combined data point identifier. """ # Obtain list of attribute keys that contain data_idx parts idx_keys = [key for key in emul_s_group.attrs.keys() if key[:8] == 'data_idx'] # Determine the number of parts idx_len = len(idx_keys) # If there is a single part, save it instead of a tuple if(idx_len == 1): # If part is an encoded string, decode and save it if isinstance(emul_s_group.attrs['data_idx'], bytes): data_idx = emul_s_group.attrs['data_idx'].decode('utf-8') # Else, save it normally else: data_idx = emul_s_group.attrs['data_idx'] # If there are multiple parts, add all of them to a list else: # Sort parts on their index number idx_keys = sorted(idx_keys, key=lambda x: int(x[9:])) # Initialize empty data_idx list data_idx = [] # Loop over all parts for key in idx_keys: # If part is an encoded string, decode and save it if isinstance(emul_s_group.attrs[key], bytes): idx_str = emul_s_group.attrs[key].decode('utf-8') data_idx.append(idx_str) # Else, save it normally else: data_idx.append(emul_s_group.attrs[key]) # Convert data_idx from list to tuple data_idx = tuple(data_idx) # Return data_idx return(data_idx)
# This function splits data_idx into parts and writes it to HDF5
[docs] def _write_data_idx(self, emul_s_group, data_idx): """ Splits a given `data_idx` up into individual parts and saves it as an attribute to the provided `emul_s_group`. Parameters ---------- emul_s_group : :obj:`~h5py.Group` object The HDF5-group to which the data point identifier needs to be saved. data_idx : tuple of {int, float, str} The data point identifier to be saved. """ # If data_idx contains multiple parts if isinstance(data_idx, tuple): # Obtain list of attribute keys required for the data_idx parts idx_keys = ['data_idx_%i' % (i) for i in range(len(data_idx))] # Loop over all parts for key, idx in zip(idx_keys, data_idx): # If part is a string, encode and save it if isinstance(idx, str): emul_s_group.attrs[key] = idx.encode('ascii', 'ignore') # Else, save it normally else: emul_s_group.attrs[key] = idx # If data_idx contains a single part, save it else: # If part is a string, encode and save it if isinstance(data_idx, str): emul_s_group.attrs['data_idx'] =\ data_idx.encode('ascii', 'ignore') # Else, save it normally else: emul_s_group.attrs['data_idx'] = data_idx
# This function returns a list of all data_idx assigned to cores
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _get_data_idx_flat(self, emul_i): """ Obtains the data point identifiers `data_idx` that are assigned to each MPI rank for the provided emulator iteration `emul_i` and returns them as a single list. The number of data points assigned to each MPI rank is also returned. Parameters ---------- %(emul_i)s Returns ------- n_data_rank : list of int The number of data points each MPI rank has assigned to it for given `emul_i`. These values can be used to split `data_idx_flat` up again into MPI rank-specific data point identifiers. data_idx_flat : list of tuples The data point identifiers that are assigned to an MPI rank for given `emul_i`. """ # Obtain the data_idx tuples on every rank data_idx_ranks = list(map(e13.delist, self._data_idx_to_core[emul_i])) # Flatten data_idx_ranks into a single list data_idx_flat = list(chain(*data_idx_ranks)) # Calculate the number of data points assigned per core n_data_rank = list(map(len, data_idx_ranks)) # Return both return(n_data_rank, data_idx_flat)
# This function matches data points with those in a previous iteration # TODO: Write this function and _assign_emul_s simpler and dependent
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _assign_data_idx(self, emul_i): """ Determines the emulator system each data point in the provided emulator iteration `emul_i` should be assigned to, in order to make sure that recurring data points have the same emulator system index as in the previous emulator iteration. If multiple options are possible, data points are assigned such to spread them as much as possible. Parameters ---------- %(emul_i)s Returns ------- data_to_emul_s : list of int The index of the emulator system that each data point should be assigned to. n_emul_s : int The total number of active and passive emulator systems there will be in the provided emulator iteration. Examples -------- If the number of data points is less than the previous iteration: >>> emul_i = 2 >>> self._data_idx[emul_i-1] ['A', 'B', 'C', 'D', 'E'] >>> self._modellink._data_idx ['B', 'F', 'G', 'E'] >>> self._assign_data_idx(emul_i) ([1, 3, 2, 4], 5) If the number of data points is more than the previous iteration: >>> emul_i = 2 >>> self._data_idx[emul_i-1] ['A', 'B', 'C', 'D', 'E'] >>> self._modellink._data_idx ['B', 'F', 'G', 'E', 'A', 'C'] >>> self._assign_data_idx(emul_i) ([1, 5, 3, 4, 0, 2], 6) If there is no previous iteration: >>> emul_i = 1 >>> self._data_idx[emul_i-1] [] >>> self._modellink._data_idx ['B', 'F', 'G', 'E', 'A', 'C'] >>> self._assign_data_idx(emul_i) ([5, 4, 3, 2, 1, 0], 6) """ # Do some logging logger = getCLogger('INIT') logger.info("Assigning model comparison data points to emulator " "systems for emulator iteration %i." % (emul_i)) # Create empty Counter for number of emulator system occurances emul_s_counter = Counter() # Calculate the total number of active and passive emulator systems if not self._emul_load: # If no emulator is loaded, this is always n_data n_emul_s = self._modellink._n_data else: # Else, it is either n_data or n_emul_s_tot n_emul_s = max(self._modellink._n_data, self._n_emul_s_tot) # Create some empty lists active_emul_s_list = [[]] data_idx_list = [] # Open hdf5-file with self._File('r', None) as file: # Loop over all previous iterations for i in range(1, emul_i): # Obtain the active emulator systems for this iteration active_emul_s_list.append( [int(key[5:]) for key in file['%i' % (i)].keys() if key[:5] == 'emul_' and key[5:] != 'space']) # Loop over all active emulator systems in the last iteration for emul_s in active_emul_s_list[-1]: data_set = file['%i/emul_%i' % (emul_i-1, emul_s)] # Read in all data_idx parts and combine them data_idx_list.append(self._read_data_idx(data_set)) # Set number of occurances for all emulator systems to 0 for emul_s in range(n_emul_s): emul_s_counter[emul_s] = 0 # Count how many times each emulator system has already been active for active_emul_s in active_emul_s_list: emul_s_counter.update(active_emul_s) # Create empty data_to_emul_s list data_to_emul_s = [None]*self._modellink._n_data # Assign all recurring data points to the correct emulator systems for i, data_idx in enumerate(self._modellink._data_idx): # Check for every data_idx if it existed in the previous iteration try: emul_s = active_emul_s_list[-1][data_idx_list.index(data_idx)] except ValueError: pass else: # If it existed, assign data_idx to corresponding emul_s data_to_emul_s[i] = emul_s # Also remove emul_s from emul_s_counter emul_s_counter.pop(emul_s) # Assign all 'new' data points to the empty emulator systems for i in range(self._modellink._n_data): # Check if this data point has already been assigned if data_to_emul_s[i] is None: # If not, check which emulator system is the least common emul_s = emul_s_counter.most_common()[-1][0] # Assign data point to this emulator system and remove it data_to_emul_s[i] = emul_s emul_s_counter.pop(emul_s) # More logging logger.info("Finished assigning data points to emulator systems.") # Return data_to_emul_s and n_emul_s return(data_to_emul_s, n_emul_s)
# This function determines how to assign emulator systems to MPI ranks # TODO: Might want to include the size (n_sam) of every system as well # TODO: May also want to include low-level MPI distribution
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _assign_emul_s(self, emul_i): """ Determines which emulator systems (files) should be assigned to which MPI rank in order to balance the number of active emulator systems on every rank for every iteration up to the provided emulator iteration `emul_i`. If multiple choices can achieve this, the emulator systems are automatically spread out such that the total number of active emulator systems on a single rank is also balanced as much as possible. Parameters ---------- %(emul_i)s Returns ------- emul_s_to_core : list of lists A list containing the emulator systems that have been assigned to the corresponding MPI rank by the controller. Notes ----- Currently, this function only uses high-level MPI. Additional speed can be obtained by also implementing low-level MPI, which will potentially be done in the future. """ # Start logging logger = getCLogger('INIT') logger.info("Determining emulator system assignments up to emulator " "iteration %i for available MPI ranks." % (emul_i)) # Create empty list of active emulator systems active_emul_s_list = [[]] # Open hdf5-file with self._File('r', None) as file: logger.info("Determining active emulator systems in every " "emulator iteration.") # Determine the active emulator systems in every iteration for i in range(1, emul_i+1): active_emul_s_list.append( [int(key[5:]) for key in file['%i' % (i)].keys() if key[:5] == 'emul_' and key[5:] != 'space']) # Determine number of active emulator systems in each iteration n_active_emul_s = [[i, len(active_emul_s)] for i, active_emul_s in enumerate(active_emul_s_list[:emul_i+1])] iter_size = sorted(n_active_emul_s, key=lambda x: x[1]) # Create empty emul_s_to_core list emul_s_to_core = [[] for _ in range(self._size)] # Create empty Counter for total number of assigned systems core_cntr = Counter() # Create empty Counter for total number of system occurances emul_s_cntr = Counter() # Determine how many times a specific emulator system is active for active_emul_s in active_emul_s_list: emul_s_cntr.update(active_emul_s) # Set the number of assigned emulator systems for each core to zero for rank in range(self._size): core_cntr[rank] = 0 # Create empty list holding all assigned emulator systems emul_s_chosen = [] # Loop over all iterations, from smallest to largest for i, size in iter_size: # Set the number of assigned systems in this iteration to zero iter_core_cntr = [0 for _ in range(self._size)] # Create empty Counter for number of system occurances that are # also in this iteration iter_emul_s_cntr = Counter() # Fill that counter with systems that are not assigned yet for emul_s in active_emul_s_list[i]: # If this system is not assigned yet, copy its size if emul_s not in emul_s_chosen: iter_emul_s_cntr[emul_s] = emul_s_cntr[emul_s] # Check if certain systems have already been assigned for j, emul_s_list in enumerate(emul_s_to_core): iter_core_cntr[j] += emul_s_list.count(emul_s) # Set the minimum number of assigned systems for a core to 0 min_count = 0 # While not all emulator systems in this iteration are assigned while(sum(iter_core_cntr) != size): # Determine cores that have minimum number of assignments min_cores = [j for j, num in enumerate(iter_core_cntr) if( num == min_count)] # If no core has this minimum size, increase it by 1 if not min_cores: min_count += 1 # If one core has this number, assign system with lowest # number of occurances to it and remove it from the list elif(len(min_cores) == 1): core = min_cores[0] emul_s, emul_size = iter_emul_s_cntr.most_common()[-1] core_cntr[core] += emul_size emul_s_chosen.append(emul_s) emul_s_to_core[core].append(emul_s) iter_core_cntr[core] += 1 iter_emul_s_cntr.pop(emul_s) min_count += 1 # If more than one core has this number, determine the core # that has the lowest total number of assigned systems and # assign the system with the highest number of occurances # to it else: emul_s, emul_size = iter_emul_s_cntr.most_common()[0] core_sizes = [[j, core_cntr[j]] for j in min_cores] core_lowest = min(core_sizes, key=lambda x: x[1])[0] core_cntr[core_lowest] += emul_size emul_s_chosen.append(emul_s) emul_s_to_core[core_lowest].append(emul_s) iter_core_cntr[core_lowest] += 1 iter_emul_s_cntr.pop(emul_s) # Log that assignments have been determined logger.info("Finished determining emulator system assignments.") # Return emul_s_to_core return(emul_s_to_core)
# Prepares the emulator for a new iteration
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _prepare_new_iteration(self, emul_i): """ Prepares the emulator for the construction of a new iteration `emul_i`. Checks if this iteration can be prepared or if it has been prepared before, and acts accordingly. Parameters ---------- %(emul_i)s Returns ------- reload : bool Bool indicating whether or not the controller rank of the :obj:`~prism.Pipeline` instance needs to reload its data. Generates --------- A new group in the master HDF5-file with the emulator iteration as its name, containing subgroups corresponding to all emulator systems that will be used in this iteration. Notes ----- Preparing an iteration that has been prepared before, causes that and all subsequent iterations of the emulator to be deleted. A check is carried out to see if it was necessary to reprepare the requested iteration and a warning is given if this check fails. """ # Logger logger = getCLogger('EMUL_PREP') logger.info("Preparing emulator iteration %i for construction." % (emul_i)) # Check if new iteration can be constructed logger.info("Checking if emulator iteration can be prepared.") if(emul_i == 1): # Set reload flag to 1 reload = 1 elif not(1 <= emul_i-1 <= self._emul_i): err_msg = ("Preparation of emulator iteration %i is only available" " when all previous iterations exist!" % (emul_i)) e13.raise_error(err_msg, RequestError, logger) elif(emul_i-1 == self._emul_i): # Set reload flag to 0 reload = 0 else: logger.info("Emulator iteration %i already exists." % (emul_i)) # Check if repreparation was actually necessary diff_flag = 1 for i, idx in enumerate(self._modellink._data_idx): try: j = self._data_idx[emul_i].index(idx) except ValueError: break else: if(self._data_val[emul_i][j] != self._modellink._data_val[i]): break if(self._data_err[emul_i][j] != self._modellink._data_err[i]): break if(self._data_spc[emul_i][j] != self._modellink._data_spc[i]): break # If not, set diff_flag to 0 else: diff_flag = 0 # Gather the diff_flags on the controller diff_flag = self._comm.reduce(diff_flag, MPI.MAX, 0) # If all diff_flags were 0 and no n_data diff, give out a warning if(self._is_controller and not diff_flag and (self._n_data_tot[emul_i] == self._modellink._n_data)): warn_msg = ("No differences in model comparison data detected." "\nUnless this repreparation was intentional, " "using the 'analyze' method of the Pipeline class " "is much faster for reanalyzing the emulator with " "new implausibility analysis parameters.") e13.raise_warning(warn_msg, RequestWarning, logger, 3) # Set reload flag to 1 reload = 1 # Clean-up all emulator files if emul_i is not 1 if(emul_i != 1): self._cleanup_emul_files(emul_i) # Controller preparing the emulator iteration if self._is_controller: # Assign data points to emulator systems data_to_emul_s, n_emul_s = self._assign_data_idx(emul_i) # Open hdf5-file with self._File('r+', None) as file: # Make group for emulator iteration group = file.create_group(str(emul_i)) # Save the number of data points group.attrs['n_data'] = self._modellink._n_data # Create an empty data set for statistics as attributes group.create_dataset('statistics', data=h5py.Empty(float)) # Save the total number of active and passive emulator systems group.attrs['n_emul_s'] = n_emul_s # Create groups for all data points logger.info("Preparing emulator system files.") for i, emul_s in enumerate(data_to_emul_s): with self._File('a', emul_s) as file_i: # Make iteration group for this emulator system data_set = file_i.create_group(str(emul_i)) # Save data value, errors and space to this system data_set.attrs['data_val'] =\ self._modellink._data_val[i] data_set.attrs['data_err'] =\ self._modellink._data_err[i] data_set.attrs['data_spc'] =\ self._modellink._data_spc[i].encode('ascii', 'ignore') # Save data_idx in portions to make it HDF5-compatible self._write_data_idx(data_set, self._modellink._data_idx[i]) # Create external link between file_i and master file group['emul_%i' % (emul_s)] = h5py.ExternalLink( path.basename(file_i.filename), str(emul_i)) # MPI Barrier self._comm.Barrier() # All ranks reload the emulator systems to allow for reassignments self._emul_i = emul_i self._emul_load = 1 self._load_data(emul_i) # Logging logger.info("Finished preparing emulator iteration.") # Return the result return(reload)
# This function constructs the emulator iteration emul_i
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _construct_iteration(self, emul_i): """ Constructs the emulator iteration corresponding to the provided `emul_i`, by performing the given emulation method and pre-calculating the prior expectation and variance values of the used model evaluation samples. Parameters ---------- %(emul_i)s Generates --------- All data sets that are required to evaluate the emulator at the constructed iteration. """ # Save current time on controller if self._is_controller: start_time = time() # Get the emul_s_seq emul_s_seq = self._active_emul_s[emul_i] # Determine active parameters ccheck_active_par = [emul_s for emul_s in emul_s_seq if 'active_par_data' in self._ccheck[emul_i][emul_s]] if ccheck_active_par: self._get_active_par(emul_i, ccheck_active_par) # Check if regression is required if(self._method in ('regression', 'full')): # Perform regression ccheck_regression = [emul_s for emul_s in emul_s_seq if 'regression' in self._ccheck[emul_i][emul_s]] if ccheck_regression: self._do_regression(emul_i, ccheck_regression) # Determine residual variance ccheck_rsdl_var = [emul_s for emul_s in emul_s_seq if 'rsdl_var' in self._ccheck[emul_i][emul_s]] if ccheck_rsdl_var: self._get_rsdl_var(emul_i, ccheck_rsdl_var) # Calculate the covariance matrices of sam_set # TODO: Implement system that, if needed, calculates cov_mat in seq # This is to avoid requiring massive amounts of RAM during calculation ccheck_cov_mat = [emul_s for emul_s in emul_s_seq if 'cov_mat' in self._ccheck[emul_i][emul_s]] if ccheck_cov_mat: self._get_cov_matrix(emul_i, ccheck_cov_mat) # Calculate the second dot-term for the adjusted expectation ccheck_exp_dot_term = [emul_s for emul_s in emul_s_seq if 'exp_dot_term' in self._ccheck[emul_i][emul_s]] if ccheck_exp_dot_term: self._get_exp_dot_term(emul_i, ccheck_exp_dot_term) # If a worker is finished, set current emul_i to constructed emul_i if self._is_worker: self._emul_i = emul_i # MPI Barrier self._comm.Barrier() # If everything is done, gather the total set of active parameters active_par_data = self._comm.gather(self._active_par_data[emul_i], 0) # Allow the controller to save them if self._is_controller and 'active_par' in self._ccheck[emul_i]: active_par = sset() for active_par_rank in active_par_data: active_par.update(*active_par_rank) self._save_data(emul_i, None, { 'active_par': np_array(active_par)}) # Set current emul_i to constructed emul_i self._emul_i = emul_i # Save time difference and communicator size self._pipeline._save_statistics(emul_i, { 'emul_construct_time': ['%.2f' % (time()-start_time), 's'], 'MPI_comm_size_cons': ['%i' % (self._size), '']}) # MPI Barrier self._comm.Barrier()
# This is function 'E_D(f(x'))' # This function gives the adjusted emulator expectation value back
[docs] @e13.docstring_append(adj_exp_doc) def _get_adj_exp(self, emul_i, emul_s_seq, par_set, cov_vec): # Obtain prior expectation value of par_set prior_exp_par_set = self._get_prior_exp(emul_i, emul_s_seq, par_set) # Create empty adj_exp_val adj_exp_val = np.zeros(len(emul_s_seq)) # Calculate the adjusted emulator expectation value at given par_set for i, emul_s in enumerate(emul_s_seq): adj_exp_val[i] = prior_exp_par_set[i] +\ cov_vec[i].T @ self._exp_dot_term[emul_i][emul_s] # Return it return(adj_exp_val)
# This is function 'Var_D(f(x'))' # This function gives the adjusted emulator variance value back
[docs] @e13.docstring_append(adj_var_doc) def _get_adj_var(self, emul_i, emul_s_seq, par_set, cov_vec): # Obtain prior variance value of par_set prior_var_par_set = self._get_cov(emul_i, emul_s_seq, par_set, par_set) # Create empty adj_var_val adj_var_val = np.zeros(len(emul_s_seq)) # Calculate the adjusted emulator variance value at given par_set for i, emul_s in enumerate(emul_s_seq): adj_var_val[i] = prior_var_par_set[i] -\ cov_vec[i].T @ self._cov_mat_inv[emul_i][emul_s] @ cov_vec[i] # Return it return(adj_var_val)
# This function evaluates the emulator at a given emul_i and par_set and # returns the adjusted expectation and variance values # TODO: Take sam_set instead of par_set?
[docs] @e13.docstring_append(eval_doc) def _evaluate(self, emul_i, par_set): # Obtain active emulator systems for this iteration emul_s_seq = self._active_emul_s[emul_i] # Calculate the covariance vector for this par_set cov_vec = self._get_cov(emul_i, emul_s_seq, par_set, None) # Calculate the adjusted expectation and variance values adj_exp_val = self._get_adj_exp(emul_i, emul_s_seq, par_set, cov_vec) adj_var_val = self._get_adj_var(emul_i, emul_s_seq, par_set, cov_vec) # Make sure that adj_var_val cannot drop below zero np.maximum(0, adj_var_val, out=adj_var_val) # Return adj_exp_val and adj_var_val return(adj_exp_val, adj_var_val)
# This function extracts the set of active parameters # TODO: Write code cleaner, if possible
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc, emul_s_seq=emul_s_seq_doc) def _get_active_par(self, emul_i, emul_s_seq): """ Determines the active parameters to be used for every emulator system listed in `emul_s_seq` in the provided emulator iteration `emul_i`. Uses backwards stepwise elimination to determine the set of active parameters. The polynomial order that is used in the stepwise elimination depends on :attr:`~poly_order`. Parameters ---------- %(emul_i)s %(emul_s_seq)s Generates (for every emulator system) ------------------------------------- active_par_data : 1D :obj:`~numpy.ndarray` object Array containing the indices of all the parameters that are active in the emulator iteration `emul_i`. """ # Log that active parameters are being determined logger = getRLogger('ACTIVE_PAR') logger.info("Determining active parameters.") # Loop over all emulator systems and determine active parameters for emul_s in emul_s_seq: # Initialize active parameters data set active_par_data = sset() # Check if previously active parameters must be active again if(self._pipeline._freeze_active_par and emul_s in self._active_emul_s[emul_i-1] and self._data_idx[emul_i-1][emul_s] == self._data_idx[emul_i][emul_s]): active_par_data.update(self._active_par_data[emul_i-1][emul_s]) # Check if active parameters analysis has been requested if not self._pipeline._do_active_anal: # If not requested, then save all potentially active parameters active_par_data.update(self._pipeline._pot_active_par) # If requested, perform a sequential backward stepwise regression else: # Obtain frozen+potentially active parameters list frz_pot_par = sset(active_par_data) frz_pot_par.update(self._pipeline._pot_active_par) frz_pot_par = list(frz_pot_par) frz_pot_idx = list(range(len(frz_pot_par))) # Obtain non-frozen potentially active parameters non_frz_par = [par for par in self._pipeline._pot_active_par if par not in active_par_data] non_frz_idx = [frz_pot_par.index(par) for par in non_frz_par] # If non_frz_par has at least 1 element, carry out analysis if non_frz_par: # Create SequentialFeatureSelector object sfs_obj = SFS(LR(), k_features='parsimonious', forward=False, floating=False, scoring='r2', cv=self._n_cross_val) # Obtain sam_set of frz_pot_par frz_pot_sam_set = self._sam_set[emul_i][:, frz_pot_par] # Obtain polynomial terms of frz_pot_sam_set pf_obj = PF(self._poly_order, include_bias=False) frz_pot_poly_terms = pf_obj.fit_transform(frz_pot_sam_set) # Perform linear regression with linear terms only sfs_obj.fit(frz_pot_sam_set, self._mod_set[emul_i][emul_s]) # Extract active parameters due to linear significance act_idx_lin = list(sfs_obj.k_feature_idx_) # Get passive non-frozen parameters in linear significance pas_idx_lin = [i for i in non_frz_idx if i not in act_idx_lin] # Make sure frozen parameters are considered active act_idx_lin = [i for i in frz_pot_idx if i not in pas_idx_lin] act_idx = list(act_idx_lin) # Do n-order polynomial regression for every passive par for i in pas_idx_lin: # Check which polynomial terms involve this passive par poly_idx = pf_obj.powers_[:, i] != 0 # Add the active linear terms as well poly_idx[act_idx_lin] = 1 # Convert poly_idx to an array of indices poly_idx = np.arange(len(poly_idx))[poly_idx] # Obtain polynomial terms for this passive parameter poly_terms = frz_pot_poly_terms[:, poly_idx] # Perform linear regression with addition of poly terms sfs_obj.fit(poly_terms, self._mod_set[emul_i][emul_s]) # Extract indices of active polynomial terms act_idx_poly = poly_idx[list(sfs_obj.k_feature_idx_)] # Check if any additional polynomial terms survived # Add i to act_idx if this is the case if np.any([j not in act_idx_lin for j in act_idx_poly]): act_idx.append(i) # Update the active parameters for this emulator system active_par_data.update(np_array(frz_pot_par)[act_idx]) # Log the resulting active parameters logger.info("Active parameters for emulator system %i: %s" % (self._emul_s[emul_s], [self._modellink._par_name[par] for par in active_par_data])) # Convert active_par_data to a NumPy array and save self._save_data(emul_i, emul_s, { 'active_par_data': np_array(active_par_data)}) # Log that active parameter determination is finished logger.info("Finished determining active parameters.")
# This function performs a forward stepwise regression on sam_set
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc, emul_s_seq=emul_s_seq_doc) def _do_regression(self, emul_i, emul_s_seq): """ Performs a forward stepwise linear regression for all requested emulator systems `emul_s_seq` in the provided emulator iteration `emul_i`. Calculates what the expectation values of all polynomial coefficients are. The polynomial order that is used in the regression depends on :attr:`~poly_order`. Parameters ---------- %(emul_i)s %(emul_s_seq)s Generates (for every emulator system) ------------------------------------- rsdl_var : float Residual variance of the regression function. regr_score : float Fit-score of the regression function. poly_coef : 1D :obj:`~numpy.ndarray` object Array containing the expectation values of the non-zero polynomial coefficients. poly_powers : 2D :obj:`~numpy.ndarray` object Array containing the powers of the non-zero polynomial terms in the regression function. poly_idx : 1D :obj:`~numpy.ndarray` object Array containing the indices of the non-zero polynomial terms in the regression function. poly_coef_cov : 1D :obj:`~numpy.ndarray` object (if \ :attr:`~use_regr_cov` is *True*) Array containing the covariance values of the non-zero polynomial coefficients. """ # Create logger logger = getRLogger('REGRESSION') logger.info("Performing regression.") # Create SequentialFeatureSelector object sfs_obj = SFS(LR(), k_features='best', forward=True, floating=False, scoring='neg_mean_squared_error', cv=self._n_cross_val) # Create Scikit-learn Pipeline object # The bias/intercept/constant-term is not included in the SFS object to # ensure that it is taken into account in the linear regression, since # it is required for getting the residual variance. It also ensures # that the SFS does not focus on the constant-term in its calculations. pipe = Pipeline_sk([('poly', PF(self._poly_order, include_bias=False)), ('SFS', sfs_obj), ('linear', LR())]) # Loop over all emulator systems and perform a regression on them for emul_s in emul_s_seq: # Extract active_sam_set active_sam_set = self._sam_set[emul_i][ :, self._active_par_data[emul_i][emul_s]] # Extract mod_set mod_set = self._mod_set[emul_i][emul_s] # Wrap in try-statement to add additional info if error is raised try: # Perform regression for this emulator system pipe.fit(active_sam_set, mod_set) except Exception as error: # pragma: no cover # If an error is raised, add which emulator system that was e13.raise_error( str(error)+" [emul_%i]" % (self._emul_s[emul_s]), type(error), logger, error.__traceback__) # Obtain the corresponding polynomial indices poly_idx = np_array(pipe.named_steps['SFS'].k_feature_idx_) # Extract sam_set_poly sam_set_poly = pipe.named_steps['poly'].transform( active_sam_set)[:, poly_idx] # Extract the residual variance rsdl_var = mse(mod_set, pipe.named_steps['linear'].predict(sam_set_poly)) # Log the score of the regression process regr_score = pipe.named_steps['linear'].score(sam_set_poly, mod_set) logger.info("Regression score for emulator system %i: %f." % (self._emul_s[emul_s], regr_score)) # Obtain polynomial powers and include intercept term poly_powers = np.insert(pipe.named_steps['poly'].powers_[poly_idx], 0, 0, 0) # Obtain polynomial coefficients and include intercept term poly_coef = np.insert(pipe.named_steps['linear'].coef_, 0, pipe.named_steps['linear'].intercept_, 0) # Check every polynomial coefficient if it is significant enough max_sam_set_poly = np.insert(np.max(sam_set_poly, axis=0), 0, 1) poly_sign = ~np.isclose(max_sam_set_poly*poly_coef/np.min(mod_set), 0) # Only include significant polynomial terms unless none of the # non-intercept terms are significant # This is because every system must have at least one active par # FIXME: Allow for no parameters to be active poly_powers = poly_powers[poly_sign if sum(poly_sign[1:]) else ()] poly_coef = poly_coef[poly_sign if sum(poly_sign[1:]) else ()] # Redetermine the active parameters, poly_powers and poly_idx new_active_par_idx = [np.any(powers) for powers in poly_powers.T] poly_powers = poly_powers[:, new_active_par_idx] new_active_par =\ self._active_par_data[emul_i][emul_s][new_active_par_idx] new_pf_obj = PF(self._poly_order).fit([[0]*poly_powers.shape[1]]) new_powers = new_pf_obj.powers_ new_powers_list = new_powers.tolist() poly_powers_list = poly_powers.tolist() poly_idx = np_array([i for i, powers in enumerate(new_powers_list) if powers in poly_powers_list]) # If regression covariances are requested, calculate them if self._use_regr_cov: # Redetermine the active sam_set_poly active_sam_set = self._sam_set[emul_i][:, new_active_par] sam_set_poly =\ new_pf_obj.fit_transform(active_sam_set)[:, poly_idx] # Calculate the poly_coef covariances poly_coef_cov = rsdl_var*self._get_inv_matrix( sam_set_poly.T @ sam_set_poly).flatten() # Create regression data dict regr_data_dict = { 'active_par': new_active_par, 'rsdl_var': rsdl_var, 'regr_score': regr_score, 'poly_coef': poly_coef, 'poly_powers': poly_powers, 'poly_idx': poly_idx} # If regression covariance is used, add it to regr_data_dict if self._use_regr_cov: regr_data_dict['poly_coef_cov'] = poly_coef_cov # Save everything to hdf5 self._save_data(emul_i, emul_s, { 'regression': regr_data_dict}) # Log that this is finished logger.info("Finished performing regression.")
# This function gives the prior expectation value # This is function 'E(f(x'))' or 'u(x')'
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc, emul_s_seq=emul_s_seq_doc) def _get_prior_exp(self, emul_i, emul_s_seq, par_set): """ Calculates the prior expectation value for requested emulator systems `emul_s_seq` at a given emulator iteration `emul_i` for specified parameter set `par_set`. This expectation depends on :attr:`~method`. Parameters ---------- %(emul_i)s %(emul_s_seq)s par_set : 1D :obj:`~numpy.ndarray` object or None If *None*, calculate the prior expectation values of sam_set. If not *None*, calculate the prior expectation value for the given model parameter value set. Returns ------- prior_exp : 1D or 2D :obj:`~numpy.ndarray` object Prior expectation values for either sam_set or `par_set` for requested emulator systems. """ # If prior_exp of sam_set is requested (exp_dot_term) if par_set is None: # Initialize empty prior expectation prior_exp = np.zeros([len(emul_s_seq), self._n_sam[emul_i]]) # Check what 'method' is given if self._method in ('gaussian', 'full'): # Gaussian prior expectation is equal to the mean, which is 0 prior_exp += 0 if self._method in ('regression', 'full'): for i, emul_s in enumerate(emul_s_seq): # Initialize PF object pf_obj = PF(self._poly_order) # Obtain the polynomial terms poly_terms = pf_obj.fit_transform( self._sam_set[emul_i][ :, self._active_par_data[emul_i][emul_s]])[ :, self._poly_idx[emul_i][emul_s]] prior_exp[i] += np.sum( self._poly_coef[emul_i][emul_s]*poly_terms, axis=-1) # If prior_exp of par_set is requested (adj_exp) else: # Initialize empty prior expectation prior_exp = np.zeros(len(emul_s_seq)) # Check what 'method' is given if self._method in ('gaussian', 'full'): # Gaussian prior expectation is equal to the mean, which is 0 prior_exp += 0 if self._method in ('regression', 'full'): for i, emul_s in enumerate(emul_s_seq): poly_terms = np.product(pow( par_set[self._active_par_data[emul_i][emul_s]], self._poly_powers[emul_i][emul_s]), axis=-1) prior_exp[i] += np.sum( self._poly_coef[emul_i][emul_s]*poly_terms, axis=-1) # Return it return(prior_exp)
# This function pre-calculates the second adjustment dot-term # This is function 'Var(D)^-1*(D-E(D))'
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc, emul_s_seq=emul_s_seq_doc) def _get_exp_dot_term(self, emul_i, emul_s_seq): """ Pre-calculates the second expectation adjustment dot-term for requested emulator systems `emul_s_seq` at a given emulator iteration `emul_i` for all model evaluation samples and saves it for later use. Parameters ---------- %(emul_i)s %(emul_s_seq)s Generates --------- exp_dot_term : 2D :obj:`~numpy.ndarray` object 2D array containing the pre-calculated values for the second adjustment dot-term of the adjusted expectation for requested emulator systems. """ # Create logger logger = getRLogger('DOT_TERM') logger.info("Pre-calculating second expectation adjustment dot-term " "for known samples at emulator iteration %i." % (emul_i)) # Obtain prior expectation value of sam_set prior_exp_sam_set = self._get_prior_exp(emul_i, emul_s_seq, None) # Calculate the exp_dot_term values and save it to hdf5 for i, emul_s in enumerate(emul_s_seq): exp_dot_term = self._cov_mat_inv[emul_i][emul_s] @\ (self._mod_set[emul_i][emul_s]-prior_exp_sam_set[i]) self._save_data(emul_i, emul_s, { 'exp_dot_term': { 'prior_exp_sam_set': prior_exp_sam_set[i], 'exp_dot_term': exp_dot_term}}) # Log again logger.info("Finished pre-calculating second adjustment dot-term " "values.")
# This function calculates the covariance between parameter sets # This is function 'Cov(f(x), f(x'))' or 'c(x,x')' # TODO: Improve Gaussian-only method by making sigma data point dependent
[docs] @e13.docstring_substitute(full_cov=full_cov_doc) def _get_cov(self, emul_i, emul_s_seq, par_set1, par_set2): """ Calculates the full emulator covariances for requested emulator systems `emul_s_seq` at emulator iteration `emul_i` for given parameter sets `par_set1` and `par_set2`. The contributions to these covariances depend on :attr:`~method`. %(full_cov)s """ # If cov of sam_set with sam_set is requested (cov_mat) if par_set1 is None: # Calculate covariance between sam_set and sam_set cov = np.zeros([len(emul_s_seq), self._n_sam[emul_i], self._n_sam[emul_i]]) # Check what 'method' is given if self._method in ('gaussian', 'full'): # Obtain the difference between sam_set and sam_set diff_sam_set = e13.diff(self._sam_set[emul_i], flatten=False) # If Gaussian needs to be taken into account for i, emul_s in enumerate(emul_s_seq): # Get active_par and rsdl_var portions active_par = self._active_par_data[emul_i][emul_s] act_rsdl_var = self._act_rsdl_var[emul_i][emul_s] pas_rsdl_var = self._pas_rsdl_var[emul_i][emul_s] # Gaussian variance cov[i] += act_rsdl_var *\ np.exp(-1*np.sum(diff_sam_set[:, :, active_par]**2, axis=-1) / np.sum(self._l_corr[active_par]**2)) # Passive parameter variety plus inflation term cov[i] += pas_rsdl_var*np.eye(self._n_sam[emul_i]) if(self._method in ('regression', 'full') and self._use_regr_cov): # If regression needs to be taken into account cov += self._get_regr_cov(emul_i, emul_s_seq, None, None) # If cov of par_set1 with sam_set is requested (cov_vec) elif par_set2 is None: # Calculate covariance between par_set1 and sam_set cov = np.zeros([len(emul_s_seq), self._n_sam[emul_i]]) # Check what 'method' is given if self._method in ('gaussian', 'full'): # Obtain the difference between par_set1 and sam_set diff_sam_set = par_set1-self._sam_set[emul_i] # If Gaussian needs to be taken into account for i, emul_s in enumerate(emul_s_seq): # Get active_par and rsdl_var portions active_par = self._active_par_data[emul_i][emul_s] act_rsdl_var = self._act_rsdl_var[emul_i][emul_s] pas_rsdl_var = self._pas_rsdl_var[emul_i][emul_s] # Gaussian variance cov[i] += act_rsdl_var *\ np.exp(-1*np.sum(diff_sam_set[:, active_par]**2, axis=-1) / np.sum(self._l_corr[active_par]**2)) # Passive parameter variety plus inflation term cov[i] += pas_rsdl_var*(par_set1 == self._sam_set[emul_i]).all(axis=-1) if(self._method in ('regression', 'full') and self._use_regr_cov): # If regression needs to be taken into account cov += self._get_regr_cov(emul_i, emul_s_seq, par_set1, None) # If cov of par_set1 with par_set2 is requested (cov) else: # Calculate covariance between par_set1 and par_set2 cov = np.zeros([len(emul_s_seq)]) # Check what 'method' is given if self._method in ('gaussian', 'full'): # Obtain the difference between par_set1 and par_set2 diff_sam_set = par_set1-par_set2 # If Gaussian needs to be taken into account for i, emul_s in enumerate(emul_s_seq): # Get active_par and rsdl_var portions active_par = self._active_par_data[emul_i][emul_s] act_rsdl_var = self._act_rsdl_var[emul_i][emul_s] pas_rsdl_var = self._pas_rsdl_var[emul_i][emul_s] # Gaussian variance cov[i] += act_rsdl_var *\ np.exp(-1*np.sum(diff_sam_set[active_par]**2, axis=-1) / np.sum(self._l_corr[active_par]**2)) # Passive parameter variety plus inflation term cov[i] += pas_rsdl_var*(par_set1 == par_set2).all() if(self._method in ('regression', 'full') and self._use_regr_cov): # If regression needs to be taken into account cov += self._get_regr_cov(emul_i, emul_s_seq, par_set1, par_set2) # Return it return(cov)
# This function calculates the regression covariance between parameter sets # This is function 'Cov(r(x), r(x'))' # OPTIMIZE: Takes roughly 45-50% of total evaluation time
[docs] @e13.docstring_substitute(regr_cov=regr_cov_doc) def _get_regr_cov(self, emul_i, emul_s_seq, par_set1, par_set2): """ Calculates the covariances of the regression function for requested emulator systems `emul_s_seq` at emulator iteration `emul_i` for given parameter sets `par_set1` and `par_set2`. %(regr_cov)s """ # If regr_cov of sam_set is requested (cov_mat) if par_set1 is None: # Make empty array regr_cov = np.zeros([len(emul_s_seq), self._n_sam[emul_i], self._n_sam[emul_i]]) for i, emul_s in enumerate(emul_s_seq): # Initialize PF object pf_obj = PF(self._poly_order) # Obtain the polynomial terms for both parameter sets poly_terms1 = pf_obj.fit_transform( self._sam_set[emul_i][ :, self._active_par_data[emul_i][emul_s]])[ :, self._poly_idx[emul_i][emul_s]] poly_terms2 = poly_terms1 # Obtain the combined product polynomial terms prod_terms = np.kron(poly_terms1, poly_terms2) # Calculate the regression covariance regr_cov[i] =\ np.sum(self._poly_coef_cov[emul_i][emul_s]*prod_terms, axis=-1).reshape([self._n_sam[emul_i], self._n_sam[emul_i]]) # If regr_cov of par_set with sam_set is requested (cov_vec) elif par_set2 is None: # Make empty array regr_cov = np.zeros([len(emul_s_seq), self._n_sam[emul_i]]) for i, emul_s in enumerate(emul_s_seq): # Initialize PF object pf_obj = PF(self._poly_order) # Obtain the polynomial terms for both parameter sets poly_terms1 =\ np.product(pow( par_set1[self._active_par_data[emul_i][emul_s]], self._poly_powers[emul_i][emul_s]), axis=-1) poly_terms2 = pf_obj.fit_transform( self._sam_set[emul_i][ :, self._active_par_data[emul_i][emul_s]])[ :, self._poly_idx[emul_i][emul_s]] # Obtain the combined product polynomial terms prod_terms = np.kron(poly_terms1, poly_terms2) # Calculate the regression covariance regr_cov[i] =\ np.sum(self._poly_coef_cov[emul_i][emul_s]*prod_terms, axis=-1) # If regr_cov of par_set1 with par_set2 is requested (cov) else: # Make empty array regr_cov = np.zeros([len(emul_s_seq)]) for i, emul_s in enumerate(emul_s_seq): # Obtain the polynomial terms for both parameter sets poly_terms1 =\ np.product(pow( par_set1[self._active_par_data[emul_i][emul_s]], self._poly_powers[emul_i][emul_s]), axis=-1) poly_terms2 =\ np.product(pow( par_set2[self._active_par_data[emul_i][emul_s]], self._poly_powers[emul_i][emul_s]), axis=-1) # Obtain the combined product polynomial terms prod_terms = np.kron(poly_terms1, poly_terms2) # Calculate the regression covariance regr_cov[i] =\ np.sum(self._poly_coef_cov[emul_i][emul_s]*prod_terms, axis=-1) # Return it return(regr_cov)
# This function calculates the covariance matrix # This is function 'Var(D)' or 'A'
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc, emul_s_seq=emul_s_seq_doc) def _get_cov_matrix(self, emul_i, emul_s_seq): """ Calculates the (inverse) matrix of covariances between known model evaluation samples for requested emulator systems `emul_s_seq` at emulator iteration `emul_i`. Parameters ---------- %(emul_i)s %(emul_s_seq)s Generates --------- cov_mat : 3D :obj:`~numpy.ndarray` object Matrix containing the covariances between all known model evaluation samples for requested emulator systems. cov_mat_inv : 3D :obj:`~numpy.ndarray` object Inverse of covariance matrix for requested emulator systems. """ # Log the creation of the covariance matrix logger = getRLogger('COV_MAT') logger.info("Calculating covariance matrix for emulator iteration %i." % (emul_i)) # Calculate covariance matrix # Since this calculation can cause memory issues, catch error and try # slower but less memory-intensive method try: cov_mat = self._get_cov(emul_i, emul_s_seq, None, None) except MemoryError: # pragma: no cover # For some odd reason, overwriting cov_mat does not clear memory, # and the only way to remove cov_mat from it is by attempting to # delete it. It always raises a NameError, but does actually clear # memory successfully. I have no idea why this happens this way. # TODO: Find out why and improve the memory clearance try: del cov_mat except NameError: pass cov_mat = np.zeros([len(emul_s_seq), self._n_sam[emul_i], self._n_sam[emul_i]]) for i in range(self._n_sam[emul_i]): cov_mat[:, i] = self._get_cov(emul_i, emul_s_seq, self._sam_set[emul_i][i], None) # Loop over all emulator systems for i, emul_s in enumerate(emul_s_seq): # Calculate the inverse of the covariance matrix logger.info("Calculating inverse of covariance matrix %i." % (self._emul_s[emul_s])) # TODO: Maybe I should put an error catch for memory overflow here? cov_mat_inv = self._get_inv_matrix(cov_mat[i]) # Save the covariance matrix and inverse to hdf5 self._save_data(emul_i, emul_s, { 'cov_mat': { 'cov_mat': cov_mat[i], 'cov_mat_inv': cov_mat_inv}}) # Log that calculation has been finished logger.info("Finished calculating covariance matrix.")
# This function calculates the inverse of a given matrix # TODO: Improve the inverse calculation # OPTIMIZE: Use pre-conditioners and linear systems?
[docs] def _get_inv_matrix(self, matrix): """ Calculates the inverse of a given `matrix`. Right now only uses the :func:`~numpy.linalg.pinv` function. Parameters ---------- matrix : 2D array_like Matrix to be inverted. Returns ------- matrix_inv : 2D :obj:`~numpy.ndarray` object Inverse of the given `matrix`. """ # Calculate the Moore-Penrose generalized inverse of the given matrix matrix_inv = pinv(matrix) # Return it return(matrix_inv)
# Load the emulator
[docs] def _load_emulator(self, modellink_obj): """ Checks if the provided working directory contains a constructed emulator and loads in the emulator data accordingly. Parameters ---------- modellink_obj : :obj:`~prism.modellink.ModelLink` object Instance of the :class:`~prism.modellink.ModelLink` class that links the emulated model to this :obj:`~prism.Pipeline` object. """ # Make logger logger = getCLogger('EMUL_LOAD') logger.info("Loading emulator.") # Check if an existing hdf5-file is provided try: logger.info("Checking if provided working directory %r contains " "a constructed emulator." % (self._pipeline._working_dir)) with self._File('r', None) as file: # Log that an hdf5-file has been found logger.info("Provided working directory contains a constructed" " emulator. Validating.") # Obtain the number of emulator iterations constructed emul_i = len(file.keys()) # Check if the hdf5-file contains solely groups made by PRISM req_keys = [str(i) for i in range(1, emul_i+1)] if(req_keys != list(file.keys())): err_msg = ("Provided master HDF5-file contains invalid " "data groups!") e13.raise_error(err_msg, e13.InputError, logger) # Log that valid emulator was found logger.info("Provided master HDF5-file is valid.") self._emul_load = 1 # Save number of emulator iterations constructed self._emul_i = emul_i except (OSError, IOError): # No existing emulator was provided logger.info("Provided working directory does not contain a " "constructed emulator.") self._emul_load = 0 self._emul_i = 0 # No emulator provided, so no loaded modellink either modellink_loaded = None else: # Read all emulator parameters from the hdf5-file modellink_loaded = self._retrieve_parameters() # Link the provided ModelLink object to the pipeline self._set_modellink(modellink_obj, modellink_loaded) # Load emulator data self._load_data(self._emul_i) # Logging logger.info("Finished loading emulator.")
# This function connects the provided ModelLink class to the pipeline # Function that loads in the emulator data
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _load_data(self, emul_i): """ Loads in all the important emulator data up to emulator iteration `emul_i` into memory. Parameters ---------- %(emul_i)s Generates --------- All relevant emulator data up to emulator iteration `emul_i` is loaded into memory. """ # Set the logger logger = getCLogger('LOAD_DATA') # Initialize all data sets with empty lists logger.info("Initializing emulator data sets.") self._n_sam = [[]] self._sam_set = [[]] self._emul_space = [[]] self._mod_set = [[]] self._active_par_data = [[]] self._rsdl_var = [[]] self._act_rsdl_var = [[]] self._pas_rsdl_var = [[]] self._poly_coef = [[]] self._poly_coef_cov = [[]] self._poly_powers = [[]] self._poly_idx = [[]] self._cov_mat_inv = [[]] self._exp_dot_term = [[]] self._n_data = [[]] self._data_val = [[]] self._data_err = [[]] self._data_spc = [[]] self._data_idx = [[]] # Initialize emulator system status lists self._ccheck = [[]] self._active_emul_s = [[]] self._n_emul_s = 0 # Initialize rank specific properties if self._is_controller: self._n_data_tot = [[]] self._n_emul_s_tot = 0 self._emul_s_to_core = [[] for _ in range(self._size)] self._data_idx_to_core = [[]] self._active_par = [[]] # If no file has been provided if(emul_i == 0 or self._emul_load == 0): logger.info("Non-existent emulator file provided. No additional " "data needs to be loaded.") return # Check if requested emulator iteration exists elif not(1 <= emul_i <= self._emul_i): err_msg = ("Requested emulator iteration %i does not exist!" % (emul_i)) e13.raise_error(err_msg, RequestError, logger) # If both checks succeed, controller determines emul_s assignments if self._is_controller: # Determine which emulator systems each MPI rank should get emul_s_to_core = self._assign_emul_s(emul_i) # Save which systems will be assigned to which rank self._emul_s_to_core = list(emul_s_to_core) # Save total number of emulator systems self._n_emul_s_tot = sum([len(seq) for seq in emul_s_to_core]) # Assign the emulator systems to the various MPI ranks self._emul_s = self._comm.scatter(emul_s_to_core, 0) # Calculate the most optimal number of OpenMP threads if using MPI if(MPI.COMM_WORLD.Get_size() > 1): omp_num_threads = max(1, self._size//self._n_emul_s_tot) else: omp_num_threads = None # Tell workers to use this number of OpenMP threads tpc.threadpool_limits(self._comm.bcast(omp_num_threads, 0), 'blas') # Workers get their emulator systems assigned else: # Receive assigned emulator systems self._emul_s = self._comm.scatter(None, 0) # Set number of OpenMP threads tpc.threadpool_limits(self._comm.bcast(None, 0), 'blas') # Temporarily manually swap the CFilter for RFilter # Every rank logs what systems were assigned to it # TODO: Remove the need to do this manually logger.filters = [logger.PRISM_filters['RFilter']] logger.info("Received emulator systems %s." % (self._emul_s)) logger.filters = [logger.PRISM_filters['CFilter']] # Determine the number of assigned emulator systems self._n_emul_s = len(self._emul_s) # Load the corresponding sam_set, mod_set and cov_mat_inv logger.info("Loading relevant emulator data up to iteration %i." % (emul_i)) # Open hdf5-file with self._File('r', None) as file: # Read in the data for i in range(1, emul_i+1): group = file[str(i)] # Create empty construct check list ccheck = [] # Check if sam_set is available try: self._n_sam.append(group.attrs['n_sam']) self._sam_set.append(group['sam_set'][()]) self._sam_set[-1].dtype = float emul_space = group['emul_space'][()] emul_space.dtype = float self._emul_space.append(emul_space.T.copy()) except KeyError: self._n_sam.append(0) self._sam_set.append([]) self._emul_space.append([]) if self._is_controller: ccheck.append('mod_real_set') # Check if active_par is available for the controller if self._is_controller: try: par_i = [self._modellink._par_name.index(par.decode( 'utf-8')) for par in group.attrs['active_par']] self._active_par.append(np_array(par_i)) except KeyError: self._active_par.append([]) ccheck.append('active_par') # Read in the total number of data points for the controller if self._is_controller: self._n_data_tot.append(group.attrs['n_data']) # Initialize empty data sets mod_set = [] active_par_data = [] rsdl_var = [] act_rsdl_var = [] pas_rsdl_var = [] poly_coef = [] poly_coef_cov = [] poly_powers = [] poly_idx = [] cov_mat_inv = [] exp_dot_term = [] data_val = [] data_err = [] data_spc = [] data_idx = [] # Check which emulator systems are active self._active_emul_s.append([]) for j, emul_s in enumerate(self._emul_s): # Create empty construct check list for this system ccheck_s = [] # Try to access the emulator system try: data_set = group['emul_%i' % (emul_s)] # If it does not exist, it was passive except KeyError: # Add empty lists for all emulator system data mod_set.append([]) active_par_data.append([]) rsdl_var.append([]) act_rsdl_var.append([]) pas_rsdl_var.append([]) poly_coef.append([]) poly_coef_cov.append([]) poly_powers.append([]) poly_idx.append([]) cov_mat_inv.append([]) exp_dot_term.append([]) data_val.append([]) data_err.append([]) data_spc.append([]) data_idx.append([]) ccheck.insert(j, ccheck_s) continue # If it does exist, add emul_s to list of active emul_s else: self._active_emul_s[-1].append(j) # Check if mod_set is available try: mod_set.append(data_set['mod_set'][()]) except KeyError: mod_set.append([]) # Check if active_par_data is available try: par_i = [self._modellink._par_name.index( par.decode('utf-8')) for par in data_set.attrs['active_par_data']] active_par_data.append(np_array(par_i)) except KeyError: active_par_data.append([]) ccheck_s.append('active_par_data') # Check if regression data is available try: rsdl_var.append(data_set.attrs['rsdl_var']) poly_coef.append(data_set['poly_coef'][()]) if self._use_regr_cov: poly_coef_cov.append(data_set['poly_coef_cov'][()]) else: poly_coef_cov.append([]) poly_powers.append(data_set['poly_powers'][()]) poly_powers[-1].dtype = INT_SIZE poly_idx.append(data_set['poly_idx'][()]) except KeyError: rsdl_var.append([]) poly_coef.append([]) poly_coef_cov.append([]) poly_powers.append([]) poly_idx.append([]) if self._method in ('regression', 'full'): ccheck_s.append('regression') # Check if rsdl_var is available try: act_rsdl_var.append(data_set.attrs['act_rsdl_var']) pas_rsdl_var.append(data_set.attrs['pas_rsdl_var']) except KeyError: act_rsdl_var.append([]) pas_rsdl_var.append([]) ccheck_s.append('rsdl_var') # Check if cov_mat is available try: cov_mat_inv.append(data_set['cov_mat_inv'][()]) except KeyError: cov_mat_inv.append([]) ccheck_s.append('cov_mat') # Check if exp_dot_term is available try: exp_dot_term.append(data_set['exp_dot_term'][()]) except KeyError: exp_dot_term.append([]) ccheck_s.append('exp_dot_term') # Read in data values, errors and spaces data_val.append(data_set.attrs['data_val']) data_err.append(data_set.attrs['data_err'].tolist()) data_spc.append(data_set.attrs['data_spc'].decode('utf-8')) # Read in all data_idx parts and combine them data_idx.append(self._read_data_idx(data_set)) # Add ccheck_s to ccheck ccheck.insert(j, ccheck_s) # Make sure that data_set is closed del data_set # Determine the number of data points on this MPI rank self._n_data.append(len(self._active_emul_s[-1])) # Add all read-in data for this iteration to respective places self._mod_set.append(mod_set) self._active_par_data.append(active_par_data) self._rsdl_var.append(rsdl_var) self._act_rsdl_var.append(act_rsdl_var) self._pas_rsdl_var.append(pas_rsdl_var) self._poly_coef.append(poly_coef) self._poly_coef_cov.append(poly_coef_cov) self._poly_powers.append(poly_powers) self._poly_idx.append(poly_idx) self._cov_mat_inv.append(cov_mat_inv) self._exp_dot_term.append(exp_dot_term) self._data_val.append(data_val) self._data_err.append(data_err) self._data_spc.append(data_spc) self._data_idx.append(data_idx) # Add ccheck for this iteration to global ccheck self._ccheck.append(ccheck) # Gather the data_idx from all MPI ranks on the controller data_idx_list = self._comm.gather(data_idx, 0) # Controller saving the received data_idx_list if self._is_controller: self._data_idx_to_core.append(data_idx_list) # If ccheck has no solely empty lists, decrease emul_i by 1 if e13.delist(self._ccheck[-1]): self._emul_i -= 1 # Log that loading is finished logger.info("Finished loading relevant emulator data.")
# This function saves emulator data to hdf5
[docs] @e13.docstring_substitute(save_data=save_data_doc_e) def _save_data(self, emul_i, lemul_s, data_dict): """ Saves a given data dict ``{keyword: data}`` at the given emulator iteration `emul_i` and local emulator system `lemul_s` to the HDF5-file and as an data attribute to the current :obj:`~Emulator` instance. %(save_data)s """ # Do some logging logger = getRLogger('SAVE_DATA') # If controller keyword contains 'mod_real_set', emul_s must be None if((self._is_controller and 'mod_real_set' in data_dict.keys()) or lemul_s is None): emul_s = None # Else, determine what the global emul_s is else: emul_s = self._emul_s[lemul_s] # Open hdf5-file with self._File('r+', emul_s) as file: # Obtain the dataset this data needs to be saved to data_set = file[str(emul_i)] # Loop over entire provided data dict for keyword, data in data_dict.items(): # Log what data is being saved logger.info("Saving %r data at iteration %i to HDF5." % (keyword, emul_i)) # Check what data keyword has been provided # ACTIVE PARAMETERS if(keyword == 'active_par'): # Determine the active parameter names par_names = [self._modellink._par_name[i].encode( 'ascii', 'ignore') for i in data] # Save active_par data to file and memory data_set.attrs['active_par'] = par_names self._active_par[emul_i] = data # Remove active_par from ccheck self._ccheck[emul_i].remove('active_par') # ACTIVE PARAMETERS DATA elif(keyword == 'active_par_data'): # Determine the active parameter names for this system par_names = [self._modellink._par_name[i].encode( 'ascii', 'ignore') for i in data] # Save active_par_data data to file and memory data_set.attrs['active_par_data'] = par_names self._active_par_data[emul_i][lemul_s] = data # Remove active_par_data from respective ccheck self._ccheck[emul_i][lemul_s].remove('active_par_data') # COV_MAT elif(keyword == 'cov_mat'): # Save cov_mat data to file and memory data_set.create_dataset('cov_mat', data=data['cov_mat']) data_set.create_dataset('cov_mat_inv', data=data['cov_mat_inv']) self._cov_mat_inv[emul_i][lemul_s] = data['cov_mat_inv'] # Remove cov_mat from respective ccheck self._ccheck[emul_i][lemul_s].remove('cov_mat') # EXP_DOT_TERM elif(keyword == 'exp_dot_term'): # Save exp_dot_term data to file and memory data_set.create_dataset('prior_exp_sam_set', data=data['prior_exp_sam_set']) data_set.create_dataset('exp_dot_term', data=data['exp_dot_term']) self._exp_dot_term[emul_i][lemul_s] = data['exp_dot_term'] # Remove exp_dot_term from respective ccheck self._ccheck[emul_i][lemul_s].remove('exp_dot_term') # MOD_REAL_SET (CONTROLLER) elif(self._is_controller and keyword == 'mod_real_set'): # Determine dtype-list for compound dataset dtype = [(n, float) for n in self._modellink._par_name] # Convert sam_set to a compound dataset data_c = data['sam_set'].copy() data_c.dtype = dtype # Save sam_set data to file data_set.create_dataset('sam_set', data=data_c) data_set.attrs['n_sam'] = np.shape(data['sam_set'])[0] # Save sam_set data to memory self._set_sam_set_data(emul_i, data['sam_set']) # Determine emul_space and save to file emul_space_c = self._get_emul_space(emul_i).T.copy() emul_space_c.dtype = dtype data_set.create_dataset('emul_space', data=emul_space_c) # Loop over all received mod_sets for lemul_s, mod_out in zip(self._active_emul_s[emul_i], data['mod_set']): # Determine the emulator system for this mod_set emul_s = self._emul_s[lemul_s] # Save mod_set data to file and memory data_set_s = data_set['emul_%i' % (emul_s)] data_set_s.create_dataset('mod_set', data=mod_out) self._mod_set[emul_i][lemul_s] = mod_out # Save whether or not ext_real_set was used data_set.attrs['use_ext_real_set'] =\ bool(data['use_ext_real_set']) # Remove mod_real_set from ccheck self._ccheck[emul_i].remove('mod_real_set') # MOD_REAL_SET (WORKER) elif(self._is_worker and keyword == 'mod_real_set'): # Save mod_set data to file and memory data_set.create_dataset('mod_set', data=data['mod_set']) self._mod_set[emul_i][lemul_s] = data['mod_set'] # REGRESSION elif(keyword == 'regression'): # Determine the new active parameter names for this system par_names = [self._modellink._par_name[i].encode( 'ascii', 'ignore') for i in data['active_par']] # Save new active_par_data data to file and memory data_set.attrs['active_par_data'] = par_names self._active_par_data[emul_i][lemul_s] = data['active_par'] # Determine dtype-list for compound dataset names = [self._modellink._par_name[par] for par in self._active_par_data[emul_i][lemul_s]] dtype = [(n, INT_SIZE) for n in names] # Convert poly_powers to a compound dataset data_c = data['poly_powers'].copy() data_c.dtype = dtype # Save all regression data to file and memory data_set.attrs['rsdl_var'] = data['rsdl_var'] data_set.attrs['regr_score'] = data['regr_score'] data_set.create_dataset('poly_coef', data=data['poly_coef']) data_set.create_dataset('poly_powers', data=data_c) data_set.create_dataset('poly_idx', data=data['poly_idx']) self._rsdl_var[emul_i][lemul_s] = data['rsdl_var'] self._poly_coef[emul_i][lemul_s] = data['poly_coef'] self._poly_powers[emul_i][lemul_s] = data['poly_powers'] self._poly_idx[emul_i][lemul_s] = data['poly_idx'] if self._use_regr_cov: data_set.create_dataset('poly_coef_cov', data=data['poly_coef_cov']) self._poly_coef_cov[emul_i][lemul_s] =\ data['poly_coef_cov'] # Remove regression from respective ccheck self._ccheck[emul_i][lemul_s].remove('regression') # RSDL_VAR elif(keyword == 'rsdl_var'): # Save rsdl_var data to file and memory data_set.attrs['act_rsdl_var'] = data['act_rsdl_var'] data_set.attrs['pas_rsdl_var'] = data['pas_rsdl_var'] self._act_rsdl_var[emul_i][lemul_s] = data['act_rsdl_var'] self._pas_rsdl_var[emul_i][lemul_s] = data['pas_rsdl_var'] # Remove rsdl_var from respective ccheck self._ccheck[emul_i][lemul_s].remove('rsdl_var') # INVALID KEYWORD else: err_msg = "Invalid keyword argument provided!" e13.raise_error(err_msg, ValueError, logger) # More logging logger.info("Finished saving data to HDF5.")
# Sets the sam_set data for the specified iteration
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc) def _set_sam_set_data(self, emul_i, sam_set): """ Sets the provided `sam_set` as the iteration data at the given emulator iteration `emul_i`. Parameters ---------- %(emul_i)s sam_set : 2D :obj:`~numpy.ndarray` object Array containing the model evaluation samples for emulator iteration `emul_i`. """ # Save the provided sam_set, the enclosing emul_space and its length self._sam_set[emul_i] = sam_set self._emul_space[emul_i] = self._get_emul_space(emul_i) self._n_sam[emul_i] = np.shape(sam_set)[0]
# Read in the emulator attributes
[docs] def _retrieve_parameters(self): """ Reads in the emulator parameters from the provided working directory and saves them in the current :obj:`~Emulator` instance. """ # Log that parameters are being read logger = getCLogger('INIT') logger.info("Retrieving emulator parameters from provided working " "directory.") # Open hdf5-file with self._File('r', None) as file: # Check if provided emulator is the same as requested emul_type = file.attrs['emul_type'].decode('utf-8') if(emul_type != self._emul_type): err_msg = ("Provided emulator system type (%r) does not match " "the requested type (%r)!" % (emul_type, self._emul_type)) e13.raise_error(err_msg, RequestError, logger) # Obtain used PRISM version and check its compatibility prism_version = file.attrs['prism_version'].decode('utf-8') check_compatibility(prism_version) # Read in all the emulator parameters self._prism_version = prism_version self._sigma = file.attrs['sigma'] self._l_corr = file.attrs['l_corr'] self._f_infl = file.attrs['f_infl'] self._method = file.attrs['method'].decode('utf-8') self._use_regr_cov = int(file.attrs['use_regr_cov']) self._poly_order = file.attrs['poly_order'] self._n_cross_val = file.attrs['n_cross_val'] modellink_name = file.attrs['modellink_name'].decode('utf-8') self._use_mock = int(file.attrs['use_mock']) # Log that reading is finished logger.info("Finished retrieving parameters.") # Return the name of the modellink class used to construct the loaded # emulator system return(modellink_name)
# This function returns default emulator parameters
[docs] @e13.docstring_append(def_par_doc.format('emulator')) def _get_default_parameters(self): # Create parameter dict with default parameters par_dict = {'sigma': '0.8', 'l_corr': '0.3', 'f_infl': '0.2', 'method': "'full'", 'use_regr_cov': 'False', 'poly_order': '3', 'n_cross_val': '5', 'use_mock': 'False'} # Return it return(sdict(par_dict))
# Set the parameters that were read in from the provided parameter dict
[docs] @e13.docstring_append(set_par_doc.format("Emulator")) def _set_parameters(self): # Log that the emulator parameters are being set logger = getCLogger('INIT') logger.info("Setting emulator parameters.") # Obtaining default emulator parameter dict par_dict = self._get_default_parameters() # Add the read-in prism dict to it par_dict.update(self._prism_dict) # More logging logger.info("Checking compatibility of provided emulator parameters.") # GENERAL # PRISM version self._prism_version = str(__version__) # Gaussian sigma self._sigma = check_vals(e13.split_seq(par_dict['sigma'])[0], 'sigma', 'float', 'nzero') # Gaussian correlation length l_corr = check_vals(e13.split_seq(par_dict['l_corr']), 'l_corr', 'float', 'pos', 'normal') self._l_corr = l_corr*abs(self._modellink._par_rng[:, 1] - self._modellink._par_rng[:, 0]) # Residual variance inflation factor self._f_infl = check_vals(e13.split_seq(par_dict['f_infl'])[0], 'f_infl', 'float', 'pos') # Method used to calculate emulator functions # Future will include 'gaussian', 'regression', 'auto' and 'full' self._method = check_vals(e13.split_seq(par_dict['method'])[0], 'method', 'str').lower() if self._method in ('gaussian', 'regression', 'full'): pass elif(self._method == 'auto'): raise NotImplementedError else: err_msg = ("Input argument 'method' is invalid (%r)!" % (self._method)) e13.raise_error(err_msg, ValueError, logger) # Obtain the bool determining whether or not to use regr_cov self._use_regr_cov = check_vals(par_dict['use_regr_cov'], 'use_regr_cov', 'bool') # Check if method == 'regression' and set use_regr_cov to True if so if(self._method == 'regression'): self._use_regr_cov = 1 # Obtain the polynomial order for the regression selection process self._poly_order =\ check_vals(e13.split_seq(par_dict['poly_order'])[0], 'poly_order', 'int', 'pos') # Obtain the number of requested cross-validations n_cross_val = check_vals(e13.split_seq(par_dict['n_cross_val'])[0], 'n_cross_val', 'int', 'nneg') # Make sure that n_cross_val is not unity if(n_cross_val != 1): self._n_cross_val = n_cross_val else: err_msg = ("Input argument 'n_cross_val' cannot be unity!") e13.raise_error(err_msg, ValueError, logger) # Check whether or not mock data should be used # TODO: Allow entire dicts to be given as mock_data (configparser?) use_mock = e13.split_seq(par_dict['use_mock']) # If use_mock contains a single element, check if it is a bool if(len(use_mock) == 1): mock_par = None use_mock = check_vals(use_mock[0], 'use_mock', 'bool') # If not, it must be an array of mock parameter values else: mock_par = self._modellink._check_sam_set(use_mock, 'mock_par') use_mock = True # If a currently loaded emulator used mock data and use_mock is False # TODO: Becomes obsolete when mock data does not change ModelLink props if self._emul_load and self._use_mock and not use_mock: # Raise error that ModelLink object needs to be reinitialized err_msg = ("Currently loaded emulator uses mock data, while none " "has been requested for new emulator. Reinitialize the " "ModelLink and Pipeline classes to accommodate for this" " change.") e13.raise_error(err_msg, RequestError, logger) else: self._use_mock = use_mock # Log that setting has been finished logger.info("Finished setting emulator parameters.") # Return mock_par return(mock_par)
# This function loads previously generated mock data into ModelLink # TODO: Allow user to add/remove mock data? Requires consistency check # TODO: Find a way to use mock data without changing ModelLink properties
[docs] def _set_mock_data(self): """ Loads previously used mock data into the :class:`~prism.modellink.ModelLink` object, overwriting the parameter estimates, data values, data errors, data spaces and data identifiers with their mock equivalents. Generates --------- Overwrites the corresponding :class:`~prism.modellink.ModelLink` class properties with the previously used values (taken from the first emulator iteration). """ # Start logger logger = getCLogger('MOCK_DATA') # Overwrite ModelLink properties with previously generated values # Log that mock_data is being loaded in logger.info("Loading previously used mock data into ModelLink object.") # Create empty variables for all ModelLink properties n_data = 0 par_est = [] data_val = [] data_err = [] data_spc = [] data_idx = [] # Controller only if self._is_controller: # Open hdf5-file with self._File('r', None) as file: # Get the number of emulator systems in the first iteration group = file['1'] n_emul_s = group.attrs['n_emul_s'] # Make empty lists for all model properties data_val = [] data_err = [] data_spc = [] data_idx = [] # Loop over all data points in the first iteration for emul_s in range(n_emul_s): # Read in data values, errors and spaces data_set = group['emul_%i' % (emul_s)] data_val.append(data_set.attrs['data_val']) data_err.append(data_set.attrs['data_err'].tolist()) data_spc.append(data_set.attrs['data_spc'].decode('utf-8')) # Read in all data_idx parts and combine them data_idx.append(self._read_data_idx(data_set)) # Overwrite ModelLink properties par_est = file.attrs['mock_par'].tolist() n_data = group.attrs['n_data'] # Broadcast updated modellink properties to workers # TODO: Find a cleaner way to write this without bcasting ModelLink obj self._modellink._n_data = self._comm.bcast(n_data, 0) self._modellink._par_est = self._comm.bcast(par_est, 0) self._modellink._data_val = self._comm.bcast(data_val, 0) self._modellink._data_err = self._comm.bcast(data_err, 0) self._modellink._data_spc = self._comm.bcast(data_spc, 0) self._modellink._data_idx = self._comm.bcast(data_idx, 0) # Log end logger.info("Loaded mock data.")
# This function returns the string representation of a polynomial term
[docs] def _get_poly_term_str(self, active_par, poly_power): """ Returns the string representation of a polynomial term given by the provided `active_par` and `poly_power`. Parameters ---------- active_par : list of int List containing the indices of the parameters whose polynomial powers are given in `poly_power`. poly_power : list of int List with the powers of the requested polynomial term. Returns ------- poly_term : str String representation of the requested polynomial term. """ # Initialize empty poly term representation poly_term = '' # Loop over all parameters in this polynomial term for par, power in zip(active_par, poly_power): # If this parameter has a power of zero, skip it if not power: continue # If not, process this parameter else: # Determine name of this parameter name = self._modellink._par_name[par] # Add an asterisk if this is not the first parameter in term poly_term += " * " if poly_term else '' # If the power is unity, just add it if(power == 1): poly_term += name # Else, add it with a power else: poly_term += "%s**%i" % (name, power) # Return the string representation return(poly_term)
# This function calculates active and passive residual variance portions
[docs] @e13.docstring_substitute(emul_i=std_emul_i_doc, emul_s_seq=emul_s_seq_doc) def _get_rsdl_var(self, emul_i, emul_s_seq): """ Splits up the calculated residual variances for requested emulator systems `emul_s_seq` at emulator iteration `emul_i` into active and passive contributions. Parameters ---------- %(emul_i)s %(emul_s_seq)s Generates --------- act_rsdl_var : list of float List containing the active portions of the residual variances. pas_rsdl_var : list of float List containing the passive portions of the residual variances. If :attr:`~f_infl` is not zero, this also includes the inflated residual variance values. """ # Log the calculation of residual variance portions logger = getRLogger('RSDL_VAR') logger.info("Determining active and passive residual variance " "contributions for emulator iteration %i." % (emul_i)) # Check if regression is used and act accordingly if self._method in ('regression', 'full'): rsdl_var = self._rsdl_var[emul_i] elif(self._method == 'gaussian'): rsdl_var = [self._sigma**2]*len(emul_s_seq) # Loop over all emulator systems for i, (emul_s, rsdl_var_s) in enumerate(zip(emul_s_seq, rsdl_var)): # Calculate the weight weight = 1-(len(self._active_par_data[emul_i][emul_s]) / self._modellink._n_par) # Set the active and passive portions of the residual variance act_rsdl_var = (1-weight)*rsdl_var_s pas_rsdl_var = (weight+self._f_infl)*rsdl_var_s # Save the residual variance contributions to hdf5 self._save_data(emul_i, emul_s, { 'rsdl_var': { 'act_rsdl_var': act_rsdl_var, 'pas_rsdl_var': pas_rsdl_var}}) # Log that calculation has been finished logger.info("Finished determining active and passive residual variance" " contributions.")