Source code for ocgis.ops.engine

import logging
from copy import deepcopy

from ocgis import env, constants
from ocgis import vm
from ocgis.base import raise_if_empty, AbstractOcgisObject
from ocgis.calc.engine import CalculationEngine
from ocgis.collection.field import Field
from ocgis.collection.spatial import SpatialCollection
from ocgis.constants import WrappedState, HeaderName, WrapAction, SubcommName, KeywordArgument
from ocgis.exc import ExtentError, EmptySubsetError, BoundsAlreadyAvailableError, SubcommNotFoundError, \
    NoDataVariablesFound, WrappedStateEvalTargetMissing
from ocgis.spatial.spatial_subset import SpatialSubsetOperation
from ocgis.util.helpers import get_default_or_apply
from ocgis.util.logging_ocgis import ocgis_lh, ProgressOcgOperations
from ocgis.variable.base import create_typed_variable_from_data_model
from ocgis.variable.crs import CFRotatedPole, Spherical, WGS84


[docs]class OperationsEngine(AbstractOcgisObject): """ Executes the operations defined by ``ops``. :param ops: The operations to interpret. :type ops: :class:`~ocgis.OcgOperations` :param bool request_base_size_only: If ``True``, return field objects following the spatial subset performing as few operations as possible. :param progress: A progress object to update. :type progress: :class:`~ocgis.util.logging_ocgis.ProgressOcgOperations` """ def __init__(self, ops, request_base_size_only=False, progress=None): self.ops = ops self._request_base_size_only = request_base_size_only self._subset_log = ocgis_lh.get_logger('subset') self._progress = progress or ProgressOcgOperations() self._original_subcomm = deepcopy(vm.current_comm_name) self._backtransform = {} # Create the calculation engine is calculations are present. if self.ops.calc is None or self._request_base_size_only: self.cengine = None self._has_multivariate_calculations = False else: ocgis_lh('initializing calculation engine', self._subset_log, level=logging.DEBUG) self.cengine = CalculationEngine(self.ops.calc_grouping, self.ops.calc, calc_sample_size=self.ops.calc_sample_size, progress=self._progress, spatial_aggregation=self.ops.aggregate) self._has_multivariate_calculations = self.cengine.has_multivariate_functions def __iter__(self): """:rtype: :class:`ocgis.collection.base.AbstractCollection`""" ocgis_lh('beginning iteration', logger='conv.__iter__', level=logging.DEBUG) # Yields collections with all operations applied. try: for coll in self._iter_collections_(): ocgis_lh('__iter__ yielding', self._subset_log, level=logging.DEBUG) yield coll finally: # Try and remove any subcommunicators associated with operations. for v in SubcommName.__members__.values(): try: vm.free_subcomm(name=v) except SubcommNotFoundError: pass vm.set_comm(self._original_subcomm) # Remove any back transformations. for v in constants.BackTransform.__members__.values(): self._backtransform.pop(v, None) def _iter_collections_(self): """:rtype: :class:`ocgis.collection.base.AbstractCollection`""" # Multivariate calculations require datasets come in as a list with all variable inputs part of the same # sequence. if self._has_multivariate_calculations: itr_rd = [[rd for rd in self.ops.dataset]] # Otherwise, process geometries expects a single element sequence. else: itr_rd = [[rd] for rd in self.ops.dataset] # Configure the progress object. self._progress.n_subsettables = len(itr_rd) self._progress.n_geometries = get_default_or_apply(self.ops.geom, len, default=1) self._progress.n_calculations = get_default_or_apply(self.ops.calc, len, default=0) # Some introductory logging. msg = '{0} dataset collection(s) to process.'.format(self._progress.n_subsettables) ocgis_lh(msg=msg, logger=self._subset_log) if self.ops.geom is None: msg = 'Entire spatial domain returned. No selection geometries requested.' else: msg = 'Each data collection will be subsetted by {0} selection geometries.'.format( self._progress.n_geometries) ocgis_lh(msg=msg, logger=self._subset_log) if self._progress.n_calculations == 0: msg = 'No calculations requested.' else: msg = 'The following calculations will be applied to each data collection: {0}.'. \ format(', '.join([_['func'] for _ in self.ops.calc])) ocgis_lh(msg=msg, logger=self._subset_log) # Process the incoming datasets. Convert from request datasets to fields as needed. for rds in itr_rd: try: msg = 'Processing URI(s): {0}'.format([rd.uri for rd in rds]) except AttributeError: # Field objects have no URIs. Multivariate calculations change how the request dataset iterator is # configured as well. msg = [] for rd in rds: try: msg.append(rd.uri) except AttributeError: # Likely a field object which does have a name. msg.append(rd.name) msg = 'Processing URI(s) / field names: {0}'.format(msg) ocgis_lh(msg=msg, logger=self._subset_log) for coll in self._process_subsettables_(rds): # If there are calculations, do those now and return a collection. if not vm.is_null and self.cengine is not None: ocgis_lh('Starting calculations.', self._subset_log) raise_if_empty(coll) # Look for any temporal grouping optimizations. if self.ops.optimizations is None: tgds = None else: tgds = self.ops.optimizations.get('tgds') # Execute the calculations. coll = self.cengine.execute(coll, file_only=self.ops.file_only, tgds=tgds) # If we need to spatially aggregate and calculations used raw values, update the collection # fields and subset geometries. if self.ops.aggregate and self.ops.calc_raw: coll_to_itr = coll.copy() for sfield, container in coll_to_itr.iter_fields(yield_container=True): sfield = _update_aggregation_wrapping_crs_(self, None, sfield, container, None) coll.add_field(sfield, container, force=True) else: # If there are no calculations, mark progress to indicate a geometry has been completed. self._progress.mark() # Conversion of groups. if self.ops.output_grouping is not None: raise NotImplementedError else: ocgis_lh('_iter_collections_ yielding', self._subset_log, level=logging.DEBUG) yield coll def _process_subsettables_(self, rds): """ :param rds: Sequence of :class:~`ocgis.RequestDataset` objects. :type rds: sequence :rtype: :class:`ocgis.collection.base.AbstractCollection` """ ocgis_lh(msg='entering _process_subsettables_', logger=self._subset_log, level=logging.DEBUG) # This is used to define the group of request datasets for these like logging and exceptions. try: alias = '_'.join([r.field_name for r in rds]) except AttributeError: # Allow field objects with do not expose the "field_name" attribute. try: alias = '_'.join([r.name for r in rds]) except TypeError: # The alias is used for logging, etc. If it cannot be constructed easily, leave it as None. alias = None except NoDataVariablesFound: # If an alias is not provided and there are no data variables, set to None as this is used only for logging. alias = None ocgis_lh('processing...', self._subset_log, alias=alias, level=logging.DEBUG) # Create the field object. Field objects may be passed directly to operations. # Look for field optimizations. Field optimizations typically include pre-loaded datetime objects. if self.ops.optimizations is not None and 'fields' in self.ops.optimizations: ocgis_lh('applying optimizations', self._subset_log, level=logging.DEBUG) field = [self.ops.optimizations['fields'][rd.field_name].copy() for rd in rds] has_field_optimizations = True else: # Indicates no field optimizations loaded. has_field_optimizations = False try: # No field optimizations and data should be loaded from source. if not has_field_optimizations: ocgis_lh('creating field objects', self._subset_log, level=logging.DEBUG) len_rds = len(rds) field = [None] * len_rds for ii in range(len_rds): rds_element = rds[ii] try: field_object = rds_element.get(format_time=self.ops.format_time, grid_abstraction=self.ops.abstraction) except (AttributeError, TypeError): # Likely a field object which does not need to be loaded from source. if not self.ops.format_time: raise NotImplementedError # Check that is indeed a field before a proceeding. if not isinstance(rds_element, Field): raise field_object = rds_element field[ii] = field_object # Multivariate calculations require pulling variables across fields. if self._has_multivariate_calculations and len(field) > 1: for midx in range(1, len(field)): # Use the data variable tag if it is available. Otherwise, attempt to merge the fields raising # warning if the variable exists in the squashed field. if len(field[midx].data_variables) > 0: vitr = field[midx].data_variables is_data = True else: vitr = list(field[midx].values()) is_data = False for mvar in vitr: mvar = mvar.extract() field[0].add_variable(mvar, is_data=is_data) new_field_name = '_'.join([str(f.name) for f in field]) field[0].set_name(new_field_name) # The first field in the list is always the target for other operations. field = field[0] assert isinstance(field, Field) # Break out of operations if the rank is empty. vm.create_subcomm_by_emptyable(SubcommName.FIELD_GET, field, is_current=True, clobber=True) if not vm.is_null: if not has_field_optimizations: if field.is_empty: raise ValueError('No empty fields allowed.') # Time, level, etc. subsets. field = self._get_nonspatial_subset_(field) # Spatially reorder the data. ocgis_lh(msg='before spatial reorder', logger=self._subset_log, level=logging.DEBUG) if self.ops.spatial_reorder: self._update_spatial_order_(field) # Extrapolate the spatial bounds if requested. # TODO: Rename "interpolate" to "extrapolate". if self.ops.interpolate_spatial_bounds: self._update_bounds_extrapolation_(field) # This error is related to subsetting by time or level. Spatial subsetting occurs below. except EmptySubsetError as e: if self.ops.allow_empty: ocgis_lh(msg='time or level subset empty but empty returns allowed', logger=self._subset_log, level=logging.WARN) coll = self._get_initialized_collection_() name = '_'.join([rd.field_name for rd in rds]) field = Field(name=name, is_empty=True) coll.add_field(field, None) try: yield coll finally: return else: # Raise an exception as empty subsets are not allowed. ocgis_lh(exc=ExtentError(message=str(e)), alias=str([rd.field_name for rd in rds]), logger=self._subset_log) # Set iterator based on presence of slice. Slice always overrides geometry. if self.ops.slice is not None: itr = [None] else: itr = [None] if self.ops.geom is None else self.ops.geom for coll in self._process_geometries_(itr, field, alias): # Conform units following the spatial subset. if not vm.is_null and self.ops.conform_units_to is not None: for to_conform in coll.iter_fields(): for dv in to_conform.data_variables: dv.cfunits_conform(self.ops.conform_units_to) ocgis_lh(msg='_process_subsettables_ yielding', logger=self._subset_log, level=logging.DEBUG) yield coll def _process_geometries_(self, itr, field, alias): """ :param itr: An iterator yielding :class:`~ocgis.Field` objects for subsetting. :type itr: [None] or [:class:`~ocgis.Field`, ...] :param :class:`ocgis.Field` field: The target field for operations. :param str alias: The request data alias currently being processed. :rtype: :class:`~ocgis.SpatialCollection` """ assert isinstance(field, Field) ocgis_lh('processing geometries', self._subset_log, level=logging.DEBUG) # Process each geometry. for subset_field in itr: # Initialize the collection storage. coll = self._get_initialized_collection_() if vm.is_null: sfield = field else: # Always work with a copy of the subset geometry. This gets twisted in interesting ways depending on the # subset target with wrapping, coordinate system conversion, etc. subset_field = deepcopy(subset_field) if self.ops.regrid_destination is not None: # If there is regridding, make another copy as this geometry may be manipulated during subsetting of # sources. subset_field_for_regridding = deepcopy(subset_field) # Operate on the rotated pole coordinate system by first transforming it to the default coordinate # system. key = constants.BackTransform.ROTATED_POLE self._backtransform[key] = self._get_update_rotated_pole_state_(field, subset_field) # Check if the geometric abstraction is available on the field object. self._assert_abstraction_available_(field) # Return a slice or snippet if either of these are requested. field = self._get_slice_or_snippet_(field) # Choose the subset UGID value. if subset_field is None: msg = 'No selection geometry. Returning all data. No unique geometry identifier.' subset_ugid = None else: subset_ugid = subset_field.geom.ugid.get_value()[0] msg = 'Subsetting with selection geometry having UGID={0}'.format(subset_ugid) ocgis_lh(msg=msg, logger=self._subset_log) if subset_field is not None: # If the coordinate systems differ, update the spatial subset's CRS to match the field. if subset_field.crs is not None and subset_field.crs != field.crs: subset_field.update_crs(field.crs) # If the geometry is a point, it needs to be buffered if there is a search radius multiplier. subset_field = self._get_buffered_subset_geometry_if_point_(field, subset_field) # If there is a selection geometry present, use it for the spatial subset. if not, all the field's data # is being returned. if subset_field is None: sfield = field else: sfield = self._get_spatially_subsetted_field_(alias, field, subset_field, subset_ugid) ocgis_lh(msg='after self._get_spatially_subsetted_field_', logger=self._subset_log, level=logging.DEBUG) # Create the subcommunicator following the data subset to ensure non-empty communication. vm.create_subcomm_by_emptyable(SubcommName.FIELD_SUBSET, sfield, is_current=True, clobber=True) if not vm.is_null: if not sfield.is_empty and not self.ops.allow_empty: raise_if_empty(sfield) # If the base size is being requested, bypass the rest of the operations. if not self._request_base_size_only: # Perform regridding operations if requested. if self.ops.regrid_destination is not None and sfield.regrid_source: sfield = self._get_regridded_field_with_subset_(sfield, subset_field_for_regridding=subset_field_for_regridding) else: ocgis_lh(msg='no regridding operations', logger=self._subset_log, level=logging.DEBUG) # If empty returns are allowed, there may be an empty field. if sfield is not None: # Only update spatial stuff if there are no calculations and, if there are calculations, # those calculations are not expecting raw values. if self.ops.calc is None or (self.ops.calc is not None and not self.ops.calc_raw): # Update spatial aggregation, wrapping, and coordinate systems. sfield = _update_aggregation_wrapping_crs_(self, alias, sfield, subset_field, subset_ugid) ocgis_lh('after _update_aggregation_wrapping_crs_ in _process_geometries_', self._subset_log, level=logging.DEBUG) # Add the created field to the output collection with the selection geometry. if sfield is None: assert self.ops.aggregate if sfield is not None: coll.add_field(sfield, subset_field) yield coll def _get_nonspatial_subset_(self, field): """ :param field: :type field: :class:`~ocgis.Field` :return: :raises: EmptySubsetError """ # Apply any time or level subsetting provided through operations. if self.ops.time_range is not None: field = field.time.get_between(*self.ops.time_range).parent if self.ops.time_region is not None: field = field.time.get_time_region(self.ops.time_region).parent if self.ops.time_subset_func is not None: field = field.time.get_subset_by_function(self.ops.time_subset_func).parent if self.ops.level_range is not None: field = field.level.get_between(*self.ops.level_range).parent return field @staticmethod def _get_initialized_collection_(): coll = SpatialCollection() return coll def _get_update_rotated_pole_state_(self, field, subset_field): """ Rotated pole coordinate systems are handled internally by transforming the CRS to a geographic coordinate system. :param field: :type field: :class:`ocgis.Field` :param subset_field: :type subset_field: :class:`ocgis.Field` or None :rtype: None or :class:`ocgis.variable.crs.CFRotatedPole` :raises: AssertionError """ # CFRotatedPole takes special treatment. only do this if a subset geometry is available. this variable is # needed to determine if backtransforms are necessary. original_rotated_pole_crs = None if isinstance(field.crs, CFRotatedPole): # Only transform if there is a subset geometry. if subset_field is not None or self.ops.aggregate or self.ops.spatial_operation == 'clip': # Update the CRS. Copy the original CRS for possible later transformation back to rotated pole. original_rotated_pole_crs = deepcopy(field.crs) ocgis_lh('initial rotated pole transformation...', self._subset_log, level=logging.DEBUG) field.update_crs(env.DEFAULT_COORDSYS) ocgis_lh('...finished initial rotated pole transformation', self._subset_log, level=logging.DEBUG) return original_rotated_pole_crs def _assert_abstraction_available_(self, field): """ Assert the spatial abstraction may be loaded on the field object if one is provided in the operations. :param field: The field to check for a spatial abstraction. :type field: :class:`ocgis.Field` """ if self.ops.abstraction != 'auto': is_available = field.grid.is_abstraction_available(self.ops.abstraction) if not is_available: msg = 'A "{0}" spatial abstraction is not available.'.format(self.ops.abstraction) ocgis_lh(exc=ValueError(msg), logger='subset') def _get_slice_or_snippet_(self, field): """ Slice the incoming field if a slice or snippet argument is present. :param field: The field to slice. :type field: :class:`ocgis.Field` :rtype: :class:`ocgis.Field` """ # If there is a snippet, return the first realization, time, and level. if self.ops.snippet: the_slice = {'time': 0, 'realization': 0, 'level': 0} # If there is a slice, use it to subset the field. Only field slices are supported. elif self.ops.slice is not None: the_slice = self.ops.slice else: the_slice = None if the_slice is not None: field = field.get_field_slice(the_slice, strict=False, distributed=True) return field def _get_spatially_subsetted_field_(self, alias, field, subset_field, subset_ugid): """ Spatially subset a field with a selection field. :param str alias: The request data alias currently being processed. :param field: Target field to subset. :type field: :class:`ocgis.Field` :param subset_field: The field to use for subsetting. :type subset_field: :class:`ocgis.Field` :rtype: :class:`ocgis.Field` :raises: AssertionError, ExtentError """ assert subset_field is not None ocgis_lh('executing spatial subset operation', self._subset_log, level=logging.DEBUG, alias=alias, ugid=subset_ugid) sso = SpatialSubsetOperation(field) try: # Execute the spatial subset and return the subsetted field. sfield = sso.get_spatial_subset(self.ops.spatial_operation, subset_field.geom, select_nearest=self.ops.select_nearest, optimized_bbox_subset=self.ops.optimized_bbox_subset) except EmptySubsetError as e: if self.ops.allow_empty: ocgis_lh(alias=alias, ugid=subset_ugid, msg='Empty geometric operation but empty returns allowed.', level=logging.WARN) sfield = Field(name=field.name, is_empty=True) else: msg = ' This typically means the selection geometry falls outside the spatial domain of the target ' \ 'dataset.' msg = str(e) + msg ocgis_lh(exc=ExtentError(message=msg), alias=alias, logger=self._subset_log) # If the subset geometry is unwrapped and the vector wrap option is true, wrap the subset geometry. if self.ops.vector_wrap: if subset_field.wrapped_state == WrappedState.UNWRAPPED: subset_field.wrap() return sfield def _get_buffered_subset_geometry_if_point_(self, field, subset_field): """ If the subset geometry is a point of multipoint, it will need to be buffered and the spatial dimension updated accordingly. If the subset geometry is a polygon, pass through. :param field: :type field: :class:`ocgis.Field` :param subset_field: :type subset_field: :class:`ocgis.Field` """ if subset_field.geom.geom_type in ['Point', 'MultiPoint'] and self.ops.search_radius_mult is not None: ocgis_lh(logger=self._subset_log, msg='buffering point geometry', level=logging.DEBUG) subset_field = subset_field.geom.get_buffer(self.ops.search_radius_mult * field.grid.resolution).parent assert subset_field.geom.geom_type in ['Polygon', 'MultiPolygon'] return subset_field def _get_regridded_field_with_subset_(self, sfield, subset_field_for_regridding=None): """ Regrid ``sfield`` subsetting the regrid destination in the process. :param sfield: The input field to regrid. :type sfield: :class:`ocgis.Field` :param subset_field_for_regridding: The original, unaltered spatial dimension to use for subsetting. :type subset_field_for_regridding: :class:`ocgis.Field` :rtype: :class:`~ocgis.Field` """ from ocgis.regrid.base import RegridOperation ocgis_lh(logger=self._subset_log, msg='Starting regrid operation...', level=logging.INFO) ro = RegridOperation(sfield, self.ops.regrid_destination, subset_field=subset_field_for_regridding, regrid_options=self.ops.regrid_options) sfield = ro.execute() ocgis_lh(logger=self._subset_log, msg='Regrid operation complete.', level=logging.INFO) return sfield def _update_bounds_extrapolation_(self, field): try: name_x_variable = '{}_{}'.format(field.grid.x.name, constants.OCGIS_BOUNDS) name_y_variable = '{}_{}'.format(field.grid.y.name, constants.OCGIS_BOUNDS) field.grid.set_extrapolated_bounds(name_x_variable, name_y_variable, constants.OCGIS_BOUNDS) except BoundsAlreadyAvailableError: msg = 'Bounds/corners already on object. Ignoring "interpolate_spatial_bounds".' ocgis_lh(msg=msg, logger=self._subset_log, level=logging.WARNING) def _update_spatial_order_(self, field): _update_wrapping_(self, field) if field.grid is not None: wrapped_state = field.grid.wrapped_state if wrapped_state == WrappedState.WRAPPED: field.grid.reorder() else: msg = 'Reorder not relevant for wrapped state "{}". Doing nothing.'.format( str(wrapped_state)) ocgis_lh(msg=msg, logger=self._subset_log, level=logging.WARN)
def _update_aggregation_wrapping_crs_(obj, alias, sfield, subset_sdim, subset_ugid): raise_if_empty(sfield) ocgis_lh('entering _update_aggregation_wrapping_crs_', obj._subset_log, alias=alias, ugid=subset_ugid, level=logging.DEBUG) # Aggregate if requested. if obj.ops.aggregate: ocgis_lh('aggregate requested in _update_aggregation_wrapping_crs_', obj._subset_log, alias=alias, ugid=subset_ugid, level=logging.DEBUG) # There may be no geometries if we are working with a gridded dataset. Load the geometries if this is the case. sfield.set_abstraction_geom() ocgis_lh('after sfield.set_abstraction_geom in _update_aggregation_wrapping_crs_', obj._subset_log, alias=alias, ugid=subset_ugid, level=logging.DEBUG) # Union the geometries and spatially average the data variables. # with vm.scoped(vm.get_live_ranks_from_object(sfield)): sfield = sfield.geom.get_unioned(spatial_average=sfield.data_variables) ocgis_lh('after sfield.geom.get_unioned in _update_aggregation_wrapping_crs_', obj._subset_log, alias=alias, ugid=subset_ugid, level=logging.DEBUG) # None is returned for the non-root process. Check we are in parallel and create an empty field. if sfield is None: if vm.size == 1: raise ValueError('None should not be returned from get_unioned if running on a single processor.') else: sfield = Field(is_empty=True) else: sfield = sfield.parent vm.create_subcomm_by_emptyable(SubcommName.SPATIAL_AVERAGE, sfield, is_current=True, clobber=True) if not vm.is_null and subset_sdim is not None and subset_sdim.geom is not None: # Add the unique geometry identifier variable. This should match the selection geometry's identifier. new_gid_variable_kwargs = dict(name=HeaderName.ID_GEOMETRY, value=subset_sdim.geom.ugid.get_value(), dimensions=sfield.geom.dimensions) dm = get_data_model(obj.ops) new_gid_variable = create_typed_variable_from_data_model('int', data_model=dm, **new_gid_variable_kwargs) sfield.geom.set_ugid(new_gid_variable) if vm.is_null: ocgis_lh(msg='null communicator following spatial average. returning.', logger=obj._subset_log, level=logging.DEBUG) return sfield raise_if_empty(sfield) ocgis_lh(msg='before wrapped_state in _update_aggregation_wrapping_crs_', logger=obj._subset_log, level=logging.DEBUG) try: wrapped_state = sfield.wrapped_state except WrappedStateEvalTargetMissing: # If there is no target for wrapping evaluation, then consider this unknown. wrapped_state = WrappedState.UNKNOWN ocgis_lh(msg='after wrapped_state in _update_aggregation_wrapping_crs_', logger=obj._subset_log, level=logging.DEBUG) # Wrap the returned data. if not env.OPTIMIZE_FOR_CALC and not sfield.is_empty: if wrapped_state == WrappedState.UNWRAPPED: ocgis_lh('wrap target is empty: {}'.format(sfield.is_empty), obj._subset_log, level=logging.DEBUG) # There may be no geometries if we are working with a gridded dataset. Load the geometries if this # is the case. sfield.set_abstraction_geom() if obj.ops.output_format in constants.VECTOR_OUTPUT_FORMATS and obj.ops.vector_wrap: ocgis_lh('wrapping output geometries', obj._subset_log, alias=alias, ugid=subset_ugid, level=logging.DEBUG) # Deepcopy geometries before wrapping as wrapping will be performed inplace. The original field may # need to be reused for additional subsets. geom = sfield.geom copied_geom = geom.get_value().copy() geom.set_value(copied_geom) # Some grids do not play nicely with wrapping. Bounds may be less than zero for an unwrapped grid. # Force wrapping if it is requested. Normally, when force is false there is a pass-through that will # leave the data untouched. geom.wrap(force=True) ocgis_lh('finished wrapping output geometries', obj._subset_log, alias=alias, ugid=subset_ugid, level=logging.DEBUG) # Transform back to rotated pole if necessary. original_rotated_pole_crs = obj._backtransform.get(constants.BackTransform.ROTATED_POLE) if original_rotated_pole_crs is not None: if not isinstance(obj.ops.output_crs, (Spherical, WGS84)): sfield.update_crs(original_rotated_pole_crs) # Update the coordinate system of the data output. if obj.ops.output_crs is not None: # If the geometry is not none, it may need to be projected to match the output coordinate system. if subset_sdim is not None and subset_sdim.crs != obj.ops.output_crs: subset_sdim.update_crs(obj.ops.output_crs) # Update the subsetted field's coordinate system. sfield = sfield.copy() sfield.update_crs(obj.ops.output_crs) # Wrap or unwrap the data if the coordinate system permits. _update_wrapping_(obj, sfield) ocgis_lh('leaving _update_aggregation_wrapping_crs_', obj._subset_log, level=logging.DEBUG) return sfield def _update_wrapping_(obj, field_object): """ Update the wrapped state of the incoming field object. This only affects fields with wrappable coordinate systems. :param obj: :class:`ocgis.driver.subset.OperationsEngine` :param field_object: :class:`ocgis.Field` """ if obj.ops.spatial_wrapping is not None: if field_object.crs is not None and field_object.crs.is_geographic: as_enum = obj.ops._get_object_('spatial_wrapping').as_enum if field_object.wrapped_state != as_enum: if as_enum == WrapAction.WRAP: field_object.wrap() else: field_object.unwrap() def get_data_model(ops): if ops.output_format_options is None: ret = None else: ret = ops.output_format_options.get(KeywordArgument.DATA_MODEL) if ret is None: ret = ops._get_object_('dataset') if ret.data_model is not None: ret = ret.data_model[0] else: ret = None return ret