import abc
import json
from abc import ABCMeta
from contextlib import contextmanager
from copy import deepcopy
from warnings import warn
import numpy as np
import six
from ocgis import constants, GridUnstruct
from ocgis import vm
from ocgis.base import AbstractOcgisObject, raise_if_empty
from ocgis.base import get_variable_names
from ocgis.collection.field import Field
from ocgis.constants import MPIWriteMode, TagName, KeywordArgument, OcgisConvention, VariableName, DecompositionType
from ocgis.driver.dimension_map import DimensionMap
from ocgis.exc import DefinitionValidationError, NoDataVariablesFound, DimensionMapError, VariableMissingMetadataError, \
GridDeficientError
from ocgis.util.helpers import get_group
from ocgis.util.logging_ocgis import ocgis_lh
from ocgis.variable.base import SourcedVariable
from ocgis.variable.dimension import Dimension
from ocgis.vmachine.mpi import OcgDist
# TODO: Make the driver accept no arguments at initialization. The request dataset should be passed around as a parameter.
[docs]@six.add_metaclass(abc.ABCMeta)
class AbstractDriver(AbstractOcgisObject):
"""
Base class for all drivers.
:param rd: The input request dataset object.
:type rd: :class:`~ocgis.RequestDataset`
"""
common_extension = None # The standard file extension commonly associated with the canonical file format.
default_axes_positions = (0, 1) # Standard axes index for Y and X respectively.
_default_crs = None
_esmf_fileformat = None # The associated ESMF file type. This may be None.
_esmf_grid_class = constants.ESMFGridClass.GRID # The ESMF grid class type.
_is_unstructured = False # Flag to indicate if the driver is for unstructured grids
_priority = False
def __init__(self, rd):
self.rd = rd
self._metadata_raw = None
self._dimension_map_raw = None
self._dist = None
def __eq__(self, other):
return self.key == other.key
def __str__(self):
return '"{0}"'.format(self.key)
@property
def crs(self):
"""Blocks setting the CRS attribute on a driver."""
raise NotImplementedError
@property
def data_model(self):
"""
Return the data model definition for the driver if it has one. NetCDF has NETCDF3_CLASSIC, etc.
:rtype: str | ``None``
"""
return None
@property
def dimension_map_raw(self):
if self._dimension_map_raw is None:
self._dimension_map_raw = create_dimension_map_raw(self, self.metadata_raw)
return self._dimension_map_raw
@property
def dist(self):
if self._dist is None:
self._dist = self.create_dist()
return self._dist
@abc.abstractproperty
def extensions(self):
"""
:returns: A sequence of regular expressions used to match appropriate URIs.
:rtype: (str,)
>>> ('.*\.shp',)
"""
@abc.abstractproperty
def key(self):
""":rtype: str"""
@property
def metadata_raw(self):
if self._metadata_raw is None:
# If there is no URI, we must be working with metadata passed to the request dataset.
if self.rd._uri is None and self.rd.opened is None:
res = self.rd.metadata
else:
res = self.get_metadata()
self._metadata_raw = res
return self._metadata_raw
@property
def metadata_source(self):
return self.rd.metadata
@abc.abstractproperty
def output_formats(self):
"""
:returns: A list of acceptable output formats for the driver. If this is `'all'`, then the driver's data may be
converted to all output formats.
:rtype: list[str, ...]
"""
@staticmethod
def array_resolution(value, axis):
"""
Optionally overloaded by subclasses to calculate the "resolution" of an array. This makes the most sense in the
context of coordinate variables. This method should return a float resolution.
:param value: The value array target.
:type value: :class:`numpy.ndarray`
:param int axis: The target axis for the resolution calculation.
:rtype: float
"""
raise NotImplementedError
@classmethod
def close(cls, obj, rd=None):
# If the request dataset has an opened file object, do not close the file as we expect the client to handle
# closing/finalization options.
if rd is not None and rd.opened is not None:
pass
else:
cls._close_(obj)
def get_crs(self, group_metadata):
""":rtype: ~ocgis.interface.base.crs.CoordinateReferenceSystem"""
crs = self._get_crs_main_(group_metadata)
if crs is None:
ret = self._default_crs
else:
ret = crs
return ret
def create_dimension_map(self, group_metadata, **kwargs):
"""
Create a dimension map for a group from its metadata.
:param dict group_metadata: Group metadata to use when creating the dimension map.
:rtype: :class:`~ocgis.DimensionMap`
"""
ret = DimensionMap()
ret.set_driver(self.key)
return ret
@staticmethod
def create_dimensions(group_metadata):
"""
Create dimension objects. The key may differ from the dimension name. In which case, we can assume the dimension
is being renamed.
:param dict group_metadata: Metadata dictionary for the target group.
:rtype: list
"""
gmd = group_metadata.get('dimensions', {})
dims = {}
for k, v in gmd.items():
size = v['size']
if v.get('isunlimited', False):
size_current = size
size = None
else:
size_current = None
dims[k] = Dimension(v.get('name', k), size=size, size_current=size_current, src_idx='auto', source_name=k)
return dims
def create_dist(self, metadata=None):
"""
Create a distribution from global metadata. In general, this should not be overloaded by subclasses.
:param dict metadata: Global metadata to use for creating a distribution.
:rtype: :class:`ocgis.OcgDist`
"""
ompi = OcgDist(size=vm.size, ranks=vm.ranks)
# Convert metadata into a grouping consistent with the MPI dimensions.
if metadata is None:
metadata = self.metadata_source
metadata = {None: metadata}
for group_index in iter_all_group_keys(metadata):
group_meta = get_group(metadata, group_index)
# Add the dimensions to the distribution object.
dimensions = self.create_dimensions(group_meta)
# Only build a distribution if the group has more than one dimension.
if len(dimensions) == 0:
_ = ompi.get_group(group=group_index)
else:
for dimension_name, dimension_meta in list(group_meta['dimensions'].items()):
target_dimension = dimensions[dimension_name]
target_dimension.dist = group_meta['dimensions'][dimension_name].get('dist', False)
ompi.add_dimension(target_dimension, group=group_index)
try:
dimension_map = self.rd.dimension_map.get_group(group_index)
except DimensionMapError:
# Likely a user-provided dimension map.
continue
# dimension_map = get_group(self.rd.dimension_map, group_index, has_root=False)
distributed_dimension_name = self.get_distributed_dimension_name(dimension_map,
group_meta['dimensions'],
decomp_type=self.rd.decomp_type)
# Allow no distributed dimensions to be returned.
if distributed_dimension_name is not None:
for target_rank in range(ompi.size):
distributed_dimension = ompi.get_dimension(distributed_dimension_name, group=group_index,
rank=target_rank)
distributed_dimension.dist = True
ompi.update_dimension_bounds()
return ompi
def create_field(self, *args, **kwargs):
"""
Create a field object. In general, this should not be overloaded by subclasses.
:keyword bool format_time: ``(=True)`` If ``False``, do not convert numeric times to Python date objects.
:keyword str grid_abstraction: ``(='auto')`` If provided, use this grid abstraction.
:keyword raw_field: ``(=None)`` If provided, modify this field instead.
:type raw_field: None | :class:`~ocgis.Field`
:param kwargs: Additional keyword arguments to :meth:`~ocgis.driver.base.AbstractDriver.create_raw_field`.
:return: :class:`ocgis.Field`
"""
kwargs = kwargs.copy()
raw_field = kwargs.pop('raw_field', None)
format_time = kwargs.pop(KeywordArgument.FORMAT_TIME, True)
grid_abstraction = kwargs.pop(KeywordArgument.GRID_ABSTRACTION, self.rd.grid_abstraction)
grid_is_isomorphic = kwargs.pop('grid_is_isomorphic', self.rd.grid_is_isomorphic)
if raw_field is None:
# Get the raw variable collection from source.
new_kwargs = kwargs.copy()
new_kwargs['source_name'] = None
raw_field = self.create_raw_field(*args, **new_kwargs)
# Get the appropriate metadata for the collection.
group_metadata = self.get_group_metadata(raw_field.group, self.metadata_source)
# Always pull the dimension map from the request dataset. This allows it to be overloaded.
dimension_map = self.get_group_metadata(raw_field.group, self.rd.dimension_map)
# Modify the coordinate system variable. If it is overloaded on the request dataset, then the variable
# collection needs to be updated to hold the variable and any alternative coordinate systems needs to be
# removed.
to_remove = None
to_add = None
crs = self.get_crs(group_metadata)
if self.rd._has_assigned_coordinate_system:
to_add = self.rd._crs
if crs is not None:
to_remove = crs.name
else:
if self.rd._crs is not None and self.rd._crs != 'auto':
to_add = self.rd._crs
if crs is not None:
to_remove = crs.name
elif crs is not None:
to_add = crs
if to_remove is not None:
raw_field.pop(to_remove, None)
if to_add is not None:
raw_field.add_variable(to_add, force=True)
# Overload the dimension map with the CRS.
if to_add is not None:
# dimension_map[DimensionMapKey.CRS][DimensionMapKey.VARIABLE] = to_add.name
dimension_map.set_crs(to_add.name)
# Remove the mask variable if present in the raw dimension map and the source dimension map is set to None.
if self.rd.dimension_map.get_spatial_mask() is None and self.dimension_map_raw.get_spatial_mask() is not None:
raw_field.pop(self.dimension_map_raw.get_spatial_mask())
# Convert the raw variable collection to a field.
# TODO: Identify a way to remove this code block; field should be appropriately initialized; format_time and grid_abstraction are part of a dimension map.
kwargs[KeywordArgument.DIMENSION_MAP] = dimension_map
kwargs[KeywordArgument.FORMAT_TIME] = format_time
if grid_abstraction != 'auto':
kwargs[KeywordArgument.GRID_ABSTRACTION] = grid_abstraction
if grid_is_isomorphic != 'auto':
kwargs['grid_is_isomorphic'] = grid_is_isomorphic
field = Field.from_variable_collection(raw_field, *args, **kwargs)
# If this is a source grid for regridding, ensure the flag is updated.
field.regrid_source = self.rd.regrid_source
# Update the assigned coordinate system flag.
field._has_assigned_coordinate_system = self.rd._has_assigned_coordinate_system
# Apply any requested subsets.
if self.rd.time_range is not None:
field = field.time.get_between(*self.rd.time_range).parent
if self.rd.time_region is not None:
field = field.time.get_time_region(self.rd.time_region).parent
if self.rd.time_subset_func is not None:
field = field.time.get_subset_by_function(self.rd.time_subset_func).parent
if self.rd.level_range is not None:
field = field.level.get_between(*self.rd.level_range).parent
# These variables have all the dimensions needed for a data classification. Use overloaded values from the
# request dataset if they are provided.
try:
data_variable_names = list(get_variable_names(self.rd.rename_variable))
except NoDataVariablesFound:
# It is okay to have no data variables in a field.
data_variable_names = []
pass
for dvn in data_variable_names:
field.append_to_tags(TagName.DATA_VARIABLES, dvn, create=True)
# Load child fields.
for child in list(field.children.values()):
kwargs['raw_field'] = child
field.children[child.name] = self.create_field(*args, **kwargs)
return field
def create_raw_field(self, group_metadata=None, group_name=None, name=None, source_name=constants.UNINITIALIZED,
parent=None, uid=None):
"""
Create a raw field object. This field object should interpret metadata explicitly (i.e. no dimension map). In
general this method should not be overloaded by subclasses.
:param dict group_metadata: Metadata dictionary for the current group.
:param str group_name: The name of the current group being processed.
:param str name: See :class:`~ocgis.base.AbstractNamedObject`
:param str source_name: See :class:`~ocgis.base.AbstractNamedObject`
:param parent: :class:`~ocgis.variable.base.AbstractContainer`
:param int uid: See :class:`~ocgis.variable.base.AbstractContainer`
:return: :class:`~ocgis.Field`
"""
if group_metadata is None:
group_metadata = self.rd.metadata
field = Field(name=name, source_name=source_name, uid=uid, attrs=group_metadata.get('global_attributes'))
for var in self.create_variables(group_metadata, parent=field).values():
field.add_variable(var, force=True)
if parent is not None:
parent.add_child(field)
for k, v in group_metadata.get('groups', {}).items():
_ = self.create_raw_field(v, name=k, parent=field, group_name=k)
return field
def create_variables(self, group_metadata, parent=None):
"""
Create a dictionary of variable objects. The keys are the variable names.
:param dict group_metadata: Metadata for the group to create variables from.
:param parent: See :class:`~ocgis.variable.base.AbstractContainer`
:return: dict
"""
vmeta = group_metadata['variables']
vars = {}
for k, v in vmeta.items():
# Dimensions are set in this sourced variable initialization call. This is to allow the group hierarchy to
# be determined by the parents' relationships.
nvar = SourcedVariable(name=v.get('name', k), dtype=v.get('dtype'), request_dataset=self.rd, source_name=k,
parent=parent)
vars[k] = nvar
return vars
@staticmethod
def get_data_variable_names(group_metadata, group_dimension_map):
"""
Return a tuple of data variable names using the metadata and dimension map.
"""
return tuple(group_metadata['variables'].keys())
def get_distributed_dimension_name(self, dimension_map, dimensions_metadata, decomp_type=DecompositionType.OCGIS):
"""Return the preferred distributed dimension name."""
return None
def get_dump_report(self, indent=0, group_metadata=None, first=True, global_attributes_name='global'):
lines = []
if first:
lines.append('OCGIS Driver Key: ' + self.key + ' {')
group_metadata = group_metadata or self.metadata_source
else:
indent += 2
lines += get_dump_report_for_group(group_metadata, global_attributes_name=global_attributes_name, indent=indent)
for group_name, group_metadata in list(group_metadata.get('groups', {}).items()):
lines.append('')
lines.append(' ' * indent + 'group: ' + group_name + ' {')
dump_lines = self.get_dump_report(group_metadata=group_metadata, first=False, indent=indent,
global_attributes_name=group_name)
lines += dump_lines
lines.append(' ' * indent + ' }' + ' // group: {}'.format(group_name))
if first:
lines.append('}')
return lines
@classmethod
def get_esmf_fileformat(cls):
"""
Get the ESMF file format associated with the driver. The string should be an accessible attribute on :attr:`ESMF.constants.FileFormat`.
:rtype: str
"""
from ocgis.regrid.base import ESMF
return getattr(ESMF.constants.FileFormat, cls._esmf_fileformat)
@classmethod
def get_esmf_grid_class(cls):
"""
Get the ESMF grid class.
:rtype: :class:`ESMF.Grid` | :class:`ESMF.Mesh`
"""
return constants.ESMFGridClass.get_esmf_class(cls._esmf_grid_class)
@staticmethod
def get_grid(field):
"""
Construct a grid object and return it. If a grid object may not be constructed, return ``None``.
:param field: The field object to use for constructing the grid.
:type field: :class:`~ocgis.Field`
:rtype: :class:`ocgis.spatial.grid.AbstractGrid` | ``None``
"""
raise NotImplementedError
@staticmethod
def get_group_metadata(group_index, metadata, has_root=False):
return get_group(metadata, group_index, has_root=has_root)
def get_metadata(self):
"""
:rtype: dict
"""
metadata_subclass = self._get_metadata_main_()
# Use the predicate (filter) if present on the request dataset.
# TODO: Should handle groups?
pred = self.rd.predicate
if pred is not None:
to_pop = []
for var_name in metadata_subclass['variables'].keys():
if not pred(var_name):
to_pop.append(var_name)
for var_name in to_pop:
metadata_subclass['variables'].pop(var_name)
return metadata_subclass
def get_source_metadata_as_json(self):
def _jsonformat_(d):
for k, v in d.items():
if isinstance(v, dict):
_jsonformat_(v)
else:
try:
# Numpy arrays need to be convered to lists.
v = v.tolist()
except AttributeError:
v = str(v)
d[k] = v
meta = deepcopy(self.metadata_source)
_jsonformat_(meta)
return json.dumps(meta)
@staticmethod
def get_or_create_spatial_mask(*args, **kwargs):
"""
Get or create the spatial mask variable and return its mask value as a boolean array.
:param tuple args: See table
===== ======================================================= ===================================
Index Type Description
===== ======================================================= ===================================
0 :class:`ocgis.spatial.base.AbstractXYZSpatialContainer` Target XYZ spatial container
1: <varying> See :meth:`ocgis.Variable.get_mask`
===== ======================================================= ===================================
:param dict kwargs: See keyword arguments to :meth:`~ocgis.Variable.get_mask`. If ``create`` and ``check_value``
are ``True`` and no mask variable exists, then coordinate variable masks are combined using a logical OR
operation.
:rtype: :class:`numpy.ndarray`
"""
from ocgis.spatial.base import create_spatial_mask_variable
args = list(args)
sobj = args[0] # Spatial object containing coordinate variables
args = args[1:]
create = kwargs.get(KeywordArgument.CREATE, False)
check_value = kwargs.get(KeywordArgument.CHECK_VALUE, False)
# Check for existing mask variable
mask_variable = sobj.mask_variable
ret = None
if mask_variable is None:
if create:
if check_value:
# Combine coordinate variable masks using a logical OR operation
for ii, cvar in enumerate(sobj.coordinate_variables):
currmask = cvar.get_mask(create=True, check_value=True)
if ii == 0:
mask_value = currmask
else:
mask_value = np.logical_or(currmask, mask_value)
else:
# Let the mask creation function handling the creation of mask values
mask_value = None
# Create the spatial mask variable and set it
mask_variable = create_spatial_mask_variable(VariableName.SPATIAL_MASK, mask_value, sobj.dimensions)
sobj.set_mask(mask_variable)
if mask_variable is not None:
# Validate the mask variable checking attributes, data types, etc.
sobj.driver.validate_spatial_mask(mask_variable)
# Get the actual boolean array from the mask variable
ret = mask_variable.get_mask(*args, **kwargs)
return ret
def get_variable_collection(self, **kwargs):
"""Here for backwards compatibility."""
return self.create_raw_field(**kwargs)
def get_variable_metadata(self, variable_object):
variable_metadata = get_variable_metadata_from_request_dataset(self, variable_object)
return variable_metadata
@classmethod
def get_variable_for_writing(cls, variable):
"""
Allows variables to overload which member to use for writing. For example, temporal variables always want
numeric times.
"""
from ocgis.variable.temporal import TemporalVariable
if isinstance(variable, TemporalVariable):
ret = cls.get_variable_for_writing_temporal(variable)
else:
ret = variable
return ret
@classmethod
def get_variable_for_writing_temporal(cls, temporal_variable):
# Only return datetime objects directly if time is being formatted. Otherwise return the standard value, these
# could be datetime objects if passed during initialization.
if temporal_variable.format_time:
ret = temporal_variable.value_datetime
else:
ret = temporal_variable.value
return ret
@abc.abstractmethod
def get_variable_value(self, variable):
"""Get value for the variable."""
@classmethod
def get_variable_write_dtype(cls, variable):
return cls.get_variable_for_writing(variable).dtype
@classmethod
def get_variable_write_fill_value(cls, variable):
ret = cls.get_variable_for_writing(variable).fill_value
return ret
@classmethod
def get_variable_write_value(cls, variable):
from ocgis.variable.temporal import TemporalVariable
if variable.has_allocated_value:
if isinstance(variable, TemporalVariable):
ret = cls.get_variable_for_writing(variable)
else:
if variable.has_mask:
ret = cls.get_variable_for_writing(variable).get_masked_value()
else:
ret = cls.get_variable_for_writing(variable).get_value()
else:
ret = None
return ret
def init_variable_from_source(self, variable):
variable_metadata = self.get_variable_metadata(variable)
# Create the dimensions if they are not present.
if variable._dimensions is None:
dist = self.dist
desired_dimensions = variable_metadata.get('dimensions')
# Variable may not have any associated dimensions (attribute-only variable).
if desired_dimensions is not None:
new_dimensions = []
for d in desired_dimensions:
try:
to_append = dist.get_dimension(d, group=variable.group, rank=vm.rank)
except KeyError:
# Rank may not be mapped due to scoped VM.
if vm.is_live:
raise
else:
variable.convert_to_empty()
break
else:
new_dimensions.append(to_append)
super(SourcedVariable, variable).set_dimensions(new_dimensions)
# Call the subclass variable initialization routine.
self._init_variable_from_source_main_(variable, variable_metadata)
# The variable is now allocated.
variable._allocated = True
def init_variable_value(self, variable):
"""Set the variable value from source data conforming units in the process."""
from ocgis.variable.temporal import TemporalVariable
value = self.get_variable_value(variable)
variable.set_value(value, update_mask=True)
# Conform the units if requested. Need to check if this variable is inside a group to find the appropriate
# metadata.
meta = get_variable_metadata_from_request_dataset(self, variable)
conform_units_to = meta.get('conform_units_to')
if conform_units_to is not None:
# The initialized units for the variable are overloaded by the destination / conform to units.
if isinstance(variable, TemporalVariable):
from_units = TemporalVariable(units=meta['attrs']['units'],
calendar=meta['attrs'].get('calendar'))
from_units = from_units.cfunits
else:
from_units = meta['attrs']['units']
variable.cfunits_conform(conform_units_to, from_units=from_units)
@staticmethod
def inquire_opened_state(opened_or_path):
"""
Return ``True`` if the input is an opened file object.
:param opened_or_path: Output file path or an open file object.
:rtype: bool
"""
poss = tuple(list(six.string_types) + [tuple, list])
if isinstance(opened_or_path, poss):
ret = False
else:
ret = True
return ret
def inspect(self):
"""
Inspect the request dataset printing information to stdout.
"""
for line in self.get_dump_report():
print(line)
@staticmethod
def iterator_formatter(name, value, mask):
return [(name, value)]
@staticmethod
def iterator_formatter_time_bounds(name, value, mask):
return AbstractDriver.iterator_formatter(name, value, mask)
@staticmethod
def iterator_formatter_time_value(name, value, mask):
return AbstractDriver.iterator_formatter(name, value, mask)
@classmethod
def open(cls, uri=None, mode='r', rd=None, **kwargs):
if uri is None and rd is None:
raise ValueError('A URI or request dataset is required.')
if rd is not None and rd.opened is not None:
ret = rd.opened
else:
if rd is not None and uri is None:
uri = rd.uri
ret = cls._open_(uri, mode=mode, **kwargs)
return ret
@staticmethod
def set_spatial_mask(sobj, value, cascade=False):
"""
Set the spatial mask on an XYZ spatial container.
:param sobj: Target XYZ spatial container
:type sobj: :class:`ocgis.spatial.base.AbstractXYZSpatialContainer`
:param value: The spatial mask value. This may be a variable or a boolean array. If it is a boolean array, a
spatial mask variable will be created and this array set as its mask.
:type value: :class:`ocgis.Variable` | :class:`numpy.ndarray`
:param bool cascade: If ``True``, cascade the mask across shared dimensions on the grid.
"""
from ocgis.spatial.grid import grid_set_mask_cascade
from ocgis.spatial.base import create_spatial_mask_variable
from ocgis.variable.base import Variable
if value is None:
# Remove the mask variable from the parent object and update the dimension map.
if sobj.has_mask:
sobj.parent.remove_variable(sobj.mask_variable)
sobj.dimension_map.set_spatial_mask(None)
else:
if isinstance(value, Variable):
# Set the mask variable from the incoming value.
sobj.parent.add_variable(value, force=True)
mask_variable = value
else:
# Convert the incoming boolean array into a mask variable.
mask_variable = sobj.mask_variable
if mask_variable is None:
dimensions = sobj.dimensions
mask_variable = create_spatial_mask_variable(VariableName.SPATIAL_MASK, value, dimensions)
sobj.parent.add_variable(mask_variable)
else:
mask_variable.set_mask(value)
sobj.dimension_map.set_spatial_mask(mask_variable)
if cascade:
grid_set_mask_cascade(sobj)
def validate_field(self, field):
pass
@classmethod
def validate_ops(cls, ops):
"""
:param ops: An operation object to validate.
:type ops: :class:`~ocgis.OcgOperations`
:raises: DefinitionValidationError
"""
if cls.output_formats != 'all':
if ops.output_format not in cls.output_formats:
msg = 'Output format not supported for driver "{0}". Supported output formats are: {1}'.format(cls.key,
cls.output_formats)
ocgis_lh(logger='driver', exc=DefinitionValidationError('output_format', msg))
@staticmethod
def validate_spatial_mask(mask_variable):
"""
Validate the spatial mask variable.
:param mask_variable: Target spatial mask variable
:type mask_variable: :class:`ocgis.Variable`
:raises: ValueError
"""
if mask_variable.attrs.get('ocgis_role') != 'spatial_mask':
msg = 'Mask variable "{}" must have an "ocgis_role" attribute with a value of "spatial_mask".'.format(
mask_variable.name)
raise ValueError(msg)
def write_variable(self, *args, **kwargs):
"""Write a variable. Not applicable for tabular formats."""
raise NotImplementedError
@classmethod
def write_field(cls, field, opened_or_path, **kwargs):
raise_if_empty(field)
vc_to_write = cls._get_field_write_target_(field)
cls.write_variable_collection(vc_to_write, opened_or_path, **kwargs)
@classmethod
def write_variable_collection(cls, vc, opened_or_path, **kwargs):
raise_if_empty(vc)
if 'ranks_to_write' in kwargs:
raise TypeError("write_variable_collection() got an unexepcted keyword argument 'ranks_to_write'")
write_mode = kwargs.pop(KeywordArgument.WRITE_MODE, None)
if vm.size > 1:
if cls.inquire_opened_state(opened_or_path):
raise ValueError('Only paths allowed for parallel writes.')
if write_mode is None:
write_modes = cls._get_write_modes_(vm, **kwargs)
else:
write_modes = [write_mode]
# vm.rank_print('tkd: write_modes', write_modes)
# Global string lengths are needed by the write. Set those while we still have global access.
for var in vc.values():
if var._string_max_length_global is None:
var.set_string_max_length_global()
for write_mode in write_modes:
cls._write_variable_collection_main_(vc, opened_or_path, write_mode, **kwargs)
@staticmethod
def _close_(obj):
"""
Close and finalize the open file object.
"""
obj.close()
@staticmethod
def _gc_nchunks_dst_(grid_chunker):
"""
Calculate the default chunking decomposition for a destination grid in the grid chunker. The decomposition should
be a tuple of integers.
>>> (10, 5)
>>> (12,)
:param grid_chunker: The grid chunker object.
:type grid_chunker: :class:`~ocgis.spatial.grid_chunker.GridChunker`
:rtype: tuple
"""
raise NotImplementedError
@classmethod
def _get_write_modes_(cls, the_vm, **kwargs):
if the_vm.size > 1:
write_modes = [MPIWriteMode.TEMPLATE, MPIWriteMode.FILL]
else:
write_modes = [MPIWriteMode.NORMAL]
return write_modes
def _get_crs_main_(self, group_metadata):
"""Return the coordinate system variable or None if not found."""
return None
@abc.abstractmethod
def _get_metadata_main_(self):
"""
Return the base metadata object. The superclass will finalize the metadata doing things like adding dimension
maps for each group.
:rtype: dict
"""
@classmethod
def _get_field_write_target_(cls, field):
return field
@abc.abstractmethod
def _init_variable_from_source_main_(self, variable_object, variable_metadata):
"""Initialize everything but dimensions on the target variable."""
@staticmethod
def _open_(uri, mode='r', **kwargs):
"""
:rtype: object
"""
return open(uri, mode=mode, **kwargs)
@classmethod
@abc.abstractmethod
def _write_variable_collection_main_(cls, vc, opened_or_path, write_mode, **kwargs):
"""
:param vc: :class:`~ocgis.new_interface.variable.VariableCollection`
:param opened_or_path: Opened file object or path to the file object to open.
:param comm: The MPI communicator.
:param rank: The MPI rank.
:param size: The MPI size.
"""
[docs]@six.add_metaclass(abc.ABCMeta)
class AbstractTabularDriver(AbstractDriver):
"""
Base class for tabular drivers (no optimal single variable access).
"""
def get_distributed_dimension_name(self, dimension_map, dimensions_metadata, decomp_type=DecompositionType.OCGIS):
"""Return the preferred distributed dimension name."""
return list(dimensions_metadata.values())[0]['name']
@staticmethod
def iterator_formatter_time_bounds(name, value, mask):
if mask:
formatted_value = None
else:
formatted_value = str(value)
ret = [(name, formatted_value)]
return ret
@staticmethod
def iterator_formatter_time_value(name, value, mask):
if mask:
formatted_value = None
else:
formatted_value = str(value)
ret = [(name, formatted_value)]
try:
ret.append(['YEAR', value.year])
ret.append(['MONTH', value.month])
ret.append(['DAY', value.day])
except AttributeError:
# Assume this is not a datetime object.
ret.append(['YEAR', None])
ret.append(['MONTH', None])
ret.append(['DAY', None])
return ret
[docs]@six.add_metaclass(ABCMeta)
class AbstractUnstructuredDriver(AbstractOcgisObject):
default_axes_positions = (0, 0) # Standard axes index for Y and X respectively.
_esmf_grid_class = constants.ESMFGridClass.MESH
_is_unstructured = True
_start_index = 0
@staticmethod
def get_element_dimension(gc):
"""See :meth:`ocgis.spatial.geomc.AbstractGeometryCoordinates.element_dim`"""
cindex = gc.cindex
if cindex is None:
ret = gc.archetype.dimensions[0]
else:
ret = cindex.dimensions[0]
return ret
@staticmethod
def get_grid(field):
try:
ret = GridUnstruct(parent=field)
except GridDeficientError:
ret = None
return ret
@staticmethod
def get_multi_break_value(cindex):
"""See :meth:`ocgis.spatial.geomc.AbstractGeometryCoordinates.multi_break_value`"""
mbv_name = OcgisConvention.Name.MULTI_BREAK_VALUE
return cindex.attrs.get(mbv_name)
@contextmanager
def driver_scope(ocgis_driver, opened_or_path=None, mode='r', **kwargs):
kwargs = kwargs.copy()
if opened_or_path is None:
try:
# Attempt to get the request dataset from the driver. If not there, assume we are working with the driver
# class and not an instance created with a request dataset.
rd = ocgis_driver.rd
except AttributeError:
rd = None
if rd is None:
raise ValueError('Without a driver instance and no open object or file path, nothing can be scoped.')
else:
if rd.opened is not None:
opened_or_path = rd.opened
else:
opened_or_path = rd.uri
else:
rd = None
if ocgis_driver.inquire_opened_state(opened_or_path):
should_close = False
else:
should_close = True
if rd is not None and rd.driver_kwargs is not None:
kwargs.update(rd.driver_kwargs)
opened_or_path = ocgis_driver.open(uri=opened_or_path, mode=mode, rd=rd, **kwargs)
try:
yield opened_or_path
finally:
if should_close:
ocgis_driver.close(opened_or_path)
def find_variable_by_attribute(variables_metadata, attribute_name, attribute_value):
ret = []
for variable_name, variable_metadata in list(variables_metadata.items()):
for k, v in list(variable_metadata['attrs'].items()):
if k == attribute_name and v == attribute_value:
ret.append(variable_name)
return ret
def format_attribute_for_dump_report(attr_value):
if isinstance(attr_value, six.string_types):
ret = '"{}"'.format(attr_value)
else:
ret = attr_value
return ret
def create_dimension_map_raw(driver, group_metadata):
ret = DimensionMap.from_metadata(driver, group_metadata)
return ret
def get_dump_report_for_group(group, global_attributes_name='global', indent=0):
lines = []
if len(group['dimensions']) > 0:
lines.append('dimensions:')
template = ' {0} = {1} ;{2}'
for key, value in list(group['dimensions'].items()):
if value.get('isunlimited', False):
one = 'ISUNLIMITED'
two = ' // {0} currently'.format(value['size'])
else:
one = value['size']
two = ''
lines.append(template.format(key, one, two))
if len(group['variables']) > 0:
lines.append('variables:')
var_template = ' {0} {1}({2}) ;'
attr_template = ' {0}:{1} = {2} ;'
for key, value in list(group['variables'].items()):
dims = [str(d) for d in value['dimensions']]
dims = ', '.join(dims)
lines.append(var_template.format(value['dtype'], key, dims))
for key2, value2 in value.get('attrs', {}).items():
lines.append(attr_template.format(key, key2, format_attribute_for_dump_report(value2)))
global_attributes = group.get('global_attributes', {})
if len(global_attributes) > 0:
lines.append('')
lines.append('// {} attributes:'.format(global_attributes_name))
template = ' :{0} = {1} ;'
for key, value in list(global_attributes.items()):
try:
lines.append(template.format(key, format_attribute_for_dump_report(value)))
except UnicodeEncodeError:
# for a unicode string, if "\u" is in the string and an inappropriate unicode character is used, then
# template formatting will break.
msg = 'Unable to encode attribute "{0}". Skipping printing of attribute value.'.format(key)
warn(msg)
if indent > 0:
indent_string = ' ' * indent
for idx, current in enumerate(lines):
if len(current) > 0:
lines[idx] = indent_string + current
return lines
def get_variable_metadata_from_request_dataset(driver, variable):
variables_metadata = get_group(driver.metadata_source, variable.group, has_root=False)['variables']
try:
ret = variables_metadata[variable._source_name]
except KeyError:
raise VariableMissingMetadataError(variable._source_name)
return ret
def iter_all_group_keys(ddict, entry=None, has_root=True):
if not has_root:
ddict = {None: ddict}
if entry is None:
entry = [None]
yield entry
for keyseq in iter_group_keys(ddict, entry):
for keyseq2 in iter_all_group_keys(ddict, keyseq):
yield keyseq2
def iter_group_keys(ddict, keyseq):
for key in get_group(ddict, keyseq).get('groups', {}):
yld = deepcopy(keyseq)
yld.append(key)
yield yld