Source code for ecmwf_models.era5.download

# -*- coding: utf-8 -*-

"""
Module to download ERA5 from terminal in netcdf and grib format.
"""

from ecmwf_models.utils import (
    load_var_table,
    lookup,
    save_gribs_from_grib,
    save_ncs_from_nc,
    mkdate,
    str2bool,
)
import warnings
import errno
import argparse
import sys
import os
import logging
from datetime import datetime, timedelta, time
import shutil
import cdsapi
import calendar
import multiprocessing


[docs]def default_variables(product="era5"): """ These variables are being downloaded, when None are passed by the user Parameters --------- product : str, optional (default: 'era5') Name of the era5 product to read the default variables for. Either 'era5' or 'era5-land'. """ lut = load_var_table(name=product) defaults = lut.loc[lut["default"] == 1]["dl_name"].values return defaults.tolist()
[docs]def download_era5( c, years, months, days, h_steps, variables, target, grb=False, product="era5", dry_run=False, cds_kwds={}, ): """ Download era5 reanalysis data for single levels of a defined time span Parameters ---------- c : cdsapi.Client Client to pass the request to years : list Years for which data is downloaded ,e.g. [2017, 2018] months : list Months for which data is downloaded, e.g. [4, 8, 12] days : list Days for which data is downloaded (range(31)=All days) e.g. [10, 20, 31] h_steps: list List of full hours to download data at the selected dates e.g [0, 12] variables : list, optional (default: None) List of variables to pass to the client, if None are passed, the default variables will be downloaded. target : str File name, where the data is stored. geb : bool, optional (default: False) Download data in grib format product : str ERA5 data product to download, either era5 or era5-land dry_run: bool, optional (default: False) Do not download anything, this is just used for testing the functionality cds_kwds: dict, optional Additional arguments to be passed to the CDS API retrieve request. Returns --------- success : bool Return True after downloading finished """ if dry_run: return request = { "format": "grib" if grb else "netcdf", "variable": variables, "year": [str(y) for y in years], "month": [str(m).zfill(2) for m in months], "day": [str(d).zfill(2) for d in days], "time": [time(h, 0).strftime("%H:%M") for h in h_steps], } request.update(cds_kwds) if product == "era5": request["product_type"] = "reanalysis" c.retrieve("reanalysis-era5-single-levels", request, target) elif product == "era5-land": c.retrieve("reanalysis-era5-land", request, target) else: raise ValueError( product, "Unknown product, choose either 'era5' or 'era5-land'" ) return True
[docs]class CDSStatusTracker: """ Track the status of the CDS download by using the CDS callback functions """ statuscode_ok = 0 statuscode_error = -1 statuscode_unavailable = 10 def __init__(self): self.download_statuscode = self.statuscode_ok
[docs] def handle_error_function(self, *args, **kwargs): message_prefix = args[0] message_body = args[1] if self.download_statuscode != self.statuscode_unavailable: if ( message_prefix.startswith("Reason:") and message_body == "Request returned no data" ): self.download_statuscode = self.statuscode_unavailable else: self.download_statuscode = self.statuscode_error logging.error(*args, **kwargs)
[docs]def download_and_move( target_path, startdate, enddate, product="era5", variables=None, keep_original=False, h_steps=[0, 6, 12, 18], grb=False, dry_run=False, grid=None, remap_method="bil", cds_kwds={}, stepsize="month", ) -> int: """ Downloads the data from the ECMWF servers and moves them to the target path. This is done in 30 day increments between start and end date. The files are then extracted into separate grib files per parameter and stored in yearly folders under the target_path. Parameters ---------- target_path : str Path where the files are stored to startdate: datetime first date to download enddate: datetime last date to download product : str, optional (default: ERA5) Either ERA5 or ERA5Land variables : list, optional (default: None) Name of variables to download keep_original: bool keep the original downloaded data h_steps: list List of full hours to download data at the selected dates e.g [0, 12] grb: bool, optional (default: False) Download data as grib files dry_run: bool Do not download anything, this is just used for testing the functions grid : dict, optional A grid on which to remap the data using CDO. This must be a dictionary using CDO's grid description format, e.g.:: grid = { "gridtype": "lonlat", "xsize": 720, "ysize": 360, "xfirst": -179.75, "yfirst": 89.75, "xinc": 0.5, "yinc": -0.5, } Default is to use no regridding. remap_method : str, optional Method to be used for regridding. Available methods are: - "bil": bilinear (default) - "bic": bicubic - "nn": nearest neighbour - "dis": distance weighted - "con": 1st order conservative remapping - "con2": 2nd order conservative remapping - "laf": largest area fraction remapping cds_kwds: dict, optional Additional arguments to be passed to the CDS API retrieve request. stepsize : str, optional Size of steps for requests, can be "month" or "day". Returns ------- status_code: int 0 : Downloaded data ok -1 : Error -10 : No data available for requested time period """ product = product.lower() if variables is None: variables = default_variables(product=product) else: # find the dl_names variables = lookup(name=product, variables=variables) variables = variables["dl_name"].values.tolist() curr_start = startdate if dry_run: warnings.warn("Dry run does not create connection to CDS") c = None cds_status_tracker = None else: cds_status_tracker = CDSStatusTracker() c = cdsapi.Client( error_callback=cds_status_tracker.handle_error_function ) pool = multiprocessing.Pool(1) while curr_start <= enddate: status_code = -1 sy, sm, sd = curr_start.year, curr_start.month, curr_start.day y, m = sy, sm if stepsize == "month": sm_days = calendar.monthrange(sy, sm)[ 1 ] # days in the current month if (enddate.year == y) and (enddate.month == m): d = enddate.day else: d = sm_days elif stepsize == "day": d = sd else: raise ValueError(f"Invalid stepsize: {stepsize}") curr_end = datetime(y, m, d) fname = "{start}_{end}.{ext}".format( start=curr_start.strftime("%Y%m%d"), end=curr_end.strftime("%Y%m%d"), ext="grb" if grb else "nc", ) downloaded_data_path = os.path.join(target_path, "temp_downloaded") if not os.path.exists(downloaded_data_path): os.mkdir(downloaded_data_path) dl_file = os.path.join(downloaded_data_path, fname) finished, i = False, 0 while (not finished) and (i < 5): # try max 5 times try: finished = download_era5( c, years=[sy], months=[sm], days=range(sd, d + 1), h_steps=h_steps, variables=variables, grb=grb, product=product, target=dl_file, dry_run=dry_run, cds_kwds=cds_kwds, ) status_code = 0 break except: # noqa: E722 # If no data is available we don't need to retry if ( cds_status_tracker.download_statuscode == CDSStatusTracker.statuscode_unavailable ): status_code = -10 break # delete the partly downloaded data and retry if os.path.isfile(dl_file): os.remove(dl_file) finished = False i += 1 continue if status_code == 0: if grb: pool.apply_async( save_gribs_from_grib, args=(dl_file, target_path), kwds=dict( product_name=product.upper(), keep_original=keep_original, ), ) else: pool.apply_async( save_ncs_from_nc, args=( dl_file, target_path, ), kwds=dict( product_name=product.upper(), grid=grid, remap_method=remap_method, keep_original=keep_original, ), ) curr_start = curr_end + timedelta(days=1) pool.close() pool.join() # remove temporary files if not keep_original: shutil.rmtree(downloaded_data_path) if grid is not None: gridpath = os.path.join(target_path, "grid.txt") if os.path.exists(gridpath): os.unlink(gridpath) weightspath = os.path.join(target_path, "remap_weights.nc") if os.path.exists(weightspath): os.unlink(weightspath) return status_code
[docs]def parse_args(args): """ Parse command line parameters for recursive download Parameters ---------- args : list Command line parameters as list of strings Returns ---------- clparams : argparse.Namespace Parsed command line parameters """ parser = argparse.ArgumentParser( description="Download ERA 5 reanalysis data images between two " "dates. Before this program can be used, you have to " "register at the CDS and setup your .cdsapirc file " "as described here: " "https://cds.climate.copernicus.eu/api-how-to" ) parser.add_argument( "localroot", help="Root of local filesystem where the downloaded data will be " "stored.", ) parser.add_argument( "-s", "--start", type=mkdate, default=datetime(1979, 1, 1), help=( "Startdate in format YYYY-MM-DD. " "If no data is found there then the first available date of the " "product is used." ), ) parser.add_argument( "-e", "--end", type=mkdate, default=datetime.now(), help=( "Enddate in format YYYY-MM-DD. " "If not given then the current date is used." ), ) parser.add_argument( "-p", "--product", type=str, default="ERA5", help=( "The ERA5 product to download. Choose either ERA5 or ERA5-Land. " "Default is ERA5." ), ) parser.add_argument( "-var", "--variables", metavar="variables", type=str, default=None, nargs="+", help=( "Name of variables to download. If None are passed, we use the " "default ones from the " "era5_lut.csv resp. era5-land_lut.csv files in this package. " "See the ERA5/ERA5-LAND documentation for more variable names: " " https://confluence.ecmwf.int/display/CKB/" "ERA5+data+documentation " " https://confluence.ecmwf.int/display/CKB/" "ERA5-Land+data+documentation" ), ) parser.add_argument( "-keep", "--keep_original", type=str2bool, default="False", help=( "Also keep the originally, temporarily downloaded image stack " "instead of deleting it after extracting single images. " "Default is False." ), ) parser.add_argument( "-grb", "--as_grib", type=str2bool, default="False", help=( "Download data in grib format instead of netcdf. " "Default is False." ), ) parser.add_argument( "--h_steps", type=int, default=[0, 6, 12, 18], nargs="+", help=( "Temporal resolution of downloaded images. " "Pass a set of full hours here, like '--h_steps 0 12'. " "By default 6H images (starting at 0:00 UTC, i.e. 0 6 12 18) " "will be downloaded" ), ) args = parser.parse_args(args) print( "Downloading {p} {f} files between {s} and {e} into folder {root}" .format( p=args.product, f="grib" if args.as_grib is True else "netcdf", s=args.start.isoformat(), e=args.end.isoformat(), root=args.localroot, ) ) return args
[docs]def main(args): args = parse_args(args) status_code = download_and_move( target_path=args.localroot, startdate=args.start, enddate=args.end, product=args.product, variables=args.variables, h_steps=args.h_steps, grb=args.as_grib, keep_original=args.keep_original, ) return status_code
[docs]def run(): status_code = main(sys.argv[1:]) if status_code == -10: return_code = errno.ENODATA # Default no data status code of 61 else: return_code = status_code sys.exit(return_code)