import argparse
import logging
import os.path
import time
import xml.etree.ElementTree as eTree
import h5py
import numpy
import pandas
from csep.core.regions import QuadtreeGrid2D, CartesianGrid2D
from csep.models import Polygon
log = logging.getLogger(__name__)
[docs]
class ForecastParsers:
[docs]
@staticmethod
def dat(filename):
data = numpy.loadtxt(filename)
all_polys = data[:, :4]
all_poly_mask = data[:, -1]
sorted_idx = numpy.sort(
numpy.unique(all_polys, return_index=True, axis=0)[1], kind="stable"
)
unique_poly = all_polys[sorted_idx]
poly_mask = all_poly_mask[sorted_idx]
all_mws = data[:, -4]
sorted_idx = numpy.sort(numpy.unique(all_mws, return_index=True)[1], kind="stable")
mws = all_mws[sorted_idx]
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in unique_poly]
dh = float(unique_poly[0, 3] - unique_poly[0, 2])
n_mag_bins = len(mws)
rates = data[:, -2].reshape(len(bboxes), n_mag_bins)
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
return rates, region, mws
[docs]
@staticmethod
def xml(filename, verbose=False):
tree = eTree.parse(filename)
root = tree.getroot()
metadata = {}
data_ijm = []
m_bins = []
cells = []
cell_dim = {}
for k, children in enumerate(list(root[0])):
if "modelName" in children.tag:
name_xml = children.text
metadata["name"] = name_xml
elif "author" in children.tag:
author_xml = children.text
metadata["author"] = author_xml
elif "forecastStartDate" in children.tag:
start_date = children.text.replace("Z", "")
metadata["forecastStartDate"] = start_date
elif "forecastEndDate" in children.tag:
end_date = children.text.replace("Z", "")
metadata["forecastEndDate"] = end_date
elif "defaultMagBinDimension" in children.tag:
m_bin_width = float(children.text)
metadata["defaultMagBinDimension"] = m_bin_width
elif "lastMagBinOpen" in children.tag:
lastmbin = float(children.text)
metadata["lastMagBinOpen"] = lastmbin
elif "defaultCellDimension" in children.tag:
cell_dim = {i[0]: float(i[1]) for i in children.attrib.items()}
metadata["defaultCellDimension"] = cell_dim
elif "depthLayer" in children.tag:
depth = {i[0]: float(i[1]) for i in root[0][k].attrib.items()}
cells = root[0][k]
metadata["depthLayer"] = depth
if verbose:
log.debug(f"Forecast with metadata:\n{metadata}")
for cell in cells:
cell_data = []
m_cell_bins = []
for i, m in enumerate(cell.iter()):
if i == 0:
cell_data.extend([float(m.attrib["lon"]), float(m.attrib["lat"])])
else:
cell_data.append(float(m.text))
m_cell_bins.append(float(m.attrib["m"]))
data_ijm.append(cell_data)
m_bins.append(m_cell_bins)
try:
data_ijm = numpy.array(data_ijm)
m_bins = numpy.array(m_bins)
except (TypeError, ValueError):
raise Exception("Data is not square")
magnitudes = m_bins[0, :]
rates = data_ijm[:, -len(magnitudes) :]
all_polys = numpy.vstack(
(
data_ijm[:, 0] - cell_dim["lonRange"] / 2.0,
data_ijm[:, 0] + cell_dim["lonRange"] / 2.0,
data_ijm[:, 1] - cell_dim["latRange"] / 2.0,
data_ijm[:, 1] + cell_dim["latRange"] / 2.0,
)
).T
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in all_polys]
dh = float(all_polys[0, 3] - all_polys[0, 2])
poly_mask = numpy.ones(len(bboxes))
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
return rates, region, magnitudes
[docs]
@staticmethod
def quadtree(filename):
with open(filename, "r") as file_:
qt_header = file_.readline().split(",")
formats = [str]
for i in range(len(qt_header) - 1):
formats.append(float)
qt_formats = {i: j for i, j in zip(qt_header, formats)}
data = pandas.read_csv(filename, header=0, dtype=qt_formats)
quadkeys = numpy.array([i.encode("ascii", "ignore") for i in data.tile])
magnitudes = numpy.array(data.keys()[3:]).astype(float)
rates = data[magnitudes.astype(str)].to_numpy()
region = QuadtreeGrid2D.from_quadkeys(quadkeys.astype(str), magnitudes=magnitudes)
region.get_cell_area()
return rates, region, magnitudes
[docs]
@staticmethod
def csv(filename):
def is_mag(num):
try:
m = float(num)
if -1 < m < 12.0:
return True
else:
return False
except ValueError:
return False
with open(filename, "r") as file_:
line = file_.readline()
if len(line.split(",")) > 3:
sep = ","
else:
sep = " "
if "tile" in line:
rates, region, magnitudes = ForecastParsers.quadtree(filename)
return rates, region, magnitudes
data = pandas.read_csv(
filename, header=0, sep=sep, escapechar="#", skipinitialspace=True
)
data.columns = [i.strip() for i in data.columns]
magnitudes = numpy.array([float(i) for i in data.columns if is_mag(i)])
rates = data[[i for i in data.columns if is_mag(i)]].to_numpy()
all_polys = data[["lon_min", "lon_max", "lat_min", "lat_max"]].to_numpy()
bboxes = [((i[0], i[2]), (i[0], i[3]), (i[1], i[3]), (i[1], i[2])) for i in all_polys]
dh = float(all_polys[0, 3] - all_polys[0, 2])
try:
poly_mask = data["mask"]
except KeyError:
poly_mask = numpy.ones(len(bboxes))
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
return rates, region, magnitudes
[docs]
@staticmethod
def hdf5(filename, group=""):
start = time.process_time()
with h5py.File(filename, "r") as db:
rates = db[f"{group}/rates"][:]
magnitudes = db[f"{group}/magnitudes"][:]
if "quadkeys" in db.keys():
region = QuadtreeGrid2D.from_quadkeys(
db[f"{group}/quadkeys"][:].astype(str), magnitudes=magnitudes
)
region.get_cell_area()
else:
dh = db[f"{group}/dh"][:][0]
bboxes = db[f"{group}/bboxes"][:]
poly_mask = db[f"{group}/poly_mask"][:]
region = CartesianGrid2D([Polygon(bbox) for bbox in bboxes], dh, mask=poly_mask)
log.debug(f"Loading from hdf5 {filename} took:" f" {time.process_time() - start:.2f}")
return rates, region, magnitudes
[docs]
class HDF5Serializer:
[docs]
@staticmethod
def grid2hdf5(rates, region, mag, grp="", hdf5_filename=None, **kwargs):
start = time.process_time()
bboxes = numpy.array([i.points for i in region.polygons])
with h5py.File(hdf5_filename, "a") as hfile:
hfile.require_dataset(f"{grp}/rates", shape=rates.shape, dtype=float)
hfile[f"{grp}/rates"][:] = rates
hfile.require_dataset(f"{grp}/magnitudes", shape=mag.shape, dtype=float)
hfile[f"{grp}/magnitudes"][:] = mag
hfile.require_dataset(f"{grp}/bboxes", shape=bboxes.shape, dtype=float)
hfile[f"{grp}/bboxes"][:] = bboxes
hfile.require_dataset(f"{grp}/dh", shape=(1,), dtype=float)
try:
hfile[f"{grp}/dh"][:] = region.dh
except AttributeError:
raise AttributeError(
"Quadtree can not be dropped to HDF5"
"(not needed, because file is already"
" low sized"
)
hfile.require_dataset(f"{grp}/poly_mask", shape=region.poly_mask.shape, dtype=float)
hfile[f"{grp}/poly_mask"][:] = region.poly_mask
if kwargs:
for key, v in kwargs.items():
if isinstance(v, (float, int, str)):
dtype = type(v)
shape = (1,)
elif isinstance(v, numpy.ndarray):
shape = v.shape
dtype = v.dtype
else:
shape = len(v)
dtype = type(v[0])
hfile.require_dataset(f"{grp}/{key}", shape=shape, dtype=dtype)
hfile[f"{grp}/{key}"][:] = v
log.debug(f"Storing to hdf5 {hdf5_filename} took:" f"{time.process_time() - start:2f}")
[docs]
def serialize():
parser = argparse.ArgumentParser()
parser.add_argument("--format", help="format")
parser.add_argument("--filename", help="Model forecast name")
args = parser.parse_args()
if args.format == "quadtree":
ForecastParsers.quadtree(args.filename)
if args.format == "dat":
ForecastParsers.dat(args.filename)
if args.format == "csep" or args.format == "csv":
ForecastParsers.csv(args.filename)
if args.format == "xml":
ForecastParsers.xml(args.filename)
if __name__ == "__main__":
serialize()