Source code for ocgis.vmachine.mpi

import itertools
from collections import OrderedDict
from contextlib import contextmanager
from functools import reduce, partial

import numpy as np
import six
from ocgis import constants
from ocgis.base import AbstractOcgisObject, raise_if_empty
from ocgis.constants import DataType, SourceIndexType
from ocgis.exc import DimensionNotFound
from ocgis.util.addict import Dict
from ocgis.util.helpers import get_optimal_slice_from_array, get_iter, get_group

try:
    from mpi4py import MPI
    from mpi4py.MPI import COMM_NULL
except ImportError:
    MPI_ENABLED = False
    MPI_TYPE_MAPPING = None
else:
    if MPI.COMM_WORLD.Get_size() > 1:
        MPI_ENABLED = True
    else:
        MPI_ENABLED = False
    MPI_TYPE_MAPPING = {np.int32: MPI.INT, np.int64: MPI.LONG_LONG,
                        np.dtype('int32'): MPI.INT, np.dtype('float64'): MPI.LONG_LONG}


class DummyMPIComm(object):

    def __init__(self):
        self._send_recv = Dict()

    def Abort(self, int_errorcode=1):
        raise RuntimeError('Abort on DummyMPIComm Called. Error code = {}'.format(int_errorcode))

    def Barrier(self):
        pass

    def bcast(self, *args, **kwargs):
        return args[0]

    def Create(self, *args, **kwargs):
        return self

    def Free(self, *args, **kwargs):
        pass

    def gather(self, *args, **kwargs):
        return [args[0]]

    def Get_group(self):
        return self

    def Get_rank(self):
        return 0

    def Get_size(self):
        return 1

    def Incl(self, *args, **kwargs):
        return self

    def reduce(self, *args, **kwargs):
        return args[0]

    def scatter(self, *args, **kwargs):
        return args[0][0]

    def Irecv(self, payload, source=0, tag=None):
        def _irecv_callback_(ipayload, icomm, isource, itag, mode):
            ret = False
            if mode == 'test':
                ret = itag in icomm._send_recv[isource]
            elif mode == 'get' or ret:
                stored = icomm._send_recv[isource].pop(itag)
                ipayload[0] = stored
                ret = True
            else:
                raise NotImplementedError(mode)
            return ret

        the_callback = partial(_irecv_callback_, payload, self, source, tag)

        return DummyRequest(callback=the_callback)

    def recv(self, *args, **kwargs):
        return self.Irecv(*args, **kwargs)

    def Isend(self, payload, dest=0, tag=None):
        self._send_recv[dest][tag] = payload[0]
        return DummyRequest()

    def send(self, *args, **kwargs):
        return self.Isend(*args, **kwargs)


if MPI_ENABLED and MPI.COMM_WORLD.Get_size() > 1:
    MPI_COMM = MPI.COMM_WORLD
else:
    MPI_COMM = DummyMPIComm()
    COMM_NULL = constants.MPI_COMM_NULL_VALUE
MPI_SIZE = MPI_COMM.Get_size()
MPI_RANK = MPI_COMM.Get_rank()


class DummyRequest(AbstractOcgisObject):

    def __init__(self, callback=None):
        self._callback = callback

    def Test(self):
        if self._callback is None:
            ret = True
        else:
            ret = self._callback('test')
        return ret

    def wait(self):
        if self._callback is None:
            ret = True
        else:
            while not self.Test():
                continue
            ret = self._callback('get')
        return ret


