Infrastructure Module

Here are shown the modules that manage the relations between the core classes of floatCSEP and the required workflow to run an Experiment.

Registries

class floatcsep.infrastructure.registries.ModelFileRegistry(model_name, workdir, path, args_file=None, input_cat=None, fmt=None)[source]

Bases: ModelRegistry, FilepathMixin

The class is responsible to handle the keys (in this case filepaths) to access model objects such as forecasts, input catalogs or argument/parameter files. These keys are used mainly by floatcsep.infrastructure.repositories.ForecastRepository or floatcsep.infrastructure.repositories.CatalogRepository.

Parameters:
  • model_name (str) – Model’s identifier string

  • workdir (str) – The current working directory of the experiment.

  • path (str) – The path of the model working directory (or model filepath).

  • args_file (str) – The path of the arguments file (only for TimeDependentModel).

  • input_cat (str) – : The path of the arguments file (only for TimeDependentModel).

  • fmt (str)

as_dict()[source]
Returns:

Simple dictionary serialization of the instance with the core attributes

Return type:

dict

build_tree(time_windows=None, model_class='TimeIndependentModel', prefix=None, run_mode=typing.Literal['sequential', 'parallel'], stage_dir='results', run_id='run')[source]

Creates the run directory, and reads the file structure inside.

