import abc
import logging
from collections import deque
import numpy as np
import six
from ocgis import env, vm
from ocgis.base import raise_if_empty, is_unstructured_driver
from ocgis.constants import KeywordArgument, GridAbstraction, VariableName, AttributeName, GridChunkerConstants, \
RegriddingRole, DMK, MPITag, DriverKey, ConversionTarget, MPI_EMPTY_VALUE
from ocgis.exc import RequestableFeature
from ocgis.spatial.base import AbstractXYZSpatialContainer
from ocgis.util.helpers import get_formatted_slice, arange_from_dimension, create_unique_global_array
from ocgis.util.logging_ocgis import ocgis_lh
from ocgis.variable.base import get_dslice, Variable
from ocgis.variable.dimension import create_distributed_dimension
from ocgis.variable.geom import GeometryProcessor, GeometryVariable
from ocgis.vmachine.mpi import cancel_free_requests
from shapely.geometry import Point, Polygon
from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry
from shapely.geometry.linestring import LineString
from shapely.geometry.multipolygon import MultiPolygon
def format_gridunstruct_return(func):
"""Allows operations on geometry coordinate objects to return their parent grid."""
def wrapped(*args, **kwargs):
obj = args[0]
result = func(*args, **kwargs)
if obj.hosted:
if isinstance(result, tuple):
to_format = result[0]
is_tuple = True
else:
to_format = result
is_tuple = False
if isinstance(to_format, AbstractGeometryCoordinates):
to_format = to_format.parent.grid
if is_tuple:
ret = tuple([to_format] + list(result)[1:])
else:
ret = to_format
else:
ret = result
return ret
return wrapped
[docs]@six.add_metaclass(abc.ABCMeta)
class AbstractGeometryCoordinates(AbstractXYZSpatialContainer):
"""
Superclass for geometry coordinate objects. These objects manage coordinate arrays for subsetting and conversion.
:param x: The x-coordinate variable. Required if no parent is provided.
:type x: :class:`~ocgis.Variable`
:param y: The y-coordinate variable. Required if no parent is provided.
:type y: :class:`~ocgis.Variable`
:param z: The z-coordinate variable. Not required.
:type z: :class:`~ocgis.Variable`
:param cindex: The element node connectivity variable. If provided, this is used to index into coordinate variables
``x``, ``y``, and/or ``z``. If this is ``None``, use coordinate variables' first dimension as the element
dimension (ragged arrays). If ``'auto'`` (the default), attempt to retrieve an appropriate variable from the
dimension map.
:type cindex: None | :class:`~ocgis.Variable` | str
:param bool packed: If ``True``, the element node connectivity variable has been de-duplicated.
:param start_index: If ``'auto'``, attempt to retrieve this value from the element node connectivity variable. The
default is ``0`` if it cannot be found. An integer may also be provided.
:type start_index: int | str
:param bool hosted: If ``False``, this object is not hosted by an unstructured grid. If ``True``, it is hosted by
an unstructured grid object. Hosted objects will return their parents for some classes of operations.
:param dict kwargs: Optional keyword arguments to the superclass.
"""
def __init__(self, x=None, y=None, z=None, cindex='auto', packed=True, start_index='auto', hosted=False, **kwargs):
self._start_index = start_index
self.packed = packed
self.hosted = hosted
kwargs = kwargs.copy()
kwargs[KeywordArgument.X] = x
kwargs[KeywordArgument.Y] = y
kwargs[KeywordArgument.Z] = z
# The mask requires working with the element dimension which is dependent on the element connectivity index if
# present.
mask = kwargs.pop(KeywordArgument.MASK, None)
super(AbstractGeometryCoordinates, self).__init__(**kwargs)
# Always overload the driver to UGRID if the current driver is not unstructured.
driver_klass = self.dimension_map.get_driver(as_class=True)
if not is_unstructured_driver(driver_klass):
self.dimension_map.set_driver(DriverKey.NETCDF_UGRID)
if cindex == 'auto':
dmap = self._get_canonical_dimension_map_(field=self.parent)
cindex = dmap.get_variable(DMK.ELEMENT_NODE_CONNECTIVITY, parent=self.parent, nullable=True)
self.cindex = cindex
if cindex is not None:
if not self.is_empty and self.element_dim.name == self.node_dim.name and self.abstraction != GridAbstraction.POINT:
msg = 'The element and node dimensions must have different names.'
raise ValueError(msg)
# Set the spatial mask following work with element connectivity to avoid a race condition.
if mask is not None:
self.set_mask(mask)
def __getitem__(self, slc):
slc = get_formatted_slice(slc, self.ndim)
if not isinstance(slc, dict):
slc = get_dslice(self.dimensions, slc)
ret = self.copy()
new_parent = ret.parent[slc]
ret.parent = new_parent
return ret
@abc.abstractproperty
def abstraction(self):
"""
Return the generic abstraction for the geometry coordinates following :class:`~ocgis.constants.GridAbstraction.`
"""
@property
def cindex(self):
"""
Provides an index into coordinate variables to extract coordinate values for elements. When setting, the first
dimension is considered the representative element count dimension.
:rtype: None | :class:`~ocgis.Variable`
"""
dmap = self._get_canonical_dimension_map_(self.parent, create=False)
return dmap.get_variable(DMK.ELEMENT_NODE_CONNECTIVITY, parent=self.parent, nullable=True)
@cindex.setter
def cindex(self, value):
"""
:param value: The coordinate index variable of integer type.
:type value: `~ocgis.Variable` || NoneType
"""
dmap = self._get_canonical_dimension_map_(self.parent, create=False)
dmap.set_variable(DMK.ELEMENT_NODE_CONNECTIVITY, value, pos=0)
if value is not None:
self.parent.add_variable(value, force=True)
if AttributeName.START_INDEX not in value.attrs:
if self._start_index == 'auto':
sindex = self.driver._start_index
else:
sindex = self._start_index
value.attrs[AttributeName.START_INDEX] = sindex
@property
def element_dim(self):
"""
Get the element dimension. The size of the dimension is equivalent to the element count.
:rtype: :class:`~ocgis.Dimension`
"""
raise_if_empty(self)
return self.parent.driver.get_element_dimension(self)
@property
def has_bounds(self):
"""
Always return ``False``. Geometry coordinate objects will never have bounds.
:rtype: bool
"""
return False
@property
def has_multi(self):
"""
If ``True``, this object represents multi-geometries (multi-polygon for example).
:rtype: bool
"""
return self.multi_break_value is not None
@property
def is_vectorized(self):
"""
Always return ``False``. Geometry coordinates are never vectorized.
:rtype: bool
"""
return False
@property
def multi_break_value(self):
"""
The break value to use for determining multi-geometries.
:return: int | None
"""
return self.parent.driver.get_multi_break_value(self.cindex)
@property
def ndim(self):
"""
Get the representative dimension. This is typically ``1``.
:rtype: int
"""
return self.archetype.ndim
@property
def node_dim(self):
"""
Get the node dimension.
:rtype: :class:`~ocgis.Dimension`
"""
return self.archetype.dimensions[0]
@property
def shape(self):
"""
Get the current size of the element dimension as a tuple. For example: ``(5,)``.
:rtype: tuple
"""
return tuple([self.element_dim.size_current])
@property
def start_index(self):
"""
Get the start index. ``0`` is the default. For data written and read from Fortran natively, this is often ``1``.
This will attempt to read an attribute from the element node connectivity variable called
:attr:`ocgis.constants.AttributeName.START_INDEX`.
:rtype: int
"""
if self._start_index == 'auto':
cindex = self.cindex
if cindex is not None:
ret = cindex.attrs[AttributeName.START_INDEX]
else:
ret = 0
else:
ret = self._start_index
return ret
@property
def topology(self):
"""Alias of :attr:`~ocgis.spatial.geomc.AbstractGeometryCoordinates.abstraction`."""
return self.abstraction
@abc.abstractproperty
def __shapely_geometry_class__(self):
"""
Return the class to use for constructing Shapely geometry objects.
>>> return Polygon
:return: :class:`~shapely.geometry.base.BaseGeometry`
"""
@property
def __shapely_multipart_class__(self):
"""
Return the Shapely multipart geometry class to use for creating multi-geometry objects.
>>> return MultiPolygon
:return: :class:`~shapely.geometry.base.BaseMultipartGeometry`
"""
raise NotImplementedError
@abc.abstractproperty
def __use_bounds_intersects_optimizations__(self):
"""Return ``True`` if bounds optimizations should be used."""
[docs] def convert_to(self, target=ConversionTarget.GEOMETRY_VARIABLE, **kwargs):
"""
Convert the geometry coordinate object to various targets.
:param target: The destination conversion target.
:type target: :attr:`~ocgis.constants.ConversionTarget`
:param dict kwargs: Keyword arguments for the creation of the destination object.
:return: Varies depending on the conversion target.
"""
kwargs = kwargs.copy()
if target == ConversionTarget.GEOMETRY_VARIABLE:
from ocgis.variable.geom import GeometryVariable
kwargs[KeywordArgument.VALUE] = list(self.iter_geometries(with_index=False))
kwargs[KeywordArgument.DIMENSIONS] = [self.element_dim]
if self.crs is not None:
kwargs[KeywordArgument.CRS] = self.crs
ret = GeometryVariable(**kwargs)
else:
raise RequestableFeature(target)
return ret
[docs] def get_geometry_iterable(self, use_mask=True, hint_mask=None, use_memory_optimizations=None, with_index=True):
"""
Yield a tuple composed of the current iterator index and Shapely geometry object. If the geometry is masked,
the geometry will be ``None``. For example: ``(2, <Polygon>)`` or ``(3, None)`` if masked.
:param bool use_mask: If ``True``, use a mask for iteration. This is retrieved from the object if ``hint_mask``
is ``None``. If ``False``, do not use the mask for iteration yielding underlying data if the object is mask.
:param hint_mask: If ``None``, use the object's mask. If a boolean array, use this as the mask as opposed to
the object's mask. The array must have the same dimension as ``self``.
:type hint_mask: None | :class:`numpy.ndarray`
:param use_memory_optimizations: If ``None``, default to :attr:`ocgis.env.USE_MEMORY_OPTIMIZATIONS`.
If ``True``, do not eagerly load coordinates. If ``False``, load all coordinates into memory improving
performance by limiting IO.
:type use_memory_optimizations: None | bool
:param bool with_index: If ``False``, do not yield the current iteration index.
:return: tuple(int, <Shapely geometry>) | <Shapely geometry>
"""
if use_memory_optimizations is None:
use_memory_optimizations = env.USE_MEMORY_OPTIMIZATIONS
if use_memory_optimizations and self.cindex is not None:
use_memory_optimizations = True
else:
use_memory_optimizations = False
if self.cindex is None:
cindex = None
has_multi = False
mbv = None
else:
cindex = self.cindex.get_value()
has_multi = self.has_multi
mbv = self.multi_break_value
if mbv is not None:
assert mbv < 0
xvar = self.x.extract()
yvar = self.y.extract()
if self.has_z:
zvar = self.z.extract()
if not use_memory_optimizations:
x_value = xvar.get_value()
y_value = yvar.get_value()
if self.has_z and not use_memory_optimizations:
z_value = zvar.get_value()
else:
z_value = None
if self.has_mask:
mask_value = self.get_mask()
else:
mask_value = None
if hint_mask is not None:
if mask_value is None:
mask_value = hint_mask
else:
mask_value = np.logical_or(mask_value, hint_mask)
has_mask = self.has_mask
has_z = self.has_z
get_shapely_geometry = self.get_shapely_geometry
get_element_node_connectivity_by_index = self.get_element_node_connectivity_by_index
start_index = self.start_index
for idx in range(len(self.element_dim)):
if use_mask and has_mask:
is_masked = mask_value[idx]
else:
is_masked = False
geom = None
if not is_masked:
if cindex is not None:
ec_idx = get_element_node_connectivity_by_index(cindex, idx)
if start_index > 0:
ec_idx -= start_index
else:
ec_idx = idx
collected = deque()
if has_multi:
mitr = iter_multipart_coordinates(ec_idx, mbv)
else:
mitr = [ec_idx]
for comp_coords in mitr:
z = None
if use_memory_optimizations:
x = xvar[comp_coords].get_value()[0]
y = yvar[comp_coords].get_value()[0]
if has_z:
z = zvar[comp_coords].get_value()[0]
else:
x = x_value[comp_coords]
y = y_value[comp_coords]
if has_z:
z = z_value[comp_coords]
if has_z:
geom = get_shapely_geometry(x, y, z)
else:
geom = get_shapely_geometry(x, y)
collected.append(geom)
if has_multi:
geom = self.__shapely_multipart_class__(collected)
else:
geom = collected[0]
if with_index:
yield idx, geom
else:
yield geom
[docs] def get_distributed_slice(self, slc, **kwargs):
"""
Slice a distributed object.
:param slc: A slice-like object.
:type slc: <varying>
:param dict kwargs: Optional arguments to :meth:`~ocgis.Variable.get_distributed_slice`.
:rtype: :class:`ocgis.spatial.geomc.AbstractGeometryCoordinates`
"""
slc = get_formatted_slice(slc, self.ndim)
if self.cindex is None:
target = self.x
else:
target = self.cindex
if len(slc) != target.ndim:
dslc = get_dslice(self.dimensions, slc)
for dim in target.dimensions:
if dim.name not in dslc:
dslc[dim.name] = slice(None)
slc = [dslc[dim.name] for dim in target.dimensions]
new_parent = target.get_distributed_slice(slc, **kwargs).parent
ret = self.copy()
ret.parent = new_parent
return ret
[docs] @abc.abstractmethod
def get_element_node_connectivity_by_index(self, element_connectivity, idx):
"""
Return something that can be used for indexing into coordinate arrays to retrieve the coordinates for the
current element.
:param element_connectivity: An element connectivity array with the first dimension/axis as the element
dimension.
:type element_connectivity: :class:`numpy.ndarray`
:param int idx: The element index.
:rtype: <used as NumPy index>
"""
[docs] def get_nearest(self, target, return_indices=False):
raise RequestableFeature("'get_nearest' is not implemented.")
[docs] def get_shapely_geometry(self, *args, **kwargs):
"""
Return a Shapely geometry object.
:rtype: :class:`shapely.geometry.base.BaseGeometry`
"""
return self.__shapely_geometry_class__(*args, **kwargs)
[docs] def get_spatial_index(self):
raise NotImplementedError
@format_gridunstruct_return
def get_spatial_subset_operation(self, spatial_op, subset_geom, return_slice=False, original_mask=None,
keep_touches=True, cascade=True, optimized_bbox_subset=False, apply_slice=True,
geom_name=None):
"""
Perform intersects or intersection operations on the object.
:param str spatial_op: Either an ``'intersects'`` or an ``'intersection'`` spatial operation.
:param subset_geom: A scalar (single geometry) geometry variable or Shapely geometry to use in the spatial
operation. All geometry types are accepted.
:type subset_geom: :class:`~ocgis.GeometryVariable` | :class:`shapely.geometry.base.BaseGeometry`
:param bool return_slice: If ``True``, also return the slices used to limit the grid's extent.
:param original_mask: An optional mask to use as a hint for spatial operation. ``True`` values are excluded
from spatial consideration.
:type original_mask: :class:`numpy.ndarray`
:param keep_touches: If ``True`` (the default), keep geometries that touch the subset geometry.
:type keep_touches: :class:`bool`
:param cascade: If ``True`` (the default), set the mask across all variables in the grid's parent collection.
:param optimized_bbox_subset: If ``True``, perform an optimized bounding box subset on the grid. This will only
use the grid's representative coordinates ignoring bounds, geometries, etc.
:param bool apply_slice: If ``True`` (the default), apply the slice to the grid object in addition to updating
its mask.
:param str geom_name: If provided, use this name for the output geometry variable if this is an intersection
operation.
:return: If ``return_slice`` is ``False`` (the default), return a shallow copy of the sliced grid. If
``return_slice`` is ``True``, this will be a tuple with the subsetted object as the first element and the slice
used as the second. If ``spatial_op`` is ``'intersection'``, the returned object is a geometry variable.
:rtype: :class:`~ocgis.Grid` | :class:`~ocgis.GeometryVariable` | :class:`tuple` of ``(<returned object>, <slice used>)``
"""
# TODO: Merge this with the grid's spatial operation.
if optimized_bbox_subset and spatial_op == 'intersection':
raise ValueError("'optimized_bbox_subset' must be False when performing an intersection")
raise_if_empty(self)
try:
subset_geom = subset_geom.prepare()
except AttributeError:
if not isinstance(subset_geom, BaseGeometry):
msg = 'Only Shapely geometries allowed for subsetting. Subset type is "{}".'.format(
type(subset_geom))
raise ValueError(msg)
else:
subset_geom = subset_geom.get_value()[0]
if self.get_mask() is None:
original_has_mask = False
else:
original_has_mask = True
if geom_name is None:
geom_name = get_default_geometry_variable_name(self)
if spatial_op == 'intersection':
perform_intersection = True
else:
perform_intersection = False
if original_mask is None and self.__use_bounds_intersects_optimizations__:
if isinstance(subset_geom, BaseMultipartGeometry):
geom_itr = subset_geom
else:
geom_itr = [subset_geom]
x = self.x.get_value()
y = self.y.get_value()
if self.has_z:
z = self.z.get_value()
else:
z = None
for ctr, geom in enumerate(geom_itr):
if geom.has_z:
coords = np.array(geom.exterior.coords)
z_coords = coords[:, 2]
z_bounds = z_coords.min(), z_coords.max()
else:
z_bounds = None
single_hint_mask = get_xyz_select(x, y, geom.bounds, z=z, z_bounds=z_bounds, keep_touches=keep_touches)
if ctr == 0:
hint_mask = single_hint_mask
else:
hint_mask = np.logical_or(hint_mask, single_hint_mask)
hint_mask = np.invert(hint_mask)
original_mask = hint_mask
if not optimized_bbox_subset:
mask = self.get_mask()
if mask is not None:
original_mask = np.logical_or(mask, hint_mask)
elif not self.__use_bounds_intersects_optimizations__:
original_mask = np.zeros(self.element_dim.size, dtype=bool)
ret = self.copy()
if original_has_mask:
ret.set_mask(ret.get_mask().copy())
if optimized_bbox_subset:
the_slice = np.invert(original_mask)
sliced_obj = ret.get_distributed_slice(the_slice)
else:
fill_mask = original_mask
geometry_fill = None
# If everything is masked, there is no reason to load the grid geometries.
if not original_mask.all():
if perform_intersection:
geometry_fill = np.zeros(fill_mask.shape, dtype=object)
gp = GeometryProcessor(self.get_geometry_iterable(hint_mask=original_mask), subset_geom,
keep_touches=keep_touches)
for idx, intersects_logical, current_geometry in gp.iter_intersects():
fill_mask[idx] = not intersects_logical
if perform_intersection and intersects_logical:
geometry_fill[idx] = current_geometry.intersection(subset_geom)
if perform_intersection:
if geometry_fill is None:
geometry_variable = GeometryVariable(name=geom_name)
else:
geometry_variable = GeometryVariable(name=geom_name, value=geometry_fill, mask=fill_mask,
dimensions=ret.element_dim)
ret.parent.add_variable(geometry_variable, force=True)
the_slice = np.invert(fill_mask)
if apply_slice:
sliced_obj = ret.get_distributed_slice(the_slice)
else:
sliced_obj = ret
sliced_mask_value = sliced_obj.get_mask()
# Only modify the outgoing mask if any values are masked.
if sliced_mask_value is not None and sliced_mask_value.any():
sliced_obj.set_mask(sliced_mask_value, cascade=cascade)
# The element dimension needs to be updated to account for fancy slicing which may leave some ranks empty.
new_element_dimension_name = self.element_dim.name
if sliced_obj.is_empty:
new_element_dimension_size = 0
new_element_dimension_src_idx = None
else:
element_dim = sliced_obj.element_dim
new_element_dimension_size = element_dim.size
new_element_dimension_src_idx = element_dim._src_idx
new_element_dimension = create_distributed_dimension(new_element_dimension_size,
name=new_element_dimension_name,
src_idx=new_element_dimension_src_idx)
sliced_obj.parent.dimensions[new_element_dimension.name] = new_element_dimension
if perform_intersection:
obj_to_ret = sliced_obj.parent[geometry_variable.name]
else:
obj_to_ret = sliced_obj
if return_slice:
ret = (obj_to_ret, the_slice)
else:
ret = obj_to_ret
return ret
def iter_geometries(self, **kwargs):
for yld in self.get_geometry_iterable(**kwargs):
yield yld
[docs] def iter_records(self, use_mask=True):
raise NotImplementedError
[docs] def reduce_global(self):
"""
De-duplicate and reindex (reset start index) an element node connectivity variable. Operation is collective
across the current VM. The new node dimension is distributed. Return a shallow copy of `self` for convenience.
:rtype: :class:`~ocgis.spatial.geomc.AbstractGeometryCoordinates`
"""
ocgis_lh(msg='entering reduce_global', logger='geomc', level=logging.DEBUG)
raise_if_empty(self)
if self.cindex is None:
raise ValueError('A coordinate index is required to reduce coordinates.')
ocgis_lh(msg='starting reduce_reindex_coordinate_index', logger='geomc', level=logging.DEBUG)
new_cindex, uidx = reduce_reindex_coordinate_index(self.cindex, start_index=self.start_index)
ocgis_lh(msg='finished reduce_reindex_coordinate_index', logger='geomc', level=logging.DEBUG)
new_cindex = Variable(name=self.cindex.name, value=new_cindex, dimensions=self.cindex.dimensions)
ret = self.copy()
if self.start_index == 1:
uidx -= 1
new_parent = self.x[uidx].parent
cdim = new_parent[self.x.name].dimensions[0]
new_node_dimension = create_distributed_dimension(cdim.size, name=cdim.name, src_idx=cdim._src_idx)
new_parent.dimensions[cdim.name] = new_node_dimension
new_parent[self.cindex.name].extract(clean_break=True)
ret.parent = new_parent
ret.cindex = new_cindex
ocgis_lh(msg='exiting reduce_global', logger='geomc', level=logging.DEBUG)
return ret
def _get_dimensions_(self):
return tuple([self.element_dim])
def _get_extent_(self):
# Get the x and y coordinate values.
x, y = self.x.v(), self.y.v()
return x.min(), y.min(), x.max(), y.max()
def _get_is_empty_(self):
return self.parent.is_empty
@staticmethod
def _gc_create_global_indices_(*args, **kwargs):
return None
def _gc_initialize_(self, regridding_role):
if regridding_role == RegriddingRole.SOURCE:
name = GridChunkerConstants.IndexFile.NAME_SRCIDX_GUID
elif regridding_role == RegriddingRole.DESTINATION:
name = GridChunkerConstants.IndexFile.NAME_DSTIDX_GUID
else:
raise NotImplementedError(regridding_role)
element_dim = self.element_dim
src_index = arange_from_dimension(element_dim, start=1, dtype=env.NP_INT)
src_index_var = Variable(name=name, value=src_index, dimensions=element_dim)
self.parent.add_variable(src_index_var)
def _gc_nchunks_dst_(self, grid_chunker):
try:
ret = super(AbstractGeometryCoordinates, self)._gc_nchunks_dst_(grid_chunker)
except NotImplementedError:
ret = (100,)
return ret
def _initialize_parent_(self, *args, **kwargs):
return self._get_parent_class_()(*args, **kwargs)
[docs]class PointGC(AbstractGeometryCoordinates):
abstraction = GridAbstraction.POINT
__shapely_geometry_class__ = Point
__use_bounds_intersects_optimizations__ = True
[docs] @staticmethod
def get_element_node_connectivity_by_index(element_connectivity, idx):
idx = element_connectivity[idx, ...].flatten()[0]
return idx
[docs]class LineGC(AbstractGeometryCoordinates):
abstraction = GridAbstraction.LINE
__shapely_geometry_class__ = LineString
__use_bounds_intersects_optimizations__ = False
[docs] def __init__(self, *args, **kwargs):
raise RequestableFeature
[docs] @staticmethod
def get_element_node_connectivity_by_index(element_connectivity, idx):
raise NotImplementedError
[docs]class PolygonGC(AbstractGeometryCoordinates):
abstraction = GridAbstraction.POLYGON
__shapely_geometry_class__ = Polygon
__shapely_multipart_class__ = MultiPolygon
__use_bounds_intersects_optimizations__ = False
[docs] def get_element_node_connectivity_by_index(self, element_connectivity, idx):
# TODO: OPTIMIZE: Driver-specific method to load polygon coordinate indices.
# ESMF unstructured uses counts...
if self.dimension_map.get_driver() == DriverKey.NETCDF_ESMF_UNSTRUCT:
num_element_conn_value = self.parent['numElementConn'].get_value()
if idx == 0:
start = 0
else:
start = num_element_conn_value[0:idx].sum()
stop = num_element_conn_value[0:idx + 1].sum()
ret = element_connectivity[start:stop]
else:
ret = element_connectivity[idx, ...].flatten()
if element_connectivity.dtype == object:
ret = ret[0].flatten()
# TODO: /OPTIMIZE
return ret
[docs] def get_shapely_geometry(self, *args, **kwargs):
if len(args) == 3:
x, y, z = args
else:
x, y = args
z = None
x = x.reshape(-1, 1)
y = y.reshape(-1, 1)
if z is None:
stup = (x, y)
else:
z = z.reshape(-1, 1)
stup = (x, y, z)
c = np.hstack(stup)
return self.__shapely_geometry_class__(c, **kwargs)
def create_buffer_array(value=MPI_EMPTY_VALUE, dtype='i'):
return np.array([value], dtype=dtype)
def create_Irecv(source, tag, dtype='i', mtype=None):
if mtype is None:
mtype = vm.get_mpi_type(np.int32)
buf = create_buffer_array(dtype=dtype)
req = vm.comm.Irecv([buf, mtype], source=source, tag=tag)
return buf, req
def create_Irecv_dct(ranks, tag):
ret = {}
for rank in ranks:
data, req = create_Irecv(rank, tag)
ret[rank] = {'data': data, 'req': req}
return ret
def create_Isend(dest, tag, value=MPI_EMPTY_VALUE, dtype='i', mtype=None):
from mpi4py import MPI
if mtype is None:
mtype = MPI.INT
buf = create_buffer_array(value=value, dtype=dtype)
req = vm.comm.Isend([buf, mtype], dest=dest, tag=tag)
return buf, req
def get_default_geometry_variable_name(gc):
possible = {GridAbstraction.POINT: VariableName.GEOMETRY_POINT,
GridAbstraction.LINE: VariableName.GEOMETRY_LINE,
GridAbstraction.POLYGON: VariableName.GEOMETRY_POLYGON}
return possible[gc.abstraction]
def get_xyz_select(x, y, bounds, z=None, z_bounds=None, invert=False, keep_touches=True):
from ocgis.spatial.grid import arr_intersects_bounds
minx, miny, maxx, maxy = bounds
select_x = arr_intersects_bounds(x, minx, maxx, keep_touches=keep_touches)
select_y = arr_intersects_bounds(y, miny, maxy, keep_touches=keep_touches)
select = np.logical_and(select_x, select_y)
if z_bounds is not None:
minz, maxz = z_bounds
select_z = arr_intersects_bounds(z, minz, maxz, keep_touches=keep_touches)
select = np.logical_and(select, select_z)
if invert:
select = np.invert(select)
return select
def iter_multipart_coordinates(arr, mbv):
"""
Split an array by a multi-part break value generating the sections that when combined create the original array.
:param arr: Array containing multi-part break values (it doesn't have to contain them).
:type arr: :class:`numpy.ndarray`
:param int mbv: The multi-part break value. Typically this is a negative integer.
:return: :class:`numpy.ndarray`
"""
w = np.where(arr == mbv)[0]
l = len(arr)
w = np.hstack((w, [l]))
start = 0
for idx, iw in enumerate(w):
slc = slice(start, iw)
start = iw + 1
yld = arr.__getitem__(slc)
yield yld
def reduce_reindex_coordinate_index(cindex, start_index=0):
"""
Reindex a subset of global coordinate indices contained in the ``cindex`` variable.
The starting index value (``0`` or ``1``) is set by ``start_index`` for the re-indexing procedure.
Function will not respect masks.
The function returns a two-element tuple:
* First element --> A :class:`numpy.ndarray` with the same dimension as ``cindex`` containing the new indexing.
* Second element --> A :class:`numpy.ndarray` containing the unique indices that may be used to reduce an external
coordinate storage variable or array.
:param cindex: A variable containing coordinate index integer values. This variable may be distributed. This may
also be a NumPy array.
:type cindex: :class:`~ocgis.Variable` | :class:`~numpy.ndarray`
:param int start_index: The first index to use for the re-indexing of ``cindex``. This may be ``0`` or ``1``.
:rtype: tuple
"""
ocgis_lh(msg='entering reduce_reindex_coordinate_index', logger='geomc', level=logging.DEBUG)
# Get the coordinate index values as a NumPy array.
try:
ocgis_lh(msg='calling cindex.get_value()', logger='geomc', level=logging.DEBUG)
ocgis_lh(msg='cindex.has_allocated_value={}'.format(cindex.has_allocated_value), logger='geomc',
level=logging.DEBUG)
ocgis_lh(msg='cindex.dimensions[0]={}'.format(cindex.dimensions[0]), logger='geomc', level=logging.DEBUG)
cindex = cindex.get_value()
ocgis_lh(msg='finished cindex.get_value()', logger='geomc', level=logging.DEBUG)
except AttributeError:
# Assume this is already a NumPy array.
pass
# Only work with 1D arrays.
cindex = np.atleast_1d(cindex)
# Used to return the coordinate index to the original shape of the incoming coordinate index.
original_shape = cindex.shape
cindex = cindex.flatten()
# Create the unique coordinate index array.
ocgis_lh(msg='calling create_unique_global_array', logger='geomc', level=logging.DEBUG)
if vm.size > 1:
u = np.array(create_unique_global_array(cindex))
else:
u = np.unique(cindex)
ocgis_lh(msg='finished create_unique_global_array', logger='geomc', level=logging.DEBUG)
# Synchronize the data type for the new coordinate index.
lrank = vm.rank
if lrank == 0:
dtype = u.dtype
else:
dtype = None
dtype = vm.bcast(dtype)
# Flag to indicate if the current rank has any unique values.
has_u = len(u) > 0
# Create the new coordinate index.
new_u_dimension = create_distributed_dimension(len(u), name='__new_u_dimension__')
new_u = arange_from_dimension(new_u_dimension, start=start_index, dtype=dtype)
# Create a hash for the new index. This is used to remap the old coordinate index.
if has_u:
uidx = {ii: jj for ii, jj in zip(u, new_u)}
else:
uidx = None
vm.barrier()
# Construct local bounds for the rank's unique value. This is used as a cheap index when ranks are looking for
# index overlaps.
if has_u:
local_bounds = min(u), max(u)
else:
local_bounds = None
# Put a copy for the bounds indexing on each rank.
lb_global = vm.gather(local_bounds)
lb_global = vm.bcast(lb_global)
# Find the vm ranks the local rank cares about. It cares if unique values have overlapping unique bounds.
overlaps = []
for rank, lb in enumerate(lb_global):
if rank == lrank:
continue
if lb is not None:
contains = lb[0] <= cindex
contains = np.logical_and(lb[1] >= cindex, contains)
if np.any(contains):
overlaps.append(rank)
# Ranks must be able to identify which ranks will be asking them for data.
global_overlaps = vm.gather(overlaps)
global_overlaps = vm.bcast(global_overlaps)
destinations = [ii for ii, jj in enumerate(global_overlaps) if vm.rank in jj]
# MPI communication tags used in the algorithm.
tag_search = MPITag.REDUCE_REINDEX_SEARCH
tag_success = MPITag.REDUCE_REINDEX_SUCCESS
tag_child_finished = MPITag.REDUCE_REINDEX_CHILD_FINISHED
tag_found = MPITag.REDUCE_REINDEX_FOUND
# Fill array for the new coordinate index.
new_cindex = np.empty_like(cindex)
# vm.barrier_print('starting run_rr')
# Fill the new coordinate indexing.
if lrank == 0:
run_rr_root(new_cindex, cindex, uidx, destinations, tag_child_finished, tag_found, tag_search, tag_success)
else:
run_rr_nonroot(new_cindex, cindex, uidx, destinations, has_u, overlaps, tag_child_finished, tag_found,
tag_search,
tag_success)
# vm.barrier_print('finished run_rr')
# Return array to its original shape.
new_cindex = new_cindex.reshape(*original_shape)
vm.barrier()
return new_cindex, u
def run_rr_nonroot(new_cindex, cindex, uidx, destinations, has_u, overlaps, tag_child_finished, tag_found, tag_search,
tag_success):
# Fill each value in the coordinate index.
for idx, ii in enumerate(cindex.flat):
# if idx % 100 == 0:
# vm.rank_print('{} of {}'.format(idx+1, cindex.shape[0]))
try:
# If this rank has unique indices, try to retrieve the new indexing from its unique indexing hash.
if has_u:
new_cindex_value = uidx[ii]
# Jump into the search loop if there are no unique indices on the local rank.
else:
raise KeyError
except KeyError:
new_cindex_value = None
# Keep searching and waiting for the response from the overlap ranks.
# while new_cindex_value is None:
# Send the value to find to each of the destination ranks.
assert ii != MPI_EMPTY_VALUE
assert ii >= 0
# vm.rank_print('idx', idx, 'sending to:', overlaps, 'receiving from:', destinations)
search_reqs = [create_Isend(orank, tag_search, value=ii) for orank in overlaps]
# for s in search_reqs:
# s.Test()
# vm.rank_print('receiving from:', destinations)
sent = search_for_destinations(destinations, uidx, tag_found, tag_search)
for s in search_reqs:
# vm.rank_print('waiting for search request', 'idx', idx, s[0])
s[1].wait()
# vm.rank_print('overlaps', overlaps, 'destinations', destinations)
# time.sleep(100)
for overlap_rank in overlaps:
data, req = create_Irecv(overlap_rank, tag_found)
req.wait()
# if req.Test():
if data[0] != MPI_EMPTY_VALUE:
new_cindex_value = data[0]
for s in sent:
s[1].wait()
# for s in search_reqs:
# s.wait()
# Free the search requests to avoid any race conditions in data buffers.
# cancel_free_requests(search_reqs)
# Fill the new coordinate index array with the found value.
assert new_cindex_value is not None
new_cindex[idx] = new_cindex_value
# Continue searching for destination ranks until the success signal is received from the root rank.
_, req_child_finished = create_Isend(0, tag_child_finished)
_, req_success = create_Irecv(0, tag_success)
while not req_success.Test():
sent = search_for_destinations(destinations, uidx, tag_found, tag_search)
for s in sent:
s[1].wait()
# Wait until the child finished tag is received by the parent.
req_child_finished.wait()
def run_rr_root(new_cindex, cindex, uidx, destinations, tag_child_finished, tag_found, tag_search, tag_success):
# Tracks when ranks are finished.
children_finished = [False] * vm.size
children_finished[0] = True
# Fill the new coordinate index. The root rank will always fully map itself.
for idx, ii in enumerate(cindex.flat):
new_cindex[idx] = uidx[ii]
success = False
# Open channels for child finished signals.
children_finished_reqs = create_Irecv_dct(vm.ranks, tag_child_finished)
while not success:
# Check for any requests from destination ranks. These ranks need a value that may be on this rank.
sent = search_for_destinations(destinations, uidx, tag_found, tag_search)
for s in sent:
s[1].wait()
# Check if children have finished. If they have finished. Send the success signal to other participating ranks.
for idx, rank in enumerate(vm.ranks):
if not children_finished[idx]:
req_child_finished = children_finished_reqs[rank]['req']
if req_child_finished.Test():
children_finished[rank] = True
if all(children_finished):
success = True
for rank in vm.ranks:
if rank != 0:
_, req = create_Isend(rank, tag_success)
req.wait()
def search_for_destinations(dest_ranks, uidx, tag_found, tag_search):
assert isinstance(dest_ranks, list)
sent = []
for dest_rank in dest_ranks:
data, req = create_Irecv(dest_rank, tag_search)
# data = np.array([MPI_EMPTY_VALUE], dtype='i')
# buf = [data, MPI.INT]
# req = vm.comm.Irecv(buf, source=dest_rank, tag=tag_search)
# req.wait()
if req.Test():
search_value = data[0]
# if data == MPI_EMPTY_VALUE:
# print('rank=', vm.rank, 'bad data from rank:', dest_rank)
assert search_value != MPI_EMPTY_VALUE
assert search_value is not None
assert search_value >= 0
# vm.rank_print('search_value', search_value)
local_uidx = uidx.get(search_value, MPI_EMPTY_VALUE)
send_res = create_Isend(dest_rank, tag_found, value=local_uidx)
sent.append(send_res)
else:
cancel_free_requests([req])
return sent