Source code for ocgis.driver.request.core

import logging
import os
import re
from copy import deepcopy

import six
from ocgis import constants
from ocgis import env
from ocgis.constants import DMK, DriverKey, DecompositionType
from ocgis.driver.dimension_map import DimensionMap
from ocgis.driver.registry import get_driver_class, driver_registry
from ocgis.driver.request.base import AbstractRequestObject
from ocgis.exc import RequestValidationError, NoDataVariablesFound, VariableNotFoundError
from ocgis.util.helpers import get_iter, locate, validate_time_subset, get_tuple
from ocgis.util.logging_ocgis import ocgis_lh
from ocgis.util.units import get_units_object, get_are_units_equivalent


[docs]class RequestDataset(AbstractRequestObject): """ Contains all the information necessary to create an OCGIS field via an OCGIS driver. >>> from ocgis import RequestDataset >>> uri = 'http://some.opendap.dataset' >>> # It is also okay to enter the path to a local file. >>> uri = '/path/to/local/file.nc' >>> variable = 'tasmax' >>> rd = RequestDataset(uri, variable) :param uri: The absolute path (URLs included) to the data's location. If ``None``, either ``opened`` or ``metadata`` must be provided. :type uri: :class:`str` | `sequence` of :class:`str` | ``None`` >>> uri = 'http://some.opendap.dataset' >>> uri = '/path/to/local/file.nc' >>> # Multifile datasets are supported for local and remote targets. >>> uri = ['/path/to/local/file1.nc', '/path/to/local/file2.nc'] .. warning:: There is no internal checking on the ordering of the files. If the datasets should be concatenated along the time dimension, it may be a good idea to run the sequence of URIs through a time sorting function :func:`~ocgis.util.helpers.get_sorted_uris_by_time_dimension`. :param variable: The target variable. If the argument value is ``None``, then a search on the target data object will be performed to find variables having a minimum set of dimensions (i.e. time and space). The value of this property will then be updated. :type variable: :class:`str` | `sequence` of :class:`str` | ``None`` >>> variable = 'tas' >>> variable = ['tas', 'tasmax'] :param time_range: Lower and upper bounds for time dimension subsetting. If ``None``, return all time points. :type time_range: two-element `sequence` of ``datetime``-like objects :param dict time_region: A dictionary with keys of ``'month'`` and/or ``'year'`` and values as sequences corresponding to target month and/or year values. Empty region selection for a key may be set to ``None``. >>> time_region = {'month':[6,7],'year':[2010,2011]} >>> time_region = {'year':[2010]} :param time_subset_func: See :meth:`~ocgis.TemporalVariable.get_subset_by_function`. :param level_range: Lower and upper bounds for level dimension subsetting. If ``None``, return all levels. :type level_range: two-element `sequence` of :class:`int` or :class:`float` :param crs: Overload the autodiscovered coordinate system. :type crs: :class:`~ocgis.variable.crs.AbstractCoordinateReferenceSystem` >>> from ocgis.variable.crs import WGS84 >>> crs = WGS84() :param str t_units: Overload the `time units`_. :param str t_calendar: Overload the `time calendar`_. :param str t_conform_units_to: Conform the time dimension to the provided units. The calendar may not be changed. The optional dependency ``cf_units`` is required. >>> t_conform_units_to = 'days since 1949-1-1' :param str grid_abstraction: Abstract the geometry data to either ``'point'`` or ``'polygon'``. If ``'polygon'`` is not possible due to missing bounds, ``'point'`` will be used instead. If ``'auto'`` (the default), identify the grid abstraction automatically. Unstructured data formats also allow for ``'line'``. .. note:: The ``abstraction`` argument in :class:`~ocgis.OcgOperations` will overload this. :param dimension_map: Maps dimensions to axes in the case of a projection/realization axis or an uncommon axis ordering. All axes must be in the dictionary. A fully-specified dimension map for a CF grid file containing time, x, and y axes is below. The file also contains a scalar level axis. At minimum, a ``'variable'`` must be provided for each axis. See :ref:`configuring-a-dimension-map` for a usage example. :type dimension_map: :class:`~ocgis.DimensionMap` | dict :param units: The units of the source variable. This will be read from metadata if this value is ``None``. :type units: str | :class:`cf_units.Unit` | `sequence` of possible types :param conform_units_to: Destination units for conversion. If this parameter is set, then the :mod:`cf_units` module must be installed. :type conform_units_to: :class:`str` | :class:`cfunits.Units` | `sequence` of possible types :param driver: If ``None``, autodiscover the appropriate driver. Acceptable values are listed below. Class objects for the associated driver key are also accepted. :type driver: str | :class:`~ocgis.driver.base.AbstractDriver` ================== ================= ====================================================================== Value File Extension(s) Description ================== ================= ====================================================================== ``'netcdf-cf'`` ``'nc'`` A netCDF file using a CF-Grid metadata convention. ``'netcdf-ugrid'`` ``'nc'`` A netCDF file using the UGRID (Unstructured Grid) metadata convention. ``'netcdf-scrip'`` ``'nc'`` A netCDF file using the SCRIP metadata convention. ``'netcdf'`` ``'nc'`` A netCDF file with no metadata convention. ``'vector'`` ``'shp'`` An ESRI Shapefile or other vector source. ``'csv'`` ``'csv'`` A CSV file. ================== ================= ====================================================================== :param str field_name: Name of the requested field in the output collection. If ``None``, defaults to the variable defaults to the data variable name. If there are multiple data variables, the default name is ``'ocgis_field'``. :param bool regrid_source: If ``False``, do not regrid this dataset. This is relevant only if a ``regrid_destination`` dataset is present. Please see :ref:`esmpy-regridding` for an overview. :param bool regrid_destination: If ``True``, use this dataset as the destination grid for a regridding operation. Only one :class:`~ocgis.RequestDataset` may be set as the destination grid. Please see :ref:`esmpy-regridding` for an overview. :param rename_variable: A sequence with the same length as ``variable``. Provides new names for the variables. :type rename_variable: `sequence` of :class:`str` :param dict metadata: Overload the metadata that would normally be loaded by the driver. If metadata is provided and ``uri`` is ``None``, a field will be created by interpreting the provided metadata. :param opened: An open file used as a write target for the driver. :type opened: varies by ``driver`` class :param int uid: A unique identifier for the request dataset. :param predicate: A filter function returning ``True`` if a variable should be included in the output field. The function should take a single argument which is a sequence of string variable names. This function is applied directly to the metadata before other functions (i.e. identifying data variables). :type predicate: `function` :param bool rotated_pole_priority: If ``False``, attempt to use representative spherical coordinates if available in a dataset having a rotated pole coordinate system. If ``True``, use the rotated coordinate even if representative coordinates are available. >>> predicate = lambda x: x.startswith('w') .. _time units: http://netcdf4-python.googlecode.com/svn/trunk/docs/netCDF4-module.html#num2date .. _time calendar: http://netcdf4-python.googlecode.com/svn/trunk/docs/netCDF4-module.html#num2date :param dict driver_kwargs: Any keyword arguments to driver creation. See the driver documentation for a description of accepted parameters. These are often format-specific and not easily generalized. :param grid_is_isomorphic: See documentation for :class:`Field` :param decomp_type: The parallel decomposition type to use. This may be left alone in many cases unless a specific parallel use case such as interfacing with ``ESMF`` is required. :type decomp_type: :class:`ocgis.constants.DecompositionType` """ def __init__(self, uri=None, variable=None, units=None, time_range=None, time_region=None, time_subset_func=None, level_range=None, conform_units_to=None, crs='auto', t_units=None, t_calendar=None, t_conform_units_to=None, grid_abstraction='auto', grid_is_isomorphic='auto', dimension_map=None, field_name=None, driver=None, regrid_source=True, regrid_destination=False, metadata=None, format_time=True, opened=None, uid=None, rename_variable=None, predicate=None, rotated_pole_priority=False, driver_kwargs=None, decomp_type=DecompositionType.OCGIS): self._is_init = True self._field_name = field_name self._level_range = None self._time_range = None self._time_region = None self._time_subset_func = None self._driver_kwargs = driver_kwargs self._decomp_type = decomp_type if rename_variable is not None: rename_variable = get_tuple(rename_variable) self._rename_variable = rename_variable self.rotated_pole_priority = rotated_pole_priority self.predicate = predicate if dimension_map is not None and isinstance(dimension_map, dict): dimension_map = DimensionMap.from_dict(dimension_map) self._dimension_map = dimension_map self._metadata = deepcopy(metadata) self._uri = None self.uid = uid # This is an "open" file-like object that may be passed in-place of file location parameters. self._opened = opened if opened is not None and driver is None: msg = 'If "opened" is not None, then a "driver" must be provided.' ocgis_lh(logger='request', exc=RequestValidationError('driver', msg)) # Field creation options. self.format_time = format_time self.grid_abstraction = grid_abstraction self.grid_is_isomorphic = grid_is_isomorphic # Flag used for regridding to determine if the coordinate system was assigned during initialization. self._has_assigned_coordinate_system = False if crs == 'auto' else True if uri is None: # Fields may be created from pure metadata. if metadata is not None: # The default OCGIS driver is NetCDF. if driver is None: driver = DriverKey.NETCDF_CF elif opened is None: ocgis_lh(logger='request', exc=RequestValidationError('uri', 'Cannot be None')) else: self._uri = get_uri(uri) if driver is None: klass = get_autodiscovered_driver(uri) else: klass = get_driver(driver) self._driver = klass(self) if variable is not None: variable = get_tuple(variable) self._variable = variable self.time_range = time_range self.time_region = time_region self.time_subset_func = time_subset_func self.level_range = level_range self._crs = deepcopy(crs) self.regrid_source = regrid_source self.regrid_destination = regrid_destination self.units = units self.conform_units_to = conform_units_to self._is_init = False self._validate_time_subset_() # Update metadata for time variable. tvar = self.dimension_map.get_variable(DMK.TIME) if tvar is not None: m = self.metadata['variables'][tvar] if t_units is not None: m['attrs']['units'] = t_units if t_calendar is not None: m['attrs']['calendar'] = t_calendar if t_conform_units_to is not None: from ocgis.util.units import get_units_object t_calendar = m['attrs'].get('calendar', constants.DEFAULT_TEMPORAL_CALENDAR) t_conform_units_to = get_units_object(t_conform_units_to, calendar=t_calendar) m['conform_units_to'] = t_conform_units_to def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __iter__(self): attrs = ['variable', 'units', 'conform_units_to'] for ii in range(len(self)): yield {a: get_tuple(getattr(self, a))[ii] for a in attrs} def __len__(self): try: ret = len(get_tuple(self.variable)) except NoDataVariablesFound: ret = 0 return ret @property def conform_units_to(self): ret = [] m = self.metadata['variables'] for v in get_iter(self.variable): ret.append(m[v].get('conform_units_to')) ret = get_first_or_tuple(ret) return ret @conform_units_to.setter def conform_units_to(self, value): if value is not None: value = get_tuple(value) if len(value) != len(get_tuple(self.variable)): msg = 'Must match "variable" element-wise. The sequence lengths differ.' raise RequestValidationError('units', msg) if env.USE_CFUNITS: validate_units('conform_units_to', value) # If we are conforming units, assert that units are equivalent. validate_unit_equivalence(get_tuple(self.units), value) m = self.metadata['variables'] for v, u in zip(get_tuple(self.variable), value): m[v]['conform_units_to'] = u @property def _conform_units_to(self): raise NotImplementedError @property def crs(self): if self._crs == 'auto': ret = self.driver.get_crs(self.metadata) else: ret = self._crs return ret @property def decomp_type(self): """ Get the parallel decomposition type used by the request dataset. :rtype: :class:`ocgis.constants.DecompositionType` """ return self._decomp_type @property def driver(self): """ Get the driver to use for field creation. :return: :class:`~ocgis.Field` """ return self._driver @property def level_range(self): return self._level_range.value @level_range.setter def level_range(self, value): from ocgis.ops.parms.definition import LevelRange self._level_range = LevelRange(value) @property def dimension_map(self): if self._dimension_map is None: dimension_map_raw = self.driver.dimension_map_raw self._dimension_map = deepcopy(dimension_map_raw) self._dimension_map.update_dimensions_from_metadata(self.metadata) return self._dimension_map @property def driver_kwargs(self): return self._driver_kwargs @property def field_name(self): if self._field_name is None: # Use renamed variables for field names. Often there is a single variable in the request. This ensures # unique field names if renamed variables are unique. ret = list(get_iter(self.rename_variable)) if len(ret) > 1: msg = 'No default "field_name" based on variables name possible with multiple data variables: {}. ' \ 'Using default field name: {}.'.format(self.variable, constants.MiscName.DEFAULT_FIELD_NAME) ocgis_lh(msg=msg, level=logging.WARN) ret = constants.MiscName.DEFAULT_FIELD_NAME else: ret = ret[0] else: ret = self._field_name return ret @field_name.setter def field_name(self, value): self._field_name = value @property def has_data_variables(self): """Return ``True`` if data variables are found in the target dataset.""" try: assert self.variable ret = True except NoDataVariablesFound: ret = False return ret @property def metadata(self): if self._metadata is None: metadata = self.driver.metadata_raw self._metadata = deepcopy(metadata) if self._rename_variable is not None: for k, v in self.rename_variable_map.items(): self._metadata['variables'][k]['name'] = v return self._metadata @property def opened(self): return self._opened @property def time_range(self): return self._time_range.value @property def rename_variable(self): if self._rename_variable is None: ret = self.variable else: ret = get_first_or_tuple(list(get_iter(self._rename_variable))) return ret @property def rename_variable_map(self): ret = {} for name, rename in zip(get_iter(self.variable), get_iter(self.rename_variable)): ret[name] = rename return ret @time_range.setter def time_range(self, value): from ocgis.ops.parms.definition import TimeRange self._time_range = TimeRange(value) # ensure the time range and region overlaps if not self._is_init: self._validate_time_subset_() @property def time_region(self): return self._time_region.value @time_region.setter def time_region(self, value): from ocgis.ops.parms.definition import TimeRegion self._time_region = TimeRegion(value) # ensure the time range and region overlaps if not self._is_init: self._validate_time_subset_() @property def time_subset_func(self): return self._time_subset_func.value @time_subset_func.setter def time_subset_func(self, value): from ocgis.ops.parms.definition import TimeSubsetFunction self._time_subset_func = TimeSubsetFunction(value) @property def units(self): ret = [] for v in get_iter(self.variable): ret.append(self.metadata['variables'][v]['attrs'].get('units')) ret = get_first_or_tuple(ret) return ret @units.setter def units(self, value): if value is not None: value = get_tuple(value) if len(value) != len(get_tuple(self.variable)): msg = 'Must match "variable" element-wise. The sequence lengths differ.' raise RequestValidationError('units', msg) if env.USE_CFUNITS: validate_units('units', value) m = self.metadata['variables'] for v, u in zip(get_tuple(self.variable), value): m[v]['attrs']['units'] = u @property def uri(self): if self._uri is None: ret = self._uri else: ret = get_first_or_tuple(self._uri) return ret @property def variable(self): if self._variable is None: ret = self.driver.get_data_variable_names(self.metadata, self.dimension_map) else: for vname in self._variable: if vname not in list(self.metadata['variables'].keys()): raise VariableNotFoundError(self.uri, vname) ret = self._variable try: ret = get_first_or_tuple(ret) except IndexError: raise NoDataVariablesFound return ret def create_field(self, *args, **kwargs): """ :rtype: :class:`~ocgis.interface.base.Field` """ # Allow for get overloads in the method call. overloads = ['format_time', 'grid_abstraction', 'uid'] for overload in overloads: if overload not in kwargs: kwargs[overload] = getattr(self, overload) if 'name' not in kwargs: try: name = self.field_name except NoDataVariablesFound: name = constants.MiscName.DEFAULT_FIELD_NAME kwargs['name'] = name return self.driver.create_field(*args, **kwargs) def create_raw_field(self, **kwargs): """ Return a raw field with no metadata interpretation. :rtype: :class:`~ocgis.Field` """ return self.driver.create_raw_field(**kwargs) def get(self, *args, **kwargs): return self.get_field(*args, **kwargs) def get_field(self, *args, **kwargs): """Here for backwards compatibility.""" return self.create_field(*args, **kwargs)
[docs] def inspect(self): """ Print a string containing important information about the source driver. """ return self.driver.inspect()
def _get_meta_rows_(self): if self.time_range is None: tr = None else: tr = '{0} to {1} (inclusive)'.format(self.time_range[0], self.time_range[1]) if self.level_range is None: lr = None else: lr = '{0} to {1} (inclusive)'.format(self.level_range[0], self.level_range[1]) try: data_variable = self.variable except NoDataVariablesFound: data_variable = None rows = [' URI: {0}'.format(self.uri), ' Data Variable(s): {0}'.format(data_variable), ' Time Range: {0}'.format(tr), ' Time Region/Selection: {0}'.format(self.time_region), ' Level Range: {0}'.format(lr)] return rows def _validate_time_subset_(self): if not validate_time_subset(self.time_range, self.time_region): raise RequestValidationError("time_range/time_region", '"time_range" and "time_region" must overlap.')
def get_first_or_tuple(value): if len(value) > 1: ret = tuple(value) else: ret = value[0] return ret def get_is_none(value): return all([v is None for v in get_iter(value)]) def validate_units(keyword, sequence): # Check all units are convertible into the appropriate backend. try: list(map(get_units_object, sequence)) except ValueError as e: raise RequestValidationError(keyword, e.message) def validate_unit_equivalence(src_units, dst_units): from ocgis.ops.parms.definition import ConformUnitsTo for s, d in zip(src_units, dst_units): s, d = list(map(get_units_object, (s, d))) if not get_are_units_equivalent((s, d)): msg = 'The units specified in "{2}" ("{0}") are not equivalent to the source units "{1}".' raise RequestValidationError(ConformUnitsTo.name, msg.format(s, d, ConformUnitsTo.name)) def get_autodiscovered_driver(uri): """ :param str uri: The target URI containing data for which to choose a driver. :returns: The correct driver for opening the ``uri``. :rtype: :class:`ocgis.api.request.driver.base.AbstractDriver` :raises: RequestValidationError """ possible = [] for element in get_iter(uri): for driver in driver_registry.drivers: for pattern in driver.extensions: if re.match(pattern, element) is not None: possible.append(driver) exc_msg = None ret = None if len(possible) == 0: exc_msg = 'Driver not found for URI: {0}'.format(uri) elif len(possible) == 1: ret = possible[0] else: sub_possible = [] for p in possible: if p._priority is True: sub_possible.append(p) sub_possible_keys = [sp.key for sp in sub_possible] if len(set(sub_possible_keys)) == 1: ret = sub_possible[0] else: exc_msg = 'More than one possible driver matched URI: {}'.format(uri) if exc_msg is None: return ret else: ocgis_lh(logger='request', exc=RequestValidationError('driver/uri', exc_msg)) def get_driver(driver): return get_driver_class(key_class_or_instance=driver, default=constants.DEFAULT_DRIVER) def get_uri(uri, ignore_errors=False, followlinks=True): out_uris = [] if isinstance(uri, six.string_types): uris = [uri] else: uris = uri assert (len(uri) >= 1) for uri in uris: ret = None # check if the path exists locally if os.path.exists(uri) or '://' in uri: ret = uri # if it does not exist, check the directory locations else: if env.DIR_DATA is not None: if isinstance(env.DIR_DATA, six.string_types): dirs = [env.DIR_DATA] else: dirs = env.DIR_DATA for directory in dirs: for filepath in locate(uri, directory, followlinks=followlinks): ret = filepath break if ret is None: if not ignore_errors: msg = 'File not found: "{0}". Check env.DIR_DATA or ensure a fully qualified URI is used.'.format( uri) ocgis_lh(logger='request', exc=ValueError(msg)) else: if not os.path.exists(ret) and not ignore_errors: msg = 'Path does not exist and is likely not a remote URI: "{0}". Set "ignore_errors" to True if ' \ 'this is not the case.' msg = msg.format(ret) ocgis_lh(msg, exc=ValueError(msg)) out_uris.append(ret) return out_uris