# -*- coding: utf-8 -*-
# The MIT License (MIT)
#
# Copyright (c) 2019, TU Wien
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Utility functions for all data products in this package.
"""
import warnings
import os
from datetime import datetime
import xarray as xr
import pandas as pd
from datedown.fname_creator import create_dt_fpath
import argparse
import numpy as np
from netCDF4 import Dataset
from collections import OrderedDict
try:
from cdo import Cdo
cdo_available = True
except ImportError:
cdo_available = False
try:
import pygrib
pygrib_available = True
except ImportError:
pygrib_available = False
[docs]class CdoNotFoundError(ModuleNotFoundError):
def __init__(self, msg=None):
_default_msg = "cdo and/or python-cdo not installed. " \
"Use conda to install it them under Linux."
self.msg = _default_msg if msg is None else msg
[docs]def str2bool(v):
"""
Parse a string to True/False
Parameters
---------
v : str
String to parse, must be part of the lists below.
Return
---------
str2bool : bool
The parsed bool from the passed string
"""
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
[docs]def save_ncs_from_nc(
input_nc,
output_path,
product_name,
filename_templ="{product}_AN_%Y%m%d_%H%M.nc",
grid=None,
keep_original=True,
remap_method="bil",
):
"""
Split the downloaded netcdf file into daily files and add to folder
structure necessary for reshuffling.
Parameters
----------
input_nc : str
Filepath of the downloaded .nc file
output_path : str
Where to save the resulting netcdf files
product_name : str
Name of the ECMWF model (only for filename generation)
filename_templ : str, optional (default: product_grid_date_time)
Template for naming each separated nc file
keep_original: bool
keep the original downloaded data
"""
localsubdirs = ["%Y", "%j"]
nc_in = xr.open_dataset(input_nc, mask_and_scale=True)
filename_templ = filename_templ.format(product=product_name)
if grid is not None:
if not cdo_available:
raise CdoNotFoundError()
cdo = Cdo()
gridpath = os.path.join(output_path, "grid.txt")
weightspath = os.path.join(output_path, "remap_weights.nc")
if not os.path.exists(gridpath):
with open(gridpath, "w") as f:
for k, v in grid.items():
f.write(f"{k} = {v}\n")
for time in nc_in.time.values:
subset = nc_in.sel(time=time)
timestamp = pd.Timestamp(time).to_pydatetime()
filepath = create_dt_fpath(
timestamp,
root=output_path,
fname=filename_templ,
subdirs=localsubdirs,
)
if not os.path.exists(os.path.dirname(filepath)):
os.makedirs(os.path.dirname(filepath))
if grid is not None:
if not os.path.exists(weightspath):
# create weights file
getattr(cdo, "gen" + remap_method)(
gridpath, input=subset, output=weightspath
)
subset = cdo.remap(
",".join([gridpath, weightspath]),
input=subset,
returnXDataset=True,
)
# same compression for all variables
var_encode = {"zlib": True, "complevel": 6}
subset.to_netcdf(
filepath, encoding={var: var_encode for var in subset.variables}
)
nc_in.close()
if not keep_original:
os.remove(input_nc)
if grid is not None:
cdo.cleanTempDir()
[docs]def save_gribs_from_grib(
input_grib,
output_path,
product_name,
filename_templ="{product}_AN_%Y%m%d_%H%M.grb",
keep_original=True,
):
"""
Split the downloaded grib file into daily files and add to folder structure
necessary for reshuffling.
Parameters
----------
input_grib : str
Filepath of the downloaded .grb file
output_path : str
Where to save the resulting grib files
product_name : str
Name of the ECMWF model (only for filename generation)
filename_templ : str, optional (default: product_OPER_0001_AN_date_time)
Template for naming each separated grb file
"""
localsubdirs = ["%Y", "%j"]
grib_in = pygrib.open(input_grib)
grib_in.seek(0)
for grb in grib_in:
template = filename_templ
filedate = datetime(grb["year"], grb["month"], grb["day"], grb["hour"])
template = template.format(product=product_name)
filepath = create_dt_fpath(
filedate, root=output_path, fname=template, subdirs=localsubdirs
)
if not os.path.exists(os.path.dirname(filepath)):
os.makedirs(os.path.dirname(filepath))
grb_out = open(filepath, "ab")
grb_out.write(grb.tostring())
grb_out.close()
grib_in.close()
if not keep_original:
os.remove(input_grib)
[docs]def mkdate(datestring):
"""
Turns a datetime string into a datetime object
Parameters
-----------
datestring: str
Input datetime string
Returns
-----------
datetime : datetime
Converted string
"""
if len(datestring) == 10:
return datetime.strptime(datestring, "%Y-%m-%d")
if len(datestring) == 16:
return datetime.strptime(datestring, "%Y-%m-%dT%H:%M")
[docs]def parse_product(inpath: str) -> str:
"""
Tries to find out what product is stored in the path. This is done based
on the name of the first file in the path that is found.
Parameters
----------
inpath: str
Input path where ERA data was downloaded to
Returns
-------
product : str
Product name
"""
onedown = os.path.join(inpath, os.listdir(inpath)[0])
twodown = os.path.join(onedown, os.listdir(onedown)[0])
for path, subdirs, files in os.walk(twodown):
for name in files:
filename, extension = os.path.splitext(name)
parts = filename.split("_")
if "ERA5-LAND" in parts:
return "era5-land"
elif "ERA5" in parts:
return "era5"
elif "ERAINT" in parts:
return "eraint"
else:
continue
[docs]def parse_filetype(inpath):
"""
Tries to find out the file type by searching for
grib or nc files two subdirectories into the passed input path.
If function fails, grib is assumed.
Parameters
----------
inpath: str
Input path where ERA data was downloaded to
Returns
-------
filetype : str
File type string.
"""
onedown = os.path.join(inpath, os.listdir(inpath)[0])
twodown = os.path.join(onedown, os.listdir(onedown)[0])
filelist = []
for path, subdirs, files in os.walk(twodown):
for name in files:
filename, extension = os.path.splitext(name)
filelist.append(extension)
if ".nc" in filelist and ".grb" not in filelist:
return "netcdf"
elif ".grb" in filelist and ".nc" not in filelist:
return "grib"
else:
# if file type cannot be detected, guess grib
return "grib"
[docs]def load_var_table(name="era5", lut=False):
"""
Load the variables table for supported variables to download.
Parameters
----------
lut : bool, optional (default: False)
If set to true only names are loaded, so that they can be used
for a LUT otherwise the full table is loaded
"""
name = name.lower()
if name == "era5":
era_vars_csv = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "era5", "era5_lut.csv"
)
elif name == "era5-land":
era_vars_csv = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"era5",
"era5-land_lut.csv",
)
elif name == "eraint":
era_vars_csv = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"erainterim",
"eraint_lut.csv",
)
else:
raise ValueError(name, "No LUT for the selected dataset found.")
if lut:
dat = pd.read_csv(era_vars_csv)[["dl_name", "long_name", "short_name"]]
else:
dat = pd.read_csv(era_vars_csv)
return dat
[docs]def lookup(name, variables):
"""
Search the passed elements in the lookup table, if one does not exists,
raise a Warning
"""
lut = load_var_table(name=name, lut=True)
selected = []
for var in variables:
found = False
for row in lut.itertuples():
if var in row:
selected.append(row.Index)
found = True
break
if found:
continue
else:
raise ValueError(
f"Passed variable {var} is not a supported variable."
)
return lut.loc[selected, :]
[docs]def get_default_params(name="era5"):
"""
Read only lines that are marked as default variable in the csv file
Parameters
----------
name : str
Name of the product to get the default parameters for
"""
vars = load_var_table(name, lut=False)
return vars.loc[vars.default == 1.0]
[docs]def make_era5_land_definition_file(
data_file,
out_file,
data_file_y_res=0.25,
ref_var="lsm",
threshold=0.5,
exclude_antarctica=True,
):
"""
Create a land grid definition file from a variable within a downloaded,
regular (netcdf) era5 file.
Parameters
----------
data_file : str
Path to the downloaded file that cotains the image that is used as the
reference for creating the land definition file.
out_file: str
Full output path to the land defintion file to create.
data_file_y_res : float, optional (default: 0.25)
The resolution of the data file in latitude direction.
ref_var: str, optional (default: 'lsm')
A variable in the data_file that is the reference for the
land definiton.
By default, we use the land-sea-mask variable.
threshold: float, optional (default: 0.5)
Threshold value below which a point is declared water,
and above (or equal) which it is declared a land-point.
If None is passed, then a point is declared a land point
if it is not masked (numpy masked array) in the reference variable.
exclude_antarctica: bool, optional (default: True)
Cut off the definition file at -60° Lat to exclude Land Points
in Antarctica.
"""
lat_name, lon_name = "latitude", "longitude"
ds_in = Dataset(data_file)
ds_out = Dataset(out_file, "w", format="NETCDF4")
for dim_name in ds_in.dimensions.keys():
ds_out.createDimension(dim_name, size=ds_in.dimensions[dim_name].size)
ds_out.createVariable(dim_name, "float32", (dim_name,), zlib=True)
ds_out.variables[dim_name][:] = ds_in.variables[dim_name][:]
ref = ds_in.variables[ref_var]
land_mask = np.zeros(ref.shape)
if np.isnan(threshold):
land_mask[~ref[:].mask] = 1.0
else:
land_mask[ref[:] >= threshold] = 1.0
# drop values below -60° Lat
if exclude_antarctica:
cut_off_lat = -60.0
index_thres_lat = ((180.0 / data_file_y_res) + 1) - (
(90.0 + cut_off_lat) / data_file_y_res
)
land_mask[int(index_thres_lat):, :] = np.nan
else:
cut_off_lat = None
ds_out.createVariable("land", "float32", (lat_name, lon_name), zlib=True)
ds_out.variables["land"][:] = land_mask
land_attrs = OrderedDict(
[
("units", "(0,1)"),
("long_name", "Land-sea mask"),
("based_on_variable", ref_var),
("standard_name", "land_binary_mask"),
("threshold_land_>=", str(threshold)),
("cut_off_at", str(cut_off_lat)),
]
)
for attr, val in land_attrs.items():
ds_out.variables["land"].setncattr(attr, val)
ds_in.close()
ds_out.close()