# -*- coding: utf-8 -*-
"""
Pipeline
========
Provides the definition of the main class of the *PRISM* package, the
:class:`~Pipeline` class.
"""
# %% IMPORTS
# Future imports
from __future__ import (absolute_import, division, print_function,
with_statement)
# Built-in imports
from contextlib import contextmanager
from inspect import isclass
import logging
import os
from os import path
import sys
from textwrap import dedent
from time import time
# Package imports
from e13tools import InputError, ShapeError
from e13tools.math import nCr
from e13tools.sampling import lhd
import numpy as np
from numpy.random import normal, random
from six import integer_types, string_types
from sortedcontainers import SortedDict as sdict
# PRISM imports
from prism._docstrings import (call_emul_i_doc, call_model_doc_s,
call_model_doc_m, def_par_doc, ext_mod_set_doc,
ext_real_set_doc_d, ext_real_set_doc_s,
ext_sam_set_doc, impl_cut_doc, impl_temp_doc,
paths_doc_d, paths_doc_s, read_par_doc,
save_data_doc_p, std_emul_i_doc,
user_emul_i_doc)
from prism._emulator import Emulator
from prism._internal import (PRISM_Comm, RequestError, RequestWarning,
check_vals, convert_str_seq, delist,
docstring_append, docstring_copy,
docstring_substitute, getCLogger, get_PRISM_File,
getRLogger, move_logger, np_array, raise_error,
raise_warning, start_logger)
from prism._projection import Projection
# All declaration
__all__ = ['Pipeline']
# %% PIPELINE CLASS DEFINITION
# TODO: Allow user to switch between emulation and modelling
# TODO: Implement multivariate implausibilities
# TODO: Think of a way to allow no ModelLink instance to be provided.
# This could be done with a DummyLink, but md_var is then uncallable.
[docs]class Pipeline(Projection, object):
"""
Defines the :class:`~Pipeline` class of the *PRISM* package.
The :class:`~Pipeline` class is the main user class of the *PRISM* package
and provides a user-friendly environment that gives access to all
operations within the package.
"""
[docs] @docstring_substitute(paths=paths_doc_d)
def __init__(self, modellink_obj, root_dir=None, working_dir=None,
prefix=None, prism_file=None, emul_type=None, comm=None):
"""
Initialize an instance of the :class:`~Pipeline` class.
Parameters
----------
modellink_obj : :obj:`~prism.modellink.ModelLink` object
Instance of the :class:`~prism.modellink.ModelLink` subclass
that links the emulated model to this :obj:`~Pipeline` instance.
Optional
--------
%(paths)s
emul_type : :class:`~prism.Emulator` subclass or None. Default: None
The type of :class:`~prism.Emulator` to use in this
:obj:`~prism.Pipeline` instance. If *None*, use the default
emulator instead.
comm : :obj:`~MPI.Intracomm` object or None. Default: None
The MPI intra-communicator to use in this :class:`~Pipeline`
instance or :obj:`MPI.COMM_WORLD` if `comm` is *None*.
If :mod:`mpi4py` is not installed, :mod:`~prism._dummyMPI` is used
instead.
"""
# Obtain MPI communicator, ranks and sizes
self._comm = PRISM_Comm(comm)
self._rank = self._comm.Get_rank()
self._size = self._comm.Get_size()
# Set statuses
self._is_controller = 1 if not self._rank else 0
self._is_worker = 1 if self._rank else 0
self._worker_mode = 0
self._do_logging = 1
# Controller obtaining paths and preparing logging system
if self._is_controller:
# Start logging
logging_file = start_logger()
logger = getCLogger('PIPELINE')
logger.info("")
# Initialize class
logger = getCLogger('INIT')
logger.info("Initializing Pipeline class.")
# Obtain paths
self._get_paths(root_dir, working_dir, prefix, prism_file)
# Move logger to working directory and restart it
move_logger(self._working_dir, logging_file)
# Remaining workers obtain paths from controller
else:
# Obtain paths
logger = getCLogger('INIT')
self._get_paths(root_dir, working_dir, prefix, prism_file)
# MPI Barrier
self._comm.Barrier()
# Start logger for workers as well
if self._is_worker:
start_logger(path.join(self._working_dir, 'prism_log.log'), 'a')
# Initialize Emulator class
# If emul_type is None, use default emulator
if emul_type is None:
Emulator(self, modellink_obj)
# Else if emul_type is a subclass of Emulator, try to initialize it
elif isclass(emul_type) and issubclass(emul_type, Emulator):
try:
emul_type(self, modellink_obj)
self._emulator
self._modellink
except Exception as error:
err_msg = ("Input argument 'emul_type' is invalid! (%s)"
% (error))
raise_error(err_msg, InputError, logger)
# If anything else is given, it is invalid
else:
err_msg = ("Input argument 'emul_type' is invalid (%r)!"
% (emul_type))
raise_error(err_msg, InputError, logger)
# Let everybody read in the pipeline parameters
self._read_parameters()
# Compile pre-defined code snippets
self._compile_code_snippets()
# Let controller load in the data
if self._is_controller:
self._load_data()
# Print out the details of the current state of the pipeline
self.details()
# Allows one to call one full loop of the PRISM pipeline
@docstring_substitute(emul_i=call_emul_i_doc)
def __call__(self, emul_i=None, force=False):
"""
Calls the :meth:`~construct` method to start the construction of the
given iteration of the emulator and creates the projection figures
right afterward if this construction was successful.
Optional
--------
%(emul_i)s
force : bool. Default: False
Controls what to do if the specified emulator iteration `emul_i`
already (partly) exists.
If *False*, finish construction of the specified iteration or skip
it if already finished.
If *True*, reconstruct the specified iteration entirely.
"""
# Perform construction
self.construct(emul_i, force=force)
# Perform projection
self.project()
# Define the representation of a Pipeline object
# TODO: Find out if there is a way to represent an MPI intra-communicator
def __repr__(self):
# Get path to current working directory, make all paths relative to it
pwd = os.getcwd()
# Make empty list holding representations of all input arguments
str_repr = []
# Add the ModelLink representation
str_repr.append(repr(self._modellink))
# Add the root_dir representation if it is not default
if(path.splitdrive(self._root_dir)[0].lower() !=
path.splitdrive(pwd)[0].lower()):
rel_root_path = self._root_dir
else:
rel_root_path = path.relpath(self._root_dir, pwd)
if(rel_root_path != '.'):
str_repr.append("root_dir=%r" % (rel_root_path))
# Add the working_dir representation
str_repr.append("working_dir=%r" % (path.relpath(self._working_dir,
self._root_dir)))
# Add the prism_file representation if it is not default
if self._prism_file is not None:
if(path.splitdrive(self._prism_file)[0].lower() !=
path.splitdrive(pwd)[0].lower()):
str_repr.append("prism_file=%r" % (self._prism_file))
else:
str_repr.append("prism_file=%r"
% (path.relpath(self._prism_file, pwd)))
# Add the emul_type representation if it is not default
emul_repr = "%s.%s" % (self._emulator.__class__.__module__,
self._emulator.__class__.__name__)
if(emul_repr != 'prism._emulator.Emulator'):
str_repr.append("emul_type=%s" % (emul_repr))
# Return representation
return("Pipeline(%s)" % (", ".join(str_repr)))
# %% CLASS PROPERTIES
# MPI properties
@property
def comm(self):
"""
:obj:`~mpi4py.MPI.Intracomm`: The global MPI intra-communicator to use
in this :obj:`~Pipeline` instance. By default, this is
:obj:`MPI.COMM_WORLD`.
"""
return(self._comm)
@property
def rank(self):
"""
int: The rank of this MPI process in :attr:`~comm`. If no MPI is used,
this is always 0.
"""
return(self._rank)
@property
def size(self):
"""
int: The number of MPI processes in :attr:`~comm`. If no MPI is used,
this is always 1.
"""
return(self._size)
@property
def is_controller(self):
"""
bool: Whether or not this MPI process is a controller rank. If no MPI
is used, this is always *True*.
"""
return(bool(self._is_controller))
@property
def is_worker(self):
"""
bool: Whether or not this MPI process is a worker rank. If no MPI is
used, this is always *False*.
"""
return(bool(self._is_worker))
@property
@contextmanager
def worker_mode(self):
"""
:obj:`~contextlib._GeneratorContextManager`: Special context manager
within which all code is executed in worker mode. In worker mode, all
worker ranks are continuously listening for calls from the controller
rank made with :meth:`~_make_call`.
Note that all code within the context manager is executed by all ranks,
with the worker ranks executing it after the controller rank exited.
It is therefore advised to use an if-statement inside to make sure only
the controller rank executes the code.
Using this context manager allows for easier use of *PRISM* in
combination with serial/OpenMP codes (like MCMC methods).
"""
# Make logger
logger = getCLogger('WORKER_M')
# Workers start listening for calls
self._listen_for_calls()
# Log that workers are now listening
logger.info("Workers are now listening for calls.")
# Execute code block within context manager
yield
# Make workers stop listening for calls
self._make_call(None)
# Log that workers are no longer listening for calls
logger.info("Workers are no longer listening for calls.")
# MPI Barrier
self._comm.Barrier()
# Pipeline Settings/Attributes/Details
@property
def do_logging(self):
"""
bool: Whether or not to save all logging messages. If *False*, all
logging messages of level :attr:`~logging.INFO` and below are ignored.
It also enables/disables the progress bar for making projections.
"""
return(bool(self._do_logging))
# TODO: Find a way to only turn off all regular logging done by PRISM
@do_logging.setter
def do_logging(self, flag):
# Make logger
logger = getRLogger('DO_LOGGING')
# Check if provided value is a bool
flag = check_vals(flag, 'do_logging', 'bool')
# If flag and do_logging are the same, skip
if flag is self._do_logging:
pass
# If logging is turned on, log this and turn off logging
elif not flag:
logging.disable(logging.INFO)
logger.warning("Logging messages of level %i (INFO) and below are "
"now ignored." % (logging.INFO))
# If logging is turned off, turn it on and log this
else:
logging.disable(logging.NOTSET)
logger.warning("Logging messages are no longer ignored.")
self._do_logging = flag
@property
def root_dir(self):
"""
str: Absolute path to the root directory.
"""
return(self._root_dir)
@property
def working_dir(self):
"""
str: Absolute path to the working directory.
"""
return(self._working_dir)
@property
def hdf5_file(self):
"""
str: Absolute path to the loaded master HDF5-file.
"""
return(self._hdf5_file)
@property
def prism_file(self):
"""
str: Absolute path to the *PRISM* parameters file or *None* if no file
was provided.
"""
return(self._prism_file)
@property
def File(self):
"""
:class:`~h5py.File`: Custom :class:`~h5py.File` class that has added
logging and automatically uses :attr:`~hdf5_file` as the HDF5-file to
open.
"""
return(self._File)
@property
def modellink(self):
"""
:obj:`~prism.modellink.ModelLink`: The
:obj:`~prism.modellink.ModelLink` instance provided during
:class:`~Pipeline` initialization.
"""
return(self._modellink)
@property
def emulator(self):
"""
:obj:`~prism.Emulator`: The :obj:`~prism.Emulator`
instance created during :class:`~Pipeline` initialization.
"""
return(self._emulator)
@property
def code_objects(self):
"""
dict of code objects: Collection of pre-compiled built-in code snippets
that are used in the :meth:`~_evaluate_sam_set` method.
"""
return(self._code_objects)
@property
def criterion(self):
"""
str, float or None: Value indicating which criterion to use in the
:func:`~e13tools.sampling.lhd` function.
"""
return(self._criterion)
@property
def do_active_anal(self):
"""
bool: Whether or not to do an active parameters analysis during the
construction of the emulator systems.
"""
return(bool(self._do_active_anal))
@property
def freeze_active_par(self):
"""
bool: Whether or not previously active parameters always stay active if
possible.
"""
return(bool(self._freeze_active_par))
@property
def pot_active_par(self):
"""
list of str: The potentially active parameters. Only parameters from
this list can become active during the active parameters analysis.
If :attr:`~do_active_anal` is *False*, all parameters in this list will
be active.
"""
return([self._modellink._par_name[i] for i in self._pot_active_par])
@property
def n_sam_init(self):
"""
int: Number of evaluation samples used to construct the first iteration
of the emulator systems.
"""
return(self._n_sam_init)
@property
def n_eval_sam(self):
"""
int: The number of evaluation samples used to analyze an emulator
iteration of the emulator systems. The number of plausible evaluation
samples is stored in :attr:`~n_impl_sam`.
It is zero if the specified iteration has not been analyzed yet.
"""
return(self._n_eval_sam)
@property
def base_eval_sam(self):
"""
int: Base number of emulator evaluations used to analyze the emulator
systems. This number is scaled up by the number of model parameters and
the current emulator iteration to generate the true number of emulator
evaluations (:attr:`~n_eval_sam`).
"""
return(self._base_eval_sam)
@property
def impl_cut(self):
"""
list of int: The non-wildcard univariate implausibility cut-off values
for an emulator iteration.
"""
return(self._impl_cut)
@property
def cut_idx(self):
"""
int: The index of the first non-wildcard in a list of implausibility
values. This is equivalent to the number of wildcards leading the
cut-off values in :attr:`~impl_cut`.
"""
return(self._cut_idx)
@property
def n_impl_sam(self):
"""
int: Number of model evaluation samples that passed the implausibility
checks during the analysis of an emulator iteration.
It is zero if the specified iteration has not been analyzed yet or has
no plausible samples.
"""
return(self._n_impl_sam)
@property
def impl_sam(self):
"""
:obj:`~numpy.ndarray`: The model evaluation samples that will be added
to the next emulator iteration.
"""
return(self._impl_sam)
# %% GENERAL CLASS METHODS
# Function that locks workers into listening for controller calls
[docs] def _listen_for_calls(self):
"""
All worker ranks in the :attr:`~comm` communicator start listening for
calls from the corresponding controller rank and will attempt to
execute the received message. Listening for calls continues until
:attr:`~_worker_mode` is set to *False*.
This method is automatically initialized and finalized when using the
:attr:`~worker_mode`.
"""
# Set worker_mode to 1
self._worker_mode = 1
# All workers start listening for calls
if self._is_worker:
while self._worker_mode:
exec_fn, args, kwargs = self._comm.bcast([], 0)
if exec_fn is None:
self._worker_mode = 0
elif isinstance(exec_fn, string_types):
attrs = exec_fn.split('.')
obj = self
for attr in attrs:
obj = getattr(obj, attr)
obj(*args, **kwargs)
else:
exec_fn(*args, **kwargs)
# Function that sends a code string to all workers and executes it
[docs] def _make_call(self, exec_fn, *args, **kwargs):
"""
Send the provided `exec_fn` to all worker ranks, if they are
listening for calls, and tell them to execute it using the provided
`args` and `kwargs`. All ranks that call this function will execute
`exec_fn` as well.
If used within the :attr:`~worker_mode` context manager, this function
should only be called by the controller. If not, it should be called by
all ranks that must execute `exec_fn`.
Parameters
----------
exec_fn : str, callable or None
If string, a callable attribute of this :obj:`~Pipeline` instance
or a callable object that the workers should execute if not.
If *None*, the workers stop listening for calls instead (disables
worker mode).
args : tuple
Positional arguments that need to be provided to `exec_fn`.
kwargs : dict
Keyword arguments that need to be provided to `exec_fn`.
"""
# Send received exec_code to all workers if they are listening
if self._worker_mode and self._is_controller:
self._comm.bcast([exec_fn, args, kwargs], 0)
# Execute exec_fn as well
if exec_fn is None:
self._worker_mode = 0
elif isinstance(exec_fn, string_types):
attrs = exec_fn.split('.')
obj = self
for attr in attrs:
obj = getattr(obj, attr)
return(obj(*args, **kwargs))
else:
return(exec_fn(*args, **kwargs))
# This function evaluates the model for a given set of evaluation samples
# TODO: If not MPI_call, all ranks evaluate part of sam_set simultaneously?
# Requires check/flag that model can be evaluated in multiple instances
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _evaluate_model(self, emul_i, sam_set, data_idx):
"""
Evaluates the model for provided evaluation sample set `sam_set` at
given data points `data_idx`.
This method automatically distributes the samples according to the
various flags set in the :class:`~prism.modellink.ModelLink` subclass.
Parameters
----------
%(emul_i)s
sam_set : 1D or 2D array_like
Parameter/sample set to evaluate in the model.
data_idx : list of tuples
The list of data identifiers for which the model is requested to
return the corresponding data values.
Returns
-------
mod_set : 2D :obj:`~numpy.ndarray` object of shape ``(n_sam, n_data)``
Array containing the data values corresponding to the requested
data points.
"""
# Log that evaluation of model samples is started
logger = getCLogger('MODEL')
logger.info("Evaluating model samples.")
# Make sure that sam_set is at least 2D and a NumPy array
sam_set = np_array(sam_set, ndmin=2)
# Check who needs to call the model
if self._is_controller or self._modellink._MPI_call:
# Request all evaluation samples at once
if self._modellink._multi_call:
mod_set = self._multi_call_model(emul_i, sam_set, data_idx)
# Request evaluation samples one-by-one
else:
# Initialize mod_set
mod_set = np.zeros([sam_set.shape[0], self._modellink._n_data])
# Loop over all requested evaluation samples
for i, par_set in enumerate(sam_set):
mod_set[i] = self._call_model(emul_i, par_set, data_idx)
# If workers did not call model, give them a dummy mod_set
else:
mod_set = []
# Log that evaluation is completed and return mod_set
logger.info("Finished evaluating model samples.")
return(mod_set)
# Function obtaining the model output for a given set of parameter values
[docs] @docstring_append(call_model_doc_s)
def _call_model(self, emul_i, par_set, data_idx):
# Make sure par_set is at least 1D and a NumPy array
sam = np_array(par_set, ndmin=1)
# Log that model is being called
logger = getCLogger('CALL_MODEL')
logger.info("Calling model at parameters %s." % (sam))
# Obtain model output
mod_out = self._modellink.call_model(
emul_i=emul_i,
par_set=sdict(zip(self._modellink._par_name, sam)),
data_idx=data_idx)
# Log that calling model has been finished
logger.info("Model returned %s." % (mod_out))
# Return it
return(np_array(mod_out))
# Function containing the model output for a given set of parameter samples
[docs] @docstring_append(call_model_doc_m)
def _multi_call_model(self, emul_i, sam_set, data_idx):
# Make sure sam_set is at least 2D and a NumPy array
sam_set = np_array(sam_set, ndmin=2)
# Log that model is being multi-called
logger = getCLogger('CALL_MODEL')
logger.info("Multi-calling model for sample set of size %i."
% (sam_set.shape[0]))
# Obtain set of model outputs
mod_set = self._modellink.call_model(
emul_i=emul_i,
par_set=sdict(zip(self._modellink._par_name, sam_set.T)),
data_idx=data_idx)
# Log that multi-calling model has been finished
logger.info("Finished model multi-call.")
# Return it
return(np_array(mod_set))
# This function returns default pipeline parameters
[docs] @docstring_append(def_par_doc.format("pipeline"))
def _get_default_parameters(self):
# Create parameter dict with default parameters
par_dict = {'n_sam_init': '500',
'base_eval_sam': '800',
'impl_cut': '[0, 4.0, 3.8, 3.5]',
'criterion': "None",
'do_active_anal': 'True',
'freeze_active_par': 'True',
'pot_active_par': 'None'}
# Return it
return(par_dict)
# Read in the parameters from the provided parameter file
[docs] @docstring_append(read_par_doc.format("Pipeline"))
def _read_parameters(self):
# Log that the PRISM parameter file is being read
logger = getCLogger('INIT')
logger.info("Reading pipeline parameters.")
# Obtaining default pipeline parameter dict
par_dict = self._get_default_parameters()
# Read in data from provided PRISM parameters file
if self._prism_file is not None:
pipe_par = np.genfromtxt(self._prism_file, dtype=(str),
delimiter=':', autostrip=True)
# Make sure that pipe_par is 2D
pipe_par = np_array(pipe_par, ndmin=2)
# Combine default parameters with read-in parameters
par_dict.update(pipe_par)
# More logging
logger.info("Checking compatibility of provided pipeline parameters.")
# GENERAL
# Number of starting samples
self._n_sam_init =\
check_vals(convert_str_seq(par_dict['n_sam_init'])[0],
'n_sam_init', 'int', 'pos')
# Base number of emulator evaluation samples
self._base_eval_sam =\
check_vals(convert_str_seq(par_dict['base_eval_sam'])[0],
'base_eval_sam', 'int', 'pos')
# Criterion parameter used for Latin Hypercube Sampling
# If criterion is None, save it as such
if(par_dict['criterion'].lower() == 'none'):
self._criterion = None
# If criterion is a bool, raise error
elif par_dict['criterion'].lower() in ('false', 'true'):
err_msg = ("Input argument 'criterion' does not accept values of "
"type 'bool'!")
raise_error(err_msg, TypeError, logger)
# If anything else is given, it must be a float or string
else:
# Convert to float or string
criterion = convert_str_seq(par_dict['criterion'])[0]
# Try to use criterion to check validity
try:
lhd(3, 2, criterion=criterion)
except Exception as error:
err_msg = ("Input argument 'criterion' is invalid! (%s)"
% (error))
raise_error(err_msg, InputError, logger)
else:
self._criterion = criterion
# Obtain the bool determining whether to do an active parameters
# analysis
self._do_active_anal = check_vals(par_dict['do_active_anal'],
'do_active_anal', 'bool')
# Obtain the bool determining whether active parameters stay active
self._freeze_active_par = check_vals(par_dict['freeze_active_par'],
'freeze_active_par', 'bool')
# Check which parameters can potentially be active
# If pot_active_par is None, save all model parameters
if(par_dict['pot_active_par'].lower() == 'none'):
self._pot_active_par = np_array(range(self._modellink._n_par))
# If pot_active_par is a bool, raise error
elif par_dict['pot_active_par'].lower() in ('false', 'true'):
err_msg = ("Input argument 'pot_active_par' does not accept values"
"of type 'bool'!")
raise_error(err_msg, TypeError, logger)
# If anything else is given, it must be a sequence of model parameters
else:
# Convert the given sequence to an array of indices
self._pot_active_par = np_array(self._modellink._get_model_par_seq(
par_dict['pot_active_par'], 'pot_active_par'))
# Log that reading has been finished
logger.info("Finished reading pipeline parameters.")
# This function controls how n_eval_samples is calculated
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _get_n_eval_sam(self, emul_i):
"""
This function calculates the total number of emulator evaluation
samples at a given emulator iteration `emul_i` from
:attr:`~base_eval_sam`.
Parameters
----------
%(emul_i)s
Returns
-------
n_eval_sam : int
Total number of emulator evaluation samples.
"""
# Calculate n_eval_sam and return it
return(emul_i*self._base_eval_sam*self._modellink._n_par)
# Obtains the paths for the root directory, working directory, pipeline
# hdf5-file and prism parameters file
[docs] @docstring_substitute(paths=paths_doc_s)
def _get_paths(self, root_dir, working_dir, prefix, prism_file):
"""
Obtains the path for the root directory, working directory and
parameters file for *PRISM*.
Parameters
----------
%(paths)s
Generates
---------
The absolute paths to the root directory, working directory, emulator
master HDF5-file and *PRISM* parameters file.
"""
# Set logging system
logger = getCLogger('INIT')
logger.info("Obtaining related directory and file paths.")
# Controller obtaining the paths
if self._is_controller:
# Obtain root directory path
# If one did not specify a root directory, set it to default
if root_dir is None:
self._root_dir = path.abspath('.')
logger.info("No root directory specified, set to %r."
% (self._root_dir))
# If one specified a root directory, use it
elif isinstance(root_dir, string_types):
self._root_dir = path.abspath(root_dir)
logger.info("Root directory set to %r." % (self._root_dir))
# Check if this directory already exists and create it if not
try:
os.mkdir(self._root_dir)
except OSError:
pass
else:
logger.info("Root directory did not exist, created it.")
# If anything else is given, it is invalid
else:
err_msg = "Input argument 'root_dir' is invalid!"
raise_error(err_msg, InputError, logger)
# Check if a valid working directory prefix string is given
if prefix is None:
prefix_scan = ''
prefix_new = 'prism_'
elif isinstance(prefix, string_types):
prefix_scan = prefix
prefix_new = prefix
else:
err_msg = "Input argument 'prefix' is not of type 'str'!"
raise_error(err_msg, TypeError, logger)
# Obtain working directory path
# If one did not specify a working directory, obtain it
if working_dir in (None, False):
logger.info("No working directory specified, trying to load "
"last one created.")
dirnames = next(os.walk(self._root_dir))[1]
emul_dirs = []
# Check which directories in the root_dir satisfy the default
# naming scheme of the emulator directories
for dirname in dirnames:
# If the prefix is the same as the scan prefix
if dirname.startswith(prefix_scan):
# Obtain full path to this directory
dir_path = path.join(self._root_dir, dirname)
# Check if this directory contains a 'prism_log.log'
if 'prism_log.log' in os.listdir(dir_path):
# Obtain creation time and append to emul_dirs
ctime = path.getctime(dir_path)
emul_dirs.append([dir_path, ctime])
# Sort list of emul_dirs on creation time
emul_dirs.sort(key=lambda x: x[1], reverse=True)
# If no working directory exists, create a new one
if not len(emul_dirs):
working_dir = ''.join([prefix_new, '0'])
self._working_dir = path.join(self._root_dir, working_dir)
os.mkdir(self._working_dir)
logger.info("No working directories found, created %r."
% (working_dir))
# If working directories exist, load last one created
else:
self._working_dir = emul_dirs[0][0]
logger.info("Working directories found, set to %r."
% (path.basename(self._working_dir)))
# If one requested a new working directory
elif working_dir is True:
# Obtain list of working directories that satisfy naming scheme
dirnames = next(os.walk(self._root_dir))[1]
n_dirs = 0
# Check if there are any directories with the same prefix
for dirname in dirnames:
if dirname.startswith(prefix_scan):
# Obtain full path to this directory
dir_path = path.join(self._root_dir, dirname)
# Check if this directory contains a 'prism_log.log'
if 'prism_log.log' in os.listdir(dir_path):
n_dirs += 1
# Check if working directories already exist with the same
# prefix and append a number to the name if this is the case
while True:
working_dir = path.join(self._root_dir,
''.join([prefix_new, str(n_dirs)]))
try:
os.mkdir(working_dir)
except OSError:
n_dirs += 1
else:
break
# Save path to new working directory
self._working_dir = working_dir
logger.info("New working directory requested, created %r."
% (path.basename(working_dir)))
# If one specified a working directory, use it
elif isinstance(working_dir, string_types):
self._working_dir = path.join(self._root_dir, working_dir)
logger.info("Working directory set to %r." % (working_dir))
# Check if this directory already exists and create it if not
try:
os.mkdir(self._working_dir)
except OSError:
pass
else:
logger.info("Working directory did not exist, created it.")
# If one provided an integer, raise warning about it
elif isinstance(working_dir, integer_types):
err_msg = ("Input argument 'working_dir' cannot be of type "
"int!")
raise_error(err_msg, TypeError, logger)
# If anything else is given, it is invalid
else:
err_msg = "Input argument 'working_dir' is invalid!"
raise_error(err_msg, InputError, logger)
# Obtain hdf5-file path
self._hdf5_file = path.join(self._working_dir, 'prism.hdf5')
# Obtain PRISM parameter file path
# If no PRISM parameter file was provided
if prism_file is None:
self._prism_file = None
# If a PRISM parameter file was provided
elif isinstance(prism_file, string_types):
# Check if prism_file was given as an absolute path
if path.exists(prism_file):
self._prism_file = path.abspath(prism_file)
# If not, check if it was relative to root_dir
elif path.exists(path.join(self._root_dir, prism_file)):
self._prism_file = path.join(self._root_dir, prism_file)
# If not either, it is invalid
else:
err_msg = ("Input argument 'prism_file' is a non-existing "
"path (%r)!" % (prism_file))
raise_error(err_msg, OSError, logger)
logger.info("PRISM parameters file set to %r."
% (self._prism_file))
# If anything else is given, it is invalid
else:
err_msg = "Input argument 'prism_file' is invalid!"
raise_error(err_msg, InputError, logger)
# Workers get dummy paths
else:
self._root_dir = None
self._working_dir = None
self._hdf5_file = None
self._prism_file = None
# Broadcast paths to workers
self._root_dir = self._comm.bcast(self._root_dir, 0)
self._working_dir = self._comm.bcast(self._working_dir, 0)
self._hdf5_file = self._comm.bcast(self._hdf5_file, 0)
self._prism_file = self._comm.bcast(self._prism_file, 0)
# Generate custom File class using the path to the master HDF5-file
self._File = get_PRISM_File(self._hdf5_file)
# This function generates mock data and loads it into ModelLink
# TODO: Find a way to use mock data without changing ModelLink properties
[docs] def _get_mock_data(self):
"""
Generates mock data and loads it into the
:obj:`~prism.modellink.ModelLink` object that was provided
during class initialization.
This function overwrites the
:class:`~prism.modellink.ModelLink` properties holding the
parameter estimates, data values and data errors.
Generates
---------
Overwrites the corresponding
:class:`~prism.modellink.ModelLink` class properties with the
generated values.
"""
# Start logger
logger = getCLogger('MOCK_DATA')
# Log new mock_data being created
logger.info("Generating mock data for new emulator.")
# Controller only
if self._is_controller:
# Set non-default parameter estimate
self._modellink._par_est = self._modellink._to_par_space(
random(self._modellink._n_par)).tolist()
# Controller broadcasting new parameter estimates to workers
self._modellink._par_est = self._comm.bcast(self._modellink._par_est,
0)
# Obtain non-default model data values
data_val = self._evaluate_model(0, self._modellink._par_est,
self._modellink._data_idx)
# Controller only
if self._is_controller:
# Set non-default model data values
self._emulator._data_val[0] = data_val[0].tolist()
# Use model discrepancy variance as model data errors
md_var = self._get_md_var(0, self._modellink._par_est)
err = np.sqrt(md_var).tolist()
self._modellink._data_err = err
# Add model data errors as noise to model data values
noise = normal(size=self._modellink._n_data)
for i in range(self._modellink._n_data):
# If value space is linear
if(self._modellink._data_spc[i] == 'lin'):
if(noise[i] > 0):
noise[i] *= err[i][0]
else:
noise[i] *= err[i][1]
# If value space is log10
elif(self._modellink._data_spc[i] == 'log10'):
if(noise[i] > 0):
noise[i] = np.log10((pow(10, err[i][0])-1)*noise[i]+1)
else:
noise[i] =\
np.log10((pow(10, -1*err[i][1])-1)*-1*noise[i]+1)
# If value space is ln
elif(self._modellink._data_spc[i] == 'ln'):
if(noise[i] > 0):
noise[i] = np.log((pow(np.e, err[i][0])-1)*noise[i]+1)
else:
noise[i] =\
np.log((pow(np.e, -1*err[i][1])-1)*-1*noise[i]+1)
# If value space is anything else
else:
raise NotImplementedError
# Add noise to the data values
self._modellink._data_val =\
(self._emulator._data_val[0]+noise).tolist()
# Logger
logger.info("Generated mock data.")
# Broadcast updated modellink properties to workers
self._modellink._data_val = self._comm.bcast(self._modellink._data_val,
0)
self._modellink._data_err = self._comm.bcast(self._modellink._data_err,
0)
# This function loads pipeline data
[docs] def _load_data(self):
"""
Loads in all the important pipeline data into memory for the controller
rank.
If it is detected that the last emulator iteration has not been
analyzed yet, the implausibility analysis parameters are read in from
the *PRISM* parameters file and temporarily stored in memory.
Generates
---------
All relevant pipeline data up to the last emulator iteration is loaded
into memory.
"""
# Set the logger
logger = getCLogger('LOAD_DATA')
# Initialize all data sets with empty lists
logger.info("Initializing pipeline data sets.")
self._n_impl_sam = [[]]
self._impl_cut = [[]]
self._cut_idx = [[]]
self._n_eval_sam = [[]]
self._impl_sam = []
# If an emulator currently exists, load in all data
if self._emulator._emul_i:
# Open hdf5-file
with self._File('r', None) as file:
# Read in the data up to the last emulator iteration
for i in range(1, self._emulator._emul_i+1):
# Get this emulator
emul = file['%i' % (i)]
# Check if analysis has been carried out (only if i=emul_i)
try:
self._impl_cut.append(emul.attrs['impl_cut'])
# If not, no plausible regions were found
except KeyError:
self._get_impl_par(True)
# If so, load in all data
else:
self._cut_idx.append(emul.attrs['cut_idx'])
finally:
self._n_impl_sam.append(emul.attrs['n_impl_sam'])
self._n_eval_sam.append(emul.attrs['n_eval_sam'])
# Read in the samples that survived the implausibility check
self._impl_sam = emul['impl_sam'][()]
self._impl_sam.dtype = float
# This function saves pipeline data to hdf5
[docs] @docstring_substitute(save_data=save_data_doc_p)
def _save_data(self, data_dict):
"""
Saves a given data dict ``{keyword: data}`` at the last emulator
iteration to the HDF5-file and as an data attribute to the current
:obj:`~Pipeline` instance.
%(save_data)s
"""
# Do some logging
logger = getRLogger('SAVE_DATA')
# Obtain last emul_i
emul_i = self._emulator._emul_i
# Open hdf5-file
with self._File('r+', None) as file:
# Obtain the dataset this data needs to be saved to
data_set = file['%i' % (emul_i)]
# Loop over entire provided data dict
for keyword, data in data_dict.items():
# Log what data is being saved
logger.info("Saving %r data at iteration %i to HDF5."
% (keyword, emul_i))
# Check what data keyword has been provided
# IMPL_PAR
if(keyword == 'impl_par'):
# Check if impl_par data has been saved before
try:
self._impl_cut[emul_i] = data['impl_cut']
self._cut_idx[emul_i] = data['cut_idx']
except IndexError:
self._impl_cut.append(data['impl_cut'])
self._cut_idx.append(data['cut_idx'])
finally:
data_set.attrs['impl_cut'] = data['impl_cut']
data_set.attrs['cut_idx'] = data['cut_idx']
# IMPL_SAM
elif(keyword == 'impl_sam'):
# Check if any plausible regions have been found at all
n_impl_sam = np.shape(data)[0]
# Convert data to a compound data set
dtype = [(n, float) for n in self._modellink._par_name]
data_c = data.copy()
data_c.dtype = dtype
# Check if impl_sam data has been saved before
try:
self._n_impl_sam[emul_i] = n_impl_sam
except IndexError:
data_set.create_dataset('impl_sam', data=data_c)
self._n_impl_sam.append(n_impl_sam)
else:
del data_set['impl_sam']
data_set.create_dataset('impl_sam', data=data_c)
finally:
self._impl_sam = data
data_set.attrs['n_impl_sam'] = n_impl_sam
# N_EVAL_SAM
elif(keyword == 'n_eval_sam'):
# Check if n_eval_sam has been saved before
try:
self._n_eval_sam[emul_i] = data
except IndexError:
self._n_eval_sam.append(data)
finally:
data_set.attrs['n_eval_sam'] = data
# INVALID KEYWORD
else:
err_msg = "Invalid keyword argument provided!"
raise_error(err_msg, ValueError, logger)
# This function saves a statistic to hdf5
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _save_statistics(self, emul_i, stat_dict):
"""
Saves a given statistics dict ``{keyword: [value, unit]}`` at
emulator iteration `emul_i` to the HDF5-file. The provided values are
always saved as strings.
Parameters
----------
%(emul_i)s
Dict variables
--------------
keyword : str
String containing the name/keyword of the statistic that is being
saved.
value : int, float or str
The value of the statistic.
unit : str
The unit of the statistic.
"""
# Do logging
logger = getCLogger('STATISTICS')
logger.info("Saving statistics to HDF5.")
# Open hdf5-file
with self._File('r+', None) as file:
# Loop over all statistics in stat_dict and save them
for keyword, (value, unit) in stat_dict.items():
file['%i/statistics' % (emul_i)].attrs[keyword] =\
[str(value).encode('ascii', 'ignore'),
unit.encode('ascii', 'ignore')]
# This function evaluates and distributes the model evaluations samples
# This is function 'k'
[docs] @docstring_substitute(emul_i=std_emul_i_doc, ext_sam=ext_sam_set_doc,
ext_mod=ext_mod_set_doc)
def _get_iteration_data(self, emul_i, sam_set, ext_sam_set, ext_mod_set):
"""
Obtains the model realization data for given emulator iteration
`emul_i` by evaluating the provided `sam_set` in the model and
distributing model outputs to the correct emulator systems.
Parameters
----------
%(emul_i)s
sam_set : 2D :obj:`~numpy.ndarray` object
Array containing the model evaluation samples.
%(ext_sam)s
%(ext_mod)s
Generates
---------
sam_set : 2D :obj:`~numpy.ndarray` object
Array containing the model evaluation samples for emulator
iteration `emul_i`.
mod_set : 2D :obj:`~numpy.ndarray` object
Array containing the model outputs of all specified model
evaluation samples for emulator iteration `emul_i`.
"""
# Log that evaluation of model samples is started
logger = getCLogger('MODEL_REAL')
logger.info("Obtaining model realization data for emulator iteration "
"%i." % (emul_i))
# Save the current time
start_time = time()
# Obtain number of samples
n_sam = np.shape(sam_set)[0]
# Gather the data_idx from all MPI ranks on the controller
data_idx_list = self._comm.gather(
delist(self._emulator._data_idx[emul_i]), 0)
# Flatten the received data_idx_list on the controller
if self._is_controller:
data_idx_flat = []
data_idx_len = []
for rank, data_idx_rank in enumerate(data_idx_list):
data_idx_len.append(len(data_idx_rank))
for data_idx in data_idx_rank:
data_idx_flat.append(data_idx)
# Use dummy data_idx_flat on workers
else:
data_idx_flat = None
# For sake of consistency, broadcast data_idx_flat to workers
data_idx_flat = self._comm.bcast(data_idx_flat, 0)
# If there are any samples in sam_set, evaluate them in the model
if n_sam:
mod_set = self._evaluate_model(emul_i, sam_set, data_idx_flat)
# Transpose obtained mod_set on controller
if self._is_controller:
mod_set = mod_set.T
# Controller processing the received data values
if self._is_controller:
# Get end time
end_time = time()-start_time
# Check if ext_real_set and/or sam_set were provided
if ext_sam_set.shape[0] and n_sam:
sam_set = np.concatenate([sam_set, ext_sam_set], axis=0)
mod_set = np.concatenate([mod_set, ext_mod_set], axis=1)
use_ext_real_set = 1
elif ext_sam_set.shape[0]:
sam_set = ext_sam_set
mod_set = ext_mod_set
use_ext_real_set = 1
else:
use_ext_real_set = 0
# Sent the specific mod_set parts to the corresponding workers
logger.info("Distributing model realization data to corresponding "
"emulator systems.")
s_idx = 0
for rank, n_data in enumerate(data_idx_len):
# Controller data must be saved last
if not rank:
pass
# Send the remaining parts to the workers
else:
self._comm.send([sam_set, mod_set[s_idx:s_idx+n_data]],
dest=rank, tag=777+rank)
# Save which data parts have already been sent
s_idx += n_data
# MPI Barrier to make sure that workers have saved their data
self._comm.Barrier()
# Save controller data
self._emulator._save_data(emul_i, None, {
'mod_real_set': {
'sam_set': sam_set,
'mod_set': mod_set[0:data_idx_len[0]],
'use_ext_real_set': use_ext_real_set}})
# Workers waiting for controller to send them their data values
else:
sam_set, mod_set = self._comm.recv(source=0, tag=777+self._rank)
# Save all the data to the specific hdf5-files
for i, lemul_s in enumerate(self._emulator._active_emul_s[emul_i]):
self._emulator._save_data(emul_i, lemul_s, {
'mod_real_set': {
'mod_set': mod_set[i]}})
# Save sam_set data to memory
self._emulator._sam_set[emul_i] = sam_set
self._emulator._n_sam[emul_i] = np.shape(sam_set)[0]
# MPI Barrier to let controller know data was saved
self._comm.Barrier()
# Controller finishing up
if self._is_controller:
# Log that this is finished
eval_rate = end_time/n_sam if n_sam else 0
msg = ("Finished obtaining and distributing model realization data"
" in %.3g seconds, averaging %.3g seconds per model "
"evaluation." % (end_time, eval_rate))
self._save_statistics(emul_i, {
'tot_model_eval_time': ['%.3g' % (end_time), 's'],
'avg_model_eval_time': ['%.3g' % (eval_rate), 's'],
'MPI_comm_size_model': ['%i' % (self._size), '']})
logger.info(msg)
print(msg)
# MPI Barrier
self._comm.Barrier()
# This function generates a large Latin Hypercube sample set to analyze
# the emulator at
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _get_eval_sam_set(self, emul_i):
"""
Generates an emulator evaluation sample set to be used for analyzing an
emulator iteration. Currently uses the :func:`~e13tools.sampling.lhd`
function.
Parameters
----------
%(emul_i)s
Returns
-------
eval_sam_set : 2D :obj:`~numpy.ndarray` object
Array containing the evaluation samples.
"""
# Log about this
logger = getCLogger('EVAL_SAMS')
# Obtain number of samples
n_eval_sam = self._get_n_eval_sam(emul_i)
# Create array containing all samples for analyzing the emulator
logger.info("Creating emulator evaluation sample set with size %i."
% (n_eval_sam))
eval_sam_set = lhd(n_eval_sam, self._modellink._n_par,
self._modellink._par_rng, 'center',
self._criterion, 100,
constraints=self._emulator._sam_set[emul_i])
logger.info("Finished creating sample set.")
# Return it
return(eval_sam_set)
# This function performs an implausibility cut-off check on a given sample
# TODO: Implement dynamic impl_cut
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _do_impl_check(self, emul_i, uni_impl_val):
"""
Performs an implausibility cut-off check on the provided implausibility
values `uni_impl_val` at emulator iteration `emul_i`.
Parameters
----------
%(emul_i)s
uni_impl_val : 1D array_like
Array containing all univariate implausibility values corresponding
to a certain parameter set for all data points.
Returns
-------
result : bool
1 if check was successful, 0 if it was not.
impl_cut_val : float
Implausibility value at the first real implausibility cut-off.
"""
# Sort impl_val to compare with the impl_cut list
sorted_impl_val = np.flip(np.sort(
uni_impl_val, axis=-1), axis=-1)[self._cut_idx[emul_i]:]
# Save the implausibility value at the first real cut-off
impl_cut_val = sorted_impl_val[0]
# Scan over all data points in this sample
for impl_val, cut_val in zip(sorted_impl_val, self._impl_cut[emul_i]):
# If impl_val is not below impl_cut, break
if(impl_val > cut_val):
return(0, impl_cut_val)
else:
# If for-loop ended in a normal way, the check was successful
return(1, impl_cut_val)
# This function calculates the univariate implausibility values
# This is function 'I²(x)'
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _get_uni_impl(self, emul_i, par_set, adj_exp_val, adj_var_val):
"""
Calculates the univariate implausibility values at a given emulator
iteration `emul_i` for specified expectation and variance values
`adj_exp_val` and `adj_var_val`, corresponding to given `par_set`.
Parameters
----------
%(emul_i)s
par_set : 1D :obj:`~numpy.ndarray` object
Model parameter value set to calculate the univariate
implausibility values for. Only used to pass to the
:meth:`~prism.modellink.ModelLink.get_md_var` method.
adj_exp_val, adj_var_val : 1D array_like
The adjusted expectation and variance values to calculate the
univariate implausibility for.
Returns
-------
uni_impl_val : 1D :obj:`~numpy.ndarray` object
Univariate implausibility value for all requested emulator systems.
"""
# Obtain model discrepancy variance
md_var = self._get_md_var(emul_i, par_set)
# Determine the active emulator systems for this iteration
active_emul_s = self._emulator._active_emul_s[emul_i]
# Initialize empty univariate implausibility
uni_impl_val_sq = np.zeros(len(active_emul_s))
# Calculate the univariate implausibility values
for i, emul_s in enumerate(active_emul_s):
# Use the upper errors by default
err_idx = 0
# If adj_exp_val < data_val, use the lower error instead
if(adj_exp_val[i] < self._emulator._data_val[emul_i][emul_s]):
err_idx = 1
# Calculate the univariate implausibility value
uni_impl_val_sq[i] =\
pow(adj_exp_val[i]-self._emulator._data_val[emul_i][emul_s],
2)/(adj_var_val[i]+md_var[i][err_idx] +
pow(self._emulator._data_err[emul_i][emul_s][err_idx],
2))
# Take square root
uni_impl_val = np.sqrt(uni_impl_val_sq)
# Return it
return(uni_impl_val)
# This function calculates the model discrepancy variance
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _get_md_var(self, emul_i, par_set):
"""
Retrieves the model discrepancy variances, which includes all variances
that are created by the model provided by the
:obj:`~prism.modellink.ModelLink` instance. This method tries
to call the :meth:`~prism.modellink.ModelLink.get_md_var`
method, and assumes a default model discrepancy variance of ``1/6th``
the data value if it cannot be called. If the data value space is not
linear, then this default value is calculated such to reflect that.
Parameters
----------
%(emul_i)s
par_set : 1D :obj:`~numpy.ndarray` object
Model parameter value set to calculate the model discrepancy
variances for.
Returns
-------
var_md : 2D :obj:`~numpy.ndarray` object
Variance of the model discrepancy.
"""
# Obtain md variances
# Try to use the user-defined md variances
try:
md_var = self._modellink.get_md_var(
emul_i=emul_i,
par_set=sdict(zip(self._modellink._par_name, par_set)),
data_idx=delist(self._emulator._data_idx[emul_i]))
# If it was not user-defined, use a default value
except NotImplementedError:
# Use factor 2 difference on 2 sigma as acceptable
# Imagine that 2 sigma range is given if lower and upper are factor
# 2 apart. This gives that sigma must be 1/6th of the data value
# Create empty md_var list
md_var = []
# Loop over all data points and check their values spaces
for data_val, data_spc in zip(
delist(self._emulator._data_val[emul_i]),
delist(self._emulator._data_spc[emul_i])):
# If value space is linear, take 1/6th of the data value
if(data_spc == 'lin'):
md_var.append([pow(data_val/6, 2)]*2)
# If value space is log10, take log10(7/6) and log10(5/6)
elif(data_spc == 'log10'):
md_var.append([0.0044818726418455815,
0.006269669725654501])
# If value space is ln, take ln(7/6) and ln(5/6)
elif(data_spc == 'ln'):
md_var.append([0.023762432091205918, 0.03324115007177121])
# If value space is anything else
else:
raise NotImplementedError
# Make sure that md_var is a NumPy array
md_var = np_array(md_var, ndmin=2)
# If it was user-defined, check if the values are compatible
# OPTIMIZE: Doing this check repeatedly may heavily slow down the code
else:
# Make sure that md_var is a NumPy array
md_var = np_array(md_var)
# Check if md_var contains n_data values
if not(md_var.shape[0] == self._emulator._n_data[emul_i]):
err_msg = ("Received array of model discrepancy variances "
"'md_var' has incorrect number of data points (%i "
"!= %i)!"
% (md_var.shape[0], self._emulator._n_data[emul_i]))
raise ShapeError(err_msg)
# Check if single or dual values were given
if(md_var.ndim == 1):
md_var = np_array([md_var]*2).T
elif(md_var.shape[1] == 2):
pass
else:
err_msg = ("Received array of model discrepancy variances "
"'md_var' has incorrect number of values (%i != 2)!"
% (md_var.shape[1]))
raise ShapeError(err_msg)
# Check if all values are non-negative floats
md_var = check_vals(md_var, 'md_var', 'nneg', 'float')
# Return it
return(md_var)
# This function completes the list of implausibility cut-offs
[docs] @docstring_substitute(impl_temp=impl_temp_doc)
def _get_impl_cut(self, impl_cut, temp):
"""
Generates the full list of impl_cut-offs from the incomplete, shortened
`impl_cut` list.
Parameters
----------
impl_cut : 1D list
Incomplete, shortened impl_cut-offs list provided during class
initialization.
%(impl_temp)s
Generates
---------
impl_cut : 1D :obj:`~numpy.ndarray` object
Full list containing the impl_cut-offs for all data points provided
to the emulator.
cut_idx : int
Index of the first impl_cut-off in `impl_cut` that is not a
wildcard.
"""
# Log that impl_cut-off list is being acquired
logger = getCLogger('INIT')
logger.info("Generating full implausibility cut-off list.")
# Complete the impl_cut list
# Obtain the first impl_cut value
try:
impl_cut[0] = check_vals(impl_cut[0], 'impl_cut[0]', 'float',
'nneg')
except IndexError:
err_msg = ("Provided implausibility cut-off list contains no "
"elements!")
raise_error(err_msg, ValueError, logger)
# Loop over the remaining values in impl_cut
for i in range(1, len(impl_cut)):
# Check if provided value is non-negative float
impl_cut[i] = check_vals(impl_cut[i], 'impl_cut[%i]' % (i),
'float', 'nneg')
# If the value is zero, take on the value of the previous cut-off
if(impl_cut[i] == 0):
impl_cut[i] = impl_cut[i-1]
# If the value is lower than the previous value, it is invalid
elif(impl_cut[i-1] != 0 and impl_cut[i] > impl_cut[i-1]):
err_msg = ("Cut-off %i is higher than cut-off %i (%f > %f)!"
% (i, i-1, impl_cut[i], impl_cut[i-1]))
raise_error(err_msg, ValueError, logger)
# Get the index identifying where the first real impl_cut is
for i, impl in enumerate(impl_cut[:self._emulator._n_data_tot[-1]]):
if(impl != 0):
cut_idx = i
break
# If the loop ends normally, there were only wildcards
else:
err_msg = "No non-wildcard implausibility cut-off was provided!"
raise_error(err_msg, ValueError, logger)
# Save both impl_cut and cut_idx
impl_cut = np_array(impl_cut)[cut_idx:]
if temp:
# If they need to be stored temporarily
self._impl_cut.append(impl_cut)
self._cut_idx.append(cut_idx)
else:
# If they need to be stored to file
self._save_data({
'impl_par': {
'impl_cut': impl_cut,
'cut_idx': cut_idx}})
# Log end of process
logger.info("Finished generating implausibility cut-off list.")
# This function reads in the impl_cut list from the PRISM parameters file
# TODO: Make impl_cut dynamic
[docs] @docstring_substitute(impl_temp=impl_temp_doc, impl_cut=impl_cut_doc)
def _get_impl_par(self, temp):
"""
Reads in the impl_cut list and other parameters for implausibility
evaluations from the *PRISM* parameters file and saves them in the last
emulator iteration.
Parameters
----------
%(impl_temp)s
Generates
---------
%(impl_cut)s
"""
# Do some logging
logger = getCLogger('INIT')
logger.info("Obtaining implausibility analysis parameters.")
# Controller only
if self._is_controller:
# Obtaining default pipeline parameter dict
par_dict = self._get_default_parameters()
# Read in data from provided PRISM parameters file
if self._prism_file is not None:
pipe_par = np.genfromtxt(self._prism_file, dtype=(str),
delimiter=':', autostrip=True)
# Make sure that pipe_par is 2D
pipe_par = np_array(pipe_par, ndmin=2)
# Combine default parameters with read-in parameters
par_dict.update(pipe_par)
# More logging
logger.info("Checking compatibility of provided implausibility "
"analysis parameters.")
# Implausibility cut-off
# Remove all unwanted characters from the string and process it
self._get_impl_cut(convert_str_seq(par_dict['impl_cut']), temp)
# Finish logging
logger.info("Finished obtaining implausibility analysis "
"parameters.")
# This function processes an externally provided real_set
# TODO: Additionally allow for an old PRISM master file to be provided
[docs] @docstring_substitute(ext_set=ext_real_set_doc_s, ext_sam=ext_sam_set_doc,
ext_mod=ext_mod_set_doc)
def _get_ext_real_set(self, ext_real_set):
"""
Processes an externally provided model realization set `ext_real_set`,
containing the used sample set and the corresponding data value set.
Parameters
----------
%(ext_set)s
Returns
-------
%(ext_sam)s
%(ext_mod)s
"""
# If no ext_real_set is provided, return empty arrays without logging
if ext_real_set is None:
return(np_array([]), np_array([]))
# Do some logging
logger = getCLogger('INIT')
logger.info("Processing externally provided model realization set.")
# If a list is given
if isinstance(ext_real_set, list):
# Check if ext_real_set contains 2 elements
if(len(ext_real_set) != 2):
err_msg = "Input argument 'ext_real_set' is not of length 2!"
raise_error(err_msg, ShapeError, logger)
# Try to extract ext_sam_set and ext_mod_set
try:
ext_sam_set = ext_real_set[0]
ext_mod_set = ext_real_set[1]
except Exception as error:
err_msg = ("Input argument 'ext_real_set' is invalid! (%s)"
% (error))
raise_error(err_msg, InputError, logger)
# If a dict is given
elif isinstance(ext_real_set, dict):
# Check if ext_real_set contains correct keys
if 'sam_set' not in ext_real_set.keys():
err_msg = ("Input argument 'ext_real_set' does not contain key"
" 'sam_set'!")
raise_error(err_msg, KeyError, logger)
if 'mod_set' not in ext_real_set.keys():
err_msg = ("Input argument 'ext_real_set' does not contain key"
" 'mod_set'!")
raise_error(err_msg, KeyError, logger)
# Try to extract ext_sam_set and ext_mod_set
try:
ext_sam_set = ext_real_set['sam_set']
ext_mod_set = ext_real_set['mod_set']
except Exception as error:
err_msg = ("Input argument 'ext_real_set' is invalid! (%s)"
% (error))
raise_error(err_msg, InputError, logger)
# If anything else is given, it is invalid
else:
err_msg = "Input argument 'ext_real_set' is invalid!"
raise_error(err_msg, InputError, logger)
# Check ext_sam_set and ext_mod_set
ext_sam_set = self._modellink._check_sam_set(ext_sam_set,
'ext_sam_set')
ext_mod_set = self._modellink._check_mod_set(ext_mod_set,
'ext_mod_set')
# Make sure that ext_sam_set and ext_mod_set are 2D
ext_sam_set = np_array(ext_sam_set, ndmin=2)
ext_mod_set = np_array(ext_mod_set, ndmin=2)
# Check if both arrays contain the same number of samples
if not(ext_sam_set.shape[0] == ext_mod_set.shape[0]):
err_msg = ("External sample and model output sets do not contain "
"the same number of samples (%i != %i)!"
% (ext_sam_set.shape[0], ext_mod_set.shape[0]))
raise_error(err_msg, ShapeError, logger)
# Log that processing has been finished
logger.info("Finished processing externally provided model realization"
" set of size %i." % (ext_sam_set.shape[0]))
# If all checks are passed, return ext_sam_set and ext_mod_set
return(ext_sam_set, ext_mod_set.T)
# This function compiles pre-defined built-in code snippets and saves them
[docs] def _compile_code_snippets(self):
"""
Compiles all pre-defined built-in code snippets to code objects and
saves them to :attr:`~code_objects`. These code objects are used for
performing standard operations in the :meth:`~_evaluate_sam_set`
method.
"""
# Log that code snippets are being compiled
logger = getCLogger('INIT')
logger.info("Compiling built-in code snippets.")
# Define dict of code snippet tuples
code_objects = {}
# ANALYZE
# Define the various code snippets
pre_code = compile("", '<string>', 'exec')
eval_code = compile("", '<string>', 'exec')
anal_code = compile("", '<string>', 'exec')
post_code = compile("self.results = sam_set[sam_idx]", '<string>',
'exec')
exit_code = compile("", '<string>', 'exec')
# Combine code snippets into a tuple and add to dict
code_objects['analyze'] = (pre_code, eval_code, anal_code, post_code,
exit_code)
# EVALUATE
# Define the various code snippets
# We have to use lists here to account for n_data differing with emul_i
pre_code = compile(dedent("""
adj_exp_val = [[] for _ in range(n_sam)]
adj_var_val = [[] for _ in range(n_sam)]
uni_impl_val_list = [[] for _ in range(n_sam)]
emul_i_stop = np.zeros([n_sam])
"""), '<string>', 'exec')
eval_code = compile(dedent("""
adj_exp_val[sam_idx[j]] = adj_val[0]
adj_var_val[sam_idx[j]] = adj_val[1]
uni_impl_val_list[sam_idx[j]] = uni_impl_vals[j]
"""), '<string>', 'exec')
anal_code = compile("emul_i_stop[sam_idx[j]] = i", '<string>', 'exec')
post_code = compile(dedent("""
adj_exp_val = self._comm.gather(np_array(adj_exp_val), 0)
adj_var_val = self._comm.gather(np_array(adj_var_val), 0)
uni_impl_val_list = self._comm.gather(np_array(uni_impl_val_list),
0)
"""), '<string>', 'exec')
exit_code = compile(dedent("""
adj_exp_val = np.concatenate(*[adj_exp_val], axis=1)
adj_var_val = np.concatenate(*[adj_var_val], axis=1)
uni_impl_val = np.concatenate(*[uni_impl_val_list], axis=1)
self.results = (adj_exp_val, adj_var_val, uni_impl_val,
emul_i_stop, impl_check)
"""), '<string>', 'exec')
# Combine code snippets into a tuple and add to dict
code_objects['evaluate'] = (pre_code, eval_code, anal_code, post_code,
exit_code)
# HYBRID
# Define the various code snippets
pre_code = compile("lnprior = 0", '<string>', 'exec')
eval_code = compile("", '<string>', 'exec')
anal_code = compile(dedent("""
lnprior = (np.log(1-impl_cut_val/self._impl_cut[i][0]) if
impl_check_val else -np.infty)
"""), '<string>', 'exec')
post_code = compile(dedent("""
lnprior = self._comm.bcast(lnprior, 0)
self.results = (sam_set[sam_idx], lnprior)
"""), '<string>', 'exec')
exit_code = compile("", '<string>', 'exec')
# Combine code snippets into a tuple and add to dict
code_objects['hybrid'] = (pre_code, eval_code, anal_code, post_code,
exit_code)
# PROJECT
# Define the various code snippets
pre_code = compile("impl_cut = np.zeros([n_sam])", '<string>', 'exec')
eval_code = compile("", '<string>', 'exec')
anal_code = compile("impl_cut[sam_idx[j]] = impl_cut_val", '<string>',
'exec')
post_code = compile("", '<string>', 'exec')
exit_code = compile("self.results = (impl_check, impl_cut)",
'<string>', 'exec')
# Combine code snippets into a tuple
code_objects['project'] = (pre_code, eval_code, anal_code, post_code,
exit_code)
# Save code_objects dict to memory
self._code_objects = code_objects
# Log again
logger.info("Finished compiling code snippets.")
# This function evaluates given sam_set in the emulator using code snippets
[docs] @docstring_substitute(emul_i=std_emul_i_doc)
def _evaluate_sam_set(self, emul_i, sam_set, exec_code):
"""
Evaluates a provided set of emulator evaluation samples `sam_set` at a
given emulator iteration `emul_i`.
The provided tuple of code snippets `exec_code` are executed using
Python's :func:`~exec` function at specific points during the analysis.
Parameters
----------
%(emul_i)s
sam_set : 2D :obj:`~numpy.ndarray` object
Array containing model parameter value sets to be evaluated in all
emulator systems in emulator iteration `emul_i`.
exec_code : {'analyze', 'evaluate', 'hybrid', 'project'} or tuple
Tuple of five code snippets ``(pre_code, eval_code, anal_code,
post_code, exit_code)`` to be executed at specific points during
the analysis.
If string, use one of the built-in tuples in :attr:`~code_objects`
instead.
Other parameters
----------------
pre_code : str or code object
Code snippet to be executed before the evaluation of `sam_set`
starts.
eval_code : str or code object
Code snippet to be executed after the evaluation of each sample in
`sam_set`.
anal_code : str or code object
Code snippet to be executed after the analysis of each sample in
`sam_set`. This code snippet is only executed by the controller.
post_code : str or code object
Code snippet to be executed after the evaluation of `sam_set` ends.
exit_code : str or code object
Code snippet to be executed before returning the results of the
evaluation of `sam_set`. This code snippet is only executed by the
controller.
Returns
-------
results : object
The object that is assigned to :attr:`~results`, which is defaulted
to *None* if no code snippet changes it. Preferably, the execution
of `post_code` and/or `exit_code` modifies :attr:`~results`. All
MPI ranks return it.
Notes
-----
If any of the code snippets is provided as a string, it will be
compiled into a code object before starting the evaluation.
"""
# Determine number of samples
n_sam = sam_set.shape[0]
# Start logging
logger = getCLogger('EVAL_SS')
logger.info("Starting evaluation of sample set of size %i." % (n_sam))
# Obtain code snippets
if isinstance(exec_code, string_types):
# If string is provided, use built-in tuple of code snippets
pre_code, eval_code, anal_code, post_code, exit_code =\
self._code_objects[exec_code]
else:
# If not, use provided code snippets
pre_code, eval_code, anal_code, post_code, exit_code = exec_code
# Compile any code snippets that were provided as a string
if isinstance(pre_code, string_types):
pre_code = compile(pre_code, '<string>', 'exec')
if isinstance(eval_code, string_types):
eval_code = compile(eval_code, '<string>', 'exec')
if isinstance(anal_code, string_types):
anal_code = compile(anal_code, '<string>', 'exec')
if isinstance(post_code, string_types):
post_code = compile(post_code, '<string>', 'exec')
if isinstance(exit_code, string_types):
exit_code = compile(exit_code, '<string>', 'exec')
# Make a filled bool list containing which samples are plausible
impl_check = np.ones(n_sam, dtype=bool)
# Make a list of plausible sample indices
sam_idx_full = np_array(range(n_sam))
sam_idx = sam_idx_full[impl_check]
# Mark all samples as plausible
eval_sam_set = sam_set
# Set the results property to None
self.results = None
# Execute the pre_code snippet
exec(pre_code)
# If the default emulator type is used
if(self._emulator._emul_type == 'default'):
# Analyze sam_set in every emulator iteration
for i in range(1, emul_i+1):
# Determine the number of samples
n_sam = len(sam_idx)
# Log that this iteration is being evaluated
logger.info("Analyzing evaluation sample set of size %i in "
"emulator iteration %i." % (n_sam, i))
# Make empty uni_impl_vals list
uni_impl_vals = np.zeros([n_sam, self._emulator._n_data[i]])
# Loop over all still plausible samples in sam_set
for j, par_set in enumerate(eval_sam_set):
# Evaluate par_set
adj_val = self._emulator._evaluate(i, par_set)
# Calculate univariate implausibility value
uni_impl_vals[j] = self._get_uni_impl(i, par_set, *adj_val)
# Execute the eval_code snippet
exec(eval_code)
# Gather the results on the controller after evaluating
uni_impl_vals_list = self._comm.gather(uni_impl_vals, 0)
# Controller performs implausibility analysis
if self._is_controller:
# Convert uni_impl_vals_list to an array
uni_impl_vals_array =\
np.concatenate(*[uni_impl_vals_list], axis=1)
# Perform implausibility cutoff check on all elements
for j, uni_impl_val in enumerate(uni_impl_vals_array):
impl_check_val, impl_cut_val =\
self._do_impl_check(i, uni_impl_val)
# Modify impl_check with obtained impl_check_val
impl_check[sam_idx[j]] = impl_check_val
# Execute the anal_code snippet
exec(anal_code)
# Modify sam_idx with those that are still plausible
sam_idx = sam_idx_full[impl_check]
# MPI Barrier
self._comm.Barrier()
# Broadcast the updated sam_idx to the workers
sam_idx = self._comm.bcast(sam_idx, 0)
# Check that sam_idx is still holding plausible samples
if not len(sam_idx):
# If not, stop analysis
break
else:
# If so, determine which samples are still plausible
eval_sam_set = sam_set[sam_idx]
# If any other emulator type is used
else:
raise NotImplementedError
# Execute the post_code snippet
exec(post_code)
# Log that analysis is finished
logger.info("Finished analyzing evaluation sample set.")
# Execute the exit_code snippet on the controller
if self._is_controller:
exec(exit_code)
# Retrieve the results and delete the attribute
results = self.results
del self.results
# Return the results
return(results)
# %% VISIBLE CLASS METHODS
# This function analyzes the emulator and determines the plausible regions
[docs] def analyze(self):
"""
Analyzes the emulator at the last emulator iteration for a large number
of emulator evaluation samples. All samples that survive the
implausibility checks, are used in the construction of the next
emulator iteration.
Generates
---------
impl_sam : 2D :obj:`~numpy.ndarray` object
Array containing all emulator evaluation samples that survived the
implausibility checks.
"""
# Get last emul_i
emul_i = self._emulator._get_emul_i(None, True)
# Begin logging
logger = getCLogger('ANALYZE')
logger.info("Analyzing emulator iteration %i." % (emul_i))
# Controller checking whether this iteration can still be (re)analyzed
if self._is_controller:
# Try to access the ccheck of the next iteration
try:
self._emulator._ccheck[emul_i+1]
# If it does not exist, current iteration can be analyzed
except IndexError:
pass
# If it does exist, check if mod_real_set has been evaluated
else:
# If it has been evaluated, reanalysis is not possible
if 'mod_real_set' not in self._emulator._ccheck[emul_i+1]:
err_msg = ("Construction of next emulator iteration (%i) "
"has already been started. Reanalysis of the "
"current iteration is not possible."
% (emul_i+1))
raise_error(err_msg, RequestError, logger)
# Controller obtaining the emulator evaluation sample set
if self._is_controller:
# Save current time
start_time1 = time()
# Get the impl_cut list
self._get_impl_par(False)
# Create an emulator evaluation sample set
eval_sam_set = self._get_eval_sam_set(emul_i)
n_eval_sam = eval_sam_set.shape[0]
# Remaining workers get dummy eval_sam_set
else:
eval_sam_set = []
# Broadcast eval_sam_set to workers
eval_sam_set = self._comm.bcast(eval_sam_set, 0)
# Save current time again
start_time2 = time()
# Analyze eval_sam_set
impl_sam = self._evaluate_sam_set(emul_i, eval_sam_set, 'analyze')
# Controller finishing up
if self._is_controller:
# Obtain some timers
end_time = time()
time_diff_total = end_time-start_time1
time_diff_eval = end_time-start_time2
# Calculate the number of plausible samples left
n_impl_sam = len(impl_sam)
# Raise warning if no plausible samples were found
if not n_impl_sam:
warn_msg = ("No plausible regions were found. Constructing the"
" next iteration will not be possible.")
raise_warning(warn_msg, RequestWarning, logger, 2)
# Raise warning if n_impl_sam is less than n_cross_val
elif(n_impl_sam < self._emulator._n_cross_val):
warn_msg = ("Number of plausible samples is lower than the "
"number of cross validations used during "
"regression (%i < %i). Constructing the next "
"iteration will not be possible."
% (n_impl_sam, self._emulator._n_cross_val))
raise_warning(warn_msg, RequestWarning, logger, 2)
# Raise warning if n_impl_sam is less than n_sam_init
elif(n_impl_sam < self._n_sam_init):
warn_msg = ("Number of plausible samples is lower than the "
"number of samples in the first iteration (%i < "
"%i). Constructing the next iteration might not "
"produce a more accurate emulator."
% (n_impl_sam, self._n_sam_init))
raise_warning(warn_msg, RequestWarning, logger, 2)
# Save the results
self._save_data({
'impl_sam': impl_sam,
'n_eval_sam': n_eval_sam})
# Save statistics about analyze time, evaluation rate, par_space
avg_eval_rate = n_eval_sam/time_diff_eval
par_space_rem = (n_impl_sam/n_eval_sam)*100
self._save_statistics(emul_i, {
'tot_analyze_time': ['%.2f' % (time_diff_total), 's'],
'avg_emul_eval_rate': ['%.2f' % (avg_eval_rate), '1/s'],
'par_space_remaining': ['%.3g' % (par_space_rem), '%'],
'MPI_comm_size_anal': ['%i' % (self._size), '']})
# Log that analysis has been finished
msg1 = ("Finished analysis of emulator iteration in %.2f seconds, "
"averaging %.2f emulator evaluations per second."
% (time_diff_total, n_eval_sam/time_diff_eval))
msg2 = ("There is %.3g%% of parameter space remaining."
% (par_space_rem))
logger.info(msg1)
logger.info(msg2)
print(msg1)
print(msg2)
# Display details about current state of pipeline
self.details()
# This function constructs a specified iteration of the emulator system
# TODO: Make time and RAM cost plots
# TODO: Fix the timers for interrupted constructs
[docs] @docstring_substitute(emul_i=call_emul_i_doc, ext_set=ext_real_set_doc_d)
def construct(self, emul_i=None, analyze=True, ext_real_set=None,
force=False):
"""
Constructs the emulator at the specified emulator iteration `emul_i`,
and performs an implausibility analysis on the emulator iteration right
afterward if requested (:meth:`~analyze`).
Optional
--------
%(emul_i)s
analyze : bool. Default: True
Bool indicating whether or not to perform an analysis after the
specified emulator iteration has been successfully constructed,
which is required for constructing the next iteration.
%(ext_set)s
force : bool. Default: False
Controls what to do if the specified emulator iteration `emul_i`
already (partly) exists.
If *False*, finish construction of the specified iteration or skip
it if already finished.
If *True*, reconstruct the specified iteration entirely.
Generates
---------
A new HDF5-group with the emulator iteration as its name, in the loaded
emulator master file, containing emulator data required for this
emulator iteration.
Notes
-----
Using an emulator iteration that has been (partly) constructed before,
will finish construction or skip it if already finished when `force` is
*False*; or it will delete that and all following iterations, and
reconstruct the specified iteration when `force` is *True*. Using
`emul_i` = 1 and `force` is *True* is equivalent to reconstructing the
entire emulator.
If no implausibility analysis is requested, then the implausibility
parameters are read in from the *PRISM* parameters file and temporarily
stored in memory in order to enable the usage of the :meth:`~evaluate`
and :meth:`~prism._projection.Projection.project` methods.
"""
# Log that a new emulator iteration is being constructed
logger = getCLogger('CONSTRUCT')
# Set emul_i correctly
emul_i = self._emulator._get_emul_i(emul_i, False)
# Controller performing checks regarding construction progress
if self._is_controller:
# Save current time
start_time = time()
# Check if force-parameter received a bool
force = check_vals(force, 'force', 'bool')
# Check if iteration was interrupted or not, or if force is True
logger.info("Checking state of emulator iteration %i." % (emul_i))
try:
# If force is True, reconstruct full iteration
if force:
logger.info("Emulator iteration %i has been requested to "
"be (re)constructed." % (emul_i))
c_from_start = 1
# If interrupted at start, reconstruct full iteration
elif('mod_real_set' in self._emulator._ccheck[emul_i]):
logger.info("Emulator iteration %i does not contain "
"evaluated model realization data. Will be "
"constructed from start." % (emul_i))
c_from_start = 1
# If already finished, skip everything
elif(emul_i <= self._emulator._emul_i):
msg = ("Emulator iteration %i has already been fully "
"constructed. Skipping construction process."
% (emul_i))
logger.info(msg)
print(msg)
c_from_start = None
# If interrupted midway, do not reconstruct full iteration
else:
logger.info("Construction of emulator iteration %i was "
"interrupted. Continuing from point of "
"interruption." % (emul_i))
c_from_start = 0
# If never constructed before, construct full iteration
except IndexError:
logger.info("Emulator iteration %i has not been constructed."
% (emul_i))
c_from_start = 1
# Remaining workers
else:
c_from_start = None
# Check if analyze-parameter received a bool
analyze = check_vals(analyze, 'analyze', 'bool')
# Broadcast construct_emul_i to workers
c_from_start = self._comm.bcast(c_from_start, 0)
# If iteration is already finished, show the details
if c_from_start is None:
self.details(emul_i)
return
# If iteration needs to be constructed from the beginning
if c_from_start:
# Log that construction of emulator iteration is being started
logger.info("Starting construction of emulator iteration %i."
% (emul_i))
# Activate worker mode
with self.worker_mode:
if self._is_controller:
# If this is the first iteration
if(emul_i == 1):
# Process ext_real_set
ext_sam_set, ext_mod_set =\
self._get_ext_real_set(ext_real_set)
# Obtain number of externally given model realizations
n_ext_sam = np.shape(ext_sam_set)[0]
# Create a new emulator
self._make_call('_emulator._create_new_emulator')
# Reload the data
self._load_data()
# Create initial set of model evaluation samples
n_sam_init = max(0, self._n_sam_init-n_ext_sam)
if n_sam_init:
logger.info("Creating initial model evaluation "
"sample set of size %i."
% (n_sam_init))
add_sam_set =\
lhd(n_sam_init, self._modellink._n_par,
self._modellink._par_rng, 'center',
self._criterion, constraints=ext_sam_set)
logger.info("Finished creating initial sample "
"set.")
else:
add_sam_set = np_array([])
# If this is any other iteration
else:
# Get dummy ext_real_set
ext_sam_set, ext_mod_set = self._get_ext_real_set(None)
# Check if last iteration was analyzed, do so if not
if not self._n_eval_sam[emul_i-1]:
# Analyze previous iteration
logger.info("Previous emulator iteration has not "
"been analyzed. Performing analysis "
"first.")
self._make_call('analyze')
# Check if a new emulator iteration can be constructed
# If no plausible regions were found
if not self._n_impl_sam[emul_i-1]:
err_msg = ("No plausible regions were found in the"
" analysis of the previous emulator "
"iteration. Construction is not "
"possible!")
raise_error(err_msg, RequestError, logger)
# If n_impl_sam is less than n_cross_val
elif(self._n_impl_sam[emul_i-1] <
self._emulator._n_cross_val):
err_msg = ("Number of plausible samples found in "
"the analysis of the previous iteration"
" is lower than the number of cross "
"validations used during regression"
" (%i < %i). Construction is not "
"possible!"
% (self._n_impl_sam[emul_i-1],
self._emulator._n_cross_val))
raise_error(err_msg, RequestError, logger)
# Make the emulator prepare for a new iteration
reload = self._make_call(
'_emulator._prepare_new_iteration', emul_i)
# Make sure the correct pipeline data is loaded in
if reload:
self._load_data()
# Obtain additional sam_set
add_sam_set = self._impl_sam
# Obtain corresponding set of model evaluations
self._make_call('_get_iteration_data', emul_i, add_sam_set,
ext_sam_set, ext_mod_set)
# If iteration needs to be constructed from midway
else:
# Log that construction of emulator iteration is continued
logger.info("Continuing construction of emulator iteration %i."
% (emul_i))
# Construct emulator iteration
self._emulator._construct_iteration(emul_i)
# Controller finishing up construction process
if self._is_controller:
# Save that emulator iteration has not been analyzed yet
self._save_data({
'impl_sam': np_array([]),
'n_eval_sam': 0})
# Log that construction has been completed
time_diff_total = time()-start_time
self._save_statistics(emul_i, {
'tot_construct_time': ['%.2f' % (time_diff_total), 's']})
msg = ("Finished construction of emulator iteration in %.2f "
"seconds." % (time_diff_total))
logger.info(msg)
print(msg)
# Analyze the emulator iteration if requested
if analyze:
self.analyze()
# If not, temporarily save implausibility parameters in memory
else:
self._get_impl_par(True)
self.details(emul_i)
# This function allows one to view the pipeline details/properties
# TODO: Allow the viewing of the entire polynomial function in SymPy
[docs] @docstring_substitute(emul_i=user_emul_i_doc)
def details(self, emul_i=None):
"""
Prints the details/properties of the currently loaded :obj:`~Pipeline`
instance at given emulator iteration `emul_i`. See ``Props`` for
detailed descriptions of all printed properties.
Optional
--------
%(emul_i)s
Props
-----
Working directory
The relative path to the working directory of the emulator starting
at the current working directory.
Emulator type
The type of this emulator, corresponding to the
:attr:`~prism.Emulator.emul_type` of the provided `emul_type`
during :class:`~Pipeline` initialization.
If no emulator type was provided during initialization, this is
'default'.
ModelLink subclass
Name of the :class:`~prism.modellink.ModelLink` subclass
used to construct this emulator.
Emulation method
Indicates the combination of regression and Gaussian emulation
methods that have been used for this emulator.
Mock data used?
Whether or not mock data has been used to construct this emulator.
If so, the printed estimates for all model parameters are the
parameter values used to create the mock data.
----
Emulator iteration
The iteration of the emulator this details overview is about. By
default, this is the last (partly) constructed iteration.
Construction completed?
Whether or not the construction of this emulator iteration is
completed. If not, the missing components for each emulator system
are listed and the remaining information of this iteration is not
printed.
Plausible regions?
Whether or not plausible regions have been found during the
analysis of this emulator iteration. If no analysis has been done
yet, "N/A" will be printed.
Projections available?
Whether or not projections have been created for this emulator
iteration. If projections are available and analysis has been done,
but with different implausibility cut-offs, a "desync" note is
added. Also prints number of available projections versus maximum
number of projections in parentheses.
----
# of model evaluation samples
The total number of model evaluation samples used to construct all
emulator iterations up to this iteration, with the number for every
individual iteration in parentheses.
# of plausible/analyzed samples
The number of emulator evaluation samples that passed the
implausibility check out of the total number of analyzed samples in
this emulator iteration.
This is the number of model evaluation samples that was/will be
used for the construction of the next emulator iteration.
If no analysis has been done, the numbers show up as "-".
%% of parameter space remaining
The percentage of the total number of analyzed samples that passed
the implausibility check in this emulator iteration.
If no analysis has been done, the number shows up as "-".
# of active/total parameters
The number of model parameters that was considered active during
the construction of this emulator iteration, compared to the total
number of model parameters defined in the used
:class:`~prism.modellink.ModelLink` subclass.
# of emulated data points
The number of data points that have been emulated in this
emulator iteration.
# of emulator systems
The total number of emulator systems that are required in this
emulator. The number of active emulator systems is equal to the
number of data points.
----
Parameter space
Lists the name, lower and upper value boundaries and estimate (if
provided) of all model parameters defined in the used
:class:`~prism.modellink.ModelLink` subclass. An asterisk is
printed in front of the parameter name if this model parameter was
considered active during the construction of this emulator
iteration. A question mark is used instead if the construction of
this emulator iteration is not finished.
"""
# Define details logger
logger = getCLogger("DETAILS")
logger.info("Collecting details about current pipeline instance.")
# Check if last emulator iteration is finished constructing
if(delist(self._emulator._ccheck[-1]) == []):
ccheck_flag = 1
else:
ccheck_flag = 0
# Gather the ccheck_flags on all ranks to see if all are finished
ccheck_flag = np.all(self._comm.allgather(ccheck_flag))
# Try to obtain correct emul_i depending on the construction progress
try:
if ccheck_flag:
emul_i = self._emulator._get_emul_i(emul_i, True)
else:
emul_i = self._emulator._get_emul_i(emul_i, False)
# If this fails, return
except RequestError:
return
# If this succeeds, gather ccheck information on the controller
else:
ccheck_list = self._comm.gather(self._emulator._ccheck[emul_i], 0)
# Controller generating the entire details overview
if self._is_controller:
# Flatten the received ccheck_list
ccheck_flat = [[] for _ in range(self._emulator._n_emul_s_tot)]
for ccheck_iter in ccheck_list[0][self._emulator._n_emul_s:]:
ccheck_flat.append(ccheck_iter)
for rank, ccheck_rank in enumerate(ccheck_list):
for emul_s, ccheck in zip(self._emulator._emul_s_to_core[rank],
ccheck_rank):
ccheck_flat[emul_s] = ccheck
# Get max lengths of various strings for parameter space section
name_len =\
max([len(par_name) for par_name in self._modellink._par_name])
lower_len =\
max([len(str(i)) for i in self._modellink._par_rng[:, 0]])
upper_len =\
max([len(str(i)) for i in self._modellink._par_rng[:, 1]])
est_lengths = [len('%.5f' % (i)) for i in self._modellink._par_est
if i is not None]
est_len = max(est_lengths) if len(est_lengths) else 0
# Open hdf5-file
with self._File('r', None) as file:
# Check if projection data is available by trying to access it
try:
data_set = file['%i/proj_hcube' % (emul_i)]
except KeyError:
proj = 0
n_proj = 0
# If projection data is available
else:
# Get the number of projections and used impl_cut-offs
proj = 0
n_proj = len(data_set.keys())
for hcube in data_set.values():
p_impl_cut = hcube.attrs['impl_cut']
p_cut_idx = hcube.attrs['cut_idx']
# Check if projections were made with the same impl_cut
if(len(p_impl_cut) == len(self._impl_cut[emul_i]) and
(p_impl_cut == self._impl_cut[emul_i]).all() and
p_cut_idx == self._cut_idx[emul_i]):
# If it was, projections are synced
proj = 1
else:
# If not, projections are desynced
proj = 2
break
# Log file being closed
logger.info("Finished collecting details about current pipeline "
"instance.")
# Obtain number of model parameters
n_par = self._modellink._n_par
# Obtain number of data points
n_data = self._emulator._n_data_tot[emul_i]
# Obtain number of emulator systems
n_emul_s = self._emulator._n_emul_s_tot
# Determine the relative path to the working directory
pwd = os.getcwd()
if(path.splitdrive(self._working_dir)[0].lower() !=
path.splitdrive(pwd)[0].lower()):
working_dir_rel_path = self._working_dir
else:
working_dir_rel_path = path.relpath(self._working_dir, pwd)
# Set width of first details column
width = 31
# PRINT DETAILS
# HEADER
print("\nPIPELINE DETAILS")
print("="*width)
# GENERAL
print("\nGENERAL")
print("-"*width)
# General details about loaded emulator
print("{0: <{1}}\t'{2}'".format("Working directory", width,
working_dir_rel_path))
print("{0: <{1}}\t'{2}'".format("Emulator type", width,
self._emulator._emul_type))
print("{0: <{1}}\t{2}".format("ModelLink subclass", width,
self._modellink._name))
if(self._emulator._method.lower() == 'regression'):
print("{0: <{1}}\t{2}".format("Emulation method", width,
"Regression"))
elif(self._emulator._method.lower() == 'gaussian'):
print("{0: <{1}}\t{2}".format("Emulation method", width,
"Gaussian"))
elif(self._emulator._method.lower() == 'full'):
print("{0: <{1}}\t{2}".format("Emulation method", width,
"Regression + Gaussian"))
print("{0: <{1}}\t{2}".format("Mock data used?", width,
"Yes" if self._emulator._use_mock
else "No"))
# ITERATION DETAILS
print("\nITERATION")
print("-"*width)
# Emulator iteration corresponding to this details overview
print("{0: <{1}}\t{2}".format("Emulator iteration", width, emul_i))
# Availability flags
# If this iteration is fully constructed, print flags and numbers
if(delist(ccheck_flat) == []):
# Determine the number of (active) parameters
n_active_par = len(self._emulator._active_par[emul_i])
# Calculate the maximum number of projections
n_proj_max = n_active_par
n_proj_max += nCr(n_active_par, 2) if(n_par > 2) else 0
# Flag details
print("{0: <{1}}\t{2}".format("Construction completed?", width,
"Yes"))
if not self._n_eval_sam[emul_i]:
print("{0: <{1}}\t{2}".format("Plausible regions?", width,
"N/A"))
else:
print("{0: <{1}}\t{2}".format(
"Plausible regions?", width,
"Yes" if self._n_impl_sam[emul_i] else "No"))
if not proj:
print("{0: <{1}}\t{2}".format("Projections available?",
width, "No"))
else:
print("{0: <{1}}\t{2} ({3}/{4}){5}".format(
"Projections available?", width,
"Yes", n_proj, n_proj_max,
"" if proj == 1 else ", desynced"))
print("-"*width)
# Number details
if(self._emulator._emul_type == 'default'):
print("{0: <{1}}\t{2} ({3})".format(
"# of model evaluation samples", width,
sum(self._emulator._n_sam[1:emul_i+1]),
self._emulator._n_sam[1:emul_i+1]))
else:
raise NotImplementedError
if not self._n_eval_sam[emul_i]:
print("{0: <{1}}\t{2}/{3}".format(
"# of plausible/analyzed samples", width, "-", "-"))
print("{0: <{1}}\t{2}".format(
"% of parameter space remaining", width, "-"))
else:
print("{0: <{1}}\t{2}/{3}".format(
"# of plausible/analyzed samples", width,
self._n_impl_sam[emul_i], self._n_eval_sam[emul_i]))
print("{0: <{1}}\t{2:.3g}%".format(
"% of parameter space remaining", width,
(self._n_impl_sam[emul_i] /
self._n_eval_sam[emul_i])*100))
print("{0: <{1}}\t{2}/{3}".format(
"# of active/total parameters", width,
n_active_par, n_par))
print("{0: <{1}}\t{2}".format("# of emulated data points",
width, n_data))
print("{0: <{1}}\t{2}".format("# of emulator systems",
width, n_emul_s))
# If not, print which components are still missing
else:
print("{0: <{1}}\t{2}".format("Construction completed?", width,
"No"))
print(" - {0: <{1}}\t{2}".format(
"'mod_real_set'?", width-4,
"No" if 'mod_real_set' in ccheck_flat else "Yes"))
# Check if all active parameters have been determined
ccheck_i = [i for i in range(n_emul_s) if
'active_par_data' in ccheck_flat[i]]
print(" - {0: <{1}}\t{2}".format(
"'active_par_data'?", width-4, "No (%s)" % (ccheck_i) if
len(ccheck_i) else "Yes"))
# Check if all regression processes have been done if requested
if self._emulator._method.lower() in ('regression', 'full'):
ccheck_i = [i for i in range(n_emul_s) if
'regression' in ccheck_flat[i]]
print(" - {0: <{1}}\t{2}".format(
"'regression'?", width-4, "No (%s)" % (ccheck_i) if
len(ccheck_i) else "Yes"))
# Check if all covariance matrices have been determined
ccheck_i = [i for i in range(n_emul_s) if
'cov_mat' in ccheck_flat[i]]
print(" - {0: <{1}}\t{2}".format(
"'cov_mat'?", width-4, "No (%s)" % (ccheck_i) if
len(ccheck_i) else "Yes"))
# Check if all exp_dot_terms have been determined
ccheck_i = [i for i in range(n_emul_s) if
'exp_dot_term' in ccheck_flat[i]]
print(" - {0: <{1}}\t{2}".format(
"'exp_dot_term'?", width-4, "No (%s)" % (ccheck_i) if
len(ccheck_i) else "Yes"))
print("-"*width)
# PARAMETER SPACE
print("\nPARAMETER SPACE")
print("-"*width)
# Define string format if no par_ests are provided
str_format1 = "{6}{0: <{1}}: [{2: >{3}}, {4: >{5}}]"
# Define string format if this par_est was provided
str_format2 = "{8}{0: <{1}}: [{2: >{3}}, {4: >{5}}] ({6: >{7}.5f})"
# Define string format if this par_est was not provided
str_format3 = "{8}{0: <{1}}: [{2: >{3}}, {4: >{5}}] ({6:->{7}})"
# Print details about every model parameter in parameter space
for i in range(n_par):
# Determine what string to use for the active flag
if(delist(ccheck_flat) != []):
active_str = "?"
elif i in self._emulator._active_par[emul_i]:
active_str = "*"
else:
active_str = " "
# Check if par_est is given and use correct string formatting
if not est_len:
print(str_format1.format(
self._modellink._par_name[i], name_len,
self._modellink._par_rng[i, 0], lower_len,
self._modellink._par_rng[i, 1], upper_len,
active_str))
elif self._modellink._par_est[i] is not None:
print(str_format2.format(
self._modellink._par_name[i], name_len,
self._modellink._par_rng[i, 0], lower_len,
self._modellink._par_rng[i, 1], upper_len,
self._modellink._par_est[i], est_len, active_str))
else:
print(str_format3.format(
self._modellink._par_name[i], name_len,
self._modellink._par_rng[i, 0], lower_len,
self._modellink._par_rng[i, 1], upper_len,
"", est_len, active_str))
# FOOTER
print("="*width)
# Flush the console
sys.stdout.flush()
# MPI Barrier
self._comm.Barrier()
# This function allows the user to evaluate a given sam_set in the emulator
# TODO: Plot emul_i_stop for large LHDs, giving a nice mental statistic
[docs] @docstring_substitute(emul_i=user_emul_i_doc)
def evaluate(self, sam_set, emul_i=None):
"""
Evaluates the given model parameter sample set `sam_set` up to given
emulator iteration `emul_i`.
The output of this function depends on the number of dimensions in
`sam_set`. The output is always provided on the controller rank.
Parameters
----------
sam_set : 1D or 2D array_like or dict
Array containing model parameter value sets to be evaluated in the
emulator up to emulator iteration `emul_i`.
Optional
--------
%(emul_i)s
Returns (if 2D `sam_set`)
-------------------------
impl_check : list of bool
List of bool indicating whether or not the given samples passed the
implausibility check at the given emulator iteration `emul_i`.
emul_i_stop : list of int
List containing the last emulator iterations at which the given
samples are still within the plausible region of the emulator.
adj_exp_val : 2D :obj:`~numpy.ndarray` object
Array containing the adjusted expectation values for all given
samples.
adj_var_val : 2D :obj:`~numpy.ndarray` object
Array containing the adjusted variance values for all given
samples.
uni_impl_val : 2D :obj:`~numpy.ndarray` object
Array containing the univariate implausibility values for all given
samples.
Prints (if 1D `sam_set`)
------------------------
impl_check : bool
Bool indicating whether or not the given sample passed the
implausibility check at the given emulator iteration `emul_i`.
emul_i_stop : int
Last emulator iteration at which the given sample is still within
the plausible region of the emulator.
adj_exp_val : 1D :obj:`~numpy.ndarray` object
The adjusted expectation values for the given sample.
adj_var_val : 1D :obj:`~numpy.ndarray` object
The adjusted variance values for the given sample.
sigma_val : 1D :obj:`~numpy.ndarray` object
The corresponding sigma value for the given sample.
uni_impl_val : 1D :obj:`~numpy.ndarray` object
The univariate implausibility values for the given sample.
Notes
-----
If given emulator iteration `emul_i` has been analyzed before, the
implausibility parameters of the last analysis are used. If not, then
the values are used that were read in when the emulator was loaded.
"""
# Get emulator iteration
emul_i = self._emulator._get_emul_i(emul_i, True)
# Do some logging
logger = getCLogger('EVALUATE')
logger.info("Evaluating emulator iteration %i for provided set of "
"model parameter samples." % (emul_i))
# Check if sam_set was provided as a dict
if isinstance(sam_set, dict):
# Make sure that sam_set is a SortedDict
sam_set = sdict(sam_set)
# Return it to normal
sam_set = np_array(sam_set.values()).T
# Controller checking the contents of sam_set
if self._is_controller:
# Check sam_set
sam_set = self._modellink._check_sam_set(sam_set, 'sam_set')
# If ndim == 1, convert to 2D array and print output later
if(sam_set.ndim == 1):
sam_set = np_array(sam_set, ndmin=2)
print_output = 1
# If ndim == 2, return output later
else:
print_output = 0
# The workers make sure that sam_set is also two-dimensional
else:
sam_set = np_array(sam_set, ndmin=2)
# MPI Barrier
self._comm.Barrier()
# Analyze sam_set
results = self._evaluate_sam_set(emul_i, sam_set, 'evaluate')
# Do more logging
logger.info("Finished evaluating emulator.")
# Controller finishing up
if self._is_controller:
# Extract data arrays from results
adj_exp_val, adj_var_val, uni_impl_val, emul_i_stop, impl_check =\
results
# If print_output is True, print the results
if print_output:
# Print results
pr_str = "Evaluation results of %s" % (sam_set[0])
print(pr_str)
print("-"*len(pr_str))
if impl_check[0]:
print("Plausible? Yes")
else:
print("Plausible? No")
print("emul_i_stop = %i" % (emul_i_stop[0]))
print("adj_exp_val = %s" % (adj_exp_val[0]))
print("adj_var_val = %s" % (adj_var_val[0]))
print("sigma_val = %s" % (np.sqrt(adj_var_val[0])))
print("uni_impl_val = %s" % (uni_impl_val[0]))
# Else, return the results
else:
# MPI Barrier for controller
self._comm.Barrier()
# Return results
return({'impl_check': impl_check.tolist(),
'emul_i_stop': emul_i_stop,
'adj_exp_val': adj_exp_val,
'adj_var_val': adj_var_val,
'uni_impl_val': uni_impl_val})
# MPI Barrier
self._comm.Barrier()
# This function simply executes self.__call__
[docs] @docstring_copy(__call__)
def run(self, emul_i=None, force=False):
self(emul_i, force)