import logging
import os
import re
from copy import deepcopy

import six
from ocgis import constants
from ocgis import env
from ocgis.constants import DMK, DriverKey, DecompositionType
from ocgis.driver.dimension_map import DimensionMap
from ocgis.driver.registry import get_driver_class, driver_registry
from ocgis.driver.request.base import AbstractRequestObject
from ocgis.exc import RequestValidationError, NoDataVariablesFound, VariableNotFoundError
from ocgis.util.helpers import get_iter, locate, validate_time_subset, get_tuple
from ocgis.util.logging_ocgis import ocgis_lh
from ocgis.util.units import get_units_object, get_are_units_equivalent

class RequestDataset(AbstractRequestObject):
    """
    Contains all the information necessary to create an OCGIS field via an OCGIS driver.

    >>> from ocgis import RequestDataset
    >>> uri = 'http://some.opendap.dataset'
    >>> # It is also okay to enter the path to a local file.
    >>> uri = '/path/to/local/'
    >>> variable = 'tasmax'
    >>> rd = RequestDataset(uri, variable)

    :param uri: The absolute path (URLs included) to the data's location. If ``None``, either ``opened`` or
     ``metadata`` must be provided.
    :type uri: :class:`str` | `sequence` of :class:`str` | ``None``

    >>> uri = 'http://some.opendap.dataset'
    >>> uri = '/path/to/local/'
    >>> # Multifile datasets are supported for local and remote targets.
    >>> uri = ['/path/to/local/', '/path/to/local/']

    .. warning:: There is no internal checking on the ordering of the files. If the datasets should be concatenated along the time dimension, it may be a good idea to run the sequence of URIs through a time sorting function :func:`~ocgis.util.helpers.get_sorted_uris_by_time_dimension`. :param variable: The target variable. If the argument value is ``None``, then a search on the target data object will be performed to find variables having a minimum set of dimensions (i.e. time and space). The value of this property will then be updated. :type variable: :class:`str` | `sequence` of :class:`str` | ``None`` >>> variable = 'tas' >>> variable = ['tas', 'tasmax'] :param time_range: Lower and upper bounds for time dimension subsetting. If ``None``, return all time points. :type time_range: two-element `sequence` of ``datetime``-like objects :param dict time_region: A dictionary with keys of ``'month'`` and/or ``'year'`` and values as sequences corresponding to target month and/or year values. Empty region selection for a key may be set to ``None``. >>> time_region = {'month':[6,7],'year':[2010,2011]} >>> time_region = {'year':[2010]} :param time_subset_func: See :meth:`~ocgis.TemporalVariable.get_subset_by_function`. :param level_range: Lower and upper bounds for level dimension subsetting. If ``None``, return all levels. :type level_range: two-element `sequence` of :class:`int` or :class:`float` :param crs: Overload the autodiscovered coordinate system. :type crs: :class:`` >>> from import WGS84 >>> crs = WGS84() :param str t_units: Overload the `time units`_. :param str t_calendar: Overload the `time calendar`_. :param str t_conform_units_to: Conform the time dimension to the provided units. The calendar may not be changed. The optional dependency ``cf_units`` is required. >>> t_conform_units_to = 'days since 1949-1-1' :param str grid_abstraction: Abstract the geometry data to either ``'point'`` or ``'polygon'``. If ``'polygon'`` is not possible due to missing bounds, ``'point'`` will be used instead. If ``'auto'`` (the default), identify the grid abstraction automatically. Unstructured data formats also allow for ``'line'``. .. note:: The ``abstraction`` argument in :class:`~ocgis.OcgOperations` will overload this. :param dimension_map: Maps dimensions to axes in the case of a projection/realization axis or an uncommon axis ordering. All axes must be in the dictionary. A fully-specified dimension map for a CF grid file containing time, x, and y axes is below. The file also contains a scalar level axis. At minimum, a ``'variable'`` must be provided for each axis. See :ref:`configuring-a-dimension-map` for a usage example. :type dimension_map: :class:`~ocgis.DimensionMap` | dict :param units: The units of the source variable. This will be read from metadata if this value is ``None``. :type units: str | :class:`cf_units.Unit` | `sequence` of possible types :param conform_units_to: Destination units for conversion. If this parameter is set, then the :mod:`cf_units` module must be installed. :type conform_units_to: :class:`str` | :class:`cfunits.Units` | `sequence` of possible types :param driver: If ``None``, autodiscover the appropriate driver. Acceptable values are listed below. Class objects for the associated driver key are also accepted. :type driver: str | :class:`~ocgis.driver.base.AbstractDriver` ================== ================= ====================================================================== Value File Extension(s) Description ================== ================= ====================================================================== ``'netcdf-cf'`` ``'nc'`` A netCDF file using a CF-Grid metadata convention. ``'netcdf-ugrid'`` ``'nc'`` A netCDF file using the UGRID (Unstructured Grid) metadata convention. ``'netcdf-scrip'`` ``'nc'`` A netCDF file using the SCRIP metadata convention. ``'netcdf'`` ``'nc'`` A netCDF file with no metadata convention. ``'vector'`` ``'shp'`` An ESRI Shapefile or other vector source. ``'csv'`` ``'csv'`` A CSV file. ================== ================= ====================================================================== :param str field_name: Name of the requested field in the output collection. If ``None``, defaults to the variable defaults to the data variable name. If there are multiple data variables, the default name is ``'ocgis_field'``. :param bool regrid_source: If ``False``, do not regrid this dataset. This is relevant only if a ``regrid_destination`` dataset is present. Please see :ref:`esmpy-regridding` for an overview. :param bool regrid_destination: If ``True``, use this dataset as the destination grid for a regridding operation. Only one :class:`~ocgis.RequestDataset` may be set as the destination grid. Please see :ref:`esmpy-regridding` for an overview. :param rename_variable: A sequence with the same length as ``variable``. Provides new names for the variables. :type rename_variable: `sequence` of :class:`str` :param dict metadata: Overload the metadata that would normally be loaded by the driver. If metadata is provided and ``uri`` is ``None``, a field will be created by interpreting the provided metadata. :param opened: An open file used as a write target for the driver. :type opened: varies by ``driver`` class :param int uid: A unique identifier for the request dataset. :param predicate: A filter function returning ``True`` if a variable should be included in the output field. The function should take a single argument which is a sequence of string variable names. This function is applied directly to the metadata before other functions (i.e. identifying data variables). :type predicate: `function` :param bool rotated_pole_priority: If ``False``, attempt to use representative spherical coordinates if available in a dataset having a rotated pole coordinate system. If ``True``, use the rotated coordinate even if representative coordinates are available. >>> predicate = lambda x: x.startswith('w') .. _time units: .. _time calendar: :param dict driver_kwargs: Any keyword arguments to driver creation. See the driver documentation for a description of accepted parameters. These are often format-specific and not easily generalized. :param grid_is_isomorphic: See documentation for :class:`Field` :param decomp_type: The parallel decomposition type to use. This may be left alone in many cases unless a specific parallel use case such as interfacing with ``ESMF`` is required. :type decomp_type: :class:`ocgis.constants.DecompositionType` """ def __init__(self, uri=None, variable=None, units=None, time_range=None, time_region=None, time_subset_func=None, level_range=None, conform_units_to=None, crs='auto', t_units=None, t_calendar=None, t_conform_units_to=None, grid_abstraction='auto', grid_is_isomorphic='auto', dimension_map=None, field_name=None, driver=None, regrid_source=True, regrid_destination=False, metadata=None, format_time=True, opened=None, uid=None, rename_variable=None, predicate=None, rotated_pole_priority=False, driver_kwargs=None, decomp_type=DecompositionType.OCGIS): self._is_init = True self._field_name = field_name self._level_range = None self._time_range = None self._time_region = None self._time_subset_func = None self._driver_kwargs = driver_kwargs self._decomp_type = decomp_type if rename_variable is not None: rename_variable = get_tuple(rename_variable) self._rename_variable = rename_variable self.rotated_pole_priority = rotated_pole_priority self.predicate = predicate if dimension_map is not None and isinstance(dimension_map, dict): dimension_map = DimensionMap.from_dict(dimension_map) self._dimension_map = dimension_map self._metadata = deepcopy(metadata) self._uri = None self.uid = uid # This is an "open" file-like object that may be passed in-place of file location parameters. self._opened = opened if opened is not None and driver is None: msg = 'If "opened" is not None, then a "driver" must be provided.' ocgis_lh(logger='request', exc=RequestValidationError('driver', msg)) # Field creation options. self.format_time = format_time self.grid_abstraction = grid_abstraction self.grid_is_isomorphic = grid_is_isomorphic # Flag used for regridding to determine if the coordinate system was assigned during initialization. self._has_assigned_coordinate_system = False if crs == 'auto' else True if uri is None: # Fields may be created from pure metadata. if metadata is not None: # The default OCGIS driver is NetCDF. if driver is None: driver = DriverKey.NETCDF_CF elif opened is None: ocgis_lh(logger='request', exc=RequestValidationError('uri', 'Cannot be None')) else: self._uri = get_uri(uri) if driver is None: klass = get_autodiscovered_driver(uri) else: klass = get_driver(driver) self._driver = klass(self) if variable is not None: variable = get_tuple(variable) self._variable = variable self.time_range = time_range self.time_region = time_region self.time_subset_func = time_subset_func self.level_range = level_range self._crs = deepcopy(crs) self.regrid_source = regrid_source self.regrid_destination = regrid_destination self.units = units self.conform_units_to = conform_units_to self._is_init = False self._validate_time_subset_() # Update metadata for time variable. tvar = self.dimension_map.get_variable(DMK.TIME) if tvar is not None: m = self.metadata['variables'][tvar] if t_units is not None: m['attrs']['units'] = t_units if t_calendar is not None: m['attrs']['calendar'] = t_calendar if t_conform_units_to is not None: from ocgis.util.units import get_units_object t_calendar = m['attrs'].get('calendar', constants.DEFAULT_TEMPORAL_CALENDAR) t_conform_units_to = get_units_object(t_conform_units_to, calendar=t_calendar) m['conform_units_to'] = t_conform_units_to def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __iter__(self): attrs = ['variable', 'units', 'conform_units_to'] for ii in range(len(self)): yield {a: get_tuple(getattr(self, a))[ii] for a in attrs} def __len__(self): try: ret = len(get_tuple(self.variable)) except NoDataVariablesFound: ret = 0 return ret @property def conform_units_to(self): ret = [] m = self.metadata['variables'] for v in get_iter(self.variable): ret.append(m[v].get('conform_units_to')) ret = get_first_or_tuple(ret) return ret @conform_units_to.setter def conform_units_to(self, value): if value is not None: value = get_tuple(value) if len(value) != len(get_tuple(self.variable)): msg = 'Must match "variable" element-wise. The sequence lengths differ.' raise RequestValidationError('units', msg) if env.USE_CFUNITS: validate_units('conform_units_to', value) # If we are conforming units, assert that units are equivalent. validate_unit_equivalence(get_tuple(self.units), value) m = self.metadata['variables'] for v, u in zip(get_tuple(self.variable), value): m[v]['conform_units_to'] = u @property def _conform_units_to(self): raise NotImplementedError @property def crs(self): if self._crs == 'auto': ret = self.driver.get_crs(self.metadata) else: ret = self._crs return ret @property def decomp_type(self): """ Get the parallel decomposition type used by the request dataset. :rtype: :class:`ocgis.constants.DecompositionType` """ return self._decomp_type @property def driver(self): """ Get the driver to use for field creation. :return: :class:`~ocgis.Field` """ return self._driver @property def level_range(self): return self._level_range.value @level_range.setter def level_range(self, value): from ocgis.ops.parms.definition import LevelRange self._level_range = LevelRange(value) @property def dimension_map(self): if self._dimension_map is None: dimension_map_raw = self.driver.dimension_map_raw self._dimension_map = deepcopy(dimension_map_raw) self._dimension_map.update_dimensions_from_metadata(self.metadata) return self._dimension_map @property def driver_kwargs(self): return self._driver_kwargs @property def field_name(self): if self._field_name is None: # Use renamed variables for field names. Often there is a single variable in the request. This ensures # unique field names if renamed variables are unique. ret = list(get_iter(self.rename_variable)) if len(ret) > 1: msg = 'No default "field_name" based on variables name possible with multiple data variables: {}. ' \ 'Using default field name: {}.'.format(self.variable, constants.MiscName.DEFAULT_FIELD_NAME) ocgis_lh(msg=msg, level=logging.WARN) ret = constants.MiscName.DEFAULT_FIELD_NAME else: ret = ret[0] else: ret = self._field_name return ret @field_name.setter def field_name(self, value): self._field_name = value @property def has_data_variables(self): """Return ``True`` if data variables are found in the target dataset.""" try: assert self.variable ret = True except NoDataVariablesFound: ret = False return ret @property def metadata(self): if self._metadata is None: metadata = self.driver.metadata_raw self._metadata = deepcopy(metadata) if self._rename_variable is not None: for k, v in self.rename_variable_map.items(): self._metadata['variables'][k]['name'] = v return self._metadata @property def opened(self): return self._opened @property def time_range(self): return self._time_range.value @property def rename_variable(self): if self._rename_variable is None: ret = self.variable else: ret = get_first_or_tuple(list(get_iter(self._rename_variable))) return ret @property def rename_variable_map(self): ret = {} for name, rename in zip(get_iter(self.variable), get_iter(self.rename_variable)): ret[name] = rename return ret @time_range.setter def time_range(self, value): from ocgis.ops.parms.definition import TimeRange self._time_range = TimeRange(value) # ensure the time range and region overlaps if not self._is_init: self._validate_time_subset_() @property def time_region(self): return self._time_region.value @time_region.setter def time_region(self, value): from ocgis.ops.parms.definition import TimeRegion self._time_region = TimeRegion(value) # ensure the time range and region overlaps if not self._is_init: self._validate_time_subset_() @property def time_subset_func(self): return self._time_subset_func.value @time_subset_func.setter def time_subset_func(self, value): from ocgis.ops.parms.definition import TimeSubsetFunction self._time_subset_func = TimeSubsetFunction(value) @property def units(self): ret = [] for v in get_iter(self.variable): ret.append(self.metadata['variables'][v]['attrs'].get('units')) ret = get_first_or_tuple(ret) return ret @units.setter def units(self, value): if value is not None: value = get_tuple(value) if len(value) != len(get_tuple(self.variable)): msg = 'Must match "variable" element-wise. The sequence lengths differ.' raise RequestValidationError('units', msg) if env.USE_CFUNITS: validate_units('units', value) m = self.metadata['variables'] for v, u in zip(get_tuple(self.variable), value): m[v]['attrs']['units'] = u @property def uri(self): if self._uri is None: ret = self._uri else: ret = get_first_or_tuple(self._uri) return ret @property def variable(self): if self._variable is None: ret = self.driver.get_data_variable_names(self.metadata, self.dimension_map) else: for vname in self._variable: if vname not in list(self.metadata['variables'].keys()): raise VariableNotFoundError(self.uri, vname) ret = self._variable try: ret = get_first_or_tuple(ret) except IndexError: raise NoDataVariablesFound return ret def create_field(self, *args, **kwargs): """ :rtype: :class:`~ocgis.interface.base.Field` """ # Allow for get overloads in the method call. overloads = ['format_time', 'grid_abstraction', 'uid'] for overload in overloads: if overload not in kwargs: kwargs[overload] = getattr(self, overload) if 'name' not in kwargs: try: name = self.field_name except NoDataVariablesFound: name = constants.MiscName.DEFAULT_FIELD_NAME kwargs['name'] = name return self.driver.create_field(*args, **kwargs) def create_raw_field(self, **kwargs): """ Return a raw field with no metadata interpretation. :rtype: :class:`~ocgis.Field` """ return self.driver.create_raw_field(**kwargs) def get(self, *args, **kwargs): return self.get_field(*args, **kwargs) def get_field(self, *args, **kwargs): """Here for backwards compatibility.""" return self.create_field(*args, **kwargs)
