Source code for ocgis.base

import abc
import itertools
from contextlib import contextmanager
from copy import copy, deepcopy

import six
from ocgis import constants
from ocgis.exc import EmptyObjectError
from ocgis.util.helpers import get_iter


[docs]@six.add_metaclass(abc.ABCMeta) class AbstractOcgisObject(object): """Base class for all ``ocgis`` objects.""" pass
[docs]@six.add_metaclass(abc.ABCMeta) class AbstractInterfaceObject(AbstractOcgisObject): """Base class for interface objects."""
[docs] def copy(self): """Return a shallow copy of self.""" return copy(self)
[docs] def deepcopy(self): """Return a deep copy of self.""" return deepcopy(self)
[docs] def to_xarray(self, **kwargs): """ Convert this object to a type understood by ``xarray``. This should be overloaded by subclasses. """ raise NotImplementedError
[docs]@six.add_metaclass(abc.ABCMeta) class AbstractNamedObject(AbstractInterfaceObject): """ Base class for objects with a name. :param str name: The object's name. :param aliases: Alternative names for the object. :type aliases: ``sequence(str, ...)`` :param str source_name: Name of the object in its source data. Allows the object name and its source name to differ. :param int uid: A unique integer identifier for the object. """ def __init__(self, name, aliases=None, source_name=constants.UNINITIALIZED, uid=None): self._aliases = None self._name = None self._source_name = source_name self.uid = uid self.set_name(name, aliases=aliases) @property def name(self): """ The objects's name. :rtype: str """ return self._name @property def source_name(self): """ The object's source name. :rtype: str """ if self._source_name == constants.UNINITIALIZED: ret = self._name else: ret = self._source_name return ret
[docs] def append_alias(self, alias): """ Append a name alias to the list of the object's aliases. :param str alias: The alias to append. """ self._aliases.append(alias)
[docs] def is_matched_by_alias(self, alias): """ Return ``True`` if the provided alias matches any of the object's aliases. :param str alias: The alias to check. :rtype: bool """ return alias in self._aliases
[docs] def set_name(self, name, aliases=None): """ Set the object's name. :param str name: The new name. :param aliases: New aliases for the object. :type aliases: ``sequence(str, ...)`` """ if aliases is None: aliases = [] if self._source_name == constants.UNINITIALIZED: if self._name is None: self._source_name = name else: self._source_name = self._name self._name = name aliases.append(self._name) self._aliases = aliases
def atleast_ncver(ver): from pkg_resources import parse_version import netCDF4 return parse_version(netCDF4.__version__) >= parse_version(ver) def get_dimension_names(target): from ocgis.variable.dimension import Dimension itr = get_iter(target, dtype=(str, Dimension)) ret_names = [] for element in itr: try: to_append = element.name except AttributeError: to_append = element ret_names.append(to_append) return tuple(ret_names) def get_keyword_arguments_from_template_keys(kwargs, keys, ignore_self=True, pop=False): ret = {} for key in keys: if ignore_self and key == 'self': continue try: if pop: ret[key] = kwargs.pop(key) else: ret[key] = kwargs[key] except KeyError: # Pass on key errors to allow classes to overload default keyword argument values. pass return ret def get_dimension_index(member, container): member = get_dimension_names(member)[0] container = get_dimension_names(container) return container.index(member) def get_variable_names(target): from ocgis.variable.base import Variable itr = get_iter(target, dtype=(str, Variable)) ret_names = [] for element in itr: try: to_append = element.name except AttributeError: to_append = element ret_names.append(to_append) return tuple(ret_names) def get_variables(target, parent): names = get_variable_names(target) ret = [None] * len(names) for idx, n in enumerate(names): ret[idx] = parent[n] return tuple(ret) @contextmanager def grid_abstraction_scope(grid, abstraction, strict=False): orig_abstraction = grid.abstraction if abstraction in grid.abstractions_available: grid.abstraction = abstraction else: if strict: raise ValueError('abstraction "{}" not available on the grid'.format(abstraction)) try: yield None finally: grid.abstraction = orig_abstraction def is_field(target): from ocgis import Field return isinstance(target, Field) def is_unstructured_driver(klass): return klass._is_unstructured def iter_dict_slices(names, sizes, extra=None): extra = extra or [] yld_extra = {e: slice(None) for e in extra} for indices in itertools.product(*[list(range(s)) for s in sizes]): yld = {n: i for n, i in zip(names, indices)} if extra is not None: yld.update(yld_extra) yield yld def iter_variables(target, container): names = get_variable_names(target) for name in names: yield container[name] @contextmanager def orphaned(target, keep_dimensions=False): if keep_dimensions: target._dimensions_cache = target.dimensions has_initialized_parent = target.has_initialized_parent if has_initialized_parent: original_parent = target.parent target.parent = None try: yield target finally: if has_initialized_parent: target.parent = original_parent if keep_dimensions: target._dimensions_cache = constants.UNINITIALIZED def raise_if_empty(target): if target.is_empty: msg = 'No empty {} objects allowed.'.format(target.__class__) exc = EmptyObjectError(msg) try: raise exc finally: from ocgis import vm vm.abort(exc=exc) @contextmanager def renamed_dimensions(dimensions, name_mapping): original_names = [d.name for d in dimensions] try: items = list(name_mapping.items()) for d in dimensions: for k, v in items: if d.name in v: d._name = k break yield dimensions finally: for d, o in zip(dimensions, original_names): d._name = o @contextmanager def renamed_dimensions_on_variables(vc, name_mapping): original_vc_dimensions = vc._dimensions original_names = {} new_vc_dimensions = copy(vc._dimensions) original_variable_dimension_names = {v.name: v.dimension_names for v in list(vc.values())} new_variable_dimension_names = copy(original_variable_dimension_names) mapping_meta = {} try: for k, v in list(name_mapping.items()): for ki, vi in list(original_vc_dimensions.items()): if ki in v: new_vc_dimensions[k] = vi original_name = new_vc_dimensions[k]._name original_names[original_name] = k new_vc_dimensions[k]._name = k if original_name != k: new_vc_dimensions.pop(original_name) for vname, vdimension_names in list(new_variable_dimension_names.items()): to_set_dimensions = list(new_variable_dimension_names[vname]) for vdn in vdimension_names: for ki, vi in list(name_mapping.items()): if vdn in vi: to_set_dimensions[to_set_dimensions.index(vdn)] = ki new_variable_dimension_names[vname] = tuple(to_set_dimensions) for var in list(vc.values()): var._dimensions = new_variable_dimension_names[var.name] vc._dimensions = new_vc_dimensions mapping_meta['variable_dimensions'] = original_variable_dimension_names mapping_meta['dimension_name_backref'] = original_names yield mapping_meta finally: for k, v in list(original_vc_dimensions.items()): v._name = k vc._dimensions = original_vc_dimensions for var in list(vc.values()): var._dimensions = original_variable_dimension_names[var.name] def revert_renamed_dimensions_on_variables(mapping_meta, vc): for v in list(vc.values()): v._dimensions = mapping_meta['variable_dimensions'][v.name] to_swap = [] for k, v in list(vc.dimensions.items()): for ki, vi in list(mapping_meta['dimension_name_backref'].items()): if k == vi and ki != vi: vc.dimensions[k]._name = ki vc.dimensions[ki] = vc.dimensions[k] if k not in to_swap: to_swap.append(k) for ts in to_swap: vc.dimensions.pop(ts)