Parallel Operations

OpenClimateGIS operations may be run in parallel using MPI provided mpi4py is installed. By default, data is distributed across a single spatial dimension. Dimensions are distributed (local bounds calculation) using the OcgDist class. Parallel execution is controlled by the OcgVM.

All OpenClimateGIS operations are implemented in parallel with minimal inter-process communication. Data writes are performed synchronously with the exception of netCDF which supports asynchronous writes provided the library is built appropriately (see the netCDF4-python Dataset documentation for additional information). OpenClimateGIS’s parallelism handles extra ranks before and after subsetting through support for empty objects. Extra ranks following a subset are handled using subcommunicators by the OcgVM.

For a standard OpenClimateGIS operations script, the script should be executed using mpirun:

>>> mpirun -n <nprocs> </path/to/ocgis/script.py>

Parallelization Scheme

OpenClimateGIS uses data parallelism for operations. Reading, subsetting, and calculations (the operations) are fully parallel. Multiple request datasets or subset geometries are processed in sequence for each dataset/geometry combination.

Spatial Averaging in Parallel

OpenClimateGIS has no bit-for-bit guarantees when spatial averaging in parallel. Each rank spatially averages its own data before combining rank sums on the root process. This often leads to floating point summation differences from a spatial average performed in serial. Hence, if the floating point errors affect your analysis, it is recommended that the process be run in serial or an advanced regridding application like ESMF is used.

Explanatory Parallel Example

This examples executes a simple subset using OpenClimateGIS data interface objects and the OcgVM in parallel. The code should be executed using mpirun -n 2 </path/to/script.py>.

from shapely.geometry import box

import ocgis
from ocgis.test.base import create_gridxy_global, create_exact_field

assert ocgis.vm.size_global == 2

# Create synthetic data for this example. Create and write the data on a single process. Subcommunicators are always
# given a name.
with ocgis.vm.scoped('serial write', [0]):
    # There will be null communicators for the duration of the context manager.
    if not ocgis.vm.is_null:
        grid = create_gridxy_global(resolution=5.0, dist=False)
        field = create_exact_field(grid, 'exact', ntime=31)
        field.write('ocgis_parallel_example.nc')

# Place a barrier so write can finish. Race conditions can occur with subcommunicators. Generally, barriers are managed
# by the operations.
ocgis.vm.barrier()

# This is our subset geometry.
bbox = [150, 30, 170, 50]
bbox = box(*bbox)

# Load the data from file.
rd = ocgis.RequestDataset('ocgis_parallel_example.nc')
field = rd.get()

# By default, data is distributed along the largest spatial dimension. The x or longitude dimension in this case.
distributed_dimension = field.grid.dimensions[1]
bounds_local = {0: (0, 36), 1: (36, 72)}
assert bounds_local[ocgis.vm.rank] == distributed_dimension.bounds_local
assert (0, 72) == distributed_dimension.bounds_global

# Subset by the geometry.
sub = field.grid.get_intersects(bbox).parent

# Empty objects are returned from spatial functions as the emptiness may be important for analysis. Empty objects do not
# have values and many functions will raise an exception if they are present.
if ocgis.vm.rank == 0:
    assert sub.is_empty
else:
    assert not sub.is_empty

# We can scope the VM by emptyable objects.
with ocgis.vm.scoped_by_emptyable('some math', sub):
    if ocgis.vm.is_null:
        print('Empty data on rank {}'.format(ocgis.vm.rank_global))
    else:
        the_shape = sub.data_variables[0].shape
        print('Not empty on rank {} and the shape is {}'.format(ocgis.vm.rank_global, the_shape))
        # Scoping is needed for writing.
        sub.write('ocgis_parallel_example_subset.nc')

print('Parallel example finished')