def inspect(self):
        """
        Print a string containing important information about the source driver.
        """
        return self.driver.inspect()
def _get_meta_rows_(self): if self.time_range is None: tr = None else: tr = '{0} to {1} (inclusive)'.format(self.time_range[0], self.time_range[1]) if self.level_range is None: lr = None else: lr = '{0} to {1} (inclusive)'.format(self.level_range[0], self.level_range[1]) try: data_variable = self.variable except NoDataVariablesFound: data_variable = None rows = [' URI: {0}'.format(self.uri), ' Data Variable(s): {0}'.format(data_variable), ' Time Range: {0}'.format(tr), ' Time Region/Selection: {0}'.format(self.time_region), ' Level Range: {0}'.format(lr)] return rows def _validate_time_subset_(self): if not validate_time_subset(self.time_range, self.time_region): raise RequestValidationError("time_range/time_region", '"time_range" and "time_region" must overlap.')
def get_first_or_tuple(value): if len(value) > 1: ret = tuple(value) else: ret = value[0] return ret def get_is_none(value): return all([v is None for v in get_iter(value)]) def validate_units(keyword, sequence): # Check all units are convertible into the appropriate backend. try: list(map(get_units_object, sequence)) except ValueError as e: raise RequestValidationError(keyword, e.message) def validate_unit_equivalence(src_units, dst_units): from ocgis.ops.parms.definition import ConformUnitsTo for s, d in zip(src_units, dst_units): s, d = list(map(get_units_object, (s, d))) if not get_are_units_equivalent((s, d)): msg = 'The units specified in "{2}" ("{0}") are not equivalent to the source units "{1}".' raise RequestValidationError(, msg.format(s, d, def get_autodiscovered_driver(uri): """ :param str uri: The target URI containing data for which to choose a driver. :returns: The correct driver for opening the ``uri``. :rtype: :class:`ocgis.api.request.driver.base.AbstractDriver` :raises: RequestValidationError """ possible = [] for element in get_iter(uri): for driver in driver_registry.drivers: for pattern in driver.extensions: if re.match(pattern, element) is not None: possible.append(driver) exc_msg = None ret = None if len(possible) == 0: exc_msg = 'Driver not found for URI: {0}'.format(uri) elif len(possible) == 1: ret = possible[0] else: sub_possible = [] for p in possible: if p._priority is True: sub_possible.append(p) sub_possible_keys = [sp.key for sp in sub_possible] if len(set(sub_possible_keys)) == 1: ret = sub_possible[0] else: exc_msg = 'More than one possible driver matched URI: {}'.format(uri) if exc_msg is None: return ret else: ocgis_lh(logger='request', exc=RequestValidationError('driver/uri', exc_msg)) def get_driver(driver): return get_driver_class(key_class_or_instance=driver, default=constants.DEFAULT_DRIVER) def get_uri(uri, ignore_errors=False, followlinks=True): out_uris = [] if isinstance(uri, six.string_types): uris = [uri] else: uris = uri assert (len(uri) >= 1) for uri in uris: ret = None # check if the path exists locally if os.path.exists(uri) or '://' in uri: ret = uri # if it does not exist, check the directory locations else: if env.DIR_DATA is not None: if isinstance(env.DIR_DATA, six.string_types): dirs = [env.DIR_DATA] else: dirs = env.DIR_DATA for directory in dirs: for filepath in locate(uri, directory, followlinks=followlinks): ret = filepath break if ret is None: if not ignore_errors: msg = 'File not found: "{0}". Check env.DIR_DATA or ensure a fully qualified URI is used.'.format( uri) ocgis_lh(logger='request', exc=ValueError(msg)) else: if not os.path.exists(ret) and not ignore_errors: msg = 'Path does not exist and is likely not a remote URI: "{0}". Set "ignore_errors" to True if ' \ 'this is not the case.' msg = msg.format(ret) ocgis_lh(msg, exc=ValueError(msg)) out_uris.append(ret) return out_uris