import os
import shutil
import datetime
from os.path import join, abspath, relpath, dirname, isfile, split, exists
import csep
import numpy
import yaml
import json
import logging
from typing import Union, List, Dict, Callable, Mapping, Sequence
from matplotlib import pyplot
from cartopy import crs as ccrs
from csep.core.catalogs import CSEPCatalog
from csep.utils.time_utils import decimal_year
from floatcsep import report
from floatcsep.logger import add_fhandler
from floatcsep.registry import PathTree
from floatcsep.utils import NoAliasLoader, parse_csep_func, read_time_cfg, \
read_region_cfg, Task, TaskGraph, timewindow2str, str2timewindow, \
magnitude_vs_time
from floatcsep.model import Model
from floatcsep.evaluation import Evaluation
import warnings
numpy.seterr(all="ignore")
warnings.filterwarnings("ignore")
log = logging.getLogger("floatLogger")
[docs]
class Experiment:
"""
Main class that handles an Experiment's context. Contains all the
specifications, instructions and methods to carry out an experiment.
Args:
name (str): Experiment name
time_config (dict): Contains all the temporal specifications.
It can contain the following keys:
- start_date (:class:`datetime.datetime`):
Experiment start date
- end_date (:class:`datetime.datetime`):
Experiment end date
- exp_class (:class:`str`):
`ti` (Time-Independent) or `td` (Time-Dependent)
- intervals (:class:`int`): Number of testing intervals/windows
- horizon (:class:`str`, :py:class:`float`): Time length of the
forecasts (e.g. 1, 10, `1 year`, `2 days`). `ti` defaults to
years, `td` to days.
- growth (:class:`str`): `incremental` or `cumulative`
- offset (:class:`float`): recurrence of forecast creation.
For further details, see :func:`~floatcsep.utils.timewindows_ti`
and :func:`~floatcsep.utils.timewindows_td`
region_config (dict): Contains all the spatial and magnitude
specifications. It must contain the following keys:
- region (:py:class:`str`,
:class:`csep.core.regions.CartesianGrid2D`): The geographical
region, which can be specified as:
(i) the name of a :mod:`csep`/:mod:`floatcsep` default region
function (e.g. :func:`~csep.core.regions.california_relm_region`)
(ii) the name of a user function or
(iii) the path to a lat/lon file
- mag_min (:class:`float`): Minimum magnitude of the experiment
- mag_max (:class:`float`): Maximum magnitude of the experiment
- mag_bin (:class:`float`): Magnitud bin size
- magnitudes (:class:`list`, :class:`numpy.ndarray`): Explicit
magnitude bins
- depth_min (:class:`float`): Minimum depth. Defaults to -2
- depth_max (:class:`float`): Maximum depth. Defaults to 6000
model_config (str): Path to the models' configuration file
test_config (str): Path to the evaluations' configuration file
default_test_kwargs (dict): Default values for the testing
(seed, number of simulations, etc.)
postproc_config (dict): Contains the instruction for postprocessing
(e.g. plot forecasts, catalogs)
**kwargs: see Note
Note:
Instead of using `time_config` and `region_config`, an Experiment can
be instantiated using these dicts as keywords. (e.g. ``Experiment(
**time_config, **region_config)``, ``Experiment(start_date=start_date,
intervals=1, region='csep-italy', ...)``
"""
'''
Data management
Model:
- FILE - read from file, scale in runtime
- drop to db, scale from function in runtime
- CODE - run, read from file
- run, store in db, read from db
TEST:
- use forecast from runtime
- read forecast from file (TD)
(does not make sense for TI (too much FS space) unless is already
dropped to DB)
'''
[docs]
def __init__(self,
name: str = None,
time_config: dict = None,
region_config: dict = None,
catalog: str = None,
models: str = None,
tests: str = None,
postproc_config: str = None,
default_test_kwargs: dict = None,
rundir: str = 'results',
report_hook: dict = None,
**kwargs) -> None:
# todo
# - instantiate from full experiment register (ignoring test/models
# config), or self-awareness functions?
# - instantiate as python objects (models/tests config)
# - check if model region matches experiment region for nonQuadTree?
# Or filter region?
# Instantiate
workdir = abspath(kwargs.get('path', os.getcwd()))
if kwargs.get('timestamp', False):
rundir = os.path.join(
rundir,
f'run_{datetime.datetime.utcnow().date().isoformat()}')
os.makedirs(os.path.join(workdir, rundir), exist_ok=True)
if kwargs.get(log, True):
logpath = os.path.join(workdir, rundir, 'experiment.log')
log.info(f'Logging at {logpath}')
add_fhandler(logpath)
self.name = name if name else 'floatingExp'
self.path = PathTree(workdir, rundir)
self.config_file = kwargs.get('config_file', None)
self.original_config = kwargs.get('original_config', None)
self.original_rundir = kwargs.get('original_rundir', None)
self.rundir = rundir
self.seed = kwargs.get('seed', None)
self.time_config = read_time_cfg(time_config, **kwargs)
self.region_config = read_region_cfg(region_config, **kwargs)
self.model_config = models if isinstance(models, str) else None
self.test_config = tests if isinstance(tests, str) else None
log.info(f'Setting up experiment {self.name}:')
log.info(f'\tStart: {self.start_date}')
log.info(f'\tEnd: {self.start_date}')
log.info(f'\tTime windows: {len(self.timewindows)}')
log.info(f'\tRegion: {self.region.name if self.region else None}')
log.info(f'\tMagnitude range: [{numpy.min(self.magnitudes)},'
f' {numpy.max(self.magnitudes)}]')
self.catalog = None
self.models = []
self.tests = []
self.postproc_config = postproc_config if postproc_config else {}
self.default_test_kwargs = default_test_kwargs
self.catalog = catalog
self.models = self.set_models(models or kwargs.get('model_config'),
kwargs.get('order', None))
self.tests = self.set_tests(tests or kwargs.get('test_config'))
self.tasks = []
self.task_graph = None
self.report_hook = report_hook if report_hook else {}
self.force_rerun = kwargs.get('force_rerun', False)
def __getattr__(self, item: str) -> object:
"""
Override built-in method to return attributes found within
:attr:`region_config` or :attr:`time_config`
"""
try:
return self.__dict__[item]
except KeyError:
try:
return self.time_config[item]
except KeyError:
try:
return self.region_config[item]
except KeyError:
raise AttributeError(
f"Experiment '{self.name}'"
f" has no attribute '{item}'") from None
def __dir__(self):
"""
Adds time and region configs keys to instance scope.
"""
_dir = list(super().__dir__()) + list(self.time_config.keys()) + list(
self.region_config)
return sorted(_dir)
[docs]
def set_models(self, model_config: Union[Dict, str, List],
order: List = None) -> List:
"""
Parse the models' configuration file/dict. Instantiates all the models
as :class:`floatcsep.model.Model` and store them into
:attr:`Experiment.models`.
Args:
model_config (dict, list, str): configuration file or dictionary
containing the model initialization attributes, as defined in
:meth:`~floatcsep.model.Model`
order (list): desired order of models
"""
models = []
if isinstance(model_config, str):
modelcfg_path = self.path.abs(model_config)
_dir = self.path.absdir(model_config)
with open(modelcfg_path, 'r') as file_:
config_dict = yaml.load(file_, NoAliasLoader)
elif isinstance(model_config, (dict, list)):
config_dict = model_config
_dir = self.path.workdir
elif model_config is None:
return models
else:
raise NotImplementedError(f'Load for model type'
f' {model_config.__class__}'
f'not implemented ')
for element in config_dict:
# Check if the model is unique or has multiple submodels
if not any('flavours' in i for i in element.values()):
name_ = next(iter(element))
path_ = self.path.rel(_dir, element[name_]['path'])
model_i = {name_: {**element[name_],
'model_path': path_,
'workdir': self.path.workdir}}
model_i[name_].pop('path')
models.append(Model.from_dict(model_i))
else:
model_flavours = list(element.values())[0]['flavours'].items()
for flav, flav_path in model_flavours:
name_super = next(iter(element))
path_super = element[name_super].get('path', '')
path_sub = self.path.rel(_dir, path_super, flav_path)
# updates name of submodel
name_flav = f'{name_super}@{flav}'
model_ = {name_flav: {**element[name_super],
'model_path': path_sub,
'workdir': self.path.workdir}}
model_[name_flav].pop('path')
model_[name_flav].pop('flavours')
models.append(Model.from_dict(model_))
# Checks if there is any repeated model.
names_ = [i.name for i in models]
if len(names_) != len(set(names_)):
reps = set(
[i for i in names_ if (sum([j == i for j in names_]) > 1)])
one = not bool(len(reps) - 1)
log.warning(f'Warning: Model{"s" * (not one)} {reps}'
f' {"is" * one + "are" * (not one)} repeated')
log.info(f'\tModels: {[i.name for i in models]}')
if order:
models = [models[i] for i in order]
return models
[docs]
def get_model(self, name: str) -> Model:
""" Returns a Model by its name string """
for model in self.models:
if model.name == name:
return model
[docs]
def stage_models(self) -> None:
"""
Stages all the experiment's models. See
:meth:`floatcsep.model.Model.stage`
"""
log.info('Staging models')
for i in self.models:
i.stage(self.timewindows)
[docs]
def set_tests(self, test_config: Union[str, Dict, List]) -> list:
"""
Parse the tests' configuration file/dict. Instantiate them as
:class:`floatcsep.evaluation.Evaluation` and store them into
:attr:`Experiment.tests`.
Args:
test_config (dict, list, str): configuration file or dictionary
containing the evaluation initialization attributes, as defined in
:meth:`~floatcsep.evaluation.Evaluation`
"""
tests = []
if isinstance(test_config, str):
with open(self.path.abs(test_config), 'r') as config:
config_dict = yaml.load(config, NoAliasLoader)
for eval_dict in config_dict:
tests.append(Evaluation.from_dict(eval_dict))
elif isinstance(test_config, (dict, list)):
for eval_dict in test_config:
tests.append(Evaluation.from_dict(eval_dict))
log.info(f'\tEvaluations: {[i.name for i in tests]}')
return tests
@property
def catalog(self) -> CSEPCatalog:
"""
Returns a CSEP catalog loaded from the given query function or
a stored file if it exists.
"""
cat_path = self.path.abs(self._catpath)
if callable(self._catalog):
if isfile(self._catpath):
return CSEPCatalog.load_json(self._catpath)
bounds = {'start_time': min([item for sublist in self.timewindows
for item in sublist]),
'end_time': max([item for sublist in self.timewindows
for item in sublist]),
'min_magnitude': self.magnitudes.min(),
'max_depth': self.depths.max()}
if self.region:
bounds.update({i: j for i, j in
zip(['min_longitude', 'max_longitude',
'min_latitude', 'max_latitude'],
self.region.get_bbox())})
catalog = self._catalog(
catalog_id='catalog', # todo name as run
**bounds
)
if self.region:
catalog.filter_spatial(region=self.region, in_place=True)
catalog.region = None
catalog.write_json(self._catpath)
return catalog
elif isfile(cat_path):
try:
return CSEPCatalog.load_json(cat_path)
except json.JSONDecodeError:
return csep.load_catalog(cat_path)
@catalog.setter
def catalog(self, cat: Union[Callable, CSEPCatalog, str]) -> None:
if cat is None:
self._catalog = None
self._catpath = None
elif isfile(self.path.abs(cat)):
log.info(f"\tCatalog: '{cat}'")
self._catalog = self.path.rel(cat)
self._catpath = self.path.rel(cat)
else:
# catalog can be a function
self._catalog = parse_csep_func(cat)
self._catpath = self.path.abs('catalog.json')
if isfile(self._catpath):
log.info(f"\tCatalog: stored "
f"'{self._catpath}' "
f"from '{cat}'")
else:
log.info(f"\tCatalog: '{cat}'")
def get_test_cat(self, tstring: str = None) -> CSEPCatalog:
"""
Filters the complete experiment catalog to a test sub-catalog bounded
by the test time-window. Writes it to filepath defined in
:attr:`Experiment.filetree`
Args:
tstring (str): Time window string
"""
if tstring:
start, end = str2timewindow(tstring)
else:
start = self.start_date
end = self.end_date
sub_cat = self.catalog.filter(
[f'origin_time < {end.timestamp() * 1000}',
f'origin_time >= {start.timestamp() * 1000}',
f'magnitude >= {self.mag_min}',
f'magnitude < {self.mag_max}'], in_place=False)
if self.region:
sub_cat.filter_spatial(region=self.region, in_place=True)
return sub_cat
[docs]
def set_test_cat(self, tstring: str) -> None:
"""
Filters the complete experiment catalog to a test sub-catalog bounded
by the test time-window. Writes it to filepath defined in
:attr:`Experiment.filetree`
Args:
tstring (str): Time window string
"""
""
testcat_name = self.path(tstring, 'catalog')
if not exists(testcat_name):
log.debug(
f'Filtering catalog to testing sub-catalog and saving to '
f'{testcat_name}')
start, end = str2timewindow(tstring)
sub_cat = self.catalog.filter(
[f'origin_time < {end.timestamp() * 1000}',
f'origin_time >= {start.timestamp() * 1000}',
f'magnitude >= {self.mag_min}',
f'magnitude < {self.mag_max}'], in_place=False)
if self.region:
sub_cat.filter_spatial(region=self.region, in_place=True)
sub_cat.write_json(filename=testcat_name)
else:
log.debug(f'Using stored test sub-catalog from {testcat_name}')
def set_input_cat(self, tstring: str, model: Model) -> None:
"""
Filters the complete experiment catalog to a input sub-catalog filtered
to the beginning of thetest time-window. Writes it to filepath defined
in :attr:`Model.tree.catalog`
Args:
tstring (str): Time window string
model (:class:`~floatcsep.model.Model`): Model to give the input
catalog
"""
start, end = str2timewindow(tstring)
sub_cat = self.catalog.filter(
[f'origin_time < {start.timestamp() * 1000}'])
sub_cat.write_ascii(filename=model.path('input_cat'))
[docs]
def set_tasks(self):
"""
Lazy definition of the experiment core tasks by wrapping instances,
methods and arguments. Creates a graph with task nodes, while assigning
task-parents to each node, depending on each Evaluation signature.
The tasks can then be run sequentially as a list or asynchronous
using the graph's node dependencies.
For instance:
* A forecast can only be made if catalog was filtered to its window
* A consistency test can be run if the forecast exists in a window
* A comparison test requires the forecast and ref forecast
* A sequential test requires the forecasts exist for all windows
* A batch test requires all forecast exist for a given window.
Returns:
"""
# Set the file path structure
self.path.build(self.timewindows, self.models, self.tests)
log.info("Setting up experiment's tasks")
log.debug("Pre-run: results' paths\n" + yaml.dump(self.path.asdict()))
# Get the time windows strings
tw_strings = timewindow2str(self.timewindows)
# Prepare the testing catalogs
task_graph = TaskGraph()
for time_i in tw_strings:
# The method call Experiment.set_test_cat(time_i) is created lazily
task_i = Task(instance=self,
method='set_test_cat',
tstring=time_i)
# An is added to the task graph
task_graph.add(task_i)
# the task will be executed later with Experiment.run()
# once all the tasks are defined
# Set up the Forecasts creation
for time_i in tw_strings:
for model_j in self.models:
if model_j.model_class == 'td':
task_tj = Task(instance=self,
method='set_input_cat',
tstring=time_i,
model=model_j)
task_graph.add(task=task_tj)
# A catalog needs to have been filtered
task_ij = Task(instance=model_j,
method='create_forecast',
tstring=time_i,
force=self.force_rerun)
task_graph.add(task=task_ij)
# A catalog needs to have been filtered
if model_j.model_class == 'td':
task_graph.add_dependency(task_ij,
dinst=self,
dmeth='set_input_cat',
dkw=(time_i, model_j))
task_graph.add_dependency(task_ij,
dinst=self,
dmeth='set_test_cat',
dkw=time_i)
# Set up the Consistency Tests
for test_k in self.tests:
if test_k.type == 'consistency':
for time_i in tw_strings:
for model_j in self.models:
task_ijk = Task(
instance=test_k,
method='compute',
timewindow=time_i,
catalog=self.path(time_i, 'catalog'),
model=model_j,
region=self.region,
path=self.path(time_i, 'evaluations',
test_k, model_j))
task_graph.add(task_ijk)
# the forecast needs to have been created
task_graph.add_dependency(task_ijk,
dinst=model_j,
dmeth='create_forecast',
dkw=time_i)
# Set up the Comparative Tests
elif test_k.type == 'comparative':
for time_i in tw_strings:
for model_j in self.models:
task_ik = Task(
instance=test_k,
method='compute',
timewindow=time_i,
catalog=self.path(time_i, 'catalog'),
model=model_j,
ref_model=self.get_model(test_k.ref_model),
region=self.region,
path=self.path(time_i, 'evaluations', test_k,
model_j)
)
task_graph.add(task_ik)
task_graph.add_dependency(task_ik,
dinst=model_j,
dmeth='create_forecast',
dkw=time_i)
task_graph.add_dependency(task_ik,
dinst=self.get_model(
test_k.ref_model),
dmeth='create_forecast',
dkw=time_i)
# Set up the Sequential Scores
elif test_k.type == 'sequential':
for model_j in self.models:
task_k = Task(
instance=test_k,
method='compute',
timewindow=tw_strings,
catalog=[self.path(i, 'catalog')
for i in tw_strings],
model=model_j,
region=self.region,
path=self.path(tw_strings[-1], 'evaluations',
test_k, model_j)
)
task_graph.add(task_k)
for tw_i in tw_strings:
task_graph.add_dependency(task_k,
dinst=model_j,
dmeth='create_forecast',
dkw=tw_i)
# Set up the Sequential_Comparative Scores
elif test_k.type == 'sequential_comparative':
tw_strs = timewindow2str(self.timewindows)
for model_j in self.models:
task_k = Task(
instance=test_k,
method='compute',
timewindow=tw_strs,
catalog=[self.path(i, 'catalog') for i in tw_strs],
model=model_j,
ref_model=self.get_model(test_k.ref_model),
region=self.region,
path=self.path(tw_strs[-1], 'evaluations', test_k,
model_j)
)
task_graph.add(task_k)
for tw_i in tw_strings:
task_graph.add_dependency(task_k,
dinst=model_j,
dmeth='create_forecast',
dkw=tw_i)
task_graph.add_dependency(task_k,
dinst=self.get_model(
test_k.ref_model),
dmeth='create_forecast',
dkw=tw_i)
# Set up the Batch comparative Scores
elif test_k.type == 'batch':
time_str = timewindow2str(self.timewindows[-1])
for model_j in self.models:
task_k = Task(
instance=test_k,
method='compute',
timewindow=time_str,
catalog=self.path(time_str, 'catalog'),
ref_model=self.models,
model=model_j,
region=self.region,
path=self.path(time_str, 'evaluations', test_k,
model_j)
)
task_graph.add(task_k)
for m_j in self.models:
task_graph.add_dependency(task_k,
dinst=m_j,
dmeth='create_forecast',
dkw=time_str)
self.task_graph = task_graph
[docs]
def run(self) -> None:
"""
Run the task tree
todo:
- Cleanup forecast (perhaps add a clean task in self.prepare_tasks,
after all test had been run for a given forecast)
- Memory monitor?
- Queuer?
"""
log.info(f'Running {self.task_graph.ntasks} tasks')
if self.seed:
numpy.random.seed(self.seed)
self.task_graph.run()
log.info(f'Calculation completed')
[docs]
def read_results(self, test: Evaluation, window: str) -> List:
"""
Reads an Evaluation result for a given time window and returns a list
of the results for all tested models.
"""
return test.read_results(window, self.models, self.path)
[docs]
def plot_results(self) -> None:
"""
Plots all evaluation results
"""
log.info("Plotting evaluations")
timewindows = timewindow2str(self.timewindows)
for test in self.tests:
test.plot_results(timewindows, self.models, self.path)
[docs]
def plot_catalog(self, dpi: int = 300, show: bool = False) -> None:
"""
Plots the evaluation catalogs
Args:
dpi: Figure resolution with which to save
show: show in runtime
"""
plot_args = {'basemap': 'ESRI_terrain',
'figsize': (12, 8),
'markersize': 8,
'markercolor': 'black',
'grid_fontsize': 16,
'title': '',
'legend': True,
}
plot_args.update(self.postproc_config.get('plot_catalog', {}))
catalog = self.get_test_cat()
if catalog.get_number_of_events() != 0:
ax = catalog.plot(plot_args=plot_args, show=show)
ax.get_figure().tight_layout()
ax.get_figure().savefig(self.path('catalog_figure'),
dpi=dpi)
ax2 = magnitude_vs_time(catalog)
ax2.get_figure().tight_layout()
ax2.get_figure().savefig(self.path('magnitude_time'),
dpi=dpi)
if self.postproc_config.get('all_time_windows'):
timewindow = self.timewindows
for tw in timewindow:
catpath = self.path(tw, 'catalog')
catalog = CSEPCatalog.load_json(catpath)
if catalog.get_number_of_events() != 0:
ax = catalog.plot(plot_args=plot_args, show=show)
ax.get_figure().tight_layout()
ax.get_figure().savefig(self.path(tw, 'figures',
'catalog'),
dpi=dpi)
ax2 = magnitude_vs_time(catalog)
ax2.get_figure().tight_layout()
ax2.get_figure().savefig(self.path(tw, 'figures',
'magnitude_time'),
dpi=dpi)
[docs]
def plot_forecasts(self) -> None:
"""
Plots and saves all the generated forecasts
"""
plot_fc_config = self.postproc_config.get('plot_forecasts')
if plot_fc_config:
log.info("Plotting forecasts")
if plot_fc_config is True:
plot_fc_config = {}
try:
proj_ = plot_fc_config.get('projection')
if isinstance(proj_, dict):
proj_name = list(proj_.keys())[0]
proj_args = list(proj_.values())[0]
else:
proj_name = proj_
proj_args = {}
plot_fc_config['projection'] = getattr(ccrs, proj_name)(
**proj_args)
except (IndexError, KeyError, TypeError, AttributeError):
plot_fc_config['projection'] = ccrs.PlateCarree(
central_longitude=0.0)
cat = plot_fc_config.get('catalog')
cat_args = {}
if cat:
cat_args = {'markersize': 7, 'markercolor': 'black',
'title': 'asd', 'grid': False,
'legend': False, 'basemap': None,
'region_border': False}
if self.region:
self.catalog.filter_spatial(self.region, in_place=True)
if isinstance(cat, dict):
cat_args.update(cat)
window = self.timewindows[-1]
winstr = timewindow2str(window)
for model in self.models:
fig_path = self.path(winstr, 'forecasts', model.name)
start = decimal_year(window[0])
end = decimal_year(window[1])
time = f'{round(end - start, 3)} years'
plot_args = {'region_border': False,
'cmap': 'magma',
'clabel': r'$\log_{10} N\left(M_w \in [{%.2f},'
r'\,{%.2f}]\right)$ per '
r'$0.1^\circ\times 0.1^\circ $ per %s' %
(min(self.magnitudes),
max(self.magnitudes), time)}
if not self.region or self.region.name == 'global':
set_global = True
else:
set_global = False
plot_args.update(plot_fc_config)
ax = model.get_forecast(winstr, self.region).plot(
set_global=set_global, plot_args=plot_args)
if self.region:
bbox = self.region.get_bbox()
dh = self.region.dh
extent = [bbox[0] - 3 * dh, bbox[1] + 3 * dh,
bbox[2] - 3 * dh, bbox[3] + 3 * dh]
else:
extent = None
if cat:
self.catalog.plot(ax=ax, set_global=set_global,
extent=extent, plot_args=cat_args)
pyplot.savefig(fig_path, dpi=300, facecolor=(0, 0, 0, 0))
[docs]
def generate_report(self) -> None:
"""
Creates a report summarizing the Experiment's results
"""
log.info(f"Saving report into {self.path.rundir}")
self.path.build(self.timewindows, self.models, self.tests)
log.debug("Post-run: results' paths\n" + yaml.dump(self.path.asdict()))
report.generate_report(self)
def make_repr(self):
log.info('Creating reproducibility config file')
repr_config = self.path('config')
# Dropping region to results folder if it is a file
region_path = self.region_config.get('path', None)
if region_path:
if isfile(region_path) and region_path:
new_path = join(self.path.rundir, self.region_config['path'])
shutil.copy2(region_path, new_path)
self.region_config.pop('path')
self.region_config['region'] = self.path.rel(new_path)
# Dropping catalog to results folder
target_cat = join(self.path.workdir, self.path.rundir,
split(self._catpath)[-1])
if not exists(target_cat):
shutil.copy2(self.path.abs(self._catpath), target_cat)
self._catpath = self.path.rel(target_cat)
relative_path = os.path.relpath(
self.path.workdir, os.path.join(self.path.workdir,
self.path.rundir))
self.path.workdir = relative_path
self.to_yml(repr_config, extended=True)
def as_dict(self, exclude: Sequence = ('magnitudes', 'depths',
'timewindows', 'filetree',
'task_graph', 'tasks',
'models', 'tests'),
extended: bool = False) -> dict:
"""
Converts an Experiment instance into a dictionary.
Args:
exclude (tuple, list): Attributes, or attribute keys, to ignore
extended (bool): Verbose representation of pycsep objects
Returns:
A dictionary with serialized instance's attributes, which are
floatCSEP readable
"""
def _get_value(x):
# For each element type, transforms to desired string/output
if hasattr(x, 'as_dict'):
# e.g. model, test, etc.
o = x.as_dict()
else:
try:
try:
o = getattr(x, '__name__')
except AttributeError:
o = getattr(x, 'name')
except AttributeError:
if isinstance(x, numpy.ndarray):
o = x.tolist()
else:
o = x
return o
def iter_attr(val):
# recursive iter through nested dicts/lists
if isinstance(val, Mapping):
return {item: iter_attr(val_) for item, val_ in val.items()
if ((item not in exclude) and val_) or extended}
elif isinstance(val, Sequence) and not isinstance(val, str):
return [iter_attr(i) for i in val]
else:
return _get_value(val)
listwalk = [(i, j) for i, j in self.__dict__.items() if
not i.startswith('_') and j]
listwalk.insert(6, ('catalog', self._catpath))
dictwalk = {i: j for i, j in listwalk}
# if self.model_config is None:
# dictwalk['models'] = iter_attr(self.models)
# if self.test_config is None:
# dictwalk['tests'] = iter_attr(self.tests)
return iter_attr(dictwalk)
[docs]
def to_yml(self, filename: str, **kwargs) -> None:
"""
Serializes the :class:`~floatcsep.experiment.Experiment` instance into
a .yml file.
Note:
This instance can then be reinstantiated using
:meth:`~floatcsep.experiment.Experiment.from_yml`
Args:
filename: Name of the file onto which dump the instance
**kwargs: Pass to :meth:`~floatcsep.experiment.Experiment.as_dict`
Returns:
"""
class NoAliasDumper(yaml.Dumper):
def ignore_aliases(self, data):
return True
with open(filename, 'w') as f_:
yaml.dump(
self.as_dict(**kwargs), f_,
Dumper=NoAliasDumper,
sort_keys=False,
default_flow_style=False,
indent=1,
width=70
)
[docs]
@classmethod
def from_yml(cls, config_yml: str, reprdir=None, **kwargs):
"""
Initializes an experiment from a .yml file. It must contain the
attributes described in the :class:`~floatcsep.experiment.Experiment`,
:func:`~floatcsep.utils.read_time_config` and
:func:`~floatcsep.utils.read_region_config` descriptions
Args:
config_yml (str): The path to the .yml file
reprdir (str): folder where to reproduce results
Returns:
An :class:`~floatcsep.experiment.Experiment` class instance
"""
log.info('Initializing experiment from .yml file')
with open(config_yml, 'r') as yml:
# experiment configuration file
_dict = yaml.safe_load(yml)
_dir_yml = dirname(config_yml)
# uses yml path and append if a rel/abs path is given in config.
_path = _dict.get('path', '')
# Only ABSOLUTE PATH
_dict['path'] = abspath(join(_dir_yml, _dict.get('path', '')))
# replaces rundir case reproduce option is used
if reprdir:
_dict['original_rundir'] = _dict.get('rundir', 'results')
_dict['rundir'] = relpath(join(_dir_yml, reprdir),
_dict['path'])
_dict['original_config'] = abspath(join(_dict['path'],
_dict['config_file']))
else:
_dict['rundir'] = _dict.get('rundir',
kwargs.pop('rundir', 'results'))
_dict['config_file'] = relpath(config_yml, _dir_yml)
# print(_dict['rundir'])
return cls(**_dict, **kwargs)