Source code for ocgis.spatial.grid_chunker

import logging
import os

import netCDF4 as nc
import numpy as np
from ocgis import constants
from ocgis.base import AbstractOcgisObject, grid_abstraction_scope
from ocgis.collection.field import Field
from ocgis.constants import GridChunkerConstants, RegriddingRole, Topology
from ocgis.driver.request.core import RequestDataset
from ocgis.spatial.base import iter_spatial_decomposition
from ocgis.spatial.geomc import AbstractGeometryCoordinates
from ocgis.spatial.grid import GridUnstruct, AbstractGrid, Grid
from ocgis.util.logging_ocgis import ocgis_lh
from ocgis.variable.base import VariableCollection, Variable
from ocgis.variable.dimension import Dimension
from ocgis.variable.geom import GeometryVariable
from ocgis.vmachine.core import vm
from ocgis.vmachine.mpi import redistribute_by_src_idx
from shapely.geometry import box


[docs]class GridChunker(AbstractOcgisObject): """ Splits source and destination grids into separate netCDF files. "Source" is intended to mean the source data for a regridding operation. "Destination" is the destination grid for regridding operation. The destination subset extents are buffered to ensure full overlap with the source destination subset. Hence, elements in the destination subset are globally unique and source subset elements are not necessarily globally unique. .. note:: Grid parent variable collections may be altered during initializations to account for global source indexing. .. note:: All function calls are collective. :param source: The source object for a regridding operation. The object must either be a grid or an object from which a grid is retrievable. :type source: :class:`~ocgis.spatial.grid.AbstractGrid` | :class:`~ocgis.RequestDataset` | :class:`~ocgis.Field` :param destination: The destination object for a regridding operation. The object must either be a grid or an object from which a grid is retrievable. :type destination: :class:`~ocgis.spatial.grid.AbstractGrid` | :class:`~ocgis.RequestDataset` | :class:`~ocgis.Field` :param tuple nchunks_dst: The split count for the grid. Tuple length must match the dimension count of the grid. >>> nchunks_dst = (2, 3) :param dict paths: Dictionary of paths used by the grid splitter. Defaults are provided. ============ ======================== =============================================================================================== Key (str) Default Description ============ ======================== =============================================================================================== wd ``os.getcwd()`` Working directory to write to or containing split files, weight files, and splitter index file. dst_template ``'split_dst_{}.nc'`` Destination filename template. src_template ``'split_src_{}.nc'`` Source filename template. wgt_template ``'esmf_weights_{}.nc'`` Weight filename template. index_file ``'01-split_index.nc'`` Name of the index file. ============ ======================== =============================================================================================== :param bool check_contains: If ``True``, check that the source subset bounding box fully contains the destination subset bounding box. Works when coordinate data is ordered and packed similarly between source and destination. :param bool allow_masked: If ``True``, allow masked values following a subset. :param float src_grid_resolution: Overload the source grid resolution. This is useful when using unstructured data that may have a regular patterning to leverage for performance. :param float dst_grid_resolution: Same as ``src_grid_resolution`` exception for the destination grid. :param bool optimized_bbox_subset: If ``True``, use optimizations for subsetting. If ``False``, do not use optimizations. Optimizations are generally okay for structured, rectilinear grids. Optimizations will avoid constructing geometries for the subset target. Hence, subset operations with complex boundary definitions should generally avoid optimizations (set to ``False``). If ``'auto'``, attempt to identify the best optimization method. :param iter_dst: A generator yielding destination grids. This generator must also write the grid. :type iter_dst: <generator function> :param float buffer_value: The value in units of the destination grid, used to buffer the spatial extent for subsetting the source grid. It is best to keep this small, but it must ensure the destination subset is fully mapped by the source for whatever purpose the grid splitter is used. If ``None``, the default is double the highest resolution between source and destination grids. :param bool redistribute: If ``True``, redistribute the source subset for unstructured grids. The redistribution reloads the data from source so should not be used with in-memory grids. :param bool genweights: If ``False``, do no generate regridding weight files using ESMF. If ``True``, generate regridding weight files for each source-destination chunk. :param dict esmf_kwargs: Optional overloads for keyword arguments to ESMF interfaces. Currently supported keyword arguments are below. ======================= ============== =============================================================== Name Default Possible ======================= ============== =============================================================== ``'regrid_method'`` ``'CONSERVE'`` ``'CONSERVE'``, ``'BILINEAR'``, ``'PATCH'``, ``'NEAREST_STOD'`` ``'unmapped_action'`` ``'IGNORE'`` ``'IGNORE'``, ``'ERROR'`` ``'ignore_degenerate'`` ``False`` ``True``/``False`` ======================= ============== =============================================================== :param bool use_spatial_decomp: If ``True``, use a spatial decomposition as opposed to an index-based decomposition when creating destination chunks. A spatial decomposition ensures destination coordinates are spatially "clumped" and is recommended for unstructured datasets. If ``'auto'``, choose the best approach from the grid type. :param bool eager: If ``True``, load grid data from disk before chunking. This avoids always loading the data from disk for sourced datasets following a subset. There will be an improvement in performance but an increase in the memory used. :raises: ValueError """ def __init__(self, source, destination, nchunks_dst=None, paths=None, check_contains=False, allow_masked=True, src_grid_resolution=None, dst_grid_resolution=None, optimized_bbox_subset='auto', iter_dst=None, buffer_value=None, redistribute=False, genweights=False, esmf_kwargs=None, use_spatial_decomp='auto', eager=True): self._src_grid = None self._dst_grid = None self._buffer_value = None self._nchunks_dst = None self._optimized_bbox_subset = None self._use_spatial_decomp = use_spatial_decomp self.genweights = genweights self.source = source self.destination = destination self.eager = eager if esmf_kwargs is None: esmf_kwargs = {} if self.genweights: esmf_kwargs = esmf_kwargs.copy() from ocgis.regrid.base import update_esmf_kwargs update_esmf_kwargs(esmf_kwargs) self.esmf_kwargs = esmf_kwargs self.nchunks_dst = nchunks_dst self.check_contains = check_contains self.allow_masked = allow_masked self.src_grid_resolution = src_grid_resolution self.dst_grid_resolution = dst_grid_resolution self.iter_dst = iter_dst self.buffer_value = buffer_value self.optimized_bbox_subset = optimized_bbox_subset self.redistribute = redistribute # Call each grid's grid splitter initialize routine. self.src_grid._gc_initialize_(RegriddingRole.SOURCE) self.dst_grid._gc_initialize_(RegriddingRole.DESTINATION) # Construct default paths if None are provided. defaults = constants.GridChunkerConstants.DEFAULT_PATHS if paths is None: paths = defaults else: for k, v in defaults.items(): if k not in paths: paths[k] = v if 'wd' not in paths: paths['wd'] = os.getcwd() self.paths = paths @property def buffer_value(self): """ Spatial distance in units of the destination grid to buffer the destination grid chunk's spatial extent when subsetting the associated source grid. Defaults to the higher spatial resolution times a modifier (:attr:`ocgis.constants.GridChunkerConstants.BUFFER_RESOLUTION_MODIFIER`). :param float value: Spatial buffer value :rtype: float """ if self._buffer_value is None: try: if self.dst_grid_resolution is None: dst_grid_resolution = self.dst_grid.resolution_max else: dst_grid_resolution = self.dst_grid_resolution if self.src_grid_resolution is None: src_grid_resolution = self.src_grid.resolution_max else: src_grid_resolution = self.src_grid_resolution if dst_grid_resolution <= src_grid_resolution: target_resolution = dst_grid_resolution else: target_resolution = src_grid_resolution ret = GridChunkerConstants.BUFFER_RESOLUTION_MODIFIER * target_resolution except NotImplementedError: # Unstructured grids do not have an associated resolution unless they are isomorphic. if isinstance(self.src_grid, GridUnstruct) or isinstance(self.dst_grid, GridUnstruct): ret = None else: raise else: ret = self._buffer_value return ret @buffer_value.setter def buffer_value(self, value): self._buffer_value = value @property def nchunks_dst(self): if self._nchunks_dst is None: ret = self.dst_grid._gc_nchunks_dst_(self) else: ret = self._nchunks_dst return ret @nchunks_dst.setter def nchunks_dst(self, value): # Assert the split dimension matches the destination grid dimension. if value is not None and len(value) != self.dst_grid.ndim: raise ValueError('The number of splits must match the grid dimension count.') self._nchunks_dst = value @property def dst_grid(self): """ Get the destination grid. :return: :class:`~ocgis.spatial.grid.AbstractGrid` """ if self._dst_grid is None: self._dst_grid = get_grid_object(self.destination) return self._dst_grid @property def optimized_bbox_subset(self): """ If ``True``, use an optimized bounding box subset to spatially subset the source grid. :param value: If ``'auto'``, choose the optimization based on grid isomorphism. :type value: str | bool :rtype: bool """ if self._optimized_bbox_subset == 'auto': if (self.src_grid_resolution is not None and self.dst_grid_resolution is not None) or ( self.src_grid.resolution_max is not None or self.src_grid_resolution is not None) and ( self.dst_grid.resolution_max is not None or self.dst_grid_resolution is not None): ret = True else: ret = False else: ret = self._optimized_bbox_subset return ret @optimized_bbox_subset.setter def optimized_bbox_subset(self, value): assert value is not None self._optimized_bbox_subset = value @property def src_grid(self): """ Get the source grid. :return: :class:`~ocgis.spatial.grid.AbstractGrid` """ if self._src_grid is None: self._src_grid = get_grid_object(self.source) return self._src_grid @property def use_spatial_decomp(self): ref = self._use_spatial_decomp if ref == 'auto': if isinstance(self.dst_grid, Grid): ref = False else: ref = True return ref def create_full_path_from_template(self, key, index=None): ret = self.paths[key] ret = os.path.join(self.paths['wd'], ret) if index is not None: ret = ret.format(index) return ret
[docs] def create_merged_weight_file(self, merged_weight_filename, strict=False): """ Merge weight file chunks to a single, global weight file. :param str merged_weight_filename: Path to the merged weight file. :param bool strict: If ``False``, allow "missing" files where the iterator index cannot create a found file. It is best to leave these ``False`` as not all source and destinations are mapped. If ``True``, raise an """ if vm.size > 1: raise ValueError("'create_merged_weight_file' does not work in parallel") index_filename = self.create_full_path_from_template('index_file') ifile = RequestDataset(uri=index_filename).get() ifile.load() ifc = GridChunkerConstants.IndexFile gidx = ifile[ifc.NAME_INDEX_VARIABLE].attrs src_global_shape = gidx[ifc.NAME_SRC_GRID_SHAPE] dst_global_shape = gidx[ifc.NAME_DST_GRID_SHAPE] # Get the global weight dimension size. n_s_size = 0 weight_filename = ifile[gidx[ifc.NAME_WEIGHTS_VARIABLE]] wv = weight_filename.join_string_value() split_weight_file_directory = self.paths['wd'] for wfn in map(lambda x: os.path.join(split_weight_file_directory, os.path.split(x)[1]), wv): if not os.path.exists(wfn): if strict: raise IOError(wfn) else: continue n_s_size += RequestDataset(wfn).get().dimensions['n_s'].size # Create output weight file. wf_varnames = ['row', 'col', 'S'] wf_dtypes = [np.int32, np.int32, np.float64] vc = VariableCollection() dim = Dimension('n_s', n_s_size) for w, wd in zip(wf_varnames, wf_dtypes): var = Variable(name=w, dimensions=dim, dtype=wd) vc.add_variable(var) vc.write(merged_weight_filename) # Transfer weights to the merged file. sidx = 0 src_indices = self.src_grid._gc_create_global_indices_(src_global_shape) dst_indices = self.dst_grid._gc_create_global_indices_(dst_global_shape) out_wds = nc.Dataset(merged_weight_filename, 'a') for ii, wfn in enumerate(map(lambda x: os.path.join(split_weight_file_directory, x), wv)): if not os.path.exists(wfn): if strict: raise IOError(wfn) else: continue wdata = RequestDataset(wfn).get() for wvn in wf_varnames: odata = wdata[wvn].get_value() try: split_grids_directory = self.paths['wd'] odata = self._gc_remap_weight_variable_(ii, wvn, odata, src_indices, dst_indices, ifile, gidx, split_grids_directory=split_grids_directory) except IndexError as e: msg = "Weight filename: '{}'; Weight Variable Name: '{}'. {}".format(wfn, wvn, str(e)) raise IndexError(msg) out_wds[wvn][sidx:sidx + odata.size] = odata out_wds.sync() sidx += odata.size out_wds.close()
[docs] @staticmethod def insert_weighted(index_path, dst_wd, dst_master_path, data_variables='auto'): """ Inserted weighted, destination variable data into the master destination file. :param str index_path: Path to the split index netCDF file. :param str dst_wd: Working directory containing the destination files holding the weighted data. :param str dst_master_path: Path to the destination master weight file. :param list data_variables: Optional list of data variables. Otherwise, auto-discovery is used. """ if vm.size > 1: raise NotImplementedError('serial only') index_field = RequestDataset(index_path).get() gs_index_v = index_field[GridChunkerConstants.IndexFile.NAME_INDEX_VARIABLE] dst_filenames = gs_index_v.attrs[GridChunkerConstants.IndexFile.NAME_DESTINATION_VARIABLE] dst_filenames = index_field[dst_filenames] y_bounds = GridChunkerConstants.IndexFile.NAME_Y_DST_BOUNDS_VARIABLE y_bounds = gs_index_v.attrs[y_bounds] y_bounds = index_field[y_bounds].get_value() x_bounds = GridChunkerConstants.IndexFile.NAME_X_DST_BOUNDS_VARIABLE x_bounds = gs_index_v.attrs[x_bounds] x_bounds = index_field[x_bounds].get_value() joined = dst_filenames.join_string_value() if data_variables == 'auto': v = None else: v = data_variables dst_master_field = RequestDataset(dst_master_path, variable=v).get() for data_variable in dst_master_field.data_variables: assert not data_variable.has_allocated_value if data_variable.ndim == 3: for time_index in range(dst_master_field.time.shape[0]): for vidx, source_path in enumerate(joined): source_path = os.path.join(dst_wd, source_path) slc = {dst_master_field.time.dimensions[0].name: time_index, dst_master_field.y.dimensions[0].name: slice(None), dst_master_field.x.dimensions[0].name: slice(None)} source_field = RequestDataset(source_path).create_field() try: source_data = source_field[data_variable.name][slc] except KeyError: if data_variable.name not in source_field.keys(): msg = "The destination variable '{}' is not in the destination file '{}'. Was SMM applied?".format( data_variable.name, source_path) raise KeyError(msg) else: raise assert not source_data.has_allocated_value with nc.Dataset(dst_master_path, 'a') as ds: ds.variables[data_variable.name][time_index, y_bounds[vidx][0]:y_bounds[vidx][1], x_bounds[vidx][0]:x_bounds[vidx][1]] = source_data.get_value() elif data_variable.ndim == 2: for vidx, source_path in enumerate(joined): source_path = os.path.join(dst_wd, source_path) source_data = RequestDataset(source_path).get()[data_variable.name] assert not source_data.has_allocated_value with nc.Dataset(dst_master_path, 'a') as ds: ds.variables[data_variable.name][y_bounds[vidx][0]:y_bounds[vidx][1], x_bounds[vidx][0]:x_bounds[vidx][1]] = source_data.get_value() else: raise NotImplementedError(data_variable.ndim)
[docs] def iter_dst_grid_slices(self, yield_idx=None): """ Yield global slices for the destination grid using guidance from ``nchunks_dst``. :param int yield_idx: If a zero-based integer, only yield for this chunk index and skip everything else. :return: A dictionary with keys as the grid dimension names and the values the associated slice for that dimension. :rtype: dict >>> example_yield = {'dimx': slice(2, 4), 'dimy': slice(10, 20)} """ return self.dst_grid._gc_iter_dst_grid_slices_(self, yield_idx=yield_idx)
[docs] def iter_dst_grid_subsets(self, yield_slice=False, yield_idx=None): """ Yield destination grid subsets. :param int yield_idx: If a zero-based integer, only yield for this chunk index and skip everything else. :param bool yield_slice: If ``True``, yield the slice used on the destination grid. :rtype: :class:`ocgis.spatial.grid.AbstractGrid` """ if self.use_spatial_decomp: for sub, slc in iter_spatial_decomposition(self.dst_grid, self.nchunks_dst, optimized_bbox_subset=True, yield_idx=yield_idx): if yield_slice: # Spatial subset may be empty on a rank... if sub.is_empty: slc = None else: slc = {d.name: slc[ii] for ii, d in enumerate(sub.dimensions)} yield sub, slc else: yield sub else: for slc in self.iter_dst_grid_slices(yield_idx=yield_idx): sub = self.dst_grid.get_distributed_slice(slc) if yield_slice: yield sub, slc else: yield sub
[docs] def iter_src_grid_subsets(self, yield_dst=False, yield_idx=None): """ Yield source grid subset using the extent of its associated destination grid subset. :param bool yield_dst: If ``True``, yield the destination subset as well as the source grid subset. :param int yield_idx: If a zero-based integer, only yield for this chunk index and skip everything else. :rtype: tuple(:class:`ocgis.spatial.grid.AbstractGrid`, `slice-like`) """ if yield_dst: yield_slice = True else: yield_slice = False buffer_value = self.buffer_value dst_grid_wrapped_state = self.dst_grid.wrapped_state dst_grid_crs = self.dst_grid.crs # Use a destination grid iterator if provided. if self.iter_dst is not None: iter_dst = self.iter_dst(self, yield_slice=yield_slice, yield_idx=yield_idx) else: iter_dst = self.iter_dst_grid_subsets(yield_slice=yield_slice, yield_idx=yield_idx) # Loop over each destination grid subset. ocgis_lh(logger='grid_chunker', msg='starting "for yld in iter_dst"', level=logging.DEBUG) for iter_dst_ctr, yld in enumerate(iter_dst, start=1): ocgis_lh(msg=["iter_dst_ctr", iter_dst_ctr], level=logging.DEBUG) if yield_slice: dst_grid_subset, dst_slice = yld else: dst_grid_subset = yld # All masked destinations are very problematic for ESMF with vm.scoped_by_emptyable('global mask', dst_grid_subset): if not vm.is_null: if dst_grid_subset.has_mask_global: if dst_grid_subset.has_mask and dst_grid_subset.has_masked_values: all_masked = dst_grid_subset.get_mask().all() else: all_masked = False all_masked_gather = vm.gather(all_masked) if vm.rank == 0: if all(all_masked_gather): exc = ValueError("Destination subset all masked") try: raise exc finally: vm.abort(exc=exc) dst_box = None with vm.scoped_by_emptyable('extent_global', dst_grid_subset): if not vm.is_null: # Use the extent of the polygon for determining the bounding box. This ensures conservative # regridding will be fully mapped. if isinstance(dst_grid_subset, AbstractGeometryCoordinates): target_grid = dst_grid_subset.parent.grid else: target_grid = dst_grid_subset # Try to reduce the coordinates in the case of unstructured grid data. if hasattr(target_grid, 'reduce_global') and Topology.POLYGON in target_grid.abstractions_available: ocgis_lh(logger='grid_chunker', msg='starting reduce_global for dst_grid_subset', level=logging.DEBUG) target_grid = target_grid.reduce_global() ocgis_lh(logger='grid_chunker', msg='finished reduce_global for dst_grid_subset', level=logging.DEBUG) extent_global = target_grid.parent.attrs.get('extent_global') if extent_global is None: with grid_abstraction_scope(target_grid, Topology.POLYGON): extent_global = target_grid.extent_global if self.check_contains: dst_box = box(*target_grid.extent_global) sub_box = box(*extent_global) if buffer_value is not None: # Use the envelope! A buffer returns "fancy" borders. We just want to expand the bounding box. sub_box = sub_box.buffer(buffer_value).envelope ocgis_lh(msg=str(sub_box.bounds), level=logging.DEBUG, logger='grid_chunker') else: sub_box, dst_box = [None, None] live_ranks = vm.get_live_ranks_from_object(dst_grid_subset) sub_box = vm.bcast(sub_box, root=live_ranks[0]) if self.check_contains: dst_box = vm.bcast(dst_box, root=live_ranks[0]) sub_box = GeometryVariable.from_shapely(sub_box, is_bbox=True, wrapped_state=dst_grid_wrapped_state, crs=dst_grid_crs) ocgis_lh(logger='grid_chunker', msg='starting "self.src_grid.get_intersects"', level=logging.DEBUG) src_grid_subset, src_grid_slice = self.src_grid.get_intersects(sub_box, keep_touches=False, cascade=False, optimized_bbox_subset=self.optimized_bbox_subset, return_slice=True) ocgis_lh(logger='grid_chunker', msg='finished "self.src_grid.get_intersects"', level=logging.DEBUG) # Reload the data using a new source index distribution. if hasattr(src_grid_subset, 'reduce_global') and src_grid_subset.cindex is not None: # Only redistribute if we have one live rank. if self.redistribute and len(vm.get_live_ranks_from_object(src_grid_subset)) > 0: ocgis_lh(logger='grid_chunker', msg='starting redistribute', level=logging.DEBUG) topology = src_grid_subset.abstractions_available[Topology.POLYGON] cindex = topology.cindex redist_dimname = self.src_grid.abstractions_available[Topology.POLYGON].element_dim.name if src_grid_subset.is_empty: redist_dim = None else: redist_dim = topology.element_dim redistribute_by_src_idx(cindex, redist_dimname, redist_dim) ocgis_lh(logger='grid_chunker', msg='finished redistribute', level=logging.DEBUG) with vm.scoped_by_emptyable('src_grid_subset', src_grid_subset): if not vm.is_null: if not self.allow_masked: gmask = src_grid_subset.get_mask() if gmask is not None and gmask.any(): raise ValueError('Masked values in source grid subset.') if self.check_contains: src_box = box(*src_grid_subset.extent_global) if not does_contain(src_box, dst_box): raise ValueError('Contains check failed.') # Try to reduce the coordinates in the case of unstructured grid data. if hasattr(src_grid_subset, 'reduce_global') and src_grid_subset.cindex is not None: ocgis_lh(logger='grid_chunker', msg='starting reduce_global', level=logging.DEBUG) src_grid_subset = src_grid_subset.reduce_global() ocgis_lh(logger='grid_chunker', msg='finished reduce_global', level=logging.DEBUG) else: pass # src_grid_subset = VariableCollection(is_empty=True) if src_grid_subset.is_empty: src_grid_slice = None else: src_grid_slice = {src_grid_subset.dimensions[ii].name: src_grid_slice[ii] for ii in range(src_grid_subset.ndim)} if yield_dst: yld = (src_grid_subset, src_grid_slice, dst_grid_subset, dst_slice) else: yld = src_grid_subset, src_grid_slice yield yld
[docs] @staticmethod def smm(*args, **kwargs): """See :meth:`ocgis.regrid.base.smm`""" from ocgis.regrid.base import smm smm(*args, **kwargs)
[docs] def write_chunks(self): """ Write grid subsets to netCDF files using the provided filename templates. This will also generate ESMF regridding weights for each subset if requested. """ src_filenames = [] dst_filenames = [] wgt_filenames = [] dst_slices = [] src_slices = [] index_path = self.create_full_path_from_template('index_file') # nzeros = len(str(reduce(lambda x, y: x * y, self.nchunks_dst))) ctr = 1 ocgis_lh(logger='grid_chunker', msg='starting self.iter_src_grid_subsets', level=logging.DEBUG) for sub_src, src_slc, sub_dst, dst_slc in self.iter_src_grid_subsets(yield_dst=True): ocgis_lh(logger='grid_chunker', msg='finished iteration {} for self.iter_src_grid_subsets'.format(ctr), level=logging.DEBUG) src_path = self.create_full_path_from_template('src_template', index=ctr) dst_path = self.create_full_path_from_template('dst_template', index=ctr) wgt_path = self.create_full_path_from_template('wgt_template', index=ctr) src_filenames.append(os.path.split(src_path)[1]) dst_filenames.append(os.path.split(dst_path)[1]) wgt_filenames.append(wgt_path) dst_slices.append(dst_slc) src_slices.append(src_slc) # Only write destinations if an iterator is not provided. if self.iter_dst is None: zip_args = [[sub_src, sub_dst], [src_path, dst_path]] else: zip_args = [[sub_src], [src_path]] cc = 1 for target, path in zip(*zip_args): with vm.scoped_by_emptyable('field.write' + str(cc), target): if not vm.is_null: ocgis_lh(logger='grid_chunker', msg='write_chunks:writing: {}'.format(path), level=logging.DEBUG) field = Field(grid=target) field.write(path) ocgis_lh(logger='grid_chunker', msg='write_chunks:finished writing: {}'.format(path), level=logging.DEBUG) cc += 1 # Increment the counter outside of the loop to avoid counting empty subsets. ctr += 1 # Generate an ESMF weights file if requested and at least one rank has data on it. if self.genweights and len(vm.get_live_ranks_from_object(sub_src)) > 0: vm.barrier() self.write_esmf_weights(src_path, dst_path, wgt_path, src_grid=sub_src, dst_grid=sub_dst) vm.barrier() # Global shapes require a VM global scope to collect. src_global_shape = global_grid_shape(self.src_grid) dst_global_shape = global_grid_shape(self.dst_grid) # Gather and collapse source slices as some may be empty and we write on rank 0. gathered_src_grid_slice = vm.gather(src_slices) if vm.rank == 0: len_src_slices = len(src_slices) new_src_grid_slice = [None] * len_src_slices for idx in range(len_src_slices): for rank_src_grid_slice in gathered_src_grid_slice: if rank_src_grid_slice[idx] is not None: new_src_grid_slice[idx] = rank_src_grid_slice[idx] break src_slices = new_src_grid_slice with vm.scoped('index write', [0]): if not vm.is_null: dim = Dimension('nfiles', len(src_filenames)) vname = ['source_filename', 'destination_filename', 'weights_filename'] values = [src_filenames, dst_filenames, wgt_filenames] grid_chunker_destination = GridChunkerConstants.IndexFile.NAME_DESTINATION_VARIABLE attrs = [{'esmf_role': 'grid_chunker_source'}, {'esmf_role': grid_chunker_destination}, {'esmf_role': 'grid_chunker_weights'}] vc = VariableCollection() grid_chunker_index = GridChunkerConstants.IndexFile.NAME_INDEX_VARIABLE vidx = Variable(name=grid_chunker_index) vidx.attrs['esmf_role'] = grid_chunker_index vidx.attrs['grid_chunker_source'] = 'source_filename' vidx.attrs[GridChunkerConstants.IndexFile.NAME_DESTINATION_VARIABLE] = 'destination_filename' vidx.attrs['grid_chunker_weights'] = 'weights_filename' vidx.attrs[GridChunkerConstants.IndexFile.NAME_SRC_GRID_SHAPE] = src_global_shape vidx.attrs[GridChunkerConstants.IndexFile.NAME_DST_GRID_SHAPE] = dst_global_shape vc.add_variable(vidx) for idx in range(len(vname)): v = Variable(name=vname[idx], dimensions=dim, dtype=str, value=values[idx], attrs=attrs[idx]) vc.add_variable(v) bounds_dimension = Dimension(name='bounds', size=2) # TODO: This needs to work with four dimensions. # Source ----------------------------------------------------------------------------------------------- self.src_grid._gc_create_index_bounds_(RegriddingRole.SOURCE, vidx, vc, src_slices, dim, bounds_dimension) # Destination ------------------------------------------------------------------------------------------ self.dst_grid._gc_create_index_bounds_(RegriddingRole.DESTINATION, vidx, vc, dst_slices, dim, bounds_dimension) vc.write(index_path) vm.barrier()
[docs] def write_esmf_weights(self, src_path, dst_path, wgt_path, src_grid=None, dst_grid=None): """ Write ESMF regridding weights for a source and destination filename combination. :param str src_path: Full path to source file :param str dst_path: Full path to destination file :param str wgt_path: Path to output weight file :param src_grid: If provided, use this source grid for identifying ESMF parameters :type src_grid: :class:`ocgis.spatial.grid.AbstractGrid` :param dst_grid: If provided, use this destination grid for identifying ESMF parameters :type dst_grid: :class:`ocgis.spatial.grid.AbstractGrid` """ from ocgis.regrid.base import create_esmf_field, create_esmf_regrid if src_grid is None: src_grid = self.src_grid if dst_grid is None: dst_grid = self.dst_grid assert wgt_path is not None srcfield, srcgrid = create_esmf_field(src_path, src_grid, self.esmf_kwargs) dstfield, dstgrid = create_esmf_field(dst_path, dst_grid, self.esmf_kwargs) regrid = None try: regrid = create_esmf_regrid(srcfield=srcfield, dstfield=dstfield, filename=wgt_path, **self.esmf_kwargs) finally: to_destroy = [regrid, srcgrid, srcfield, dstgrid, dstfield] for t in to_destroy: if t is not None: t.destroy() del regrid del srcgrid del srcfield del dstgrid del dstfield
def _gc_remap_weight_variable_(self, ii, wvn, odata, src_indices, dst_indices, ifile, gidx, split_grids_directory=None): if wvn == 'S': pass else: ifc = GridChunkerConstants.IndexFile if wvn == 'row': is_unstruct = isinstance(self.dst_grid, GridUnstruct) if is_unstruct: dst_filename = ifile[gidx[ifc.NAME_DESTINATION_VARIABLE]].join_string_value()[ii] dst_filename = os.path.join(split_grids_directory, dst_filename) oindices = RequestDataset(dst_filename).get()[ifc.NAME_DSTIDX_GUID].get_value() else: y_bounds = ifile[gidx[ifc.NAME_Y_DST_BOUNDS_VARIABLE]].get_value() x_bounds = ifile[gidx[ifc.NAME_X_DST_BOUNDS_VARIABLE]].get_value() indices = dst_indices elif wvn == 'col': is_unstruct = isinstance(self.src_grid, GridUnstruct) if is_unstruct: src_filename = ifile[gidx[ifc.NAME_SOURCE_VARIABLE]].join_string_value()[ii] src_filename = os.path.join(split_grids_directory, src_filename) oindices = RequestDataset(src_filename).get()[ifc.NAME_SRCIDX_GUID].get_value() else: y_bounds = ifile[gidx[ifc.NAME_Y_SRC_BOUNDS_VARIABLE]].get_value() x_bounds = ifile[gidx[ifc.NAME_X_SRC_BOUNDS_VARIABLE]].get_value() indices = src_indices else: raise NotImplementedError if not is_unstruct: islice = tuple([slice(y_bounds[ii][0], y_bounds[ii][1]), slice(x_bounds[ii][0], x_bounds[ii][1])]) oindices = indices[islice] oindices = oindices.flatten() odata = oindices[odata - 1] return odata
def does_contain(container, containee): intersection = container.intersection(containee) return np.isclose(intersection.area, containee.area) def get_grid_object(obj, load=True): if isinstance(obj, AbstractGrid): res = obj elif isinstance(obj, RequestDataset): res = obj.create_field().grid elif isinstance(obj, Field): res = obj.grid else: raise NotImplementedError(obj) if load: res.parent.load() return res def global_grid_shape(grid): with vm.scoped_by_emptyable('global grid shape', grid): if not vm.is_null: return grid.shape_global