[docs]class OcgDist(AbstractOcgisObject): """ Computes the parallel distribution for variable dimensions. :param int size: The number of ranks to use for the distribution. If ``None`` (the default), use the global MPI size. :param ranks: A sequence of integer ranks with length equal to ``size``. Useful if computing a distribution for a rank subset. :type ranks: `sequence` of :class:`int` """ def __init__(self, size=None, ranks=None): if size is None: size = MPI_SIZE if ranks is None: ranks = tuple(range(size)) self.ranks = ranks self.size = size self.mapping = {} for rank in self.ranks: self.mapping[rank] = create_template_rank_dict() self.has_updated_dimensions = False def add_dimension(self, dim, group=None, force=False): from ocgis import Dimension if not isinstance(dim, Dimension): raise ValueError('"dim" must be a "Dimension" object.') for rank in self.ranks: if rank != MPI_RANK: to_add_dim = dim.copy() else: to_add_dim = dim the_group = self._create_or_get_group_(group, rank=rank) if not force and to_add_dim.name in the_group['dimensions']: msg = 'Dimension with name "{}" already in group "{}" and "force=False".' raise ValueError(msg.format(to_add_dim.name, group)) else: the_group['dimensions'][to_add_dim.source_name] = to_add_dim def add_dimensions(self, dims, **kwargs): for dim in dims: self.add_dimension(dim, **kwargs) def add_variable(self, name_or_variable, force=False, dimensions=None, group=None): """ Add a variable to the distribution mapping. :param name_or_variable: The variable or variable name to add to the distribution. :type name_or_variable: :class:`~ocgis.new_interface.variable.Variable` :param force: If ``True``, overwrite any variables with the same name. :param sequence dimensions: A sequence of dimension names if ``name_or_variable`` is a name. Otherwise, dimensions are pulled from the variable object. :raises: ValueError """ from ocgis.variable.base import Variable from ocgis.variable.dimension import Dimension if isinstance(name_or_variable, Variable): group = group or name_or_variable.group name = name_or_variable.name dimensions = name_or_variable.dimensions else: name = name_or_variable dimensions = list(get_iter(dimensions, dtype=(str, Dimension))) if dimensions is not None and len(dimensions) > 0: if isinstance(dimensions[0], Dimension): dimensions = [dim.name for dim in dimensions] else: dimensions = [] dimensions = tuple(dimensions) for rank_home in self.ranks: the_group = self._create_or_get_group_(group, rank_home) if not force and name in the_group['variables']: msg = 'Variable with name "{}" already in group "{}" and "force=False".' raise ValueError(msg.format(name, group)) else: the_group['variables'][name] = {'dimensions': dimensions} def add_variables(self, vars, **kwargs): for var in vars: self.add_variable(var, **kwargs) def create_dimension(self, *args, **kwargs): from ocgis import Dimension group = kwargs.pop('group', None) dim = Dimension(*args, **kwargs) self.add_dimension(dim, group=group) # If rank counts are not the same, you have to get your own dimension. Ranks may not be contained in the # mapping. if self.size != MPI_SIZE: ret = None else: ret = self.get_dimension(dim.name, group=group) return ret def create_variable(self, *args, **kwargs): from ocgis import Variable group = kwargs.pop('group', None) var = Variable(*args, **kwargs) self.add_variable(var, group=group) return var def get_bounds_local(self, group=None, rank=MPI_RANK): the_group = self.get_group(group=group, rank=rank) ret = [dim.bounds_local for dim in list(the_group['dimensions'].values())] return tuple(ret) def get_dimension(self, name, group=None, rank=MPI_RANK): group_data = self.get_group(group=group, rank=rank) d = group_data['dimensions'] try: ret = d[name] except KeyError: raise DimensionNotFound(name) return ret def get_dimensions(self, names, **kwargs): ret = [self.get_dimension(name, **kwargs) for name in names] return tuple(ret) def get_empty_ranks(self, group=None, inverse=False): ret = [] for rank in range(self.size): the_group = self.get_group(group, rank=rank) if any([dim.is_empty for dim in list(the_group['dimensions'].values())]): ret.append(rank) if inverse: ret = set(range(self.size)) - set(ret) ret = list(ret) ret.sort() return tuple(ret) def get_variable(self, name_or_variable, group=None, rank=MPI_RANK): if group is None: try: group = name_or_variable.group except AttributeError: pass try: name = name_or_variable.source_name except AttributeError: name = name_or_variable group_data = self.get_group(group, rank=rank) return group_data['variables'][name] def get_group(self, group=None, rank=MPI_RANK): return self._create_or_get_group_(group, rank=rank) def iter_groups(self, rank=MPI_RANK): from ocgis.driver.base import iter_all_group_keys mapping = self.mapping[rank] for group_key in iter_all_group_keys(mapping): group_data = get_group(mapping, group_key) yield group_key, group_data def update_dimension_bounds(self, rank='all', min_elements=2): """ :param rank: If ``'all'``, update across all ranks. Otherwise, update for the integer rank provided. :type rank: str/int :param int min_elements: The minimum number of elements per rank. It must be >= 2. """ if self.has_updated_dimensions: raise ValueError('Dimensions already updated.') if min_elements < 1: raise ValueError('"min_elements" must be >= 1.') if rank == 'all': ranks = list(range(self.size)) else: ranks = [rank] for rank in ranks: for _, group_data in self.iter_groups(rank=rank): dimdict = group_data['dimensions'] # If there are no distributed dimensions, there is no work to be dome with MPI bounds. if not any([dim.dist for dim in list(dimdict.values())]): continue # Get dimension lengths. lengths = {dim.name: len(dim) for dim in list(dimdict.values()) if dim.dist} # Choose the size of the distribution group. There needs to be at least one element per rank. First, # get the longest distributed dimension. max_length = max(lengths.values()) for k, v in list(lengths.items()): if v == max_length: distributed_dimension = dimdict[k] # Ensure only the distributed dimension remains distributed. for dim in list(dimdict.values()): if dim.name != distributed_dimension.name: dim.dist = False # Adjust the MPI distributed size if the length of the longest dimension is less than the rank count. # Dimensions on higher ranks will be considered empty. the_size = self.size if len(distributed_dimension) < the_size: the_size = len(distributed_dimension) # Use the minimum number of elements per rank to potentially adjust the size. min_size = int(np.floor(len(distributed_dimension) / float(min_elements))) if self.size > 1 and min_size < the_size: the_size = min_size # Fix the global bounds. distributed_dimension.bounds_global = (0, len(distributed_dimension)) # Use this to calculate the local bounds for a dimension. bounds_local = get_rank_bounds(len(distributed_dimension), the_size, rank) if bounds_local is not None: from ocgis.variable.dimension import slice_source_index start, stop = bounds_local src_idx = slice_source_index(slice(start, stop), distributed_dimension._src_idx) distributed_dimension.set_size(stop - start, src_idx=src_idx) distributed_dimension.bounds_local = bounds_local else: # If there are no local bounds, the dimension is empty. distributed_dimension.convert_to_empty() self.has_updated_dimensions = True def _create_or_get_group_(self, group, rank=MPI_RANK): # Allow None and single string group selection. if group is None or isinstance(group, six.string_types): group = [group] # Always start with None, the root group, when searching for data groups. if group[0] is not None: group.insert(0, None) ret = self.mapping[rank] for ctr, g in enumerate(group): # No group nesting for the first iteration. if ctr > 0: ret = ret['groups'] # This is the default fill for the group. if g not in ret: ret[g] = {'groups': {}, 'dimensions': {}, 'variables': {}} ret = ret[g] return ret
def barrier_print(*args, **kwargs): kwargs = kwargs.copy() comm = kwargs.pop('comm', MPI_COMM) assert len(kwargs) == 0 size = comm.Get_size() rank = comm.Get_rank() if len(args) == 1: args = args[0] ranks = list(range(size)) comm.Barrier() for r in ranks: if r == rank: print('') print('(rank={}, barrier=True) {}'.format(rank, args)) print('') comm.Barrier() def create_slices(length, size): # TODO: Optimize by removing numpy.arange r = np.arange(length) sections = np.array_split(r, size) sections = [get_optimal_slice_from_array(s, check_diff=False) for s in sections] return sections def dgather(elements): grow = elements[0] for idx in range(1, len(elements)): for k, v in elements[idx].items(): grow[k] = v return grow def get_global_to_local_slice(start_stop, bounds_local): """ :param start_stop: Two-element, integer sequence for the start and stop global indices. :type start_stop: tuple :param bounds_local: Two-element, integer sequence describing the local bounds. :type bounds_local: tuple :return: Two-element integer sequence mapping the global to the local slice. If the local bounds are outside the global slice, ``None`` will be returned. :rtype: tuple or None """ start, stop = start_stop lower, upper = bounds_local if start is None or stop is None: raise ValueError('Start and/or stop may not be None.') new_start = start if start >= upper: new_start = None else: if new_start < lower: new_start = lower if stop <= lower: new_stop = None elif upper < stop: new_stop = upper else: new_stop = stop if new_start is None or new_stop is None: ret = None else: ret = (new_start - lower, new_stop - lower) return ret def dict_get_or_create(ddict, key, default): try: ret = ddict[key] except KeyError: ret = ddict[key] = default return ret def get_rank_bounds(nelements, size, rank, esplit=None): """ :param nelements: The number of elements in the sequence to split. :param size: Processor count. If ``None`` use MPI size. :param rank: The process's rank. If ``None`` use the MPI rank. :param esplit: The split size. If ``None``, compute this internally. :return: A tuple of lower and upper bounds using Python slicing rules. Returns ``None`` if no bounds are available for the rank. Also returns ``None`` in the case of zero length. :rtype: tuple or None >>> get_rank_bounds(5, 4, 2, esplit=None) (3, 4) """ # This is the edge case for zero-length. if nelements == 0: return # Set defaults for the rank and size. # This is the edge case for ranks outside the size. Possible with an overloaded size not related to the MPI # environment. if rank >= size: return # Case with more length than size. Do not take this route of a default split is provided. if nelements > size and esplit is None: nelements = int(nelements) size = int(size) esplit, remainder = divmod(nelements, size) if remainder > 0: # Find the rank bounds with no remainder. ret = get_rank_bounds(nelements - remainder, size, rank) # Adjust the returned slices accounting for the remainder. if rank + 1 <= remainder: ret = (ret[0] + rank, ret[1] + rank + 1) else: ret = (ret[0] + remainder, ret[1] + remainder) elif remainder == 0: # Provide the default split to compute the bounds and avoid the recursion. ret = get_rank_bounds(nelements, size, rank, esplit=esplit) else: raise NotImplementedError # Case with equal length and size or more size than length. else: if esplit is None: if nelements < size: esplit = int(np.ceil(float(nelements) / float(size))) elif nelements == size: esplit = 1 else: raise NotImplementedError else: esplit = int(esplit) if rank == 0: lbound = 0 else: lbound = rank * esplit ubound = lbound + esplit if ubound >= nelements: ubound = nelements if lbound >= ubound: # The lower bound is outside the vector length ret = None else: ret = (lbound, ubound) return ret @contextmanager def mpi_group_scope(ranks, comm=None): from mpi4py.MPI import COMM_NULL comm, rank, size = get_standard_comm_state(comm=comm) base_group = comm.Get_group() sub_group = base_group.Incl(ranks) new_comm = comm.Create(sub_group) try: yield new_comm finally: if new_comm != COMM_NULL: new_comm.Free() sub_group.Free() def ogather(elements): ret = np.array(elements, dtype=object) return ret def hgather(elements): n = sum([e.shape[0] for e in elements]) fill = np.zeros(n, dtype=elements[0].dtype) start = 0 for e in elements: shape_e = e.shape[0] if shape_e == 0: continue stop = start + shape_e fill[start:stop] = e start = stop return fill def create_nd_slices(splits, shape): ret = [None] * len(shape) for idx, (split, shp) in enumerate(zip(splits, shape)): ret[idx] = create_slices(shp, split) ret = [slices for slices in itertools.product(*ret)] return tuple(ret) def create_template_rank_dict(): return {None: {'dimensions': {}, 'groups': {}, 'variables': {}}} def find_dimension_in_sequence(dimension_name, dimensions): ret = None for dim in dimensions: if dimension_name == dim.name: ret = dim break if ret is None: raise DimensionNotFound('Dimension not found: {}'.format(dimension_name)) return ret def get_nonempty_ranks(target, the_vm): """Collective!""" gathered = the_vm.gather(target.is_empty) if the_vm.rank == 0: ret = [] for idx, rank in enumerate(the_vm.ranks): if not gathered[idx]: ret.append(rank) ret = tuple(ret) else: ret = None ret = the_vm.bcast(ret) return ret def get_optimal_splits(size, shape): n_elements = reduce(lambda x, y: x * y, shape) if size >= n_elements: splits = shape else: if size <= shape[0]: splits = [1] * len(shape) splits[0] = size else: even_split = int(np.power(size, 1.0 / float(len(shape)))) splits = [None] * len(shape) for idx, shp in enumerate(shape): if even_split > shp: fill = shp else: fill = even_split splits[idx] = fill return tuple(splits) def get_standard_comm_state(comm=None): comm = comm or MPI_COMM return comm, comm.Get_rank(), comm.Get_size() def rank_print(*args): if len(args) == 1: args = args[0] msg = '(rank={}) {}'.format(MPI_RANK, args) print(msg) def redistribute_by_src_idx(variable, dimname, dimension): """ Redistribute values in ``variable`` using the source index associated with ``dimension``. The reloads the data from source and does not do an in-memory redistribution using MPI. This function is collective across the current `~ocgis.OcgVM`. * Uses fancy indexing only. * Gathers all source indices to a single processor. :param variable: The variable to redistribute. :type variable: :class:`~ocgis.Variable` :param str dimname: The name of the dimension holding the source indices. :param dimension: The dimension object. :type dimension: :class:`~ocgis.Dimension` """ from ocgis import SourcedVariable, Variable, vm from ocgis.variable.dimension import create_src_idx assert isinstance(variable, SourcedVariable) assert dimname is not None # If this is a serial operation just return. The rank should be fully autonomous in terms of its source information. if vm.size == 1: return # There needs to be at least one rank to redistribute. live_ranks = vm.get_live_ranks_from_object(variable) if len(live_ranks) == 0: raise ValueError('There must be at least one rank to redistribute by source index.') # Remove relevant values from a variable. def _reset_variable_(target): target._is_empty = None target._mask = None target._value = None target._has_initialized_value = False # Gather the sliced dimensions. This dimension hold the source indices that are redistributed. dims_global = vm.gather(dimension) if vm.rank == 0: # Filter any none-type dimensions to handle currently empty ranks. dims_global = [d for d in dims_global if d is not None] # Convert any bounds-type source indices to fancy type. # TODO: Support bounds-type source indices. for d in dims_global: if d._src_idx_type == SourceIndexType.BOUNDS: d._src_idx = create_src_idx(*d._src_idx, si_type=SourceIndexType.FANCY) # Create variable to scatter that holds the new global source indices. global_src_idx = hgather([d._src_idx for d in dims_global]) global_src_idx = Variable(name='global_src_idx', value=global_src_idx, dimensions=dimname) # The new size is also needed to create a regular distribution for the variable scatter. global_src_idx_size = global_src_idx.size else: global_src_idx, global_src_idx_size = [None] * 2 # Build the new distribution based on the gathered source indices. global_src_idx_size = vm.bcast(global_src_idx_size) dest_dist = OcgDist() new_dim = dest_dist.create_dimension(dimname, global_src_idx_size, dist=True) dest_dist.update_dimension_bounds() # This variable holds the new source indices. new_rank_src_idx = variable_scatter(global_src_idx, dest_dist) if new_rank_src_idx.is_empty: # Support new empty ranks following the scatter. variable.convert_to_empty() else: # Reset the variable so everything can be loaded from source. _reset_variable_(variable) # Update the source index on the target dimension. new_dim._src_idx = new_rank_src_idx.get_value() # Add the dimension with the new source index to the collection. variable.parent.dimensions[dimname] = new_dim # All emptiness should be pushed back to the dimensions. variable.parent._is_empty = None for var in variable.parent.values(): var._is_empty = None # Any variables that have a shared dimension should also be reset. for var in variable.parent.values(): if dimname in var.dimension_names: if new_rank_src_idx.is_empty: var.convert_to_empty() else: _reset_variable_(var) def variable_collection_scatter(variable_collection, dest_dist): from ocgis import vm if vm.rank == 0: for v in variable_collection.values(): if v.dist: msg = "Only variables with no prior distribution may be scattered. '{}' is distributed." raise ValueError(msg.format(v.name)) target = variable_collection.first() else: target = None sv = variable_scatter(target, dest_dist) svc = sv.parent if vm.rank == 0: n_children = len(variable_collection.children) else: n_children = None n_children = vm.bcast(n_children) if vm.rank == 0: children = list(variable_collection.children.values()) else: children = [None] * n_children for child in children: scattered_child = variable_collection_scatter(child, dest_dist)[0] svc.add_child(scattered_child, force=True) return svc def variable_gather(variable, root=0): from ocgis import vm if variable.is_empty: raise ValueError('No empty variables allowed.') if vm.size > 1: if vm.rank == root: new_variable = variable.copy() new_variable.dtype = variable.dtype new_variable._mask = None new_variable._value = None new_variable._dimensions = None assert not new_variable.has_allocated_value else: new_variable = None else: new_variable = variable for dim in new_variable.dimensions: dim.dist = False if vm.size > 1: if vm.rank == root: new_dimensions = [None] * variable.ndim else: new_dimensions = None for idx, dim in enumerate(variable.dimensions): if dim.dist: parts = vm.gather(dim) if vm.rank == root: new_dim = dim.copy() if dim.dist: new_size = 0 has_src_idx = False for part in parts: has_src_idx = part._src_idx is not None new_size += len(part) if has_src_idx: if part._src_idx_type == SourceIndexType.FANCY: new_src_idx = np.zeros(new_size, dtype=DataType.DIMENSION_SRC_INDEX) for part in parts: new_src_idx[part.bounds_local[0]: part.bounds_local[1]] = part._src_idx else: part_bounds = [None] * len(parts) for idx2, part in enumerate(parts): part_bounds[idx2] = part._src_idx part_bounds = np.array(part_bounds) new_src_idx = (part_bounds.min(), part_bounds.max()) else: new_src_idx = None new_dim = dim.copy() new_dim.set_size(new_size, src_idx=new_src_idx) new_dim.dist = False new_dimensions[idx] = new_dim if vm.rank == root: new_variable.set_dimensions(new_dimensions, force=True) gathered_variables = vm.gather(variable) if vm.rank == root: for idx, gv in enumerate(gathered_variables): destination_slice = [slice(*dim.bounds_local) for dim in gv.dimensions] new_variable.__setitem__(destination_slice, gv) return new_variable else: return else: return new_variable def variable_scatter(variable, dest_dist, root=0, strict=False): from ocgis import vm if variable is not None: raise_if_empty(variable) if vm.rank == root: if variable.dist: raise ValueError('Only variables with no prior distribution may be scattered.') if not dest_dist.has_updated_dimensions: raise ValueError('The destination distribution must have updated dimensions.') # Find the appropriate group for the dimensions. if vm.rank == root: group = variable.group # dimension_names = [dim.name for dim in variable.dimensions] dimension_names = variable.parent.dimensions.keys() else: group = None dimension_names = None # Depending on the strictness level, not all dimensions may be present in the distribution. This is allowed for # processes to more flexibly add undistributed dimensions. Distributed dimensions should be part of the destination # distribution already. not_in_dist = {} if vm.rank == 0: for dest_dimension_name in dimension_names: try: _ = dest_dist.get_dimension(dest_dimension_name, group=group) except DimensionNotFound: if strict: raise else: not_in_dist[dest_dimension_name] = variable.parent.dimensions[dest_dimension_name] not_in_dist = vm.bcast(not_in_dist) # Synchronize the processes with the MPI distribution and the group containing the dimensions. dest_dist = vm.bcast(dest_dist, root=root) group = vm.bcast(group, root=root) # Need to convert the object to a list to be compatible with Python 3. if dimension_names is not None: dimension_names = list(dimension_names) dimension_names = vm.bcast(dimension_names, root=root) # These are the dimensions for the local process. dest_dimensions = [None] * len(dimension_names) for ii, dest_dimension_name in enumerate(dimension_names): try: d = dest_dist.get_dimension(dest_dimension_name, group=group) except DimensionNotFound: if strict: raise else: # Dimensions not in the distribution should have been received from the root process. d = not_in_dist[dest_dimension_name] dest_dimensions[ii] = d # Populate the local destination dimensions dictionary. dd_dict = OrderedDict() for d in dest_dimensions: dd_dict[d.name] = d # Slice the variables collecting the sequence to scatter to the MPI procs. if vm.rank == root: size = dest_dist.size if size > 1: slices = [None] * size # Get the slices need to scatter the variables. These are essentially the local bounds on each dimension. empty_ranks = dest_dist.get_empty_ranks() empty_variable = variable.copy() empty_variable.convert_to_empty() for current_rank in range(size): if current_rank in empty_ranks: slices[current_rank] = None else: current_dimensions = list( dest_dist.get_group(group=group, rank=current_rank)['dimensions'].values()) slices[current_rank] = {dim.name: slice(*dim.bounds_local) for dim in current_dimensions if dim.name in variable.parent.dimensions} # Slice the variables. These sliced variables are the scatter targets. variables_to_scatter = [None] * size for idx, slc in enumerate(slices): if slc is None: variables_to_scatter[idx] = empty_variable else: variables_to_scatter[idx] = variable.parent[slc][variable.name] else: variables_to_scatter = [variable] else: variables_to_scatter = None # Scatter the variable across processes. scattered_variable = vm.scatter(variables_to_scatter, root=root) # Update the scattered variable collection dimensions with the destination dimensions on the process. Everything # should align shape-wise. scattered_variable.parent._dimensions = dd_dict return scattered_variable def vgather(elements): n = sum([e.shape[0] for e in elements]) fill = np.zeros((n, elements[0].shape[1]), dtype=elements[0].dtype) start = 0 for e in elements: shape_e = e.shape if shape_e[0] == 0: continue stop = start + shape_e[0] fill[start:stop, :] = e start = stop return fill def cancel_free_requests(request_sequence): for r in request_sequence: try: r.Cancel() r.Free() except: pass