Parameters:
  • time_windows (list(str)) – List of time windows or strings.

  • model_class (str) – Model’s class name

  • prefix (str) – prefix of the model forecast filenames if TD

  • run_mode (str) – if run mode is sequential, input data (args and cat) will be dynamically overwritten in ‘model/input/` through time_windows. If ‘parallel’, input data is dynamically writing anew in ‘results/{time_window}/input/{model_name}/’.

  • stage_dir (str) – Whether input data is stored persistently in the run_dir or just in tmp cache (Only for parallel execution).

  • run_id (str) – Job ID of the run for parallel execution and tmp storing of input data

Return type:

None

property fmt: str

Returns: The extension or format of the forecast

forecast_exists(timewindow)[source]

Checks if forecasts exist for a sequence of time_windows

Parameters:

timewindow (str, list) – A single or sequence of strings representing a time window

Returns:

A list of bool representing the existence of such forecasts.

Return type:

bool | Sequence[bool]

get_args_key(*args)[source]

Gets the filepath of an arguments file for a given sequence of keys (usually a timewindow string).

Parameters:

*args (Sequence[str]) – A sequence of keys (usually time-window strings)

Returns:

The argument file’s key(s) from a sequence of key values

Return type:

str

get_args_template_path()[source]

Path to the model’s canonical args template: <model.path>/input/<args_file>. Exists regardless of staging mode. This file should come with the source model

Return type:

Path

get_forecast_dir()[source]

Returns the directory that contains the forecasts.

Return type:

Path

get_forecast_key(*args)[source]

Gets the filepath of a forecast for a given sequence of keys (usually a timewindow string).

Parameters:

*args (Sequence[str]) – A sequence of keys (usually time-window strings)

Returns:

The forecast registry from a sequence of key values

Return type:

str

get_input_catalog_key(*args)[source]

Gets the filepath of the input catalog for a given sequence of keys (usually a timewindow string).

Parameters:

*args (Sequence[str]) – A sequence of keys (usually time-window strings)

Returns:

The input catalog registry key from a sequence of key values

Return type:

str

get_input_dir(tstring)[source]

Returns the directory that contains the per-window input files (args/catalog).

Parameters:

tstring (str)

Return type:

Path

class floatcsep.infrastructure.registries.ExperimentFileRegistry(workdir, run_dir='results')[source]

Bases: ExperimentRegistry, FilepathMixin

The class has the responsibility of managing the keys (based on models, timewindow and evaluation name strings) to the structure of the experiment inputs (catalogs, models etc) and results from the competing evaluations. It keeps track of the forecast registries, as well as the existence of results and their path in the filesystem.

Parameters:
  • workdir (Path) – The working directory for the experiment run-time.

  • run_dir (str) – The directory in which the results will be stored.

add_model_registry(model)[source]

Adds a model’s ForecastRegistry to the ExperimentFileRegistry.

Parameters:

model (str) – A Model object

Return type:

None

as_dict()[source]
Return type:

Path

build_tree(time_windows, models, tests, run_mode='sequential')[source]

Creates the run directory and reads the file structure inside.

Parameters:
Return type:

None

get_attr(*args)[source]
Parameters:

*args (Any) – A sequence of keys (usually models, tests and/or time-window strings)

Returns:

The filepath from a sequence of key values (usually models first, then time-window strings)

Return type:

Path

get_figure_key(*args)[source]

Gets the file path of a result figure.

Parameters:

*args (Sequence[any]) – A sequence of keys (usually tests and/or time-window strings)

Returns:

The filepath of the figure for a given result

Return type:

str

get_model_registry(model_name)[source]

Retrieves a model’s ForecastRegistry from the ExperimentFileRegistry.

Parameters:

model_name (str) – The name of the model.

Returns:

The ModelRegistry associated with the model.

Return type:

ModelRegistry

get_result_key(*args)[source]

Gets the file path of an evaluation result.

Parameters:

args (Sequence[any]) – A sequence of keys (usually models, tests and/or time-window strings)

Returns:

The filepath of a serialized result

Return type:

str

get_test_catalog_key(*args)[source]

Gets the file path of a testing catalog.

Parameters:

*args (Sequence[any]) – A sequence of keys (time-window strings)

Returns:

The filepath of the testing catalog for a given time-window

Return type:

str

result_exist(timewindow_str, test_name, model_name)[source]

Checks if a given test results exist

Parameters:
  • timewindow_str (str) – String representing the time window

  • test_name (str) – Name of the evaluation

  • model_name (str) – Name of the model

Return type:

bool

Repositories

class floatcsep.infrastructure.repositories.CatalogForecastRepository(registry, **kwargs)[source]

Bases: ForecastRepository

The class is responsible to access (or store in memory) the catalog-based forecasts of a model. The flag lazy_load can be set to False so the catalogs are stored in memory and reduce the time required to parse files.

Parameters:
  • registry (ModelRegistry) – The registry containing the keys/path to the forecasts given their time-windows.

  • **kwargs

load_forecast(tstring, name=None, region=None, n_sims=None)[source]

Returns a forecast object or a sequence of them for a set of time window strings.

Parameters:
  • tstring (str, list) – String representing the time-window.

  • name (str) – Name of the forecast model.

  • region (optional) – A region, in case the forecast requires to be filtered lazily.

  • (optional (n_sims) – The number of simulations/synthetic catalogs of the forecast.

Returns:

The CSEP CatalogForecast object or a list of them.

Return type:

CatalogForecast | list[CatalogForecast]

remove(tstring)[source]
Parameters:

tstring (str | Sequence[str])

class floatcsep.infrastructure.repositories.GriddedForecastRepository(registry, **kwargs)[source]

Bases: ForecastRepository

The class is responsible to access (or store in memory) the gridded-based forecasts of a model. A keyword lazy_load can be set to False so the catalogs are stored in memory and avoid parsing files repeatedly (Skip for large files).

Parameters:
  • registry (ModelRegistry) – The registry containing the keys/path to the forecasts given their time-windows.

  • **kwargs

load_forecast(tstring=None, name='', region=None, forecast_unit=1)[source]

Returns a forecast object or a sequence of them for a set of time window strings.

Parameters:
  • tstring (str, list) – String representing the time-window

  • name (str) – Forecast name

  • region (optional) – A region, in case the forecast requires to be filtered lazily.

  • forecast_unit (float) – The time unit (in decimal years) that the forecast represents

Returns:

The CSEP CatalogForecast object or a list of them.

Return type:

GriddedForecast | Sequence[GriddedForecast]

remove(tstring)[source]
Parameters:

tstring (str | Sequence[str])

class floatcsep.infrastructure.repositories.ResultsRepository(registry)[source]

Bases: object

The class is responsible to access, read and write the results of a given evaluation

Parameters:

registry (ExperimentRegistry) – The registry of an experiment, which keeps track of the filepaths of each result.

load_results(test, window, models)[source]

Reads an Evaluation result for a given time window and returns a list of the results for all tested models.

Parameters:
  • test (Evaluation) – The tests for which the results are to be loaded

  • window (str, list) – The time-windows for which the results are to be loaded

  • models (Model, list) – The models for which the results are to be loaded

Return type:

List | EvaluationResult

write_result(result, test, model, window)[source]

Writes the evaluation results using their method .to_dict() as json file.

Parameters:
  • result (EvaluationResult) – CSEP evaluation result

  • test – Name of the test

  • model – Name of the model

  • window – Name of the time-window

Return type:

None

class floatcsep.infrastructure.repositories.CatalogRepository(registry)[source]

Bases: object

The class handles the main and sub-catalogs from the experiment. It is responsible for accessing, downloading, storing the main catalog, as well as filtering and storing the corresponding input-catalogs (e.g., input for a model to be run) and test-catalogs (catalogs for the model’s forecasts to be evaluated against).

Parameters:

registry (ExperimentRegistry) – The registry of the experiment

as_dict()[source]
property catalog: CSEPCatalog

Returns a CSEP catalog loaded from the given query function or a stored file if it exists.

filter_catalog(start_date=None, end_date=None, min_mag=None, max_mag=None, min_depth=None, max_depth=None, region=None)[source]

Wrapper for pyCSEP catalog filters, to constrain a catalog to a given time and magnitude range, as well to a spatial region.

Parameters:
Returns:

Filtered catalog

Return type:

CSEPCatalog

get_test_cat(tstring=None, fmt='json')[source]

Filters the complete experiment catalog to a test sub-catalog bounded by the test time-window. Writes it to filepath defined in Experiment.registry

Parameters:
  • tstring (str) – Time window string

  • fmt (str) – Format of the catalog to be used

Return type:

CSEPCatalog

set_input_cats(tstring, models, fmt='ascii')[source]

Filters the complete experiment catalog to input sub-catalog filtered to the beginning of the test time-window.

Parameters:
  • tstring (str) – Time window string

  • model (Model) – Model to give the input catalog

  • fmt (str) – Output catalog format

  • models (List[Model])

Return type:

None

set_main_catalog(catalog, time_config, region_config)[source]

Sets the catalog to be used for the experiment.

Parameters:
set_test_cats(tstring, fmt='json')[source]

Filters the complete experiment catalog to a test sub-catalog bounded by the test time-window. Writes it to filepath defined in Experiment.registry

Parameters:
  • tstring (str) – Time window string

  • fmt (str) – Output catalog format

Return type:

None

Environments

class floatcsep.infrastructure.environments.CondaManager(base_name, model_directory)[source]

Bases: EnvironmentManager

Manages a conda (or mamba) environment, providing methods to create, check and manipulate conda environments specifically.

Initializes the Conda environment manager with the specified base name and model directory. It also generates the environment name and detects the package manager (conda or mamba) to install dependencies.

Parameters:
  • base_name (str) – The base name, i.e., model name, for the conda environment.

  • model_directory (str) – The directory containing the model files.

create_environment(force=False)[source]

Creates a conda environment using either an environment.yml file or the specified Python version in setup.py/setup.cfg or project/toml. If ‘force’ is True, any existing environment with the same name will be removed first.

Parameters:

force (bool) – Whether to forcefully remove an existing environment.

static detect_package_manager()[source]

Detects whether ‘mamba’ or ‘conda’ is available as the package manager.

Returns:

The name of the detected package manager (‘mamba’ or ‘conda’).

Return type:

str

detect_python_version()[source]

Determines the required Python version from setup files in the model directory. It checks ‘setup.py’, ‘pyproject.toml’, and ‘setup.cfg’ (in that order), for version specifications.

Returns:

The build python version.

Return type:

version (str)

env_exists()[source]

Checks if the conda environment exists by querying the list of existing conda environments.

Returns:

True if the conda environment exists, False otherwise.

Return type:

bool

install_dependencies()[source]

Installs the necessary dependencies for the environment based on the specified configuration or requirements.

Return type:

None

run_command(command, **kwargs)[source]

Runs a specified command within the conda environment.

Parameters:

command (str) – The command to be executed in the conda environment.

Return type:

None

class floatcsep.infrastructure.environments.VenvManager(base_name, model_directory)[source]

Bases: EnvironmentManager

Manages a virtual environment created using Python’s venv module. Provides methods to create, check, and manipulate virtual environments.

Initializes the virtual environment manager with the specified base name and model directory.

Parameters:
  • base_name (str) – The base name (i.e., model name) for the virtual environment.

  • model_directory (str) – The directory containing the model files.

create_environment(force=False)[source]

Creates a virtual environment in the specified model directory. If ‘force’ is True, any existing virtual environment will be removed before creation.

Parameters:

force (bool) – Whether to forcefully remove an existing virtual environment.

env_exists()[source]

Checks if the virtual environment exists by verifying the presence of its directory.

Returns:

True if the virtual environment exists, False otherwise.

Return type:

bool

install_dependencies()[source]

Installs dependencies in the virtual environment using pip, based on the model directory’s configuration.

Return type:

None

run_command(command, **kwargs)[source]

Executes a specified command in the virtual environment and logs the output.

Parameters:

command (str) – The command to be executed in the virtual environment.

Return type:

None

class floatcsep.infrastructure.environments.DockerManager(base_name, model_directory)[source]

Bases: EnvironmentManager

Manages a Docker environment, providing methods to create, check and manipulate Docker containers for the environment.

Initializes the environment manager with a base name and model directory.

Parameters:
  • base_name (str) – The base name for the environment.

  • model_directory (str) – The directory containing the model files.

create_environment(force=False)[source]

Build (or rebuild) the Docker image for this model.

Parameters:

force (bool)

Return type:

None

env_exists()[source]

Checks if the Docker image with the given tag already exists.

Returns:

True if the Docker image exists, False otherwise.

Return type:

bool

install_dependencies()[source]

Installs dependencies for Docker-based models. This is typically handled by the Dockerfile, so no additional action is needed here.

Return type:

None

static kill_containers(label_key, label_value_prefix=None)[source]
Parameters:
  • label_key (str)

  • label_value_prefix (str)

run_command(command=None, run_label=None, input_volume=None, forecast_volume=None, mem_limit=None, cpus=None)[source]

Runs the model’s Docker container with input/ and forecasts/ mounted. Streams logs and checks for non-zero exit codes.

Parameters:
Return type:

None

class floatcsep.infrastructure.environments.EnvironmentFactory[source]

Bases: object

Factory class for creating instances of environment managers based on the specified type.

static check_environment_type()[source]
Return type:

str | None

static get_env(build=None, model_name='model', model_path=None)[source]

Returns an instance of an environment manager based on the specified build type. It checks the current environment type and can return a conda, venv, or Docker environment manager.

Parameters:
  • build (str) – The desired type of environment (‘conda’, ‘venv’, or ‘docker’).

  • model_name (str) – The name of the model for which the environment is being created.

  • model_path (str) – The path to the model directory.

Returns:

An instance of the appropriate environment manager.

Return type:

EnvironmentManager

Raises:

Exception – If an invalid environment type is specified.

Engine

The components here are in charge of managing and executing the floatCSEP workflow.

class floatcsep.infrastructure.engine.Task(instance, method, **kwargs)[source]

Bases: object

Represents a unit of work to be executed later as part of a task graph.

A Task wraps an object instance, a method, and its arguments to allow for deferred execution. This is useful in workflows where tasks need to be executed in a specific order, often dictated by dependencies on other tasks.

For instance, can wrap a floatcsep.model.Model, its method ‘create_forecast’ and the argument ‘time_window’, which can be executed later with Task.call() when, for example, task dependencies (parent nodes) have been completed.

Parameters:
  • instance (object) – The instance whose method will be executed later.

  • method (str) – The method of the instance that will be called.

  • **kwargs – Arguments to pass to the method when it is invoked.

run()[source]

Executes the task by calling the method on the object instance with the stored arguments. If the instance has a store attribute, it will use that instead of the instance itself. Once executed, the result is stored in the store attribute if any output is produced.

Returns:

The output of the method execution, or None if the method does not return anything.

sign_match(obj=None, meth=None, kw_arg=None)[source]

Checks whether the task matches a given function signature.

This method is used to verify if a task belongs to a given object, method, or if it uses a specific keyword argument. Useful for identifying tasks in a graph based on partial matches of their attributes.

Parameters:
  • obj (object | str) – The object instance or its name (str) to match against.

  • meth (str) – The method name to match against.

  • kw_arg (Any) – A specific keyword argument value to match against in the task’s arguments.

Returns:

True if the task matches the provided signature, False otherwise.

Return type:

bool

class floatcsep.infrastructure.engine.TaskGraph[source]

Bases: object

Context manager of floatcsep workload distribution.

A TaskGraph is responsible for adding tasks, managing dependencies between tasks, and executing tasks in the correct order. Tasks in the graph can depend on one another, and the graph ensures that each task is run after all of its dependencies have been satisfied. Contains a Task dictionary whose dict_keys are the Task to be executed with dict_values as the Task’s dependencies.

Initializes the TaskGraph with an empty task dictionary and task count.

add(task)[source]

Adds a new task to the task graph.

The task is added to the dictionary of tasks with no dependencies by default.

Parameters:

task (Task) – The task to be added to the graph.

add_dependency(task, dep_inst=None, dep_meth=None, dkw=None)[source]

Adds a dependency to a task already within the graph.

Searches for other tasks within the graph whose signature matches the provided object instance, method name, or keyword argument. Any matches are added as dependencies to the provided task.

Parameters:
  • task (Task) – The task to which dependencies will be added.

  • dep_inst (object | str) – The object instance or name of the dependency.

  • dep_meth (str) – The method name of the dependency.

  • dkw (Any) – A specific keyword argument value of the dependency.

Returns:

None

property ntasks: int

Returns the number of tasks currently in the graph.

Returns:

The total number of tasks in the graph.

Return type:

int

run()[source]

Executes in sequential all tasks in the task graph according to the order set in Experiment.set_tasks().

Iterates over each task in the graph and runs it after its dependencies have been resolved.

Returns:

None

run_parallel(max_workers)[source]
Parameters:

max_workers (int)