Infrastructure Module
Here are shown the modules that manage the relations between the core classes of floatCSEP
and the required workflow to run an Experiment.
Registries
- class floatcsep.infrastructure.registries.ModelFileRegistry(model_name, workdir, path, args_file=None, input_cat=None, fmt=None)[source]
Bases:
ModelRegistry,FilepathMixinThe class is responsible to handle the keys (in this case filepaths) to access model objects such as forecasts, input catalogs or argument/parameter files. These keys are used mainly by
floatcsep.infrastructure.repositories.ForecastRepositoryorfloatcsep.infrastructure.repositories.CatalogRepository.- Parameters:
model_name (str) – Model’s identifier string
workdir (str) – The current working directory of the experiment.
path (str) – The path of the model working directory (or model filepath).
args_file (str) – The path of the arguments file (only for TimeDependentModel).
input_cat (str) – : The path of the arguments file (only for TimeDependentModel).
fmt (str)
- as_dict()[source]
- Returns:
Simple dictionary serialization of the instance with the core attributes
- Return type:
- build_tree(time_windows=None, model_class='TimeIndependentModel', prefix=None, run_mode=typing.Literal['sequential', 'parallel'], stage_dir='results', run_id='run')[source]
Creates the run directory, and reads the file structure inside.
- Parameters:
model_class (str) – Model’s class name
prefix (str) – prefix of the model forecast filenames if TD
run_mode (str) – if run mode is sequential, input data (args and cat) will be dynamically overwritten in ‘model/input/` through time_windows. If ‘parallel’, input data is dynamically writing anew in ‘results/{time_window}/input/{model_name}/’.
stage_dir (str) – Whether input data is stored persistently in the run_dir or just in tmp cache (Only for parallel execution).
run_id (str) – Job ID of the run for parallel execution and tmp storing of input data
- Return type:
None
- get_args_key(*args)[source]
Gets the filepath of an arguments file for a given sequence of keys (usually a timewindow string).
- get_args_template_path()[source]
Path to the model’s canonical args template: <model.path>/input/<args_file>. Exists regardless of staging mode. This file should come with the source model
- Return type:
- get_forecast_key(*args)[source]
Gets the filepath of a forecast for a given sequence of keys (usually a timewindow string).
- class floatcsep.infrastructure.registries.ExperimentFileRegistry(workdir, run_dir='results')[source]
Bases:
ExperimentRegistry,FilepathMixinThe class has the responsibility of managing the keys (based on models, timewindow and evaluation name strings) to the structure of the experiment inputs (catalogs, models etc) and results from the competing evaluations. It keeps track of the forecast registries, as well as the existence of results and their path in the filesystem.
- Parameters:
- add_model_registry(model)[source]
Adds a model’s ForecastRegistry to the ExperimentFileRegistry.
- Parameters:
model (str) – A Model object
- Return type:
None
- build_tree(time_windows, models, tests, run_mode='sequential')[source]
Creates the run directory and reads the file structure inside.
- get_model_registry(model_name)[source]
Retrieves a model’s ForecastRegistry from the ExperimentFileRegistry.
- Parameters:
model_name (str) – The name of the model.
- Returns:
The ModelRegistry associated with the model.
- Return type:
ModelRegistry
Repositories
- class floatcsep.infrastructure.repositories.CatalogForecastRepository(registry, **kwargs)[source]
Bases:
ForecastRepositoryThe class is responsible to access (or store in memory) the catalog-based forecasts of a model. The flag lazy_load can be set to False so the catalogs are stored in memory and reduce the time required to parse files.
- Parameters:
registry (ModelRegistry) – The registry containing the keys/path to the forecasts given their time-windows.
**kwargs
- class floatcsep.infrastructure.repositories.GriddedForecastRepository(registry, **kwargs)[source]
Bases:
ForecastRepositoryThe class is responsible to access (or store in memory) the gridded-based forecasts of a model. A keyword lazy_load can be set to False so the catalogs are stored in memory and avoid parsing files repeatedly (Skip for large files).
- Parameters:
registry (ModelRegistry) – The registry containing the keys/path to the forecasts given their time-windows.
**kwargs
- class floatcsep.infrastructure.repositories.ResultsRepository(registry)[source]
Bases:
objectThe class is responsible to access, read and write the results of a given evaluation
- Parameters:
registry (ExperimentRegistry) – The registry of an experiment, which keeps track of the filepaths of each result.
- load_results(test, window, models)[source]
Reads an Evaluation result for a given time window and returns a list of the results for all tested models.
- Parameters:
test (Evaluation) – The tests for which the results are to be loaded
window (str, list) – The time-windows for which the results are to be loaded
models (Model, list) – The models for which the results are to be loaded
- Return type:
- write_result(result, test, model, window)[source]
Writes the evaluation results using their method .to_dict() as json file.
- Parameters:
result (EvaluationResult) – CSEP evaluation result
test – Name of the test
model – Name of the model
window – Name of the time-window
- Return type:
None
- class floatcsep.infrastructure.repositories.CatalogRepository(registry)[source]
Bases:
objectThe class handles the main and sub-catalogs from the experiment. It is responsible for accessing, downloading, storing the main catalog, as well as filtering and storing the corresponding input-catalogs (e.g., input for a model to be run) and test-catalogs (catalogs for the model’s forecasts to be evaluated against).
- Parameters:
registry (ExperimentRegistry) – The registry of the experiment
- property catalog: CSEPCatalog
Returns a CSEP catalog loaded from the given query function or a stored file if it exists.
- filter_catalog(start_date=None, end_date=None, min_mag=None, max_mag=None, min_depth=None, max_depth=None, region=None)[source]
Wrapper for pyCSEP catalog filters, to constrain a catalog to a given time and magnitude range, as well to a spatial region.
- Parameters:
start_date (datetime.datetime) – Initial datetime
end_date (datetime.datetime) – Final datetime
min_mag (float) – Minimum magnitude to filter
max_mag (float) – Maximum magnitude to filter
min_depth (float) – Minimum depth to filter (positive downwards from surface)
max_depth (float) – Maximum depth to filter (positive downwards from surface)
region (csep.core.regions.CartesianGrid2D) – Spatial domain to filter
- Returns:
Filtered catalog
- Return type:
- get_test_cat(tstring=None, fmt='json')[source]
Filters the complete experiment catalog to a test sub-catalog bounded by the test time-window. Writes it to filepath defined in
Experiment.registry- Parameters:
- Return type:
- set_input_cats(tstring, models, fmt='ascii')[source]
Filters the complete experiment catalog to input sub-catalog filtered to the beginning of the test time-window.
Environments
- class floatcsep.infrastructure.environments.CondaManager(base_name, model_directory)[source]
Bases:
EnvironmentManagerManages a conda (or mamba) environment, providing methods to create, check and manipulate conda environments specifically.
Initializes the Conda environment manager with the specified base name and model directory. It also generates the environment name and detects the package manager (conda or mamba) to install dependencies.
- Parameters:
- create_environment(force=False)[source]
Creates a conda environment using either an environment.yml file or the specified Python version in setup.py/setup.cfg or project/toml. If ‘force’ is True, any existing environment with the same name will be removed first.
- Parameters:
force (bool) – Whether to forcefully remove an existing environment.
- static detect_package_manager()[source]
Detects whether ‘mamba’ or ‘conda’ is available as the package manager.
- Returns:
The name of the detected package manager (‘mamba’ or ‘conda’).
- Return type:
- detect_python_version()[source]
Determines the required Python version from setup files in the model directory. It checks ‘setup.py’, ‘pyproject.toml’, and ‘setup.cfg’ (in that order), for version specifications.
- Returns:
The build python version.
- Return type:
version (str)
- env_exists()[source]
Checks if the conda environment exists by querying the list of existing conda environments.
- Returns:
True if the conda environment exists, False otherwise.
- Return type:
- class floatcsep.infrastructure.environments.VenvManager(base_name, model_directory)[source]
Bases:
EnvironmentManagerManages a virtual environment created using Python’s venv module. Provides methods to create, check, and manipulate virtual environments.
Initializes the virtual environment manager with the specified base name and model directory.
- Parameters:
- create_environment(force=False)[source]
Creates a virtual environment in the specified model directory. If ‘force’ is True, any existing virtual environment will be removed before creation.
- Parameters:
force (bool) – Whether to forcefully remove an existing virtual environment.
- env_exists()[source]
Checks if the virtual environment exists by verifying the presence of its directory.
- Returns:
True if the virtual environment exists, False otherwise.
- Return type:
- class floatcsep.infrastructure.environments.DockerManager(base_name, model_directory)[source]
Bases:
EnvironmentManagerManages a Docker environment, providing methods to create, check and manipulate Docker containers for the environment.
Initializes the environment manager with a base name and model directory.
- Parameters:
- create_environment(force=False)[source]
Build (or rebuild) the Docker image for this model.
- Parameters:
force (bool)
- Return type:
None
- env_exists()[source]
Checks if the Docker image with the given tag already exists.
- Returns:
True if the Docker image exists, False otherwise.
- Return type:
- install_dependencies()[source]
Installs dependencies for Docker-based models. This is typically handled by the Dockerfile, so no additional action is needed here.
- Return type:
None
- class floatcsep.infrastructure.environments.EnvironmentFactory[source]
Bases:
objectFactory class for creating instances of environment managers based on the specified type.
- static get_env(build=None, model_name='model', model_path=None)[source]
Returns an instance of an environment manager based on the specified build type. It checks the current environment type and can return a conda, venv, or Docker environment manager.
- Parameters:
- Returns:
An instance of the appropriate environment manager.
- Return type:
EnvironmentManager
- Raises:
Exception – If an invalid environment type is specified.
Engine
The components here are in charge of managing and executing the floatCSEP workflow.
- class floatcsep.infrastructure.engine.Task(instance, method, **kwargs)[source]
Bases:
objectRepresents a unit of work to be executed later as part of a task graph.
A Task wraps an object instance, a method, and its arguments to allow for deferred execution. This is useful in workflows where tasks need to be executed in a specific order, often dictated by dependencies on other tasks.
For instance, can wrap a floatcsep.model.Model, its method ‘create_forecast’ and the argument ‘time_window’, which can be executed later with Task.call() when, for example, task dependencies (parent nodes) have been completed.
- Parameters:
- run()[source]
Executes the task by calling the method on the object instance with the stored arguments. If the instance has a store attribute, it will use that instead of the instance itself. Once executed, the result is stored in the store attribute if any output is produced.
- Returns:
The output of the method execution, or None if the method does not return anything.
- sign_match(obj=None, meth=None, kw_arg=None)[source]
Checks whether the task matches a given function signature.
This method is used to verify if a task belongs to a given object, method, or if it uses a specific keyword argument. Useful for identifying tasks in a graph based on partial matches of their attributes.
- Parameters:
- Returns:
True if the task matches the provided signature, False otherwise.
- Return type:
- class floatcsep.infrastructure.engine.TaskGraph[source]
Bases:
objectContext manager of floatcsep workload distribution.
A TaskGraph is responsible for adding tasks, managing dependencies between tasks, and executing tasks in the correct order. Tasks in the graph can depend on one another, and the graph ensures that each task is run after all of its dependencies have been satisfied. Contains a Task dictionary whose dict_keys are the Task to be executed with dict_values as the Task’s dependencies.
Initializes the TaskGraph with an empty task dictionary and task count.
- add(task)[source]
Adds a new task to the task graph.
The task is added to the dictionary of tasks with no dependencies by default.
- Parameters:
task (Task) – The task to be added to the graph.
- add_dependency(task, dep_inst=None, dep_meth=None, dkw=None)[source]
Adds a dependency to a task already within the graph.
Searches for other tasks within the graph whose signature matches the provided object instance, method name, or keyword argument. Any matches are added as dependencies to the provided task.
- property ntasks: int
Returns the number of tasks currently in the graph.
- Returns:
The total number of tasks in the graph.
- Return type: