import json
import os
import numpy
import csep
import git
import subprocess
import logging
from typing import List, Callable, Union, Mapping, Sequence
from datetime import datetime
from csep.core.forecasts import GriddedForecast, CatalogForecast
from csep.utils.time_utils import decimal_year
from floatcsep.accessors import from_zenodo, from_git
from floatcsep.readers import ForecastParsers, HDF5Serializer, check_format
from floatcsep.utils import timewindow2str, str2timewindow
from floatcsep.registry import ModelTree
log = logging.getLogger('floatLogger')
[docs]
class Model:
"""
Class defining a forecast generating Model. Initializes a model source
either from filesystem or web repositories, contains information about
the Model's typology, and maps accordingly to a forecast generating
function.
Args:
name (str): Name of the model
model_path (str): Relative path of the model (file or runnable code)
to the Experiment's instance path
forecast_unit (float): Temporal unit of the forecast. Default to
years in time-independent models and days
in time-dependent
use_db (bool): Flag the use of a database for speed/memory purposes
func (str, ~collections.abc.Callable): Forecast generating function
(for code models)
func_kwargs (dict): Arguments to pass into `func`
zenodo_id (int): Zenodo ID or record of the Model
giturl (str): Link to a git repository
repo_hash (str): Specific commit/branch/tag hash.
authors (list[str]): Authors' names metadata
doi: Digital Object Identifier metadata:
"""
'''
Model characteristics:
Forecast: - Gridded
- Catalog
Updating: - Time-Independent
- Time-Dependent
Source: - File
- Code
Model typology:
To implement in beta version:
- (grid - ti - file): e.g. CSEP1 style gridded forecasts
To implement in further versions:
- (grid - ti - code): e.g. smoothed-seismicity model
- (grid - td - code): e.g. EEPAS, STEP, Italy-NZ OEF models
- (cat - td - code): e.g. ETAS model code
- (cat - td - file): e.g OEF-ready Forecasts
- (grid - td - file): e.g OEF-ready Forecasts
Get forecasts options:
- FILE - read from file, scale in runtime
- drop to db, scale from function in runtime
- CODE - run, read from file
- run, store in db, read from db
'''
[docs]
def __init__(self, name: str, model_path: str,
forecast_unit: float = 1, use_db: bool = False,
func: Union[str, Callable] = None, func_kwargs: dict = None,
zenodo_id: int = None, giturl: str = None,
repo_hash: str = None, authors: List[str] = None,
doi: str = None, **kwargs) -> None:
# todo:
# - Instantiate from source code
# Instantiate attributes
self.name = name
self.zenodo_id = zenodo_id
self.giturl = giturl
self.repo_hash = repo_hash
self.authors = authors
self.doi = doi
self.forecast_unit = forecast_unit
self.func = func
self.func_kwargs = func_kwargs or {}
self.use_db = use_db
self.path = ModelTree(kwargs.get('workdir', os.getcwd()), model_path)
# Set model temporal class
if self.func:
# Time-Dependent
self.model_class = 'td'
self.build = kwargs.get('build', 'docker')
self.run_prefix = ''
else:
# Time-Independent
self.model_class = kwargs.get('model_class', 'ti')
# Instantiate attributes to be filled in run-time
self.forecasts = {}
self.__dict__.update(**kwargs)
@property
def dir(self) -> str:
"""
Returns:
The directory containing the model source.
"""
if os.path.isdir(self.path('path')):
return self.path('path')
else:
return os.path.dirname(self.path('path'))
[docs]
def stage(self, timewindows=None) -> None:
"""
Pre-steps to make the model runnable before integrating
- Get from filesystem, Zenodo or Git
- Pre-check model fileformat
- Initialize database
- Run model quality assurance (unit tests, runnable from floatcsep)
"""
self.get_source(self.zenodo_id, self.giturl, branch=self.repo_hash)
if self.use_db:
self.init_db()
if self.model_class == 'td':
self.build_model()
self.path.build_tree(timewindows=timewindows,
model_class=self.model_class,
prefix=self.__dict__.get('prefix', self.name),
args_file=self.__dict__.get('args_file', None),
input_cat=self.__dict__.get('input_cat', None))
def build_model(self):
if self.build == 'pip' or self.build == 'venv':
venv = os.path.join(self.path('path'),
self.__dict__.get('venv', 'venv'))
venvact = os.path.join(venv, 'bin', 'activate')
if not os.path.exists(venv):
log.info(f'Building model {self.name} using pip')
subprocess.run(['python', '-m', 'venv', venv])
log.info(f'\tVirtual environment created in {venv}')
build_cmd = f'source {venvact} && ' \
f'pip install --upgrade pip && ' \
f'pip install -e {self.path("path")}'
cmd = ['bash', '-c', build_cmd]
log.info(f'\tInstalling dependencies')
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
for line in process.stdout:
log.info(f'\t{line[:-1]}')
process.wait()
log.info(f'\tEnvironment ready')
log.warning(f'\tNested environments is not fully supported. '
f'Consider using docker instead')
self.run_prefix = f'cd {self.path("path")} && source {venvact} && '
[docs]
def get_source(self, zenodo_id: int = None, giturl: str = None,
force: bool = False,
**kwargs) -> None:
"""
Search, download or clone the model source in the filesystem, zenodo
and git, respectively. Identifies if the instance path points to a file
or to its parent directory
Args:
zenodo_id (int): Zenodo identifier of the repository. Usually as
`https://zenodo.org/record/{zenodo_id}`
giturl (str): git remote repository URL from which to clone the
source
force (bool): Forces to re-query the model from a web repository
**kwargs: see :func:`~floatcsep.utils.from_zenodo` and
:func:`~floatcsep.utils.from_git`
"""
if os.path.exists(self.path('path')) and not force:
return
os.makedirs(self.dir, exist_ok=True)
if zenodo_id:
log.info(f'Retrieving model {self.name} from zenodo id: '
f'{zenodo_id}')
try:
from_zenodo(zenodo_id, self.dir if self.path.fmt
else self.path('path'), force=force)
except (KeyError, TypeError) as msg:
raise KeyError(f'Zenodo identifier is not valid: {msg}')
elif giturl:
log.info(f'Retrieving model {self.name} from git url: '
f'{giturl}')
try:
from_git(giturl, self.dir if self.path.fmt
else self.path('path'), **kwargs)
except (git.NoSuchPathError, git.CommandError) as msg:
raise git.NoSuchPathError(f'git url was not found {msg}')
else:
raise FileNotFoundError('Model has no path or identified')
if not os.path.exists(self.dir) or not \
os.path.exists(self.path('path')):
raise FileNotFoundError(
f"Directory '{self.dir}' or file {self.path}' do not exist. "
f"Please check the specified 'path' matches the repo "
f"structure")
[docs]
def init_db(self, dbpath: str = '', force: bool = False) -> None:
"""
Initializes the database if `use_db` is True. If the model source is a
file, seralizes the forecast into a HDF5 file. If source is a
generating function or code, creates an empty DB
Args:
dbpath (str): Path to drop the HDF5 database. Defaults to same path
replaced with an `hdf5` extension
force (bool): Forces the serialization even if the DB already
exists
"""
# todo Think about distinction btwn 'TI' and 'Gridded' models.
if self.model_class == 'ti':
parser = getattr(ForecastParsers, self.path.fmt)
rates, region, mag = parser(self.path('path'))
db_func = HDF5Serializer.grid2hdf5
if not dbpath:
dbpath = self.path.path.replace(self.path.fmt, 'hdf5')
self.path.database = dbpath
if not os.path.isfile(self.path.abs(dbpath)) or force:
log.info(f'Serializing model {self.name} into HDF5 format')
db_func(rates, region, mag,
hdf5_filename=self.path.abs(dbpath),
unit=self.forecast_unit)
else:
raise NotImplementedError('TD serialization not implemented')
[docs]
def rm_db(self) -> None:
""" Clean up the generated HDF5 File"""
if self.use_db:
if os.path.isfile(self.path) and self.fmt == 'hdf5':
os.remove(self.path)
else:
log.warning(f"The HDF5 file {self.path} does not exist")
[docs]
def get_forecast(self,
tstring: Union[str, list] = None,
region=None
) -> Union[GriddedForecast, CatalogForecast,
List[GriddedForecast], List[CatalogForecast]]:
""" Wrapper that just returns a forecast, which should hide the
access method (db storage, ti_td, etc.) under the hood"""
if self.model_class == 'ti':
if isinstance(tstring, str):
# If only one timewindow string is passed
try:
# If they are retrieved from the Evaluation class
return self.forecasts[tstring]
except KeyError:
# In case they are called from postprocess
self.create_forecast(tstring)
return self.forecasts[tstring]
else:
# If multiple timewindow strings are passed
forecasts = []
for tw in tstring:
if tw in self.forecasts.keys():
forecasts.append(self.forecasts[tw])
if not forecasts:
raise KeyError(
f'Forecasts {*tstring,} have not been created yet')
return forecasts
elif self.model_class == 'td':
if isinstance(tstring, str):
# If one time window string is passed
# default forecast naming
fc_path = self.path('forecasts', tstring)
# default forecasts folder
# A region must be given to the forecast
return csep.load_catalog_forecast(fc_path, region=region)
[docs]
def create_forecast(self, tstring: str,
**kwargs) -> None:
"""
Creates a forecast from the model source and a given time window
Note:
The argument `tstring` is formatted according to how the Experiment
handles timewindows, specified in the functions
:func:'floatcsep.utils.timewindow2str` and
:func:'floatcsep.utils.str2timewindow`
Args:
tstring: String representing the start and end of the forecast,
formatted as 'YY1-MM1-DD1_YY2-MM2-DD2'.
**kwargs:
"""
start_date, end_date = str2timewindow(tstring)
# Model src is a file
if self.model_class == 'ti':
self.forecast_from_file(start_date, end_date, **kwargs)
# Model src is a func or binary
else:
fc_path = self.path('forecasts', tstring)
if kwargs.get('force') or not os.path.exists(fc_path):
self.forecast_from_func(start_date, end_date,
**self.func_kwargs,
**kwargs)
else:
log.info(f'Forecast of {tstring} of model {self.name} already '
f'exists')
[docs]
def forecast_from_file(self, start_date: datetime, end_date: datetime,
**kwargs) -> None:
"""
Generates a forecast from a file, by parsing and scaling it to
the desired time window. H
Args:
start_date (~datetime.datetime): Start of the forecast
end_date (~datetime.datetime): End of the forecast
**kwargs: Keyword arguments for
:class:`csep.core.forecasts.GriddedForecast`
"""
time_horizon = decimal_year(end_date) - decimal_year(start_date)
tstring = timewindow2str([start_date, end_date])
f_path = self.path('forecasts', tstring)
f_parser = getattr(ForecastParsers, self.path.fmt)
rates, region, mags = f_parser(f_path)
forecast = GriddedForecast(
name=f'{self.name}',
data=rates,
region=region,
magnitudes=mags,
start_time=start_date,
end_time=end_date
)
scale = time_horizon / self.forecast_unit
if scale != 1.0:
forecast = forecast.scale(scale)
log.debug(
f"Model {self.name}:\n"
f"\tForecast expected count: {forecast.event_count:.2f}"
f" with scaling parameter: {time_horizon:.1f}")
self.forecasts[tstring] = forecast
[docs]
def forecast_from_func(self, start_date: datetime, end_date: datetime,
**kwargs) -> None:
self.prepare_args(start_date, end_date, **kwargs)
log.info(f'Running {self.name} using {self.build}:'
f' {timewindow2str([start_date, end_date])}')
self.run_model()
def prepare_args(self, start, end, **kwargs):
filepath = self.path('args_file')
fmt = os.path.splitext(filepath)[1]
if fmt == '.txt':
def replace_arg(arg, val, fp):
with open(fp, 'r') as filearg_:
lines = filearg_.readlines()
pattern_exists = False
for k, line in enumerate(lines):
if line.startswith(arg):
lines[k] = f"{arg} = {val}\n"
pattern_exists = True
break # assume there's only one occurrence of the key
if not pattern_exists:
lines.append(f"{arg} = {val}\n")
with open(fp, 'w') as file:
file.writelines(lines)
replace_arg('start_date', start.isoformat(), filepath)
replace_arg('end_date', end.isoformat(), filepath)
for i, j in kwargs.items():
replace_arg(i, j, filepath)
elif fmt == '.json':
with open(filepath, 'r') as file_:
args = json.load(file_)
args['start_date'] = start.isoformat()
args['end_date'] = end.isoformat()
args.update(kwargs)
with open(filepath, 'w') as file_:
json.dump(args, file_, indent=2)
def run_model(self):
if self.build == 'pip' or self.build == 'venv':
run_func = f'{self.func} {self.path("args_file")}'
cmd = ['bash', '-c', f'{self.run_prefix} {run_func}']
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
for line in process.stdout:
log.info(f'\t{line[:-1]}')
process.wait()
def as_dict(self, excluded=('name', 'forecasts', 'workdir')):
"""
Returns:
Dictionary with relevant attributes. Model can be reinstantiated
from this dict
"""
def _get_value(x):
# For each element type, transforms to desired string/output
if hasattr(x, 'as_dict'):
# e.g. model, evaluation, filetree, etc.
o = x.as_dict()
else:
try:
try:
o = getattr(x, '__name__')
except AttributeError:
o = getattr(x, 'name')
except AttributeError:
if isinstance(x, numpy.ndarray):
o = x.tolist()
else:
o = x
return o
def iter_attr(val):
# recursive iter through nested dicts/lists
if isinstance(val, Mapping):
return {item: iter_attr(val_) for item, val_ in val.items()
if ((item not in excluded) and val_)}
elif isinstance(val, Sequence) and not isinstance(val, str):
return [iter_attr(i) for i in val]
else:
return _get_value(val)
listwalk = [(i, j) for i, j in sorted(self.__dict__.items()) if
not i.startswith('_') and j]
dictwalk = {i: j for i, j in listwalk}
# if self.model_config is None:
# dictwalk['models'] = iter_attr(self.models)
# if self.test_config is None:
# dictwalk['tests'] = iter_attr(self.tests)
return {self.name: iter_attr(dictwalk)}
[docs]
@classmethod
def from_dict(cls, record: dict, **kwargs):
"""
Returns a Model instance from a dictionary containing the required
atrributes. Can be used to quickly instantiate from a .yml file.
Args:
record (dict): Contains the keywords from the ``__init__`` method.
Note:
Must have either an explicit key `name`, or it must have
exactly one key with the model's name, whose values are
the remaining ``__init__`` keywords.
Returns:
A Model instance
"""
if 'name' in record.keys():
return cls(**record)
elif len(record) != 1:
raise IndexError('A single model has not been passed')
name = next(iter(record))
return cls(name=name, **record[name], **kwargs)