import datetime
import filecmp
import hashlib
import logging
import os
import shutil
from os.path import join, abspath, relpath, dirname, isfile, split, exists
from pathlib import Path
from typing import Union, List, Dict, Sequence
import numpy
import yaml
import scipy
from floatcsep.evaluation import Evaluation
from floatcsep.infrastructure.logger import add_fhandler
from floatcsep.model import Model, TimeDependentModel
from floatcsep.infrastructure.registries import ExperimentRegistry
from floatcsep.infrastructure.repositories import ResultsRepository, CatalogRepository
from floatcsep.utils.helpers import (
NoAliasLoader,
read_time_cfg,
read_region_cfg,
timewindow2str,
parse_nested_dicts,
)
from floatcsep.infrastructure.engine import Task, TaskGraph
from floatcsep.infrastructure.logger import log_models_tree, log_results_tree
log = logging.getLogger("floatLogger")
[docs]
class Experiment:
"""
Main class that handles an Experiment's context. Contains all the specifications,
instructions and methods to carry out an experiment.
Args:
name (str): Experiment name
time_config (dict): Contains all the temporal specifications.
It can contain the following keys:
- start_date (:class:`datetime.datetime`):
Experiment start date
- end_date (:class:`datetime.datetime`):
Experiment end date
- exp_class (:class:`str`):
`ti` (Time-Independent) or `td` (Time-Dependent)
- intervals (:class:`int`): Number of testing intervals/windows
- horizon (:class:`str`, :py:class:`float`): Time length of the
forecasts (e.g. 1, 10, `1 year`, `2 days`). `ti` defaults to
years, `td` to days.
- growth (:class:`str`): `incremental` or `cumulative`
- offset (:class:`float`): recurrence of forecast creation.
For further details, see :func:`~floatcsep.utils.time_windows_ti`
and :func:`~floatcsep.utils.time_windows_td`
region_config (dict): Contains all the spatial and magnitude
specifications. It must contain the following keys:
- region (:py:class:`str`,
:class:`csep.core.regions.CartesianGrid2D`): The geographical
region, which can be specified as:
(i) the name of a :mod:`csep`/:mod:`floatcsep` default region
function (e.g. :func:`~csep.core.regions.california_relm_region`)
(ii) the name of a user function or
(iii) the path to a lat/lon file
- mag_min (:class:`float`): Minimum magnitude of the experiment
- mag_max (:class:`float`): Maximum magnitude of the experiment
- mag_bin (:class:`float`): Magnitud bin size
- magnitudes (:class:`list`, :class:`numpy.ndarray`): Explicit
magnitude bins
- depth_min (:class:`float`): Minimum depth. Defaults to -2
- depth_max (:class:`float`): Maximum depth. Defaults to 6000
model_config (str): Path to the models' configuration file
test_config (str): Path to the evaluations' configuration file
run_mode (str): 'sequential' or 'parallel'
default_test_kwargs (dict): Default values for the testing
(seed, number of simulations, etc.)
postprocess (dict): Contains the instruction for postprocessing
(e.g. plot forecasts, catalogs)
**kwargs: see Note
Note:
Instead of using `time_config` and `region_config`, an Experiment can
be instantiated using these dicts as keywords. (e.g. ``Experiment(
**time_config, **region_config)``, ``Experiment(start_date=start_date,
intervals=1, region='csep-italy', ...)``
"""
def __init__(
self,
name: str = None,
doi: str = None,
authors: str = None,
journal: str = None,
manuscript_doi: str = None,
catalog_doi: str = None,
last_run: str = None,
floatcsep_version: str = None,
pycsep_version: str = None,
exp_time: str = None,
LICENSE: str = None,
time_config: dict = None,
region_config: dict = None,
catalog: str = None,
models: str = None,
tests: str = None,
postprocess: str = None,
default_test_kwargs: dict = None,
run_dir: str = "results",
run_mode: str = "sequential",
stage_dir: ... = "results",
report_hook: dict = None,
**kwargs,
) -> None:
# todo
# - instantiate from full experiment register (ignoring test/models
# config), or self-awareness functions?
# - instantiate as python objects (models/tests config)
# - check if model region matches experiment region for nonQuadTree?
# Or filter region?
# Instantiate
workdir = Path(kwargs.get("path", os.getcwd())).resolve()
if kwargs.get("timestamp", False):
run_dir = Path(run_dir, f"run_{datetime.datetime.utcnow().date().isoformat()}")
os.makedirs(Path(workdir, run_dir), exist_ok=True)
self.name = name if name else "floatingExp"
self.authors = authors if authors else None
self.doi = doi if doi else None
self.journal = journal if journal else None
self.manuscript_doi = manuscript_doi if manuscript_doi else None
self.catalog_doi = catalog_doi if catalog_doi else None
self.last_run = last_run if last_run else None
self.floatcsep_version = floatcsep_version if floatcsep_version else None
self.pycsep_version = pycsep_version if pycsep_version else None
self.exp_time = exp_time if exp_time else None
self.LICENSE = LICENSE if LICENSE else None
self.registry = ExperimentRegistry.factory(workdir=workdir, run_dir=run_dir)
self.results_repo = ResultsRepository(self.registry)
self.catalog_repo = CatalogRepository(self.registry)
self.run_id = "run"
self.config_file = kwargs.get("config_file", None)
self.original_config = kwargs.get("original_config", None)
self.original_run_dir = kwargs.get("original_run_dir", None)
self.run_dir = run_dir
self.run_mode = run_mode
self.stage_dir = stage_dir
self.seed = kwargs.get("seed", None)
self.time_config = read_time_cfg(time_config, **kwargs)
self.region_config = read_region_cfg(region_config, **kwargs)
self.model_config = models if isinstance(models, str) else None
self.test_config = tests if isinstance(tests, str) else None
logger = kwargs.get("logging", False)
if logger:
filename = "experiment.log" if logger is True else logger
self.registry.logger = os.path.join(workdir, run_dir, filename)
log.info(f"Logging at {self.registry.logger}")
add_fhandler(self.registry.logger)
log.debug("-------- BEGIN OF RUN --------")
log.info(f"Setting up experiment {self.name}:")
log.info(f"\tStart: {self.start_date}")
log.info(f"\tEnd: {self.end_date}")
log.info(f"\tTime windows: {len(self.time_windows)}")
log.info(f"\tRegion: {self.region.name if self.region else None}")
log.info(
f"\tMagnitude range: [{numpy.min(self.magnitudes)},"
f" {numpy.max(self.magnitudes)}]"
)
exp_class_str = (
"Time-Dependent"
if self.exp_class in ("td", "time-dependent")
else "Time-Independent"
)
log.info(f"\tExperiment class: {exp_class_str}")
self.catalog = None
self.models = []
self.tests = []
self.postprocess = postprocess if postprocess else {}
self.default_test_kwargs = default_test_kwargs
self.catalog_repo.set_main_catalog(catalog, self.time_config, self.region_config)
self.models = self.set_models(
models or kwargs.get("model_config"), kwargs.get("order", None)
)
self.tests = self.set_tests(tests or kwargs.get("test_config"))
self.tasks = []
self.task_graph = None
self.report_hook = report_hook if report_hook else {}
self.force_rerun = kwargs.get("force_rerun", False)
def __getattr__(self, item: str) -> object:
"""
Override built-in method to return the experiment attributes by also using the command
``experiment.{attr}``. Adds also to the experiment scope the keys of
:attr:`region_config` or :attr:`time_config`. These are: ``start_date``, ``end_date``,
``time_windows``, ``horizon``, ``offset``, ``region``, ``magnitudes``, ``mag_min``,
`mag_max``, ``mag_bin``, ``depth_min`` depth_max .
"""
try:
return self.__dict__[item]
except KeyError:
try:
return self.time_config[item]
except KeyError:
try:
return self.region_config[item]
except KeyError:
raise AttributeError(
f"Experiment '{self.name}'" f" has no attribute '{item}'"
) from None
def __dir__(self):
"""Adds the time and region configs keys the to instance scope."""
_dir = (
list(super().__dir__()) + list(self.time_config.keys()) + list(self.region_config)
)
return sorted(_dir)
[docs]
def set_models(self, model_config: Union[Dict, str, List], order: List = None) -> List:
"""
Parse the models' configuration file/dict. Instantiates all the models as
:class:`floatcsep.model.Model` and store them into :attr:`Experiment.models`.
Args:
model_config (dict, list, str): configuration file or dictionary
containing the model initialization attributes, as defined in
:meth:`~floatcsep.model.Model`
order (list): desired order of models
"""
models = []
if isinstance(model_config, str):
modelcfg_path = self.registry.abs(model_config)
_dir = self.registry.abs_dir(model_config)
with open(modelcfg_path, "r") as file_:
config_dict = yaml.load(file_, NoAliasLoader)
elif isinstance(model_config, (dict, list)):
config_dict = model_config
_dir = self.registry.workdir
elif model_config is None:
return models
else:
raise NotImplementedError(
f"Load for model type" f" {model_config.__class__}" f"not implemented "
)
for element in config_dict:
# Check if the model is unique or has multiple submodels
if not any("flavours" in i for i in element.values()):
name_ = next(iter(element))
path_ = self.registry.rel(_dir, element[name_]["path"])
model_i = {
name_: {
**element[name_],
"model_path": path_,
"workdir": self.registry.workdir,
}
}
model_i[name_].pop("path")
models.append(Model.factory(model_i))
else:
model_flavours = list(element.values())[0]["flavours"].items()
for flav, flav_path in model_flavours:
name_super = next(iter(element))
path_super = element[name_super].get("path", "")
path_sub = self.registry.rel(_dir, path_super, flav_path)
# updates name of submodel
name_flav = f"{name_super}@{flav}"
model_ = {
name_flav: {
**element[name_super],
"model_path": path_sub,
"workdir": self.registry.workdir,
}
}
model_[name_flav].pop("path")
model_[name_flav].pop("flavours")
models.append(Model.factory(model_))
# Checks if there is any repeated model.
names_ = [i.name for i in models]
if len(names_) != len(set(names_)):
reps = set([i for i in names_ if (sum([j == i for j in names_]) > 1)])
one = not bool(len(reps) - 1)
log.warning(
f'Warning: Model{"s" * (not one)} {reps}'
f' {"is" * one + "are" * (not one)} repeated'
)
log.info(f"\tModels: {[i.name for i in models]}")
if order:
models = [models[i] for i in order]
return models
[docs]
def get_model(self, name: str) -> Model:
"""Returns a Model by its name string."""
for model in self.models:
if model.name == name:
return model
raise Exception(f"No existing model with name {name}")
[docs]
def get_test(self, name: str) -> Evaluation:
"""Returns an Evaluation by its name string."""
for test in self.tests:
if test.name == name:
return test
raise Exception(f"No existing evaluation with name {name}")
[docs]
def stage_models(self) -> None:
"""
Stages all the experiment's models. See :meth:`floatcsep.model.Model.stage`
"""
log.info("Staging models")
for i in self.models:
i.stage(
self.time_windows,
run_mode=self.run_mode,
stage_dir=self.stage_dir,
run_id=self.run_id,
)
self.registry.add_model_registry(i)
[docs]
def set_tests(self, test_config: Union[str, Dict, List]) -> list:
"""
Parse the tests' configuration file/dict. Instantiate them as
:class:`floatcsep.evaluation.Evaluation` and store them into
:attr:`Experiment.tests`.
Args:
test_config (dict, list, str): configuration file or dictionary
containing the evaluation initialization attributes, as defined in
:meth:`~floatcsep.evaluation.Evaluation`
"""
tests = []
if isinstance(test_config, str):
with open(self.registry.abs(test_config), "r") as config:
config_dict = yaml.load(config, NoAliasLoader)
for eval_dict in config_dict:
eval_i = Evaluation.from_dict(eval_dict)
eval_i.results_repo = self.results_repo
eval_i.catalog_repo = self.catalog_repo
tests.append(eval_i)
elif isinstance(test_config, (dict, list)):
for eval_dict in test_config:
eval_i = Evaluation.from_dict(eval_dict)
eval_i.results_repo = self.results_repo
eval_i.catalog_repo = self.catalog_repo
tests.append(eval_i)
log.info(f"\tEvaluations: {[i.name for i in tests]}")
return tests
[docs]
def set_tree(self):
self.registry.build_tree(self.time_windows, self.models, self.tests, self.run_mode)
[docs]
def set_tasks(self) -> None:
"""
Lazy definition of the experiment core tasks by wrapping instances,
methods and arguments. Creates a graph with task nodes, while assigning
task-parents to each node, depending on each Evaluation signature.
The tasks can then be run in sequential as a list or asynchronous
using the graph's node dependencies.
For instance:
* A forecast can only be made if catalog was filtered to its window
* A consistency test can be run if the forecast exists in a window
* A comparison test requires the forecast and ref forecast
* A sequential test requires the forecasts exist for all windows
* A batch test requires all forecast exist for a given window.
"""
self.set_tree()
log.debug("Pre-run forecast summary")
log_models_tree(log, self.registry, self.time_windows)
log.debug("Pre-run result summary")
log_results_tree(log, self.registry)
log.info("Setting up experiment's tasks")
# Get the time windows strings
tw_strings = timewindow2str(self.time_windows)
# Prepare the testing catalogs
task_graph = TaskGraph()
for time_i in tw_strings:
task_i = Task(instance=self.catalog_repo, method="set_test_cats", tstring=time_i)
task_graph.add(task_i)
if self.exp_class in ["td", "time_dependent"]:
task_j = Task(
instance=self.catalog_repo,
method="set_input_cats",
tstring=time_i,
models=self.models,
)
task_graph.add(task=task_j)
# Set up the Forecasts creation
for time_i in tw_strings:
for model_j in self.models:
task_ij = Task(
instance=model_j,
method="create_forecast",
tstring=time_i,
force=self.force_rerun,
)
task_graph.add(task=task_ij)
# A catalog needs to have been filtered
if isinstance(model_j, TimeDependentModel):
task_graph.add_dependency(
task_ij,
dep_inst=self.catalog_repo,
dep_meth="set_input_cats",
dkw=(time_i, model_j),
)
task_graph.add_dependency(
task_ij, dep_inst=self.catalog_repo, dep_meth="set_test_cats", dkw=time_i
)
# Set up the Consistency Tests
for test_k in self.tests:
if test_k.type == "consistency":
for time_i in tw_strings:
for model_j in self.models:
task_ijk = Task(
instance=test_k,
method="compute",
timewindow=time_i,
model=model_j,
region=self.region,
)
task_graph.add(task_ijk)
# the forecast needs to have been created
task_graph.add_dependency(
task_ijk, dep_inst=model_j, dep_meth="create_forecast", dkw=time_i
)
# Set up the Comparative Tests
elif test_k.type == "comparative":
for time_i in tw_strings:
for model_j in self.models:
task_ik = Task(
instance=test_k,
method="compute",
timewindow=time_i,
model=model_j,
ref_model=self.get_model(test_k.ref_model),
region=self.region,
)
task_graph.add(task_ik)
task_graph.add_dependency(
task_ik, dep_inst=model_j, dep_meth="create_forecast", dkw=time_i
)
task_graph.add_dependency(
task_ik,
dep_inst=self.get_model(test_k.ref_model),
dep_meth="create_forecast",
dkw=time_i,
)
# Set up the Sequential Scores
elif test_k.type == "sequential":
for model_j in self.models:
task_k = Task(
instance=test_k,
method="compute",
timewindow=tw_strings,
model=model_j,
region=self.region,
)
task_graph.add(task_k)
for tw_i in tw_strings:
task_graph.add_dependency(
task_k, dep_inst=model_j, dep_meth="create_forecast", dkw=tw_i
)
# Set up the Sequential_Comparative Scores
elif test_k.type == "sequential_comparative":
tw_strs = timewindow2str(self.time_windows)
for model_j in self.models:
task_k = Task(
instance=test_k,
method="compute",
timewindow=tw_strs,
model=model_j,
ref_model=self.get_model(test_k.ref_model),
region=self.region,
)
task_graph.add(task_k)
for tw_i in tw_strings:
task_graph.add_dependency(
task_k, dep_inst=model_j, dep_meth="create_forecast", dkw=tw_i
)
task_graph.add_dependency(
task_k,
dep_inst=self.get_model(test_k.ref_model),
dep_meth="create_forecast",
dkw=tw_i,
)
# Set up the Batch comparative Scores
elif test_k.type == "batch":
time_str = timewindow2str(self.time_windows[-1])
for model_j in self.models:
task_k = Task(
instance=test_k,
method="compute",
timewindow=time_str,
ref_model=self.models,
model=model_j,
region=self.region,
)
task_graph.add(task_k)
for m_j in self.models:
task_graph.add_dependency(
task_k, dep_inst=m_j, dep_meth="create_forecast", dkw=time_str
)
self.task_graph = task_graph
[docs]
def run(self) -> None:
"""
Run the task tree.
todo:
- Cleanup forecast (perhaps add a clean task in self.prepare_tasks,
after all test had been run for a given forecast)
- Memory monitor?
- Queuer?
"""
if self.seed:
numpy.random.seed(self.seed)
if self.run_mode == "parallel":
cpu_count = os.cpu_count() or 4
workers = getattr(self, "concurrent_tasks", None) or min(cpu_count, 32)
self.task_graph.run_parallel(max_workers=workers)
else:
self.task_graph.run()
log.debug("Post-run forecast registry")
log_models_tree(log, self.registry, self.time_windows)
log.debug("Post-run result summary")
log_results_tree(log, self.registry)
[docs]
def read_results(self, test: Evaluation, window: str) -> List:
"""
Reads an Evaluation result for a given time window and returns a list of the results
for all tested models.
"""
return test.read_results(window, self.models)
[docs]
def make_repr(self) -> None:
"""
Creates a reproducibility configuration file, re-directing the forecasts/catalog paths,
in order to reproduce the existing results and compare them with previous runs.
"""
log.info("Creating reproducibility config file")
repr_config = self.registry.get_attr("repr_config")
# Dropping region to results folder if it is a file
region_path = self.region_config.get("path", False)
if isinstance(region_path, str):
if isfile(region_path) and region_path:
new_path = join(self.registry.run_dir, self.region_config["path"])
shutil.copy2(region_path, new_path)
self.region_config.pop("path")
self.region_config["region"] = self.registry.rel(new_path)
# Dropping catalog to results folder
target_cat = join(
self.registry.workdir, self.registry.run_dir, split(self.catalog_repo.cat_path)[-1]
)
if not exists(target_cat):
shutil.copy2(self.registry.abs(self.catalog_repo.cat_path), target_cat)
relative_path = Path(
os.path.relpath(self.registry.workdir.as_posix(), self.registry.run_dir.as_posix())
)
self.registry.workdir = relative_path
self.to_yml(repr_config.as_posix(), extended=True)
[docs]
def as_dict(self, extra: Sequence = (), extended=False) -> dict:
"""
Converts an Experiment instance into a dictionary.
Args:
extra: additional instance attribute to include in the dictionary.
extended: Include explicit parameters
Returns:
A dictionary with serialized instance's attributes, which are
floatCSEP readable
"""
dict_walk = {
"name": self.name,
"config_file": self.config_file,
"path": self.registry.workdir.as_posix(),
"run_dir": Path(self.registry.workdir, self.registry.run_dir.name),
"time_config": {
i: j
for i, j in self.time_config.items()
if (i not in ("time_windows",) or extended)
},
"region_config": {
i: j
for i, j in self.region_config.items()
if (i not in ("magnitudes", "depths") or extended)
},
"catalog": self.registry.rel(self.catalog_repo.cat_path).as_posix(),
"models": [i.as_dict() for i in self.models],
"tests": [i.as_dict() for i in self.tests],
}
dict_walk.update(extra)
return parse_nested_dicts(dict_walk)
[docs]
def to_yml(self, filename: str, **kwargs) -> None:
"""
Serializes the :class:`~floatcsep.experiment.Experiment` instance into a .yml file.
Note:
This instance can then be reinstantiated using
:meth:`~floatcsep.experiment.Experiment.from_yml`
Args:
filename: Name of the file onto which dump the instance
**kwargs: Pass to :meth:`~floatcsep.experiment.Experiment.as_dict`
Returns:
"""
class NoAliasDumper(yaml.Dumper):
def ignore_aliases(self, data):
return True
with open(filename, "w") as f_:
yaml.dump(
self.as_dict(**kwargs),
f_,
Dumper=NoAliasDumper,
sort_keys=False,
default_flow_style=False,
indent=1,
width=70,
)
[docs]
@classmethod
def from_yml(cls, config_yml: str, repr_dir=None, **kwargs):
"""
Initializes an experiment from a .yml file. It must contain the.
attributes described in the :class:`~floatcsep.experiment.Experiment`,
:func:`~floatcsep.utils.read_time_config` and
:func:`~floatcsep.utils.read_region_config` descriptions
Args:
config_yml (str): The path to the .yml file
repr_dir (str): folder where to reproduce results
Returns:
An :class:`~floatcsep.experiment.Experiment` class instance
"""
log.info(f"Initializing experiment from {config_yml} file")
with open(config_yml, "r") as yml:
# experiment configuration file
_dict = yaml.load(yml, NoAliasLoader)
_dir_yml = dirname(config_yml)
# Only ABSOLUTE PATH
_dict["path"] = abspath(join(_dir_yml, _dict.get("path", "")))
# replaces run_dir case reproduce option is used
if repr_dir:
_dict["original_run_dir"] = _dict.get("run_dir", "results")
_dict["run_dir"] = relpath(join(_dir_yml, repr_dir), _dict["path"])
_dict["original_config"] = abspath(join(_dict["path"], _dict["config_file"]))
else:
_dict["run_dir"] = _dict.get("run_dir", kwargs.pop("run_dir", "results"))
_dict["config_file"] = relpath(config_yml, _dir_yml)
if "logging" in _dict:
kwargs.pop("logging")
return cls(**_dict, **kwargs)
class ExperimentComparison:
def __init__(self, original, reproduced, **kwargs):
""""""
self.original = original
self.reproduced = reproduced
self.num_results = {}
self.file_comp = {}
@staticmethod
def obs_diff(obs_orig, obs_repr):
return numpy.abs(
numpy.divide((numpy.array(obs_orig) - numpy.array(obs_repr)), numpy.array(obs_orig))
)
@staticmethod
def test_stat(test_orig, test_repr):
if isinstance(test_orig[0], str):
if not isinstance(test_orig[1], str):
stats = numpy.array(
[0, numpy.divide((test_repr[1] - test_orig[1]), test_orig[1]), 0, 0]
)
else:
stats = None
else:
stats_orig = numpy.array(
[numpy.mean(test_orig), numpy.std(test_orig), scipy.stats.skew(test_orig)]
)
stats_repr = numpy.array(
[numpy.mean(test_repr), numpy.std(test_repr), scipy.stats.skew(test_repr)]
)
ks = scipy.stats.ks_2samp(test_orig, test_repr)
stats = [*numpy.divide(numpy.abs(stats_repr - stats_orig), stats_orig), ks.pvalue]
return stats
def get_results(self):
win_orig = timewindow2str(self.original.time_windows)
tests_orig = self.original.tests
models_orig = [i.name for i in self.original.models]
results = dict.fromkeys([i.name for i in tests_orig])
for test in tests_orig:
if test.type in ["consistency", "comparative"]:
results[test.name] = dict.fromkeys(win_orig)
for tw in win_orig:
results_orig = self.original.read_results(test, tw)
results_repr = self.reproduced.read_results(test, tw)
results[test.name][tw] = {
models_orig[i]: {
"observed_statistic": self.obs_diff(
results_orig[i].observed_statistic,
results_repr[i].observed_statistic,
),
"test_statistic": self.test_stat(
results_orig[i].test_distribution,
results_repr[i].test_distribution,
),
}
for i in range(len(models_orig))
}
else:
results_orig = self.original.read_results(test, win_orig[-1])
results_repr = self.reproduced.read_results(test, win_orig[-1])
results[test.name] = {
models_orig[i]: {
"observed_statistic": self.obs_diff(
results_orig[i].observed_statistic,
results_repr[i].observed_statistic,
),
"test_statistic": self.test_stat(
results_orig[i].test_distribution, results_repr[i].test_distribution
),
}
for i in range(len(models_orig))
}
return results
@staticmethod
def get_hash(filename):
with open(filename, "rb") as f:
bytes_file = f.read()
readable_hash = hashlib.sha256(bytes_file).hexdigest()
return readable_hash
def get_filecomp(self):
win_orig = timewindow2str(self.original.time_windows)
tests_orig = self.original.tests
models_orig = [i.name for i in self.original.models]
results = dict.fromkeys([i.name for i in tests_orig])
for test in tests_orig:
if test.type in ["consistency", "comparative"]:
results[test.name] = dict.fromkeys(win_orig)
for tw in win_orig:
results[test.name][tw] = dict.fromkeys(models_orig)
for model in models_orig:
orig_path = self.original.registry.get_result_key(tw, test, model)
repr_path = self.reproduced.registry.get_result_key(tw, test, model)
results[test.name][tw][model] = {
"hash": (self.get_hash(orig_path) == self.get_hash(repr_path)),
"byte2byte": filecmp.cmp(orig_path, repr_path),
}
else:
results[test.name] = dict.fromkeys(models_orig)
for model in models_orig:
orig_path = self.original.registry.get_result_key(win_orig[-1], test, model)
repr_path = self.reproduced.registry.get_result_key(
win_orig[-1], test, model
)
results[test.name][model] = {
"hash": (self.get_hash(orig_path) == self.get_hash(repr_path)),
"byte2byte": filecmp.cmp(orig_path, repr_path),
}
return results
def compare_results(self):
self.num_results = self.get_results()
self.file_comp = self.get_filecomp()