import itertools
import logging
from abc import ABCMeta
from collections import OrderedDict
from copy import deepcopy
import netCDF4 as nc
import numpy as np
import six
from netCDF4._netCDF4 import VLType, MFDataset, MFTime
from ocgis import constants, vm
from ocgis import env
from ocgis.base import orphaned, raise_if_empty
from ocgis.collection.field import Field
from ocgis.constants import MPIWriteMode, DimensionMapKey, KeywordArgument, DriverKey, CFName, SourceIndexType, \
DecompositionType
from ocgis.driver.base import AbstractDriver, driver_scope
from ocgis.exc import ProjectionDoesNotMatch, PayloadProtectedError, NoDataVariablesFound, \
GridDeficientError
from ocgis.util.helpers import itersubclasses, get_iter, get_formatted_slice, get_by_key_list, is_auto_dtype, get_group
from ocgis.util.logging_ocgis import ocgis_lh
from ocgis.variable.base import SourcedVariable, ObjectType, get_slice_sequence_using_local_bounds
from ocgis.variable.crs import CFCoordinateReferenceSystem, CoordinateReferenceSystem, CFRotatedPole, CFSpherical, \
AbstractProj4CRS
from ocgis.variable.dimension import Dimension
from ocgis.variable.temporal import TemporalVariable
from ocgis.vmachine.mpi import OcgDist
[docs]class DriverNetcdf(AbstractDriver):
"""
Driver for netCDF files that avoids any hint of metadata.
Driver keyword arguments (``driver_kwargs``) to the request dataset:
* Any keyword arguments to dataset or multi-file dataset creation.
"""
extensions = ('.*\.nc', 'http.*')
key = DriverKey.NETCDF
output_formats = 'all'
common_extension = 'nc'
@property
def data_model(self):
return self.metadata_source['file_format']
@staticmethod
def get_data_variable_names(group_metadata, group_dimension_map):
return tuple()
@classmethod
def get_variable_for_writing_temporal(cls, temporal_variable):
return temporal_variable.value_numtime
def get_variable_value(self, variable):
return get_value_from_request_dataset(variable)
@classmethod
def write_variable(cls, var, dataset, write_mode=MPIWriteMode.NORMAL, **kwargs):
"""
Write a variable to an open netCDF dataset object.
:param var: Variable object.
:param dataset: Open netCDF dataset object.
:param kwargs: Arguments to netCDF variable creation with additional keyword arguments below.
:keyword bool file_only: (``=False``) If ``True``, do not write the value to the output file. Create an empty
netCDF file.
:keyword bool unlimited_to_fixed_size: (``=False``) If ``True``, convert the unlimited dimension to a fixed size.
"""
# There should never be any write operations associated with an empty variable.
raise_if_empty(var)
# Write the parent collection if available on the variable.
if not var.is_orphaned:
parent_kwargs = {}
parent_kwargs[KeywordArgument.VARIABLE_KWARGS] = kwargs
return var.parent.write(dataset, **parent_kwargs)
assert isinstance(dataset, nc.Dataset)
file_only = kwargs.pop(KeywordArgument.FILE_ONLY, False)
unlimited_to_fixed_size = kwargs.pop(KeywordArgument.UNLIMITED_TO_FIXED_SIZE, False)
# No data should be written during a global write. Data will be filled in during the append process.
if write_mode == MPIWriteMode.TEMPLATE:
file_only = True
if var.name is None:
msg = 'A variable "name" is required.'
raise ValueError(msg)
# Dimension creation should not occur during a fill operation. The dimensions and variables have already been
# created.
if write_mode != MPIWriteMode.FILL:
dimensions = var.dimensions
dtype = cls.get_variable_write_dtype(var)
if isinstance(dtype, ObjectType):
dtype = dtype.create_vltype(dataset, dimensions[0].name + '_VLType')
# Assume we are writing string data if the data type is object.
elif dtype == str or var.is_string_object:
dtype = 'S1'
if len(dimensions) > 0:
# Special handling for string variables.
if dtype == 'S1':
max_length = var.string_max_length_global
assert max_length is not None
dimensions = [var.dimensions[0], Dimension('{}_ocgis_slen'.format(var.name), max_length)]
dimensions = list(dimensions)
# Convert the unlimited dimension to fixed size if requested.
for idx, d in enumerate(dimensions):
if d.is_unlimited and unlimited_to_fixed_size:
dimensions[idx] = Dimension(d.name, size=var.shape[idx])
break
# Create the dimensions.
for dim in dimensions:
create_dimension_or_pass(dim, dataset, write_mode=write_mode)
dimensions = [d.name for d in dimensions]
# Only use the fill value if something is masked.
is_nc3 = dataset.data_model.startswith('NETCDF3')
if ((len(dimensions) > 0 and var.has_masked_values) and (
write_mode == MPIWriteMode.TEMPLATE or not file_only)) or (
is_nc3 and not var.has_allocated_value and len(
dimensions) > 0) or (env.USE_NETCDF4_MPI and var.has_mask and vm.size > 1):
fill_value = cls.get_variable_write_fill_value(var)
else:
# Copy from original attributes.
if '_FillValue' not in var.attrs:
fill_value = None
else:
fill_value = cls.get_variable_write_fill_value(var)
if write_mode == MPIWriteMode.FILL:
ncvar = dataset.variables[var.name]
else:
ncvar = dataset.createVariable(var.name, dtype, dimensions=dimensions, fill_value=fill_value, **kwargs)
if write_mode == MPIWriteMode.ASYNCHRONOUS:
# Tell NC4 we are writing the variable in parallel
ncvar.set_collective(True)
# Do not fill values on file_only calls. Also, only fill values for variables with dimension greater than zero.
if not file_only and not var.is_empty and not isinstance(var, CoordinateReferenceSystem):
if not var.is_string_object and isinstance(var.dtype, ObjectType) and not isinstance(var, TemporalVariable):
bounds_local = var.dimensions[0].bounds_local
for idx in range(bounds_local[0], bounds_local[1]):
ncvar[idx] = np.array(var.get_value()[idx - bounds_local[0]])
else:
fill_slice = get_slice_sequence_using_local_bounds(var)
data_value = cls.get_variable_write_value(var)
# Only write allocated values.
if data_value is not None:
if var.dtype == str or var.is_string_object:
for idx in range(fill_slice[0].start, fill_slice[0].stop):
try:
curr_value = data_value[idx - fill_slice[0].start]
except Exception as e:
msg = "Variable name is '{}'. Original message: ".format(var.name) + str(e)
raise e.__class__(msg)
for sidx, sval in enumerate(curr_value):
ncvar[idx, sidx] = sval
elif var.ndim == 0:
ncvar[:] = data_value
else:
try:
ncvar.__setitem__(fill_slice, data_value)
except Exception as e:
msg = "Variable name is '{}'. Original message: ".format(var.name) + str(e)
raise e.__class__(msg)
# Only set variable attributes if this is not a fill operation.
if write_mode != MPIWriteMode.FILL:
var.write_attributes_to_netcdf_object(ncvar)
if var.units is not None:
ncvar.setncattr('units', str(var.units))
dataset.sync()
@classmethod
def _write_variable_collection_main_(cls, vc, opened_or_path, write_mode, **kwargs):
assert write_mode is not None
dataset_kwargs = kwargs.get('dataset_kwargs', {})
variable_kwargs = kwargs.get('variable_kwargs', {})
# When filling a dataset, we use append mode.
if write_mode == MPIWriteMode.FILL:
mode = 'a'
else:
mode = 'w'
# For an asynchronous write, treat everything like a single rank.
if write_mode == MPIWriteMode.ASYNCHRONOUS:
possible_ranks = [0]
else:
possible_ranks = vm.ranks
# Write the data on each rank.
for idx, rank_to_write in enumerate(possible_ranks):
# The template write only occurs on the first rank.
if write_mode == MPIWriteMode.TEMPLATE and rank_to_write != 0:
pass
# If this is not a template write, fill the data.
elif write_mode == MPIWriteMode.ASYNCHRONOUS or vm.rank == rank_to_write:
with driver_scope(cls, opened_or_path=opened_or_path, mode=mode, **dataset_kwargs) as dataset:
# Write global attributes if we are not filling data.
if write_mode != MPIWriteMode.FILL:
vc.write_attributes_to_netcdf_object(dataset)
# This is the main variable write loop.
variables_to_write = get_variables_to_write(vc)
for variable in variables_to_write:
# Load the variable's data before orphaning. The variable needs its parent to know which
# group it is in.
variable.load()
# Call the individual variable write method in fill mode. Orphaning is required as a
# variable will attempt to write its parent first.
with orphaned(variable, keep_dimensions=True):
variable.write(dataset, write_mode=write_mode, **variable_kwargs)
# Recurse the children.
for child in list(vc.children.values()):
if write_mode != MPIWriteMode.FILL:
group = nc.Group(dataset, child.name)
else:
group = dataset.groups[child.name]
child.write(group, write_mode=write_mode, **kwargs)
dataset.sync()
vm.barrier()
def _get_metadata_main_(self):
with driver_scope(self) as ds:
ret = parse_metadata(ds)
return ret
@classmethod
def _get_write_modes_(cls, the_vm, **kwargs):
dataset_kwargs = kwargs.get(KeywordArgument.DATASET_KWARGS, {})
ret = AbstractDriver._get_write_modes_(the_vm, **kwargs)
if env.USE_NETCDF4_MPI and the_vm.size > 1:
if dataset_kwargs.get('format', 'NETCDF4') == 'NETCDF4' and dataset_kwargs.get('parallel', True):
ret = [MPIWriteMode.ASYNCHRONOUS]
return ret
def _init_variable_from_source_main_(self, variable, variable_object):
init_variable_using_metadata_for_netcdf(variable, self.rd.metadata)
@staticmethod
def _open_(uri, mode='r', **kwargs):
"""
:rtype: object
"""
kwargs = kwargs.copy()
group_indexing = kwargs.pop('group_indexing', None)
lvm = kwargs.pop('vm', vm)
if isinstance(uri, six.string_types):
# Open the dataset in parallel if we want to use the netCDF MPI capability. It may not be available even in
# parallel.
if mode == 'w' and lvm.size > 1:
if kwargs.get('format', 'NETCDF4') == 'NETCDF4':
if kwargs.get('parallel') is None and env.USE_NETCDF4_MPI:
kwargs['parallel'] = True
if kwargs.get('parallel') and kwargs.get('comm') is None:
kwargs['comm'] = lvm.comm
ret = nc.Dataset(uri, mode=mode, **kwargs)
# tdk:FIX: this should be enabled for MFDataset as well. see https://github.com/Unidata/netcdf4-python/issues/809#issuecomment-435144221
# netcdf4 >= 1.4.0 always returns masked arrays. This is inefficient and is turned off by default by ocgis.
if hasattr(ret, 'set_always_mask'):
ret.set_always_mask(False)
else:
ret = nc.MFDataset(uri, **kwargs)
if group_indexing is not None:
for group_name in get_iter(group_indexing):
ret = ret.groups[group_name]
return ret
[docs]@six.add_metaclass(ABCMeta)
class AbstractDriverNetcdfCF(DriverNetcdf):
def create_dimension_map(self, group_metadata, strict=False):
dmap = super(AbstractDriverNetcdfCF, self).create_dimension_map(group_metadata, strict=strict)
# Check for time, level, and realization variables. This involves checking for the presence of any bounds
# variables.
axes = {'realization': 'R', 'time': 'T', 'level': 'Z'}
entries = self._create_dimension_map_entries_dict_(axes, group_metadata, strict)
self._update_dimension_map_with_entries_(entries, dmap)
# Check for coordinate system variables. This will check every variable.
crs_name = get_coordinate_system_variable_name(self, group_metadata)
if crs_name is not None:
dmap.set_crs(crs_name)
# Check for a spatial mask.
variables = group_metadata['variables']
for varname, var in variables.items():
if 'ocgis_role' in var.get('attrs', {}):
if var['attrs']['ocgis_role'] == 'spatial_mask':
dmap.set_spatial_mask(varname, attrs=var['attrs'])
break
return dmap
@staticmethod
def _create_dimension_map_entries_dict_(axes, group_metadata, strict, attr_name='axis'):
variables = group_metadata['variables']
check_bounds = list(axes.keys())
if 'realization' in check_bounds:
check_bounds.pop(check_bounds.index('realization'))
# Get the main entry for each axis.
for k, v in list(axes.items()):
axes[k] = create_dimension_map_entry(v, variables, strict=strict, attr_name=attr_name)
# Attempt to find bounds for each entry (ignoring realizations).
for k in check_bounds:
if axes[k] is not None:
keys = ['bounds']
if k == 'time':
keys += ['climatology']
bounds_var = get_by_key_list(variables[axes[k]['variable']]['attrs'], keys)
if bounds_var is not None:
if bounds_var not in variables:
msg = 'Bounds listed for variable "{0}" but the destination bounds variable "{1}" does not exist.'. \
format(axes[k]['variable'], bounds_var)
ocgis_lh(msg, logger='nc.driver', level=logging.WARNING)
bounds_var = None
axes[k]['bounds'] = bounds_var
entries = {k: v for k, v in list(axes.items()) if v is not None}
return entries
def _get_crs_main_(self, group_metadata):
return get_crs_variable(group_metadata, self.rd.rotated_pole_priority)
@staticmethod
def _update_dimension_map_with_entries_(entries, dimension_map):
for k, v in entries.items():
variable_name = v.pop('variable')
dimension_map.set_variable(k, variable_name, **v)
[docs]class DriverNetcdfCF(AbstractDriverNetcdfCF):
"""
Metadata-aware netCDF driver interpreting CF-Grid by default.
"""
key = DriverKey.NETCDF_CF
_esmf_fileformat = 'GRIDSPEC'
_default_crs = env.DEFAULT_COORDSYS
_priority = True
@staticmethod
def array_resolution(value, axis):
"""See :meth:`ocgis.driver.base.AbstractDriver.array_resolution`"""
if value.size == 1:
return 0.0
else:
resolution_limit = constants.RESOLUTION_LIMIT
is_vectorized = value.ndim == 1
if is_vectorized:
target = np.abs(np.diff(np.abs(value[0:resolution_limit])))
else:
if axis == 0:
target = np.abs(np.diff(np.abs(value[:, 0:resolution_limit]), axis=axis))
elif axis == 1:
target = np.abs(np.diff(np.abs(value[0:resolution_limit, :]), axis=axis))
else:
raise NotImplementedError(axis)
ret = np.mean(target)
return ret
def create_dimension_map(self, group_metadata, strict=False):
dmap = super(DriverNetcdfCF, self).create_dimension_map(group_metadata, strict=strict)
# Get dimension variable metadata. This involves checking for the presence of any bounds variables.
axes = {'x': 'X', 'y': 'Y'}
entries = self._create_dimension_map_entries_dict_(axes, group_metadata, strict, attr_name='axis')
self._update_dimension_map_with_entries_(entries, dmap)
# Hack to get the original coordinate system. Tell the driver to get rotated pole if it can from the metadata.
original_rpp = self.rd.rotated_pole_priority
try:
self.rd.rotated_pole_priority = True
raw_crs = self.get_crs(group_metadata)
finally:
self.rd.rotated_pole_priority = original_rpp
# Swap out correct axis variables for spherical as opposed to rotated latitude longitude
if not self.rd.rotated_pole_priority and isinstance(raw_crs, CFRotatedPole):
crs = self.get_crs(group_metadata)
if isinstance(crs, CFSpherical):
axes = {DimensionMapKey.X: {'value': CFName.StandardName.LONGITUDE, 'axis': 'X'},
DimensionMapKey.Y: {'value': CFName.StandardName.LATITUDE, 'axis': 'Y'}}
crs_entries = self._create_dimension_map_entries_dict_(axes, group_metadata, strict,
attr_name=CFName.STANDARD_NAME)
self._update_dimension_map_with_entries_(crs_entries, dmap)
env.SET_GRID_AXIS_ATTRS = False
env.COORDSYS_ACTUAL = raw_crs
return dmap
@staticmethod
def get_data_variable_names(group_metadata, group_dimension_map):
axes_needed = [DimensionMapKey.TIME, DimensionMapKey.X, DimensionMapKey.Y]
dvars = []
dimension_names_needed = []
for axis in axes_needed:
axis_variable = group_dimension_map.get_variable(axis)
if axis_variable is None:
# A required axis is missing in the dimension map. Hence, there are no dimensioned variables in the
# group.
return tuple()
else:
dimension_names_needed += group_dimension_map.get_dimension(axis)
dimension_names_needed = set(dimension_names_needed)
for vk, vv in list(group_metadata['variables'].items()):
variable_dimension_names = set(vv['dimensions'])
intersection = dimension_names_needed.intersection(variable_dimension_names)
if len(intersection) == len(axes_needed):
dvars.append(vk)
return tuple(dvars)
def get_distributed_dimension_name(self, dimension_map, dimensions_metadata, decomp_type=DecompositionType.OCGIS):
x_variable = dimension_map.get_variable(DimensionMapKey.X)
y_variable = dimension_map.get_variable(DimensionMapKey.Y)
if x_variable and y_variable:
dimension_name_x = dimension_map.get_dimension(DimensionMapKey.X)[0]
if decomp_type == DecompositionType.OCGIS:
dimension_name_y = dimension_map.get_dimension(DimensionMapKey.Y)[0]
sizes = np.zeros(2, dtype={'names': ['dim', 'size'], 'formats': [object, int]})
sizes[0] = (dimension_name_x, dimensions_metadata[dimension_name_x]['size'])
sizes[1] = (dimension_name_y, dimensions_metadata[dimension_name_y]['size'])
max_index = np.argmax(sizes['size'])
ret = sizes['dim'][max_index]
elif decomp_type == DecompositionType.ESMF:
ret = dimension_name_x
else:
raise NotImplementedError(decomp_type)
else:
ret = None
return ret
@staticmethod
def get_grid(field):
from ocgis import Grid
try:
ret = Grid(parent=field)
except GridDeficientError:
ret = None
return ret
@staticmethod
def _gc_iter_dst_grid_slices_(grid_chunker, yield_idx=None):
slice_store = []
ydim_name = grid_chunker.dst_grid.dimensions[0].name
xdim_name = grid_chunker.dst_grid.dimensions[1].name
dst_grid_shape_global = grid_chunker.dst_grid.shape_global
for idx in range(grid_chunker.dst_grid.ndim):
splits = grid_chunker.nchunks_dst[idx]
size = dst_grid_shape_global[idx]
slices = create_slices_for_dimension(size, splits)
slice_store.append(slices)
for ctr, (slice_y, slice_x) in enumerate(itertools.product(*slice_store)):
if yield_idx is not None and yield_idx != ctr:
if ctr > yield_idx:
break
else:
continue
yield {ydim_name: create_slice_from_tuple(slice_y),
xdim_name: create_slice_from_tuple(slice_x)}
@classmethod
def _get_field_write_target_(cls, field):
"""Collective!"""
ocgis_lh(level=10, logger="driver.nc", msg="entering _get_field_write_target_")
if field.crs is not None:
field.crs.format_spatial_object(field)
grid = field.grid
if grid is not None:
# If any grid pieces are masked, ensure the mask is created across all grids.
has_mask = vm.gather(grid.has_mask)
if vm.rank == 0:
if any(has_mask):
create_mask = True
else:
create_mask = False
else:
create_mask = None
create_mask = vm.bcast(create_mask)
if create_mask and not grid.has_mask:
grid.get_mask(create=True)
# Putting units on bounds for netCDF-CF can confuse some parsers.
if grid.has_bounds:
field = field.copy()
field.x.bounds.attrs.pop('units', None)
field.y.bounds.attrs.pop('units', None)
# Remove the current coordinate system if this is a dummy coordinate system.
if env.COORDSYS_ACTUAL is not None:
field = field.copy()
field.set_crs(env.COORDSYS_ACTUAL, should_add=True)
return field
def create_slice_from_tuple(tup):
return slice(tup[0], tup[1])
def create_slices_for_dimension(size, splits):
ompi = OcgDist(size=splits)
dimname = 'foo'
ompi.create_dimension(dimname, size, dist=True)
ompi.update_dimension_bounds()
slices = []
for rank in range(splits):
dimension = ompi.get_dimension(dimname, rank=rank)
slices.append(dimension.bounds_local)
return slices
def parse_metadata(rootgrp, fill=None):
if fill is None:
fill = OrderedDict()
if 'groups' not in fill:
fill['groups'] = OrderedDict()
update_group_metadata(rootgrp, fill)
for group in list(rootgrp.groups.values()):
new_fill = fill['groups'][group.name] = OrderedDict()
parse_metadata(group, fill=new_fill)
return fill
def read_from_collection(target, request_dataset, parent=None, name=None, source_name=constants.UNINITIALIZED,
uid=None):
# Allow an empty variable renaming map. This occurs when there are no visible data variables to the metadata
# parser.
try:
rename_variable_map = request_dataset.rename_variable_map
except NoDataVariablesFound:
rename_variable_map = {}
ret = Field(attrs=get_netcdf_attributes(target), parent=parent, name=name, source_name=source_name, uid=uid)
pred = request_dataset.predicate
for varname, ncvar in target.variables.items():
if pred is not None and not pred(varname):
continue
source_name = varname
name = rename_variable_map.get(varname, varname)
sv = SourcedVariable(name=name, request_dataset=request_dataset, parent=ret, source_name=source_name)
ret[name] = sv
for group_name, ncgroup in list(target.groups.items()):
child = read_from_collection(ncgroup, request_dataset, parent=ret, name=group_name, uid=uid)
ret.add_child(child)
return ret
def init_variable_using_metadata_for_netcdf(variable, metadata):
source = get_group(metadata, variable.group, has_root=False)
desired_name = variable.source_name
var = source['variables'][desired_name]
if vm.is_null:
variable.convert_to_empty()
else:
# Update data type and fill value.
if is_auto_dtype(variable._dtype) or var.get('dtype_packed') is not None:
var_dtype = var['dtype']
desired_dtype = deepcopy(var_dtype)
if isinstance(var_dtype, VLType):
desired_dtype = ObjectType(var_dtype)
elif var['dtype_packed'] is not None:
desired_dtype = deepcopy(var['dtype_packed'])
variable._dtype = desired_dtype
if variable._fill_value == 'auto':
if var.get('fill_value_packed') is not None:
desired_fill_value = var['fill_value_packed']
else:
desired_fill_value = var.get('fill_value')
variable._fill_value = deepcopy(desired_fill_value)
variable_attrs = variable._attrs
# Offset and scale factors are not supported by OCGIS. The data is unpacked when written to a new output file.
# TODO: Consider supporting offset and scale factors for write operations.
exclude = ['add_offset', 'scale_factor']
for k, v in list(var.get('attrs', {}).items()):
if k in exclude:
continue
if k not in variable_attrs:
variable_attrs[k] = deepcopy(v)
# The conform units to value should be the default units value. Units will be converted on variable load.
conform_units_to = var.get('conform_units_to')
if conform_units_to is not None:
variable_attrs['units'] = conform_units_to
def get_coordinate_system_variable_name(driver_object, group_metadata):
rd = driver_object.rd
crs_name = None
if rd._has_assigned_coordinate_system:
if rd._crs is not None:
crs_name = rd._crs.name
else:
crs = driver_object.get_crs(group_metadata)
if crs is not None:
crs_name = crs.name
return crs_name
def get_value_from_request_dataset(variable):
if variable.protected:
raise PayloadProtectedError(variable.name)
rd = variable._request_dataset
with driver_scope(rd.driver) as source:
if variable.group is not None:
for vg in variable.group:
if vg is None:
continue
else:
source = source.groups[vg]
desired_name = variable.source_name or rd.variable
# Reference the variable in the source dataset.
ncvar = source.variables[desired_name]
# Allow multi-unit time values for temporal variables.
if isinstance(variable, TemporalVariable) and isinstance(source, MFDataset) and rd.format_time:
# MFTime may fail if time_bnds do not have a calendar attribute.
# Use rd.dimension_map.set_bounds('time', None) to disable indexing on time_bnds.
try:
ncvar = MFTime(ncvar, units=variable.units, calendar=variable.calendar)
except TypeError:
# Older versions of netcdf4-python do not support the calendar argument.
ncvar = MFTime(ncvar, units=variable.units)
ret = get_variable_value(ncvar, variable.dimensions)
return ret
def get_variables_to_write(vc):
from ocgis.variable.geom import GeometryVariable
ret = []
for variable in vc.values():
if isinstance(variable, GeometryVariable):
continue
else:
ret.append(variable)
return ret
def get_variable_value(variable, dimensions):
if dimensions is not None and len(dimensions) > 0:
to_format = [None] * len(dimensions)
for idx in range(len(dimensions)):
current_dimension = dimensions[idx]
si_type = current_dimension._src_idx_type
if si_type is None:
if current_dimension.bounds_local is None:
to_insert = slice(0, len(current_dimension))
else:
to_insert = slice(*current_dimension.bounds_local)
elif si_type == SourceIndexType.FANCY:
to_insert = current_dimension._src_idx
elif si_type == SourceIndexType.BOUNDS:
to_insert = slice(*current_dimension._src_idx)
else:
raise NotImplementedError(si_type)
to_format[idx] = to_insert
slc = get_formatted_slice(to_format, len(dimensions))
else:
slc = slice(None)
try:
ret = variable.__getitem__(slc)
except IndexError:
# TODO: Hack! Slicing the top-level MFTime variable does not work with multifiles.
ret = super(MFTime, variable).__getitem__(slc)
return ret
def create_dimension_or_pass(dim, dataset, write_mode=MPIWriteMode.NORMAL):
if dim.name not in dataset.dimensions:
if dim.is_unlimited:
size = None
elif write_mode in (MPIWriteMode.TEMPLATE, MPIWriteMode.ASYNCHRONOUS):
lower, upper = dim.bounds_global
size = upper - lower
else:
size = dim.size
dataset.createDimension(dim.name, size)
def get_crs_variable(metadata, rotated_pole_priority, to_search=None):
found = []
variables = metadata['variables']
for vname, var in list(variables.items()):
if to_search is not None:
if vname not in to_search:
continue
for potential in itersubclasses(CFCoordinateReferenceSystem):
try:
crs = potential.load_from_metadata(vname, metadata, strict=False)
found.append(crs)
break
except ProjectionDoesNotMatch:
continue
fset = set([f.name for f in found])
if len(fset) > 1:
msg = 'Multiple coordinate systems found. There should be only one.'
raise ValueError(msg)
elif len(fset) == 0:
crs = None
else:
crs = found[0]
# Allow spherical coordinate systems to overload rotated pole if they are present in the metadata.
if not rotated_pole_priority and isinstance(crs, CFRotatedPole):
try:
crs = CFSpherical.load_from_metadata(None, metadata, depth=2)
except ProjectionDoesNotMatch:
# No spherical coordinate system data available. Fall back on previous crs.
pass
# If there is no CRS, check for OCGIS coordinate systems.
if crs is None:
try:
crs = AbstractProj4CRS.load_from_metadata(metadata)
except ProjectionDoesNotMatch:
pass
return crs
def create_dimension_map_entry(src, variables, strict=False, attr_name='axis'):
"""
Create a dimension map entry dictionary by searching variable metadata using attribute constraints.
:param src: The source information to use for constructing the entry. If ``src`` is a dictionary, it must have two
entries. The key ``'value'`` corresponds to the string attribute value. The key ``'axis'`` is the representative
axis to assign the source value (for example ``'X'`` or ``'Y'``).
:type src: str | dict
:param dict variables: The metadata entries for the group's variables.
:param bool strict: If ``False``, do not use a strict interpretation of metadata. Allow some standard approaches for
handling metadata exceptions.
:param str attr_name: Name of the attribute to use for checking the attribute values form ``src``.
:return: dict
"""
if isinstance(src, dict):
axis = src['axis']
attr_value = src['value']
else:
axis = src
attr_value = src
axis_vars = []
for variable in list(variables.values()):
vattrs = variable.get('attrs', {})
if vattrs.get(attr_name) == attr_value:
if len(variable['dimensions']) == 0:
pass
else:
axis_vars.append(variable['name'])
# Try to find by default names.
if not strict and len(axis_vars) == 0:
possible_names = CFName.get_axis_mapping().get(axis, [])
for pn in possible_names:
if pn in list(variables.keys()):
axis_vars.append(variables[pn]['name'])
if len(axis_vars) == 1:
var_name = axis_vars[0]
dims = list(variables[var_name]['dimensions'])
if not strict:
# Use default index positions for X/Y dimensions.
if axis in ('X', 'Y') and len(dims) > 1:
if axis == 'Y':
dims = [dims[0]]
elif axis == 'X':
dims = [dims[1]]
ret = {'variable': var_name, DimensionMapKey.DIMENSION: dims}
elif len(axis_vars) > 1:
msg = 'Multiple axis (axis="{}") possibilities found using variable(s) "{}". Use a dimension map to specify ' \
'the appropriate coordinate dimensions.'
ocgis_lh(msg.format(axis, axis_vars), level=logging.WARN, logger='ocgis.driver.nc', force=True)
ret = None
else:
ret = None
return ret
def get_netcdf_attributes(target):
attributes = OrderedDict()
for attname in target.ncattrs():
try:
attributes[attname] = target.getncattr(attname)
except AttributeError:
# TODO: Report bug to netCDF4-python.
# May fail for multi-file datasets. Try to access the underlying dictionary.
attributes[attname] = target.__dict__[attname]
return attributes
def remove_netcdf_attribute(filename, variable_name, attr_name):
with driver_scope(DriverNetcdf, opened_or_path=filename, mode='a') as ds:
var = ds[variable_name]
var.delncattr(attr_name)
def update_group_metadata(rootgrp, fill):
global_attributes = get_netcdf_attributes(rootgrp)
fill.update({'global_attributes': global_attributes})
# get file format
fill.update({'file_format': rootgrp.file_format})
# get variables
variables = OrderedDict()
for key, value in rootgrp.variables.items():
subvar = OrderedDict()
for attr in value.ncattrs():
subvar.update({attr: getattr(value, attr)})
# Remove scale factors and offsets from the metadata.
if 'scale_factor' in subvar:
dtype_packed = value[0].dtype
fill_value_packed = np.ma.array([], dtype=dtype_packed).fill_value
else:
dtype_packed = None
fill_value_packed = None
# Attempt to find the fill value.
try:
fill_value = value.__dict__['_FillValue']
except KeyError:
try:
fill_value = value.fill_value
except AttributeError:
try:
fill_value = value.missing_value
except AttributeError:
fill_value = 'auto'
value_dtype = value.dtype
try:
value_datatype = value.datatype
except AttributeError:
# This is a fickle property for some reason. bekozi does not know if it is version related or not present
# on standard non-VLType variables. Maybe both. Either way, this is the best way to detect VLType variables.
value_datatype = None
if isinstance(value_datatype, VLType):
the_dtype = ObjectType(value_dtype)
else:
the_dtype = value_dtype
variables.update({key: {'dimensions': value.dimensions,
'attrs': subvar,
'dtype': the_dtype,
'name': value._name,
'fill_value': fill_value,
'dtype_packed': dtype_packed,
'fill_value_packed': fill_value_packed}})
fill.update({'variables': variables})
# get dimensions
dimensions = OrderedDict()
for key, value in rootgrp.dimensions.items():
subdim = {key: {'name': key, 'size': len(value), 'isunlimited': value.isunlimited()}}
dimensions.update(subdim)
fill.update({'dimensions': dimensions})