import argparse
import csv
import logging
import os.path
import time
import xml.etree.ElementTree as eTree
from typing import Iterator
import csep
import h5py
import numpy
import pandas
from csep.core.catalogs import CSEPCatalog
from csep.core.regions import QuadtreeGrid2D, CartesianGrid2D
from csep.core.forecasts import CatalogForecast
from csep.models import Polygon
from csep.utils.time_utils import strptime_to_utc_epoch
log = logging.getLogger(__name__)
[docs]
class CatalogSerializer:
"""
Serializers for :class:`csep.core.catalogs.CSEPCatalog`.
Delegates to the built-in I/O methods from pyCSEP catalog objects.
"""
[docs]
@staticmethod
def ascii(catalog: CSEPCatalog, filename: str) -> None:
"""
Serialize a catalog to the pyCSEP ASCII format.
Args:
catalog (:class:`csep.core.catalogs.CSEPCatalog`): Catalog instance to write.
filename (str): Destination filepath.
Returns:
None
"""
catalog.write_ascii(filename=filename)
[docs]
@staticmethod
def json(catalog: CSEPCatalog, filename: str) -> None:
"""
Serialize a catalog to JSON using the pyCSEP encoder.
Args:
catalog (:class:`csep.core.catalogs.CSEPCatalog`): Catalog instance to write.
filename (str): Destination filepath.
Returns:
None
"""
catalog.write_json(filename=filename)
[docs]
class CatalogParser:
"""
Parsers for :class:`csep.core.catalogs.CSEPCatalog`.
Wraps pyCSEP catalog loaders and provides an interface for loading catalogs.
"""
[docs]
@staticmethod
def ascii(filename: str) -> CSEPCatalog:
"""
Load a pyCSEP catalog in the ASCII catalog format.
Args:
filename (str): Path to file.
Returns:
:class:`csep.core.catalogs.CSEPCatalog`.
"""
return CSEPCatalog.load_catalog(filename=filename)
[docs]
@staticmethod
def json(filename: str) -> CSEPCatalog:
"""
Load a pyCSEP catalog from JSON.
Args:
filename (str): Path to the file.
Returns:
:class:`csep.core.catalogs.CSEPCatalog`
"""
return CSEPCatalog.load_json(filename=filename)
[docs]
class CatalogForecastParsers:
"""
Parsers for catalog-based forecasts stored on disk.
These helpers select the appropriate loader based on file headers and return
the object produced by :func:`csep.load_catalog_forecast`.
"""
[docs]
@staticmethod
def csv(filename: str, **kwargs) -> CatalogForecast:
"""
Load a catalog-based forecast from a CSV file.
The loader inspects the header to detect either:
(i) a CSEP/pyCSEP-style catalog layout or (ii) a Hermes layout.
Args:
filename (str): Path to the CSV file.
**kwargs: Passed to :func:`csep.load_catalog_forecast`.
Returns:
:class:`csep.core.forecasts.CatalogForecast`.
"""
csep_headers = [
"lon",
"lat",
"magnitude",
"time_string",
"depth",
"catalog_id",
"event_id",
]
hermes_headers = [
"realization_id",
"magnitude",
"depth",
"latitude",
"longitude",
"time",
]
headers_df = pandas.read_csv(filename, nrows=0).columns.str.strip().to_list()
# CSEP headers
if headers_df[:2] == csep_headers[:2]:
return csep.load_catalog_forecast(filename, **kwargs)
elif headers_df == hermes_headers:
return csep.load_catalog_forecast(
filename, catalog_loader=CatalogForecastParsers.load_hermes_catalog, **kwargs
)
else:
raise Exception("Catalog Forecast could not be loaded")
[docs]
@staticmethod
def load_hermes_catalog(filename, **kwargs) -> Iterator[CSEPCatalog]:
"""
Loads hermes synthetic catalogs in csep-ascii format.
This function can load multiple catalogs stored in a single file. This typically called
to load a catalog-based forecast, but could also load a collection of catalogs stored
in the same file
Args:
filename (str): filepath or directory of catalog files
**kwargs (dict): passed to class constructor
Yields:
:class:`csep.core.forecasts.CatalogForecast`.
"""
def read_float(val):
"""Returns val as float or None if unable"""
try:
val = float(val)
except:
val = None
return val
def is_header_line(line):
if line[0].lower() == "realization_id":
return True
else:
return False
def read_catalog_line(line):
# convert to correct types
catalog_id = int(line[0])
magnitude = read_float(line[1])
depth = read_float(line[2])
lat = read_float(line[3])
lon = read_float(line[4])
# maybe fractional seconds are not included
origin_time = line[5]
if origin_time:
try:
origin_time = strptime_to_utc_epoch(
origin_time, format="%Y-%m-%d %H:%M:%S.%f"
)
except ValueError:
origin_time = strptime_to_utc_epoch(origin_time, format="%Y-%m-%d %H:%M:%S")
event_id = 0
# temporary event
temp_event = (event_id, origin_time, lat, lon, depth, magnitude)
return temp_event, catalog_id
# handle all catalogs in single file
if os.path.isfile(filename):
with open(filename, "r", newline="") as input_file:
catalog_reader = csv.reader(input_file, delimiter=",")
# csv treats everything as a string convert to correct types
events = []
# all catalogs should start at zero
prev_id = None
for line in catalog_reader:
# skip header line on first read if included in file
if prev_id is None:
if is_header_line(line):
continue
# read line and return catalog id
temp_event, catalog_id = read_catalog_line(line)
empty = False
# OK if event_id is empty
if all([val in (None, "") for val in temp_event[1:]]):
empty = True
# first event is when prev_id is none, catalog_id should always start at zero
if prev_id is None:
prev_id = 0
# if the first catalog doesn't start at zero
if catalog_id != prev_id:
if not empty:
events = [temp_event]
else:
events = []
for id in range(catalog_id):
yield CSEPCatalog(data=[], catalog_id=id, **kwargs)
prev_id = catalog_id
continue
# accumulate event if catalog_id is the same as previous event
if catalog_id == prev_id:
if not all([val in (None, "") for val in temp_event]):
events.append(temp_event)
prev_id = catalog_id
# create and yield class if the events are from different catalogs
elif catalog_id == prev_id + 1:
yield CSEPCatalog(data=events, catalog_id=prev_id, **kwargs)
# add event to new event list
if not empty:
events = [temp_event]
else:
events = []
prev_id = catalog_id
# this implies there are empty catalogs, because they are not listed in the ascii file
elif catalog_id > prev_id + 1:
yield CSEPCatalog(data=events, catalog_id=prev_id, **kwargs)
# if prev_id = 0 and catalog_id = 2, then we skipped one catalog. thus, we skip catalog_id - prev_id - 1 catalogs
num_empty_catalogs = catalog_id - prev_id - 1
# first yield empty catalog classes
for id in range(num_empty_catalogs):
yield CSEPCatalog(
data=[],
catalog_id=catalog_id - num_empty_catalogs + id,
**kwargs,
)
prev_id = catalog_id
# add event to new event list
if not empty:
events = [temp_event]
else:
events = []
else:
raise ValueError(
"catalog_id should be monotonically increasing and events should be ordered by catalog_id"
)
# yield final catalog, note: since this is just loading catalogs, it has no idea how many should be there
cat = CSEPCatalog(data=events, catalog_id=prev_id, **kwargs)
yield cat
elif os.path.isdir(filename):
raise NotImplementedError(
"reading from directory or batched files not implemented yet!"
)
[docs]
class GriddedForecastParsers:
"""
Parsers for grid-based earthquake forecasts.
Each parser returns a tuple ``(rates, region, magnitudes)`` where `rates` is a
2D array shaped ``(num_spatial_bins, num_magnitude_bins)`` and `region` is a
pyCSEP region instance describing the spatial bins.
"""
[docs]
@staticmethod
def dat(filename: str):
"""
Load a CSEP-style ASCII `.dat` gridded forecast.
Args:
filename (str): Path to the `.dat` file.
Returns:
tuple:
- rates (numpy.ndarray): Forecast rates with shape
``(n_cells, n_mag_bins)``.
- region (csep.core.regions.CartesianGrid2D): Region built from
rectangular polygons.
- magnitudes (numpy.ndarray): Magnitude bin edges.
"""
data = numpy.loadtxt(filename)
all_polys = data[:, :4]
all_poly_mask = data[:, -1]
sorted_idx = numpy.sort(
numpy.unique(all_polys, return_index=True, axis=0)[1], kind="stable"
)
unique_poly = all_polys[sorted_idx]
poly_mask = all_poly_mask[sorted_idx]
all_mws = data[:, -4]
sorted_idx = numpy.sort(numpy.unique(all_mws, return_index=True)[1], kind="stable")
mws = all_mws[sorted_idx]
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in unique_poly]
dh = float(unique_poly[0, 3] - unique_poly[0, 2])
n_mag_bins = len(mws)
rates = data[:, -2].reshape(len(bboxes), n_mag_bins)
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
return rates, region, mws
[docs]
@staticmethod
def xml(filename, verbose=False):
"""
Load a CSEP XML gridded forecast.
Args:
filename (str): Path to the XML file.
verbose (bool): If True, logs parsed metadata.
Returns:
tuple:
- rates (numpy.ndarray): Forecast rates with shape
``(n_cells, n_mag_bins)``.
- region (csep.core.regions.CartesianGrid2D): Region built from
cell bounding boxes.
- magnitudes (numpy.ndarray): Magnitude bin edges.
"""
tree = eTree.parse(filename)
root = tree.getroot()
metadata = {}
data_ijm = []
m_bins = []
cells = []
cell_dim = {}
for k, children in enumerate(list(root[0])):
if "modelName" in children.tag:
name_xml = children.text
metadata["name"] = name_xml
elif "author" in children.tag:
author_xml = children.text
metadata["author"] = author_xml
elif "forecastStartDate" in children.tag:
start_date = children.text.replace("Z", "")
metadata["forecastStartDate"] = start_date
elif "forecastEndDate" in children.tag:
end_date = children.text.replace("Z", "")
metadata["forecastEndDate"] = end_date
elif "defaultMagBinDimension" in children.tag:
m_bin_width = float(children.text)
metadata["defaultMagBinDimension"] = m_bin_width
elif "lastMagBinOpen" in children.tag:
lastmbin = float(children.text)
metadata["lastMagBinOpen"] = lastmbin
elif "defaultCellDimension" in children.tag:
cell_dim = {i[0]: float(i[1]) for i in children.attrib.items()}
metadata["defaultCellDimension"] = cell_dim
elif "depthLayer" in children.tag:
depth = {i[0]: float(i[1]) for i in root[0][k].attrib.items()}
cells = root[0][k]
metadata["depthLayer"] = depth
if verbose:
log.debug(f"Forecast with metadata:\n{metadata}")
for cell in cells:
cell_data = []
m_cell_bins = []
for i, m in enumerate(cell.iter()):
if i == 0:
cell_data.extend([float(m.attrib["lon"]), float(m.attrib["lat"])])
else:
cell_data.append(float(m.text))
m_cell_bins.append(float(m.attrib["m"]))
data_ijm.append(cell_data)
m_bins.append(m_cell_bins)
try:
data_ijm = numpy.array(data_ijm)
m_bins = numpy.array(m_bins)
except (TypeError, ValueError):
raise Exception("Data is not square")
magnitudes = m_bins[0, :]
rates = data_ijm[:, -len(magnitudes) :]
all_polys = numpy.vstack(
(
data_ijm[:, 0] - cell_dim["lonRange"] / 2.0,
data_ijm[:, 0] + cell_dim["lonRange"] / 2.0,
data_ijm[:, 1] - cell_dim["latRange"] / 2.0,
data_ijm[:, 1] + cell_dim["latRange"] / 2.0,
)
).T
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in all_polys]
dh = float(all_polys[0, 3] - all_polys[0, 2])
poly_mask = numpy.ones(len(bboxes))
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
return rates, region, magnitudes
[docs]
@staticmethod
def quadtree(filename):
"""
Load a quadtree forecast from CSV.
The file is expected to contain a `tile` column with quadkeys and one
column per magnitude bin.
Args:
filename (str): Path to the CSV file.
Returns:
tuple:
- rates (numpy.ndarray): Forecast rates with shape
``(n_tiles, n_mag_bins)``.
- region (csep.core.regions.QuadtreeGrid2D): Region reconstructed
from quadkeys.
- magnitudes (numpy.ndarray): Magnitude bin edges.
"""
with open(filename, "r") as file_:
qt_header = file_.readline().split(",")
formats = [str]
for i in range(len(qt_header) - 1):
formats.append(float)
qt_formats = {i: j for i, j in zip(qt_header, formats)}
data = pandas.read_csv(filename, header=0, dtype=qt_formats)
quadkeys = numpy.array([i.encode("ascii", "ignore") for i in data.tile])
magnitudes = numpy.array(data.keys()[3:]).astype(float)
rates = data[magnitudes.astype(str)].to_numpy()
region = QuadtreeGrid2D.from_quadkeys(quadkeys.astype(str), magnitudes=magnitudes)
region.get_cell_area()
return rates, region, magnitudes
[docs]
@staticmethod
def csv(filename):
"""
Load a gridded forecast from CSV.
If the header contains `tile`, dispatches to :meth:`quadtree`. Otherwise,
expects bounding-box columns (`lon_min`, `lon_max`, `lat_min`, `lat_max`)
and one column per magnitude bin. An optional `mask` column is used as the
region mask.
Args:
filename (str): Path to the CSV file.
Returns:
tuple:
- rates (numpy.ndarray): Forecast rates.
- region (csep.core.regions.CartesianGrid2D or
csep.core.regions.QuadtreeGrid2D): Region describing spatial bins.
- magnitudes (numpy.ndarray): Magnitude bin edges.
"""
def is_mag(num):
try:
m = float(num)
if -1 < m < 12.0:
return True
else:
return False
except ValueError:
return False
with open(filename, "r") as file_:
line = file_.readline()
if len(line.split(",")) > 3:
sep = ","
else:
sep = " "
if "tile" in line:
rates, region, magnitudes = GriddedForecastParsers.quadtree(filename)
return rates, region, magnitudes
data = pandas.read_csv(
filename, header=0, sep=sep, escapechar="#", skipinitialspace=True
)
data.columns = [i.strip() for i in data.columns]
magnitudes = numpy.array([float(i) for i in data.columns if is_mag(i)])
rates = data[[i for i in data.columns if is_mag(i)]].to_numpy()
all_polys = data[["lon_min", "lon_max", "lat_min", "lat_max"]].to_numpy()
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in all_polys]
dh = float(all_polys[0, 3] - all_polys[0, 2])
try:
poly_mask = data["mask"]
except KeyError:
poly_mask = numpy.ones(len(bboxes))
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
return rates, region, magnitudes
[docs]
@staticmethod
def hdf5(filename, group=""):
"""
Load a gridded forecast from an HDF5 container.
Reads datasets created by :meth:`HDF5Serializer.grid2hdf5`. If `quadkeys`
are present, reconstructs a quadtree region; otherwise reconstructs a
cartesian region from stored bounding boxes.
Args:
filename (str): Path to the HDF5 file.
group (str): Group prefix in the HDF5 file.
Returns:
tuple:
- rates (numpy.ndarray): Forecast rates.
- region (csep.core.regions.CartesianGrid2D or
csep.core.regions.QuadtreeGrid2D): Reconstructed region.
- magnitudes (numpy.ndarray): Magnitude bin edges.
"""
start = time.process_time()
with h5py.File(filename, "r") as db:
rates = db[f"{group}/rates"][:]
magnitudes = db[f"{group}/magnitudes"][:]
if "quadkeys" in db.keys():
region = QuadtreeGrid2D.from_quadkeys(
db[f"{group}/quadkeys"][:].astype(str), magnitudes=magnitudes
)
region.get_cell_area()
else:
dh = db[f"{group}/dh"][:][0]
bboxes = db[f"{group}/bboxes"][:]
poly_mask = db[f"{group}/poly_mask"][:]
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
log.debug(f"Loading from hdf5 {filename} took:" f" {time.process_time() - start:.2f}")
return rates, region, magnitudes
[docs]
class HDF5Serializer:
"""
Serialize gridded forecast components to HDF5.
Stores the forecast rates, magnitude bins, and enough region geometry to
reconstruct a cartesian grid on load.
"""
[docs]
@staticmethod
def grid2hdf5(rates, region, mag, grp="", hdf5_filename=None, **kwargs):
"""
Store a cartesian gridded forecast into an HDF5 file.
Writes (or overwrites) datasets under ``grp``:
`rates`, `magnitudes`, `bboxes`, `dh`, and `poly_mask`. Extra keyword
arguments are stored as additional datasets.
Args:
rates (numpy.ndarray): Forecast rates.
region (csep.core.regions.CartesianGrid2D): Region defining the grid.
mag (numpy.ndarray): Magnitude bin edges.
grp (str): Group prefix in the HDF5 file.
hdf5_filename (str): Path to the HDF5 file.
**kwargs: Extra metadata to store as datasets.
Returns:
None
"""
start = time.process_time()
bboxes = numpy.array([i.points for i in region.polygons])
with h5py.File(hdf5_filename, "a") as hfile:
hfile.require_dataset(f"{grp}/rates", shape=rates.shape, dtype=float)
hfile[f"{grp}/rates"][:] = rates
hfile.require_dataset(f"{grp}/magnitudes", shape=mag.shape, dtype=float)
hfile[f"{grp}/magnitudes"][:] = mag
hfile.require_dataset(f"{grp}/bboxes", shape=bboxes.shape, dtype=float)
hfile[f"{grp}/bboxes"][:] = bboxes
hfile.require_dataset(f"{grp}/dh", shape=(1,), dtype=float)
try:
hfile[f"{grp}/dh"][:] = region.dh
except AttributeError:
raise AttributeError(
"Quadtree can not be dropped to HDF5"
"(not needed, because file is already"
" low sized"
)
hfile.require_dataset(f"{grp}/poly_mask", shape=region.poly_mask.shape, dtype=float)
hfile[f"{grp}/poly_mask"][:] = region.poly_mask
if kwargs:
for key, v in kwargs.items():
if isinstance(v, (float, int, str)):
dtype = type(v)
shape = (1,)
elif isinstance(v, numpy.ndarray):
shape = v.shape
dtype = v.dtype
else:
shape = len(v)
dtype = type(v[0])
hfile.require_dataset(f"{grp}/{key}", shape=shape, dtype=dtype)
hfile[f"{grp}/{key}"][:] = v
log.debug(f"Storing to hdf5 {hdf5_filename} took:" f"{time.process_time() - start:2f}")
[docs]
def serialize():
"""
Small CLI for testing gridded forecast parsers.
Parses `--format` and `--filename`, then loads the file with the selected
parser.
Returns:
None
"""
parser = argparse.ArgumentParser()
parser.add_argument("--format", help="format")
parser.add_argument("--filename", help="Model forecast name")
args = parser.parse_args()
if args.format == "quadtree":
GriddedForecastParsers.quadtree(args.filename)
if args.format == "dat":
GriddedForecastParsers.dat(args.filename)
if args.format == "csep" or args.format == "csv":
GriddedForecastParsers.csv(args.filename)
if args.format == "xml":
GriddedForecastParsers.xml(args.filename)
if __name__ == "__main__":
serialize()