Source code for floatcsep.utils

# python libraries
import copy
import hashlib

import numpy
import re
import multiprocessing
import os
import mercantile
import shapely.geometry
import scipy.stats
import itertools
import functools
import yaml
import pandas
import seaborn
import filecmp
from datetime import datetime, date
from functools import partial
from typing import Sequence, Union
from matplotlib import pyplot
from matplotlib.lines import Line2D
from collections import OrderedDict

# pyCSEP libraries
import six
import csep.core
import csep.utils
from csep.core.regions import CartesianGrid2D, compute_vertices
from csep.utils.plots import plot_spatial_dataset
from csep.models import Polygon
from csep.core.regions import QuadtreeGrid2D, geographical_area_from_bounds
from csep.utils.calc import cleaner_range

# floatCSEP libraries

import floatcsep.accessors
import floatcsep.extras
import floatcsep.readers

_UNITS = ['years', 'months', 'weeks', 'days']
_PD_FORMAT = ['YS', 'MS', 'W', 'D']

[docs] def parse_csep_func(func): """ Searchs in pyCSEP and floatCSEP a function or method whose name matches the provided string. Args: func (str, obj) : representing the name of the pycsep/floatcsep function or method Returns: The callable function/method object. If it was already callable, returns the same input """ def recgetattr(obj, attr, *args): def _getattr(obj_, attr_): return getattr(obj_, attr_, *args) return functools.reduce(_getattr, [obj] + attr.split('.')) if callable(func): return func elif func is None: return func else: _target_modules = [csep, csep.utils, csep.utils.plots, csep.core.regions, floatcsep.utils, floatcsep.accessors, floatcsep.extras, floatcsep.readers.HDF5Serializer, floatcsep.readers.ForecastParsers] for module in _target_modules: try: return recgetattr(module, func) except AttributeError: pass raise AttributeError( f'Evaluation/Plot/Region function {func} has not yet been' f' implemented in floatcsep or pycsep')
[docs] def parse_timedelta_string(window, exp_class='ti'): """ Parses a float or string representing the testing time window length Note: Time-independent experiments defaults to `year` as time unit whereas time-dependent to `days` Args: window (str, int): length of the time window exp_class (str): experiment class Returns: Formatted :py:class:`str` representing the length and unit (year, month, week, day) of the time window """ if isinstance(window, str): try: n, unit_ = [i for i in re.split(r'(\d+)', window) if i] unit = [i for i in [j[:-1] for j in _UNITS] if i in unit_.lower()][ 0] return f'{n}-{unit}s' except (ValueError, IndexError): raise ValueError('Time window is misspecified. ' 'Try the amount followed by the time unit ' '(e.g. 1 day, 1 months, 3 years)') elif isinstance(window, float): n = window unit = 'year' if exp_class == 'ti' else 'day' return f'{n}-{unit}s'
def read_time_cfg(time_config, **kwargs): """ Builds the temporal configuration of an experiment. Args: time_config (dict): Dictionary containing the explicit temporal attributes of the experiment (see `_attrs` local variable) **kwargs: Only the keywords contained in the local variable `_attrs` are captured. This ensures to capture the keywords passed to an :class:`~floatcsep.core.Experiment` object Returns: A dictionary containing the experiment time attributes and the time windows to be evaluated """ _attrs = ['start_date', 'end_date', 'intervals', 'horizon', 'offset', 'growth', 'exp_class'] time_config = copy.deepcopy(time_config) if time_config is None: time_config = {} try: experiment_class = time_config.get('exp_class', kwargs['exp_class']) except KeyError: experiment_class = 'ti' time_config['exp_class'] = experiment_class time_config.update({i: j for i, j in kwargs.items() if i in _attrs}) if 'horizon' in time_config.keys(): time_config['horizon'] = parse_timedelta_string(time_config['horizon']) if 'offset' in time_config.keys(): time_config['offset'] = parse_timedelta_string(time_config['offset']) if experiment_class == 'ti': time_config['timewindows'] = timewindows_ti(**time_config) return time_config elif experiment_class == 'td': time_config['timewindows'] = timewindows_td(**time_config) return time_config def read_region_cfg(region_config, **kwargs): """ Builds the region configuration of an experiment. Args: region_config (dict): Dictionary containing the explicit region attributes of the experiment (see `_attrs` local variable) **kwargs: Only the keywords contained in the local variable `_attrs` are captured. This ensures to capture the keywords passed to an :class:`~floatcsep.core.Experiment` object Returns: A dictionary containing the region attributes of the experiment """ region_config = copy.deepcopy(region_config) _attrs = ['region', 'mag_min', 'mag_max', 'mag_bin', 'magnitudes', 'depth_min', 'depth_max'] if region_config is None: region_config = {} region_config.update({i: j for i, j in kwargs.items() if i in _attrs}) dmin = region_config.get('depth_min', -2) dmax = region_config.get('depth_max', 6000) depths = cleaner_range(dmin, dmax, dmax - dmin) magnitudes = region_config.get('magnitudes', None) if magnitudes is None: magmin = region_config['mag_min'] magmax = region_config['mag_max'] magbin = region_config['mag_bin'] magnitudes = cleaner_range(magmin, magmax, magbin) region_data = region_config.get('region', None) try: region = parse_csep_func(region_data)(name=region_data, magnitudes=magnitudes) \ if region_data else None except AttributeError: if isinstance(region_data, str): filename = os.path.join(kwargs.get('path', ''), region_data) with open(filename, 'r') as file_: parsed_region = file_.readlines() try: data = numpy.array([re.split(r'\s+|,', i.strip()) for i in parsed_region], dtype=float) except ValueError: data = numpy.array([re.split(r'\s+|,', i.strip()) for i in parsed_region[1:]], dtype=float) dh1 = scipy.stats.mode( numpy.diff(numpy.unique(data[:, 0]))).mode dh2 = scipy.stats.mode( numpy.diff(numpy.unique(data[:, 1]))).mode dh = numpy.nanmin([dh1, dh2]) region = CartesianGrid2D.from_origins(data, name=region_data, magnitudes=magnitudes, dh=dh) region_config.update({'path': region_data}) else: region_data['magnitudes'] = magnitudes region = CartesianGrid2D.from_dict(region_data) region_config.update({'depths': depths, 'magnitudes': magnitudes, 'region': region}) return region_config
[docs] def timewindow2str(datetimes: Union[Sequence[datetime], Sequence[Sequence[datetime]]]): """ Converts a time window (list/tuple of datetimes) to a string that represents it. Can be a single timewindow or a list of time windows. Args: datetimes: Returns: """ if isinstance(datetimes[0], datetime): return '_'.join([ for j in datetimes]) elif isinstance(datetimes[0], (list, tuple)): return ['_'.join([ for j in i]) for i in datetimes]
def str2timewindow(tw_string: Union[str, Sequence[str]]): """ Converts a string representation of a time window into a list of datetimes representing the time window edges. Args: tw_string: Returns: """ if isinstance(tw_string, str): start_date, end_date = [datetime.fromisoformat(i) for i in tw_string.split('_')] return start_date, end_date elif isinstance(tw_string, (list, tuple)): datetimes = [] for twstr in tw_string: start_date, end_date = [datetime.fromisoformat(i) for i in twstr.split('_')] datetimes.append([start_date, end_date]) return datetimes
[docs] def timewindows_ti(start_date=None, end_date=None, intervals=None, horizon=None, growth='incremental', **_): """ Creates the testing intervals for a time-independent experiment. Note: The following argument combinations are possible: - (start_date, end_date) - (start_date, end_date, timeintervals) - (start_date, end_date, timehorizon) - (start_date, timeintervals, timehorizon) Args: start_date (datetime.datetime): Start of the experiment end_date (datetime.datetime): End of the experiment intervals (int): number of intervals to discretize the time span horizon (str): time length of each interval growth (str): incremental or cumulative time windows Returns: List of tuples containing the lower and upper boundaries of each testing window, as :py:class:`datetime.datetime`. """ frequency = None if (intervals is None) and (horizon is None): intervals = 1 elif horizon: n, unit = horizon.split('-') frequency = f'{n}{_PD_FORMAT[_UNITS.index(unit)]}' periods = intervals + 1 if intervals else intervals try: timelimits = pandas.date_range(start=start_date, end=end_date, periods=periods, freq=frequency).to_pydatetime() except ValueError as e_: raise ValueError( 'The following experiment parameters combinations are possible:\n' ' (start_date, end_date)\n' ' (start_date, end_date, intervals)\n' ' (start_date, end_date, timewindow)\n' ' (start_date, intervals, timewindow)\n') if growth == 'incremental': return [(i, j) for i, j in zip(timelimits[:-1], timelimits[1:])] elif growth == 'cumulative': return [(timelimits[0], i) for i in timelimits[1:]]
[docs] def timewindows_td(start_date=None, end_date=None, timeintervals=None, timehorizon=None, timeoffset=None, **_): """ Creates the testing intervals for a time-dependent experiment. Note: The following arg combinations are possible: - (start_date, end_date, timeintervals) - (start_date, end_date, timehorizon) - (start_date, timeintervals, timehorizon) - (start_date, end_date, timehorizon, timeoffset) - (start_date, timeinvervals, timehorizon, timeoffset) Args: start_date (datetime.datetime): Start of the experiment end_date (datetime.datetime): End of the experiment timeintervals (int): number of intervals to discretize the time span timehorizon (str): time length of each time window timeoffset (str): Offset between consecutive forecast. if None or timeoffset=timehorizon, windows are non-overlapping Returns: List of tuples containing the lower and upper boundaries of each testing window, as :py:class:`datetime.datetime`. """ frequency = None if timehorizon: n, unit = timehorizon.split('-') frequency = f'{n}{_PD_FORMAT[_UNITS.index(unit)]}' periods = timeintervals + 1 if timeintervals else timeintervals try: offset = timeoffset.split('-') if timeoffset else None start_offset = start_date + pandas.DateOffset( **{offset[1]: float(offset[0])}) if offset else start_date end_offset = end_date - pandas.DateOffset( **{offset[1]: float(offset[0])}) if offset else start_date lower_limits = pandas.date_range(start=start_date, end=end_offset, periods=periods, freq=frequency).to_pydatetime()[:-1] upper_limits = pandas.date_range(start=start_offset, end=end_date, periods=periods, freq=frequency).to_pydatetime()[:-1] except ValueError as e_: raise ValueError( 'The following experiment parameters combinations are possible:\n' ' (start_date, end_date)\n' ' (start_date, end_date, intervals)\n' ' (start_date, end_date, timewindow)\n' ' (start_date, intervals, timewindow)\n')
# if growth == 'incremental': # timewindows = [(i, j) for i, j in zip(timelimits[:-1], # timelimits[1:])] # elif growth == 'cumulative': # timewindows = [(timelimits[0], i) for i in timelimits[1:]] # return timewindows
[docs] class Task:
[docs] def __init__(self, instance, method, **kwargs): """ Base node of the workload distribution. Wraps lazily objects, methods and their arguments for them to be executed later. For instance, can wrap a floatcsep.Model, its method 'create_forecast' and the argument 'time_window', which can be executed later with when, for example, task dependencies (parent nodes) have been completed. Args: instance: can be floatcsep.Experiment, floatcsep.Model, floatcsep.Evaluation method: the instance's method to be lazily created **kwargs: keyword arguments passed to method. """ self.obj = instance self.method = method self.kwargs = kwargs = None # Bool for nested tasks. DEPRECATED
def sign_match(self, obj=None, met=None, kw_arg=None): """ Checks if the Task matchs a given signature for simplicity. Purpose is to check from the outside if the Task is from a given object (Model, Experiment, etc), matching either name or object or description Args: obj: Instance or instance's name str. Instance is preferred met: Name of the method kw_arg: Only the value (not key) of the kwargs dictionary Returns: """ if self.obj == obj or obj == getattr(self.obj, 'name', None): if met == self.method: if kw_arg in self.kwargs.values(): return True return False def __str__(self): task_str = f'{self.__class__}\n\t' \ f'Instance: {self.obj.__class__.__name__}\n' a = getattr(self.obj, 'name', None) if a: task_str += f'\tName: {a}\n' task_str += f'\tMethod: {self.method}\n' for i, j in self.kwargs.items(): task_str += f'\t\t{i}: {j} \n' return task_str[:-2]
[docs] def run(self): if hasattr(self.obj, 'store'): self.obj = output = getattr(self.obj, self.method)(**self.kwargs) if output: = output del self.obj return output
def __call__(self, *args, **kwargs): return
[docs] def check_exist(self): pass
class TaskGraph: """ Context manager of floatcsep workload distribution Assign tasks to a node and defines their dependencies (parent nodes). Contains a 'tasks' dictionary whose dict_keys are the Task to be executed with dict_values as the Task's dependencies. """ def __init__(self): self.tasks = OrderedDict() self._ntasks = 0 = 'floatcsep.utils.TaskGraph' @property def ntasks(self): return self._ntasks @ntasks.setter def ntasks(self, n): self._ntasks = n def add(self, task): """ Simply adds a defined task to the graph Args: task: floatcsep.utils.Task Returns: """ self.tasks[task] = [] self.ntasks += 1 def add_dependency(self, task, dinst=None, dmeth=None, dkw=None): """ Adds a dependency to a task already inserted to the TaskGraph. Searchs within the pre-added tasks a signature match by their name/instance, method and keyword_args. Args: task: Task to which a dependency will be asigned dinst: object/name of the dependency dmeth: method of the dependency dkw: keyword argument of the dependency Returns: """ deps = [] for i, other_tasks in enumerate(self.tasks.keys()): if other_tasks.sign_match(dinst, dmeth, dkw): deps.append(other_tasks) self.tasks[task].extend(deps) def run(self): """ Iterates through all the graph tasks and runs them. Returns: """ for task, deps in self.tasks.items(): def __call__(self, *args, **kwargs): return def check_exist(self): pass class MarkdownReport: """ Class to generate a Markdown report from a study """ def __init__(self, outname=''): self.outname = outname self.toc = [] self.has_title = True self.has_introduction = False self.markdown = [] def add_introduction(self, adict): """ Generate document header from dictionary """ first = f"# CSEP Testing Results: {adict['simulation_name']} \n" \ f"**Forecast Name:** {adict['forecast_name']} \n" \ f"**Simulation Start Time:** {adict['origin_time']} \n" \ f"**Evaluation Time:** {adict['evaluation_time']} \n" \ f"**Catalog Source:** {adict['catalog_source']} \n" \ f"**Number Simulations:** {adict['num_simulations']}\n" # used to determine to place TOC at beginning of document or after # introduction. self.has_introduction = True self.markdown.append(first) return first def add_text(self, text): """ Text should be a list of strings where each string will be on its own line. Each add_text command represents a paragraph. Args: text (list): lines to write Returns: """ self.markdown.append(' '.join(text) + '\n\n') def add_figure(self, title, relative_filepaths, level=2, ncols=1, add_ext=False, text='', caption='', width=None): """ This function expects a list of filepaths. If you want the output stacked, select a value of ncols. ncols should be divisible by filepaths. todo: modify formatted_paths to work when not divis. Args: title: name of the figure level (int): value 1-6 depending on the heading relative_filepaths (str or List[Tuple[str]]): list of paths in order to make table Returns: """ # verify filepaths have proper extension should always be png is_single = False paths = [] if isinstance(relative_filepaths, six.string_types): is_single = True paths.append(relative_filepaths) else: paths = relative_filepaths correct_paths = [] if add_ext: for fp in paths: correct_paths.append(fp + '.png') else: correct_paths = paths # generate new lists with size ncols formatted_paths = [correct_paths[i:i + ncols] for i in range(0, len(paths), ncols)] # convert str into a list, where each potential row is an iter not str def build_header(_row): top = "|" bottom = "|" for i, _ in enumerate(_row): if i == ncols: break top += " |" bottom += " --- |" return top + '\n' + bottom size_ = bool(width) * f'width={width}' def add_to_row(_row): if len(_row) == 1: return f'<img src="{_row[0]}" {size_}/>' string = '| ' for item in _row: string = string + f'<img src="{item}" width={width}/>' return string level_string = f"{level * '#'}" result_cell = [] locator = title.lower().replace(" ", "_") result_cell.append( f'{level_string} {title} <a name="{locator}"></a>\n') result_cell.append(f'{text}\n') for i, row in enumerate(formatted_paths): if i == 0 and not is_single and ncols > 1: result_cell.append(build_header(row)) result_cell.append(add_to_row(row)) result_cell.append('\n') result_cell.append(f'{caption}') self.markdown.append('\n'.join(result_cell) + '\n') # generate metadata for TOC self.toc.append((title, level, locator)) def add_heading(self, title, level=1, text='', add_toc=True): # multipying char simply repeats it if isinstance(text, str): text = [text] cell = [] level_string = f"{level * '#'}" locator = title.lower().replace(" ", "_") sub_heading = f'{level_string} {title} <a name="{locator}"></a>\n' cell.append(sub_heading) try: for item in list(text): cell.append(item) except Exception as ex: raise RuntimeWarning( "Unable to add document subhead, text must be iterable.") self.markdown.append('\n'.join(cell) + '\n') # generate metadata for TOC if add_toc: self.toc.append((title, level, locator)) def add_list(self, _list): cell = [] for item in _list: cell.append(f"* {item}") self.markdown.append('\n'.join(cell) + '\n\n') def add_title(self, title, text): self.has_title = True self.add_heading(title, 1, text, add_toc=False) def table_of_contents(self): """ generates table of contents based on contents of document. """ if len(self.toc) == 0: return toc = ["# Table of Contents"] for i, elem in enumerate(self.toc): title, level, locator = elem space = ' ' * (level - 1) toc.append(f"{space}1. [{title}](#{locator})") insert_loc = 1 if self.has_title else 0 self.markdown.insert(insert_loc, '\n'.join(toc) + '\n\n') def add_table(self, data, use_header=True): """ Generates table from HTML and styles using bootstrap class Args: data List[Tuple[str]]: should be (nrows, ncols) in size. all rows should be the same sizes Returns: table (str): this can be added to subheading or other cell if desired. """ table = ['<div class="table table-striped">', f'<table>'] def make_header(row): header = [] header.append('<tr>') for item in row: header.append(f'<th>{item}</th>') header.append('</tr>') return '\n'.join(header) def add_row(row): table_row = ['<tr>'] for item in row: table_row.append(f"<td>{item}</td>") table_row.append('</tr>') return '\n'.join(table_row) for i, row in enumerate(data): if i == 0 and use_header: table.append(make_header(row)) else: table.append(add_row(row)) table.append('</table>') table.append('</div>') table = '\n'.join(table) self.markdown.append(table + '\n\n') def save(self, save_dir): output = list(itertools.chain.from_iterable(self.markdown)) full_md_fname = os.path.join(save_dir, self.outname) with open(full_md_fname, 'w') as f: f.writelines(output) class NoAliasLoader(yaml.Loader): @staticmethod def ignore_aliases(self): return True class ExperimentComparison: def __init__(self, original, reproduced, **kwargs): """ """ self.original = original self.reproduced = reproduced self.num_results = {} self.file_comp = {} @staticmethod def obs_diff(obs_orig, obs_repr): return numpy.abs(numpy.divide((numpy.array(obs_orig) - numpy.array(obs_repr)), numpy.array(obs_orig))) @staticmethod def test_stat(test_orig, test_repr): if isinstance(test_orig[0], str): if not isinstance(test_orig[1], str): stats = numpy.array([0, numpy.divide((test_repr[1] - test_orig[1]), test_orig[1]), 0, 0]) else: stats = None else: stats_orig = numpy.array([numpy.mean(test_orig), numpy.std(test_orig), scipy.stats.skew(test_orig)]) stats_repr = numpy.array([numpy.mean(test_repr), numpy.std(test_repr), scipy.stats.skew(test_repr)]) ks = scipy.stats.ks_2samp(test_orig, test_repr) stats = [*numpy.divide( numpy.abs(stats_repr - stats_orig), stats_orig ), ks.pvalue] return stats def get_results(self): win_orig = timewindow2str(self.original.timewindows) win_repr = timewindow2str(self.reproduced.timewindows) tests_orig = self.original.tests tests_repr = self.reproduced.tests models_orig = [ for i in self.original.models] models_repr = [ for i in self.reproduced.models] results = dict.fromkeys([ for i in tests_orig]) for test in tests_orig: if test.type in ['consistency', 'comparative']: results[] = dict.fromkeys(win_orig) for tw in win_orig: results_orig = self.original.read_results(test, tw) results_repr = self.reproduced.read_results(test, tw) results[][tw] = { models_orig[i]: { 'observed_statistic': self.obs_diff( results_orig[i].observed_statistic, results_repr[i].observed_statistic), 'test_statistic': self.test_stat( results_orig[i].test_distribution, results_repr[i].test_distribution) } for i in range(len(models_orig))} else: results_orig = self.original.read_results(test, win_orig[-1]) results_repr = self.reproduced.read_results(test, win_orig[-1]) results[] = { models_orig[i]: { 'observed_statistic': self.obs_diff( results_orig[i].observed_statistic, results_repr[i].observed_statistic), 'test_statistic': self.test_stat( results_orig[i].test_distribution, results_repr[i].test_distribution) } for i in range(len(models_orig))} return results @staticmethod def get_hash(filename): with open(filename, "rb") as f: bytes_file = readable_hash = hashlib.sha256(bytes_file).hexdigest() return readable_hash def get_filecomp(self): win_orig = timewindow2str(self.original.timewindows) win_repr = timewindow2str(self.reproduced.timewindows) tests_orig = self.original.tests tests_repr = self.reproduced.tests models_orig = [ for i in self.original.models] models_repr = [ for i in self.reproduced.models] results = dict.fromkeys([ for i in tests_orig]) for test in tests_orig: if test.type in ['consistency', 'comparative']: results[] = dict.fromkeys(win_orig) for tw in win_orig: results[][tw] = dict.fromkeys(models_orig) for model in models_orig: orig_path = self.original.path( tw, 'evaluations', test, model) repr_path = self.reproduced.path( tw, 'evaluations', test, model) results[][tw][model] ={ 'hash': ( self.get_hash(orig_path) == self.get_hash(repr_path)), 'byte2byte': filecmp.cmp(orig_path, repr_path)} else: results[] = dict.fromkeys(models_orig) for model in models_orig: orig_path = self.original.path( win_orig[-1], 'evaluations', test, model) repr_path = self.reproduced.path( win_orig[-1], 'evaluations', test, model) results[][model] ={ 'hash': ( self.get_hash(orig_path) == self.get_hash(repr_path)), 'byte2byte': filecmp.cmp(orig_path, repr_path)} return results def compare_results(self): self.num_results = self.get_results() self.file_comp = self.get_filecomp() self.write_report() def write_report(self): numerical = self.num_results data = self.file_comp outname = os.path.join('') save_path = os.path.dirname(os.path.join(self.reproduced.path.workdir, self.reproduced.path.rundir)) report = MarkdownReport(outname=outname) report.add_title( f"Reproducibility Report - {}", '' ) report.add_heading("Objectives", level=2) objs = [ "Analyze the statistic reproducibility and data reproducibility of" " the experiment. Compares the differences between " "(i) the original and reproduced scores," " (ii) the statistical descriptors of the test distributions," " (iii) The p-value of a Kolmogorov-Smirnov test -" " values beneath 0.1 means we can't reject the distributions are" " similar -," " (iv) Hash (SHA-256) comparison between the results' files and " "(v) byte-to-byte comparison" ] report.add_list(objs) for num, dat in zip(numerical.items(), data.items()): res_keys = list(num[1].keys()) is_time = False try: str2timewindow(res_keys[0]) is_time = True except ValueError: pass if is_time: report.add_heading(num[0], level=2) for tw in res_keys: rows = [[tw, 'Score difference', 'Test Mean diff.', 'Test Std diff.', 'Test Skew diff.', 'KS-test p value', 'Hash (SHA-256) equal', 'Byte-to-byte equal']] for model_stat, model_file in zip(num[1][tw].items(), dat[1][tw].items()): obs = model_stat[1]['observed_statistic'] test = model_stat[1]['test_statistic'] rows.append([model_stat[0], obs, *[f'{i:.1e}' for i in test[:-1]], f'{test[-1]:.1e}', model_file[1]['hash'], model_file[1]['byte2byte']]) report.add_table(rows) else: report.add_heading(num[0], level=2) rows = [[tw, 'Max Score difference', 'Hash (SHA-256) equal', 'Byte-to-byte equal']] for model_stat, model_file in zip(num[1].items(), dat[1].items()): obs = numpy.nanmax(model_stat[1]['observed_statistic']) rows.append([model_stat[0], f'{obs:.1e}', model_file[1]['hash'], model_file[1]['byte2byte']]) report.add_table(rows) report.table_of_contents() ####################### # Perhaps add to pycsep #######################
[docs] def plot_sequential_likelihood(evaluation_results, plot_args=None): if plot_args is None: plot_args = {} title = plot_args.get('title', None) titlesize = plot_args.get('titlesize', None) ylabel = plot_args.get('ylabel', None) colors = plot_args.get('colors', [None] * len(evaluation_results)) linestyles = plot_args.get('linestyles', [None] * len(evaluation_results)) markers = plot_args.get('markers', [None] * len(evaluation_results)) markersize = plot_args.get('markersize', 1) linewidth = plot_args.get('linewidth', 0.5) figsize = plot_args.get('figsize', (6, 4)) timestrs = plot_args.get('timestrs', None) if timestrs: startyear = [date.fromisoformat(j.split('_')[0]) for j in timestrs][0] endyears = [date.fromisoformat(j.split('_')[1]) for j in timestrs] years = [startyear] + endyears else: startyear = 0 years = numpy.arange(0, len(evaluation_results[0].observed_statistic) + 1) seaborn.set_style("white", {"axes.facecolor": ".9", '': 'Ubuntu'}) pyplot.rcParams.update({'xtick.bottom': True, 'axes.labelweight': 'bold', 'xtick.labelsize': 8, 'ytick.labelsize': 8, 'legend.fontsize': 9}) if isinstance(colors, list): assert len(colors) == len(evaluation_results) elif isinstance(colors, str): colors = [colors] * len(evaluation_results) if isinstance(linestyles, list): assert len(linestyles) == len(evaluation_results) elif isinstance(linestyles, str): linestyles = [linestyles] * len(evaluation_results) if isinstance(markers, list): assert len(markers) == len(evaluation_results) elif isinstance(markers, str): markers = [markers] * len(evaluation_results) fig, ax = pyplot.subplots(figsize=figsize) for i, result in enumerate(evaluation_results): data = [0] + result.observed_statistic ax.plot(years, data, color=colors[i], linewidth=linewidth, linestyle=linestyles[i], marker=markers[i], markersize=markersize, label=result.sim_name) ax.set_ylabel(ylabel) ax.set_xlim([startyear, None]) ax.set_title(title, fontsize=titlesize) ax.grid(True) ax.legend(loc=(1.04, 0), fontsize=7) fig.tight_layout()
[docs] def magnitude_vs_time(catalog): mag =['magnitude'] time = [datetime.fromtimestamp(i / 1000.) for i in['origin_time']] fig, ax = pyplot.subplots(figsize=(12, 4)) ax.plot(time, mag, marker='o', linewidth=0, color='r', alpha=0.2) ax.set_xlabel('Date', fontsize=16) ax.set_ylabel('$M_w$', fontsize=16) ax.set_title('Magnitude vs. Time', fontsize=18) return ax
def plot_matrix_comparative_test(evaluation_results, p=0.05, order=True, plot_args={}): """ Produces matrix plot for comparative tests for all models Args: evaluation_results (list of result objects): paired t-test results p (float): significance level order (bool): columns/rows ordered by ranking Returns: ax (matplotlib.Axes): handle for figure """ names = [i.sim_name for i in evaluation_results] t_value = numpy.array( [Tw_i.observed_statistic for Tw_i in evaluation_results]) t_quantile = numpy.array( [Tw_i.quantile[0] for Tw_i in evaluation_results]) w_quantile = numpy.array( [Tw_i.quantile[1] for Tw_i in evaluation_results]) score = numpy.sum(t_value, axis=1) / t_value.shape[0] if order: arg_ind = numpy.flip(numpy.argsort(score)) else: arg_ind = numpy.arange(len(score)) # Flip rows/cols if ordered by value data_t = t_value[arg_ind, :][:, arg_ind] data_w = w_quantile[arg_ind, :][:, arg_ind] data_tq = t_quantile[arg_ind, :][:, arg_ind] fig, ax = pyplot.subplots(1, 1, figsize=(7, 6)) cmap = seaborn.diverging_palette(220, 20, as_cmap=True) seaborn.heatmap(data_t, vmin=-3, vmax=3, center=0, cmap=cmap, ax=ax, cbar_kws={'pad': 0.01, 'shrink': 0.7, 'label': 'Information Gain', 'anchor': (0., 0.)}), ax.set_yticklabels([names[i] for i in arg_ind], rotation='horizontal') ax.set_xticklabels([names[i] for i in arg_ind], rotation='vertical') for n, i in enumerate(data_tq): for m, j in enumerate(i): if j > 0 and data_w[n, m] < p: ax.scatter(n + 0.5, m + 0.5, marker='o', s=5, color='black') legend_elements = [Line2D([0], [0], marker='o', lw=0, label=r'T and W significant', markerfacecolor="black", markeredgecolor='black', markersize=4)] fig.legend(handles=legend_elements, loc='lower right', bbox_to_anchor=(0.75, 0.0, 0.2, 0.2), handletextpad=0) pyplot.tight_layout() ######################### # Below needs refactoring ######################### def forecast_mapping(forecast_gridded, target_grid, ncpu=None): """ Aggregates conventional forecast onto quadtree region This is generic function, which can map any forecast on to another grid. Wrapper function over "_forecat_mapping_generic" Forecast mapping onto Target Grid forecast_gridded: csep.core.forecast with other grid. target_grid: csep.core.region.CastesianGrid2D or QuadtreeGrid2D only_de-aggregate: Flag (True or False) Note: set the flag "only_deagregate = True" Only if one is sure that both grids are Quadtree and Target grid is high-resolution at every level than the other grid. """ from csep.core.forecasts import GriddedForecast bounds_target = target_grid.bounds bounds = forecast_gridded.region.bounds data = data_mapped_bounds = _forecast_mapping_generic(bounds_target, bounds, data, ncpu=ncpu) target_forecast = GriddedForecast(data=data_mapped_bounds, region=target_grid, magnitudes=forecast_gridded.magnitudes) return target_forecast def plot_quadtree_forecast(qtree_forecast): """ Currently, only a single-resolution plotting capability is available. So we aggregate multi-resolution forecast on a single-resolution grid and then plot it Args: csep.core.models.GriddedForecast Returns: class:`` object """ quadkeys = qtree_forecast.region.quadkeys qk_sizes = [] for qk in quadkeys: qk_sizes.append(len(qk)) if qk_sizes.count(qk_sizes[0]) == len(qk_sizes): # single-resolution grid ax = qtree_forecast.plot() else: print('Multi-resolution grid detected.') print('Currently, we do not offer utility to plot a forecast with ' 'multi-resolution grid') print('Therefore, forecast is being aggregated on a single-resolution ' 'grid (L8) for plotting') single_res_grid_l8 = QuadtreeGrid2D.from_single_resolution(8) forecast_l8 = forecast_mapping(qtree_forecast, single_res_grid_l8) ax = forecast_l8.plot() return ax def plot_forecast_lowres(forecast, plot_args, k=4): """ Plot a reduced resolution plot. The forecast values are kept the same, but cells are enlarged :param forecast: GriddedForecast object :param plot_args: arguments to be passed to plot_spatial_dataset :param k: Resampling factor. Selects cells every k row and k columns. """ print('\tPlotting Forecast') plot_args['title'] = region = forecast.region coords = dataset = numpy.log10(forecast.spatial_counts(cartesian=True))[::k, ::k] region.xs = numpy.unique(region.get_cartesian(coords[:, 0])[0, ::k]) region.ys = numpy.unique(region.get_cartesian(coords[:, 1])[::k, 0]) plot_spatial_dataset(dataset, region, set_global=True, plot_args=plot_args) def quadtree_csv_loader(csv_fname): """ Load quadtree forecasted stored as csv file The format expects forecast as a comma separated file, in which first column corresponds to quadtree grid cell (quadkey). The second and thrid columns indicate depth range. The corresponding enteries in the respective row are forecast rates corresponding to the magnitude bins. The first line of forecast is a header, and its format is listed here: 'Quadkey', depth_min, depth_max, Mag_0, Mag_1, Mag_2, Mag_3 , .... Quadkey is a string. Rest of the values are floats. For the purposes of defining region objects quadkey is used. We assume that the starting value of magnitude bins are provided in the header. Args: csv_fname: file name of csep forecast in csv format Returns: rates, region, mws (np.ndarray, QuadtreeRegion2D, np.ndarray): rates, region, and magnitude bins needed to define QuadTree models """ data = numpy.genfromtxt(csv_fname, dtype='str', delimiter=',') quadkeys = data[1:, 0] mws = data[0, 3:].astype(float) rates = data[1:, 3:] rates = rates.astype(float) region = QuadtreeGrid2D.from_quadkeys(quadkeys, magnitudes=mws) region.get_cell_area() return rates, region, mws def geographical_area_from_qk(quadk): """ Wrapper around function geographical_area_from_bounds """ bounds = tile_bounds(quadk) return geographical_area_from_bounds(bounds[0], bounds[1], bounds[2], bounds[3]) def tile_bounds(quad_cell_id): """ It takes in a single Quadkkey and returns lat,longs of two diagonal corners using mercantile Parameters ---------- quad_cell_id : Stirng Quad key of a cell. Returns ------- bounds : Mercantile object Latitude and Longitude of bottom left AND top right corners. """ bounds = mercantile.bounds(mercantile.quadkey_to_tile(quad_cell_id)) return [bounds.west, bounds.south, bounds.east, bounds.north] def create_polygon(fg): """ Required for parallel processing """ return shapely.geometry.Polygon( [(fg[0], fg[1]), (fg[2], fg[1]), (fg[2], fg[3]), (fg[0], fg[3])]) def calc_cell_area(cell): """ Required for parallel processing """ return geographical_area_from_bounds(cell[0], cell[1], cell[2], cell[3]) def _map_overlapping_cells(fcst_grid_poly, fcst_cell_area, fcst_rate_poly, target_poly): # , """ This functions work for Cells that do not directly conside with target polygon cells. This function uses 3 variables i.e. fcst_grid_poly, fcst_cell_area, fcst_rate_poly This function takes 1 target polygon, upon which models are to be mapped. Finds all the cells of forecast grid that match with this polygon and then maps the forecast rate of those cells according to area. fcst_grid_polygon (variable in memory): The grid that needs to be mapped on target_poly fcst_rate_poly (variable in memory): The forecast that needs to be mapped on target grid polygon fcst_cell_area (variable in memory): The cell area of forecast grid Args: target_poly: One polygon upon which forecast grid is to be mapped. returns: The forecast rate received by target_poly """ map_rate = numpy.array([0]) for j in range(len(fcst_grid_poly)): # Iterates over ALL the cells of Forecast grid and find the cells # that overlap with target cell (poly). if target_poly.intersects(fcst_grid_poly[j]): # overlaps intersect = target_poly.intersection(fcst_grid_poly[j]) shared_area = geographical_area_from_bounds(intersect.bounds[0], intersect.bounds[1], intersect.bounds[2], intersect.bounds[3]) map_rate = map_rate + ( fcst_rate_poly[j] * (shared_area / fcst_cell_area[j])) return map_rate def _map_exact_inside_cells(fcst_grid, fcst_rate, boundary): """ Uses 2 Global variables. fcst_grid, fcst_rate Takes a cell_boundary and finds all those fcst_grid cells that fit exactly inside it and then sum-up the rates of all those cells fitting inside it to get forecast rate for boundary_cell Args: boundary: 1 cell with [lon1, lat1, lon2, lat2] returns: 1 - sum of forecast_rates for cell that fall totally inside of boundary cell 2 - Array of the corresponding cells that fall inside """ c = numpy.logical_and(numpy.logical_and(fcst_grid[:, 0] >= boundary[0], fcst_grid[:, 1] >= boundary[1]), numpy.logical_and(fcst_grid[:, 2] <= boundary[2], fcst_grid[:, 3] <= boundary[3])) exact_cells = numpy.where(c == True) return numpy.sum(fcst_rate[c], axis=0), exact_cells def _forecast_mapping_generic(target_grid, fcst_grid, fcst_rate, ncpu=None): """ This function can perofrmns both aggregation and de-aggregation/ It is a wrapper function that uses 4 functions in respective order i.e. _map_exact_cells, _map_overlapping_cells, calc_cell_area, create_polygon Maps the forecast rates of one grid to another grid using parallel processing Works in two steps: 1 - Maps all those cells that fall entirely on target cells 2 - The cells that overlap with multiple cells, map them according to cell area Inumpyuts: target_grid: Target grid bounds, upon which forecast is to be mapped. [n x 4] array, Bottom left and Top Right corners [lon1, lat1, lon2, lat2] fcst_grid: Available grid that is available with forecast Same as bounds_targets fcst_rate: Forecast rates to be mapped. [n x mbins] Returns: target_rates: Forecast rates mapped on the target grid [nx1] """ if ncpu == None: ncpu = multiprocessing.cpu_count() pool = multiprocessing.Pool(ncpu) else: pool = multiprocessing.Pool(ncpu) # mp.cpu_count() print('Number of CPUs :', ncpu) func_exact = partial(_map_exact_inside_cells, fcst_grid, fcst_rate) exact_rate =, [poly for poly in target_grid]) pool.close() exact_cells = [] exact_rate_tgt = [] for i in range(len(exact_rate)): exact_cells.append(exact_rate[i][1][0]) exact_rate_tgt.append(exact_rate[i][0]) exact_cells = numpy.concatenate(exact_cells) # Exclude all those cells from Grid that have already fallen entirely # inside any cell of Target Grid fcst_rate_poly = numpy.delete(fcst_rate, exact_cells, axis=0) lft_fcst_grid = numpy.delete(fcst_grid, exact_cells, axis=0) # play now only with those cells are overlapping with multiple target cells # Get the polygon of Remaining Forecast grid Cells pool = multiprocessing.Pool(ncpu) fcst_grid_poly =, [i for i in lft_fcst_grid]) pool.close() # Get the Cell Area of forecast grid pool = multiprocessing.Pool(ncpu) fcst_cell_area =, [i for i in lft_fcst_grid]) pool.close() # print('Calculate target polygons') pool = multiprocessing.Pool(ncpu) target_grid_poly =, [i for i in target_grid]) pool.close() # print('--2nd Step: Start Polygon mapping--') pool = multiprocessing.Pool(ncpu) func_overlapping = partial(_map_overlapping_cells, fcst_grid_poly, fcst_cell_area, fcst_rate_poly) # Uses above three Global Parameters rate_tgt =, [poly for poly in target_grid_poly]) pool.close() zero_pad_len = numpy.shape(fcst_rate)[1] for i in range(len(rate_tgt)): if len(rate_tgt[i]) < zero_pad_len: rate_tgt[i] = numpy.zeros(zero_pad_len) map_rate = numpy.add(rate_tgt, exact_rate_tgt) return map_rate def _set_dockerfile(name): string = f""" ## Install Docker image from trusted source FROM python:3.8.13 ## Setup user args ARG USERNAME={name} ARG USER_UID=1100 ARG USER_GID=$USER_UID RUN mkdir -p /usr/src/{name} && chown $USER_UID:$USER_GID /usr/src/{name} RUN groupadd --non-unique -g $USER_GID $USERNAME && useradd -u $USER_UID -g $USER_GID -s /bin/sh -m $USERNAME ## Set up work directory in the Docker container WORKDIR /usr/src/{name}/ ## Copy the files from the local machine (the repository) to the Docker container COPY --chown=$USER_UID:$USER_GID . /usr/src/{name}/ ## Calls, install python dependencies and install this model as a python module ENV VIRTUAL_ENV=/venv/ RUN python3 -m venv $VIRTUAL_ENV ENV PATH="$VIRTUAL_ENV/bin:$PATH" # RUN pip install --no-cache-dir --upgrade pip # RUN pip install -r requirements.txt RUN pip install numpy pandas h5py USER $USERNAME """ return string def _global_region(dh=0.1, name="global", magnitudes=None): """ Creates a global region used for evaluating gridded models on the global scale. Modified from csep.core.regions.global_region The gridded region corresponds to the Args: dh: Returns: csep.utils.CartesianGrid2D: """ # generate latitudes lons = numpy.arange(-180.0, 180, dh) lats = numpy.arange(-90, 90, dh) coords = itertools.product(lons, lats) region = CartesianGrid2D( [Polygon(bbox) for bbox in compute_vertices(coords, dh)], dh, name=name) if magnitudes is not None: region.magnitudes = magnitudes return region def _check_zero_bins(exp, catalog, test_date): for model in exp.models: forecast = model.create_forecast(exp.start_date, test_date) catalog.filter_spatial(forecast.region) bins = catalog.get_spatial_idx() zero_forecast = numpy.argwhere(forecast.spatial_counts()[bins] == 0) if zero_forecast: print(zero_forecast) ax = catalog.plot(plot_args={'basemap': 'stock_img'}) ax = forecast.plot(ax=ax, plot_args={'alpha': 0.8}) ax.plot(catalog.get_longitudes()[zero_forecast.ravel()], catalog.get_latitudes()[zero_forecast.ravel()], 'o', markersize=10) pyplot.savefig(f'{model.path}/{}.png', dpi=300) for model in exp.models: forecast = model.create_forecast(exp.start_date, test_date) catalog.filter_spatial(forecast.region) sbins = catalog.get_spatial_idx() mbins = catalog.get_mag_idx() zero_forecast = numpy.argwhere([sbins, mbins] == 0) print('event', 'cell', sbins[zero_forecast], 'datum',[zero_forecast]) if zero_forecast: print(zero_forecast) print('cellfc', forecast.get_longitudes()[sbins[zero_forecast]], forecast.get_latitudes()[sbins[zero_forecast]]) print('scounts', forecast.spatial_counts()[sbins[zero_forecast]]) print('data',[sbins[zero_forecast]]) print([zero_forecast[0]]) ax = catalog.plot(plot_args={'basemap': 'stock_img'}) ax = forecast.plot(ax=ax, plot_args={'alpha': 0.8}) ax.plot(catalog.get_longitudes()[zero_forecast.ravel()], catalog.get_latitudes()[zero_forecast.ravel()], 'o', markersize=10) pyplot.savefig(f'{model.path}/{}.png', dpi=300)