import abc
import itertools
import tempfile
from copy import deepcopy
import numpy as np
import six
from fiona.crs import from_string, to_string
from ocgis import constants
from ocgis.base import AbstractInterfaceObject, raise_if_empty, AbstractOcgisObject
from ocgis.constants import MPIWriteMode, WrappedState, WrapAction, KeywordArgument, CFName, OcgisUnits, \
ConversionFactor, DimensionMapKey, DMK, OcgisConvention
from ocgis.environment import osr
from ocgis.exc import ProjectionCoordinateNotFound, ProjectionDoesNotMatch, CRSNotEquivalenError, \
WrappedStateEvalTargetMissing, CRSDepthNotImplemented
from ocgis.spatial.wrap import GeometryWrapper, CoordinateArrayWrapper
from ocgis.util.helpers import iter_array, get_iter
from shapely.geometry import Point, Polygon, box
from shapely.geometry.base import BaseMultipartGeometry
from shapely.geometry.multipolygon import MultiPolygon
SpatialReference = osr.SpatialReference
[docs]@six.add_metaclass(abc.ABCMeta)
class AbstractCRS(AbstractInterfaceObject):
"""
Base class for all OCGIS coordinate systems. Intended to allow differentiation between standard PROJ.4 coordinate
systems and specialized OCGIS-supported coordinate systems.
:param angular_units: Angular units for the coordinate system.
:type angular_units: :attr:`ocgis.constants.OcgisUnits`
:param linear_units: Linear units for the coordinate system.
:type linear_units: :attr:`ocgis.constants.OcgisUnits`
"""
_cf_attributes = None
_cf_attributes_to_remove = ('units', 'standard_name')
_default_towgs84 = None
_fuzzy_value = None
_string_max_length_global = None
def __init__(self, angular_units=OcgisUnits.DEGREES, linear_units=None):
self._angular_units = angular_units
self._linear_units = linear_units
@property
def angular_units(self):
return self._angular_units
@property
def dist(self):
"""Here for variable compatibility."""
return False
@property
def linear_units(self):
return self._linear_units
@property
def attrs(self):
return {}
@property
def has_initialized_parent(self):
return False
@property
def is_empty(self):
return False
@abc.abstractproperty
def is_geographic(self):
"""
:return: ``True`` if the coordinate system is considered geographic.
:rtype: bool
"""
@abc.abstractproperty
def is_wrappable(self):
"""
:return: ``True`` if the coordinate system may be globally wrapped or unwrapped. A wrappable CRS will use degree
units on ranges 0 to 360 and -180 to 180.
:rtype: bool
"""
@property
def is_orphaned(self):
return True
@property
def ndim(self):
return 0
@property
def units(self):
raise NotImplementedError
def as_variable(self):
from ocgis.variable.base import Variable
var = Variable(name=self.name)
return var
def convert_to_empty(self):
pass
def create_dimension_map(self, group_metadata, strict=False):
from ocgis import DimensionMap
return DimensionMap()
[docs] @classmethod
def fuzzy_check(cls, value):
"""
Coordinate system definitions are often changing in interpreters. This function allows coordinate system
definitions to vary slightly, but still be created appropriately.
This returns ``True`` if there is a fuzzy match.
:param dict value: The coordinate system dictionary definition.
:return: bool
"""
req = cls._fuzzy_value
ret = True
for k, v in req.items():
if k not in value:
ret = False
break
else:
if v != value[k]:
ret = False
break
if ret:
if cls._default_towgs84 is not None and 'towgs84' in value:
ret = value['towgs84'] == cls._default_towgs84
return ret
[docs] @classmethod
def get_wrap_action(cls, state_src, state_dst):
"""
:param int state_src: The wrapped state of the source dataset. (:class:`ocgis.constants.WrappedState`)
:param int state_dst: The wrapped state of the destination dataset. (:class:`ocgis.constants.WrappedState`)
:returns: The wrapping action to perform on ``state_src``. (:class:`ocgis.constants.WrapAction`)
:rtype: int
:raises: NotImplementedError, ValueError
"""
possible = [WrappedState.WRAPPED, WrappedState.UNWRAPPED, WrappedState.UNKNOWN]
has_issue = None
if state_src not in possible:
has_issue = 'source'
if state_dst not in possible:
has_issue = 'destination'
if has_issue is not None:
msg = 'The wrapped state on "{0}" is not recognized.'.format(has_issue)
raise ValueError(msg)
# the default action is to do nothing.
ret = None
# if the wrapped state of the destination is unknown, then there is no appropriate wrapping action suitable for
# the source.
if state_dst == WrappedState.UNKNOWN:
ret = None
# if the destination is wrapped and src is unwrapped, then wrap the src.
elif state_dst == WrappedState.WRAPPED:
if state_src == WrappedState.UNWRAPPED:
ret = WrapAction.WRAP
# if the destination is unwrapped and the src is wrapped, the source needs to be unwrapped.
elif state_dst == WrappedState.UNWRAPPED:
if state_src == WrappedState.WRAPPED:
ret = WrapAction.UNWRAP
else:
raise NotImplementedError(state_dst)
return ret
[docs] def get_wrapped_state(self, target):
"""
:param target: Return the wrapped state of a field. This function only checks grid centroids and geometry
exteriors. Bounds/corners on the grid are excluded.
:type target: :class:`~ocgis.Field`
"""
# TODO: Wrapped state should operate on the x-coordinate variable vectors or geometries only.
# TODO: This should be a method on grids and geometry variables.
from ocgis.collection.field import Field
from ocgis.spatial.base import AbstractXYZSpatialContainer
from ocgis import vm
raise_if_empty(self)
# If this is not a wrappable coordinate system, wrapped state is undefined.
if not self.is_wrappable:
ret = None
else:
if isinstance(target, Field):
grid = target.grid
if grid is not None:
target = grid
else:
target = target.geom
if target is None:
raise WrappedStateEvalTargetMissing
elif target.is_empty:
ret = None
elif isinstance(target, AbstractXYZSpatialContainer):
ret = self._get_wrapped_state_from_array_(target.x.get_value())
else:
stops = (WrappedState.WRAPPED, WrappedState.UNWRAPPED)
ret = WrappedState.UNKNOWN
geoms = target.get_masked_value().flat
_is_masked = np.ma.is_masked
_get_ws = self._get_wrapped_state_from_geometry_
for geom in geoms:
if not _is_masked(geom):
flag = _get_ws(geom)
if flag in stops:
ret = flag
break
rets = vm.gather(ret)
if vm.rank == 0:
rets = set(rets)
if WrappedState.WRAPPED in rets:
ret = WrappedState.WRAPPED
elif WrappedState.UNWRAPPED in rets:
ret = WrappedState.UNWRAPPED
else:
ret = list(rets)[0]
else:
ret = None
ret = vm.bcast(ret)
return ret
[docs] def prepare_geometry_variable(self, subset_geom, rhs_tol=10.0, inplace=True):
"""
Prepared a geometry variable for subsetting. This method:
* Appropriately wraps subset geometries for spherical coordinate systems.
:param subset_geom: The geometry variable to prepare.
:type subset_geom: :class:`~ocgis.GeometryVariable`
:param float rhs_tol: The amount, in matching coordinate system units, to buffer the right hand selection
geometry.
:param bool inplace: If ``True``, modify the object in-place.
"""
# TODO: This should work with arbitrary geometries not just bounding boxes.
assert subset_geom.size == 1
if self.is_wrappable and subset_geom.is_bbox and subset_geom.wrapped_state == WrappedState.UNWRAPPED:
if not inplace:
subset_geom = subset_geom.deepcopy()
svalue = subset_geom.get_value()
buffered_bbox = svalue[0].bounds
if buffered_bbox[0] < 0.:
bbox_rhs = list(deepcopy(buffered_bbox))
bbox_rhs[0] = buffered_bbox[0] + 360.
bbox_rhs[2] = 360. + rhs_tol
bboxes = [buffered_bbox, bbox_rhs]
else:
bboxes = [buffered_bbox]
bboxes = [box(*b) for b in bboxes]
if len(bboxes) > 1:
svalue[0] = MultiPolygon(bboxes)
return subset_geom
[docs] def set_string_max_length_global(self, value=None):
"""Here for variable compatibility."""
[docs] def to_xarray(self, **kwargs):
"""
Convert the CRS variable to a :class:`xarray.DataArray`. This *does not* traverse the parent's hierararchy. Use
the conversion method on the variable's parent to convert all variables in the collection.
:rtype: :class:`xarray.DataArray`
"""
from xarray import DataArray
return DataArray(attrs=self.attrs, name=self.name, data=[])
def wrap_or_unwrap(self, action, target, force=False):
from ocgis.variable.geom import GeometryVariable
from ocgis.spatial.grid import Grid
if action not in (WrapAction.WRAP, WrapAction.UNWRAP):
raise ValueError('"action" not recognized: {}'.format(action))
if target.wrapped_state != action or force:
if action == WrapAction.WRAP:
attr = 'wrap'
else:
attr = 'unwrap'
if isinstance(target, GeometryVariable):
w = GeometryWrapper()
func = getattr(w, attr)
target_value = target.get_value()
for idx, target_geom in iter_array(target_value, use_mask=True, return_value=True,
mask=target.get_mask()):
target_value.__setitem__(idx, func(target_geom))
elif isinstance(target, Grid):
ca = CoordinateArrayWrapper()
func = getattr(ca, attr)
func(target.x.get_value())
target.remove_bounds()
if target.has_allocated_point:
getattr(target.get_point(), attr)()
if target.has_allocated_polygon:
getattr(target.get_polygon(), attr)()
else:
raise NotImplementedError(target)
@classmethod
def _get_wrapped_state_from_array_(cls, arr):
"""
:param arr: Input n-dimensional array.
:type arr: :class:`numpy.ndarray`
:returns: Wrapped state enumeration value from :class:`~ocgis.constants.WrappedState`.
:rtype: int
"""
gt_m180 = arr > constants.MERIDIAN_180TH
lt_pm = arr < 0
if np.any(lt_pm):
ret = WrappedState.WRAPPED
elif np.any(gt_m180):
ret = WrappedState.UNWRAPPED
else:
ret = WrappedState.UNKNOWN
return ret
@classmethod
def _get_wrapped_state_from_geometry_(cls, geom):
"""
:param geom: The input geometry.
:type geom: :class:`~shapely.geometry.point.Point`, :class:`~shapely.geometry.point.Polygon`,
:class:`~shapely.geometry.multipoint.MultiPoint`, :class:`~shapely.geometry.multipolygon.MultiPolygon`
:returns: A string flag. See class level ``_flag_*`` attributes for values.
:rtype: str
:raises: NotImplementedError
"""
if isinstance(geom, BaseMultipartGeometry):
itr = geom
else:
itr = [geom]
app = np.array([])
for element in itr:
if isinstance(element, Point):
element_arr = [np.array(element)[0]]
elif isinstance(element, Polygon):
element_arr = np.array(element.exterior.coords)[:, 0]
else:
raise NotImplementedError(type(element))
app = np.append(app, element_arr)
return cls._get_wrapped_state_from_array_(app)
@staticmethod
def _place_prime_meridian_array_(arr):
"""
Replace any 180 degree values with the value of :attribute:`ocgis.constants.MERIDIAN_180TH`.
:param arr: The target array to modify inplace.
:type arr: :class:`numpy.array`
:rtype: boolean :class:`numpy.array`
"""
from ocgis import constants
# find the values that are 180
select = arr == 180
# replace the values that are 180 with the constant value
np.place(arr, select, constants.MERIDIAN_180TH)
# return the mask used for the replacement
return select
[docs]@six.add_metaclass(abc.ABCMeta)
class AbstractProj4CRS(AbstractCRS):
"""
Base class for coordinate systems that may be transformed using PROJ.4.
"""
[docs]class CoordinateReferenceSystem(AbstractProj4CRS, AbstractInterfaceObject):
"""
Defines a coordinate system objects. One of ``value``, ``proj4``, or ``epsg`` is required.
:param value: A dictionary representation of the coordinate system with PROJ.4 paramters as keys.
:type value: dict
:param proj4: A PROJ.4 string.
:type proj4: str
:param epsg: An EPSG code.
:type epsg: int
:param name: A custom name for the coordinate system.
:type name: str
"""
[docs] def __init__(self, value=None, proj4=None, epsg=None, name=OcgisConvention.Name.COORDSYS):
self.name = name
# Allows operations on data variables to look through an empty dimension list. Alleviates instance checking.
self.dimensions = tuple()
self.dimension_names = tuple()
self.has_bounds = False
self._epsg = epsg
# Some basic overloaded for WGS84.
if epsg == 4326:
value = WGS84().value
# Add a special check for init keys in value dictionary.
if value is not None:
if 'init' in value and list(value.values())[0].startswith('epsg'):
epsg = int(list(value.values())[0].split(':')[1])
value = None
if value is None:
if proj4 is not None:
value = from_string(proj4)
elif epsg is not None:
sr = SpatialReference()
sr.ImportFromEPSG(epsg)
value = from_string(sr.ExportToProj4())
else:
msg = 'A value dictionary, PROJ.4 string, or EPSG code is required.'
raise ValueError(msg)
else:
# Remove unicode to avoid strange issues with proj and fiona.
for k, v in value.items():
if isinstance(v, six.string_types):
value[k] = str(v)
else:
try:
value[k] = v.tolist()
# this may be a numpy arr that needs conversion
except AttributeError:
continue
sr = SpatialReference()
sr.ImportFromProj4(to_string(value))
self.value = from_string(sr.ExportToProj4())
try:
assert self.value != {}
except AssertionError:
msg = 'Empty CRS: The conversion to PROJ.4 may have failed. The CRS value is: {0}'.format(value)
raise ValueError(msg)
[docs] def __eq__(self, other):
try:
if self.sr.IsSame(other.sr) == 1:
ret = True
else:
# Try a value comparison.
if self.value == other.value:
ret = True
else:
# Try without the "wktext" flag.
new_values = [self.value.copy(), other.value.copy()]
for n in new_values:
n.pop('wktext', None)
ret = new_values[0] == new_values[1]
except AttributeError:
# likely a nonetype of other object type
if other is None or not isinstance(other, self.__class__):
ret = False
else:
raise
return ret
[docs] def __ne__(self, other):
return not self.__eq__(other)
# def __repr__(self):
# msg = [str(type(self))]
# elements = []
# elements.append("name = {}".format(self.name))
# elements.append("value = {}".format(self.value))
# elements = [' {}'.format(e) for e in elements]
# msg.extend(elements)
# return '\n'.join(msg)
@property
def dtype(self):
return None
@property
def has_allocated_value(self):
return False
@property
def is_geographic(self):
return bool(self.sr.IsGeographic())
@property
def is_wrappable(self):
return self.is_geographic
@property
def linear_units(self):
return self.sr.GetLinearUnitsName()
@property
def proj4(self):
if self._epsg == 4326:
ret = WGS84().proj4
else:
ret = self.sr.ExportToProj4()
return ret
@property
def sr(self):
sr = SpatialReference()
sr.ImportFromProj4(to_string(self.value))
return sr
@property
def shape(self):
return tuple([0])
@property
def size(self):
return 0
def as_variable(self, with_proj4=True):
ret = super(CoordinateReferenceSystem, self).as_variable()
if with_proj4:
ret.attrs['proj4'] = self.proj4
return ret
def convert_to_empty(self):
pass
def extract(self, *args, **kwargs):
return self
def get_mask(self):
return None
[docs] def load(self, *args, **kwargs):
"""Compatibility with variable."""
pass
[docs] def write_to_rootgrp(self, rootgrp, with_proj4=True, **kwargs):
"""
Write the coordinate system to an open netCDF file.
:param rootgrp: An open netCDF dataset object for writing.
:type rootgrp: :class:`netCDF4.Dataset`
:param bool with_proj4: If ``True``, write the PROJ.4 string to the coordinate system variable in an attribute
called "proj4".
:returns: The netCDF variable object created to hold the coordinate system metadata.
:rtype: :class:`netCDF4.Variable`
"""
variable = rootgrp.createVariable(self.name, 'S1')
if with_proj4:
setattr(variable, OcgisConvention.Name.PROJ4, self.proj4)
setattr(variable, OcgisConvention.Name.OCGIS_ROLE, OcgisConvention.Value.ROLE_COORDSYS)
return variable
def write(self, *args, **kwargs):
write_mode = kwargs.pop('write_mode', MPIWriteMode.NORMAL)
# Let subclasses determine if proj4 should be written.
with_proj4 = kwargs.pop('with_proj4', None)
# Fill operations set values on variables. Coordinate system variables have no inherent values constructed only
# from attributes.
if write_mode != MPIWriteMode.FILL:
if with_proj4 is not None:
ret = self.write_to_rootgrp(*args, **kwargs)
else:
ret = self.write_to_rootgrp(*args)
return ret
[docs]class CRS(CoordinateReferenceSystem):
"""Here for convenience."""
@six.add_metaclass(abc.ABCMeta)
class AbstractSphericalCoordinateReferenceSystem(AbstractOcgisObject):
_cf_attributes = {DimensionMapKey.X: {CFName.UNITS: 'degrees_east',
CFName.STANDARD_NAME: CFName.StandardName.LONGITUDE},
DimensionMapKey.Y: {CFName.UNITS: 'degrees_north',
CFName.STANDARD_NAME: CFName.StandardName.LATITUDE}}
is_wrappable = True
is_geographic = True
def format_spatial_object(self, *args, **kwargs):
if self.angular_units == OcgisUnits.RADIANS:
msg = '{} are not acceptable units for field formatting. Must be {}.'
msg = msg.format(OcgisUnits.RADIANS, OcgisUnits.DEGREES)
raise ValueError(msg)
super(AbstractSphericalCoordinateReferenceSystem, self).format_spatial_object(*args, **kwargs)
def transform_coordinates(self, other_crs, x, y, z, inverse=False):
"""
Transform coordinate arrays to match ``other_crs``. If ``inverse`` is ``True``, then ``other_crs`` is
considered the defining coordinate system for the input vectors. Coordinate arrays must have matching
dimensions.
.. note:: If ``z`` is a scalar, the transformed ``z`` value is not returned.
:param other_crs: :class:`ocgis.variable.crs.AbstractCRS`
:param x: The x-coordinate array.
:type x: :class:`numpy.ndarray`
:param y: The y-coordinate array.
:type y: :class:`numpy.ndarray`
:param z: The z-coordinate array.
:type z: :class:`numpy.ndarray`
:param bool inverse: If ``True``, the coordinate system of the input variables is ``other_crs`` and the
transform CRS is the object.
"""
if not isinstance(other_crs, Cartesian):
raise CRSNotEquivalenError(self, other_crs)
if inverse:
z_out = np.sqrt(x ** 2 + y ** 2 + z ** 2)
x_out = np.arctan2(y, x)
y_out = np.arcsin(z / z_out)
if self.angular_units == OcgisUnits.DEGREES:
x_out *= ConversionFactor.RAD_TO_DEG
y_out *= ConversionFactor.RAD_TO_DEG
else:
if self.angular_units == OcgisUnits.DEGREES:
x = x.copy()
y = y.copy()
select = x > 180.
if select.any():
raise ValueError('x-coordinate must be wrapped if degrees.')
x *= ConversionFactor.DEG_TO_RAD
y *= ConversionFactor.DEG_TO_RAD
x_out = z * np.cos(y) * np.cos(x)
y_out = z * np.cos(y) * np.sin(x)
z_out = z * np.sin(y)
return x_out, y_out, z_out
def transform_geometry(self, other_crs, geom, inverse=False):
from ocgis import GeometryVariable
assert isinstance(geom, GeometryVariable)
assert geom.geom_type == 'Polygon'
assert geom.shape == (1,)
if not inverse:
if geom.wrapped_state == WrappedState.UNWRAPPED:
raise ValueError('Geometry coordinates may not be unwrapped.')
sgeom = geom.get_value()[0]
coords = np.array(sgeom.exterior)
if not sgeom.has_z:
new_coords = np.ones((coords.shape[0], 3), dtype=coords.dtype)
new_coords[:, 0:2] = coords
else:
new_coords = coords
x = new_coords[:, 0]
y = new_coords[:, 1]
z = new_coords[:, 2]
x_out, y_out, z_out = self.transform_coordinates(other_crs, x, y, z, inverse=inverse)
x[:] = x_out
y[:] = y_out
z[:] = z_out
new_geom = Polygon(new_coords)
geom.get_value()[0] = new_geom
def transform_grid(self, other_crs, grid, inverse=False):
"""
Transform a grid's coordinate system.
:param other_crs: See :meth:`ocgis.variable.crs.AbstractSphericalCoordinateReferenceSystem.transform_coordinates`
:param grid: The grid to transform in-place.
:type grid: :class:`ocgis.Grid`
:param inverse: See :meth:`ocgis.variable.crs.AbstractSphericalCoordinateReferenceSystem.transform_coordinates`
"""
if not inverse:
wrapped_state = grid.wrapped_state
if wrapped_state == WrappedState.UNWRAPPED:
raise ValueError('x-coordinates may not be wrapped')
if not grid.has_z:
raise ValueError('A z-/level-coordinate is required')
grid.expand()
x = grid.x.get_value()
y = grid.y.get_value()
z = grid.z.get_value()
x_out, y_out, z_out = self.transform_coordinates(other_crs, x, y, z, inverse=inverse)
if grid.has_bounds:
xb = grid.x.bounds.get_value()
yb = grid.y.bounds.get_value()
zb = grid.z.bounds.get_value()
xb_out, yb_out, zb_out = self.transform_coordinates(other_crs, xb, yb, zb, inverse=inverse)
################################################################################################################
# Set values
grid.x.set_value(x_out)
grid.y.set_value(y_out)
grid.z.set_value(z_out)
if grid.has_bounds:
grid.x.bounds.set_value(xb_out)
grid.y.bounds.set_value(yb_out)
grid.z.bounds.set_value(zb_out)
class Spherical(AbstractSphericalCoordinateReferenceSystem, CoordinateReferenceSystem):
"""
A spherical model of the Earth's surface with equivalent semi-major and semi-minor axes.
:param semi_major_axis: The radius of the spherical model. The default value is taken from the PROJ.4 (v4.8.0)
source code (src/pj_ellps.c).
:type semi_major_axis: float
:param units: Either degrees or radians.
:type units: :class:`ocgis.constants.OcgisUnits`
"""
_default_towgs84 = '0,0,0,0,0,0,0'
_fuzzy_value = {'a': 6370997, 'b': 6370997, 'proj': 'longlat'}
def __init__(self, semi_major_axis=6370997.0, angular_units=OcgisUnits.DEGREES):
value = {'proj': 'longlat', 'towgs84': self._default_towgs84, 'no_defs': '', 'a': semi_major_axis,
'b': semi_major_axis}
CoordinateReferenceSystem.__init__(self, value=value, name='latitude_longitude')
AbstractCRS.__init__(self, angular_units=angular_units)
self.semi_major_axis = semi_major_axis
class Cartesian(AbstractCRS):
"""
A regular Cartesian coordinate system.
"""
name = 'cartesian'
is_geographic = True
is_wrappable = False
def as_variable(self):
ret = super(Cartesian, self).as_variable()
if self.linear_units is not None:
ret[CFName.UNITS] = str(self.linear_units)
return ret
def transform_geometry(self, other_crs, geom, inverse=False):
if not isinstance(other_crs, (Spherical, Tripole)):
raise CRSNotEquivalenError(self, other_crs)
return other_crs.transform_geometry(self, geom, inverse=True)
def transform_grid(self, other_crs, grid, inverse=False):
if not isinstance(other_crs, (Spherical, Tripole)):
raise CRSNotEquivalenError(self, other_crs)
return other_crs.transform_grid(self, grid, inverse=True)
class Tripole(AbstractSphericalCoordinateReferenceSystem, AbstractCRS):
"""
A spherical representation of the Earth's surface having three poles (singularities).
:param spherical: The spherical basis for the tripole coordinate system.
:type spherical: :class:`ocgis.variable.crs.Spherical`
"""
name = 'tripole'
def __init__(self, spherical=None):
if spherical is None:
spherical = Spherical()
self.spherical = spherical
AbstractCRS.__init__(self, linear_units=spherical.linear_units,
angular_units=spherical.angular_units)
class WGS84(CoordinateReferenceSystem):
"""
A representation of the Earth using the WGS84 datum (i.e. EPSG code 4326).
"""
_cf_attributes = {DimensionMapKey.X: {CFName.UNITS: 'degrees_east',
CFName.STANDARD_NAME: 'longitude'},
DimensionMapKey.Y: {CFName.UNITS: 'degrees_north',
CFName.STANDARD_NAME: 'latitude'}}
_default_towgs84 = '0,0,0,0,0,0,0'
_fuzzy_value = {'datum': 'WGS84', 'proj': 'longlat'}
def __init__(self):
value = {'no_defs': True, 'datum': 'WGS84', 'proj': 'longlat', 'towgs84': '0,0,0,0,0,0,0'}
try:
super(WGS84, self).__init__(value=value, name='WGS84_EPSG_4326')
except:
value['ellps'] = value['datum']
super(WGS84, self).__init__(value=value, name='WGS84_EPSG_4326')
def __eq__(self, other):
ret = super(WGS84, self).__eq__(other)
if not ret:
# Versions handle values differently in GDAL/PROJ.4.
try:
ret = other.value == {'proj': 'longlat', 'datum': 'WGS84', 'no_defs': True}
except AttributeError:
ret = False
return ret
@property
def proj4(self):
return '+proj=longlat +ellps=WGS84 +towgs84=0,0,0,0,0,0,0 +no_defs '
@six.add_metaclass(abc.ABCMeta)
class CFCoordinateReferenceSystem(CoordinateReferenceSystem):
_find_projection_coordinates = True
# Alternative grid mapping names to check. Should be a tuple in subclasses.
_fuzzy_grid_mapping_names = None
# Default map parameter values.
map_parameters_defaults = {}
def __init__(self, **kwds):
self.projection_x_coordinate = kwds.pop('projection_x_coordinate', None)
self.projection_y_coordinate = kwds.pop('projection_y_coordinate', None)
# Always provide a default name for the CF-based coordinate systems.
name = kwds.pop('name', self.grid_mapping_name)
check_keys = list(kwds.keys())
for key in list(kwds.keys()):
check_keys.remove(key)
if len(check_keys) > 0:
raise ValueError('The keyword parameter(s) "{0}" was/were not provided.')
self.map_parameters_values = kwds
crs = {'proj': self.proj_name}
for k in list(self.map_parameters.keys()):
if k in self.iterable_parameters:
v = getattr(self, self.iterable_parameters[k])(kwds[k])
crs.update(v)
else:
try:
crs.update({self.map_parameters[k]: kwds[k]})
except KeyError:
# Attempt to load any default map parameter values.
crs.update({self.map_parameters[k]: self.map_parameters_defaults[k]})
super(CFCoordinateReferenceSystem, self).__init__(value=crs, name=name)
@abc.abstractproperty
def grid_mapping_name(self):
str
@abc.abstractproperty
def iterable_parameters(self):
dict
@abc.abstractproperty
def map_parameters(self):
dict
@abc.abstractproperty
def proj_name(self):
str
def format_standard_parallel(self, value):
if isinstance(value, np.ndarray):
value = value.tolist()
ret = {}
try:
it = iter(value)
except TypeError:
it = [value]
for ii, v in enumerate(it, start=1):
ret.update({self.map_parameters['standard_parallel'].format(ii): v})
return ret
@classmethod
def get_fuzzy_names(cls):
ret = list(get_iter(cls.grid_mapping_name))
if cls._fuzzy_grid_mapping_names is not None:
ret += list(get_iter(cls._fuzzy_grid_mapping_names))
return tuple(ret)
@classmethod
def load_from_metadata(cls, var, meta, strict=True):
def _get_projection_coordinate_(target, meta):
key = 'projection_{0}_coordinate'.format(target)
for k, v in list(meta['variables'].items()):
if 'standard_name' in v['attrs']:
if v['attrs']['standard_name'] == key:
return k
raise ProjectionCoordinateNotFound(key)
r_var = meta['variables'][var]
try:
# Look for the grid_mapping attribute on the target variable.
r_grid_mapping = meta['variables'][r_var['attrs']['grid_mapping']]
except KeyError:
# Attempt to match the class's grid mapping name across variables if strictness allows.
if not strict:
r_grid_mapping = None
fuzzy_names = cls.get_fuzzy_names()
for var_name, var_meta in list(meta['variables'].items()):
if var_meta.get('name', var_name) in fuzzy_names:
r_grid_mapping = var_meta
break
if r_grid_mapping is None:
raise ProjectionDoesNotMatch
else:
raise ProjectionDoesNotMatch
try:
grid_mapping_name = r_grid_mapping['attrs']['grid_mapping_name']
except KeyError:
raise ProjectionDoesNotMatch
if grid_mapping_name != cls.grid_mapping_name:
raise ProjectionDoesNotMatch
# get the projection coordinates if not turned off by class attribute.
if cls._find_projection_coordinates:
pc_x, pc_y = [_get_projection_coordinate_(target, meta) for target in ['x', 'y']]
else:
pc_x, pc_y = None, None
kwds = r_grid_mapping['attrs'].copy()
kwds.pop('grid_mapping_name', None)
kwds['projection_x_coordinate'] = pc_x
kwds['projection_y_coordinate'] = pc_y
# add the correct name to the coordinate system
kwds['name'] = r_grid_mapping['name']
cls._load_from_metadata_finalize_(kwds, var, meta)
return cls(**kwds)
@classmethod
def _load_from_metadata_finalize_(cls, kwds, var, meta):
pass
def write_to_rootgrp(self, rootgrp, with_proj4=False, **kwargs):
variable = super(CFCoordinateReferenceSystem, self).write_to_rootgrp(rootgrp, with_proj4=with_proj4)
variable.grid_mapping_name = self.grid_mapping_name
for k, v in self.map_parameters_values.items():
if v is None:
v = ''
setattr(variable, k, v)
return variable
class CFSpherical(Spherical, CFCoordinateReferenceSystem):
grid_mapping_name = 'latitude_longitude'
iterable_parameters = None
map_parameters = None
proj_name = None
def __init__(self, *args, **kwargs):
self.map_parameters_values = {}
Spherical.__init__(self, *args, **kwargs)
@classmethod
def load_from_metadata(cls, var, meta, strict=False, depth=1):
variables = meta['variables']
if depth == 1:
variable_attrs = variables[var].get('attrs', {})
r_grid_mapping = variable_attrs.get('grid_mapping')
r_grid_mapping_name = variable_attrs.get('grid_mapping_name')
if cls.grid_mapping_name in (r_grid_mapping, r_grid_mapping_name):
return cls()
else:
raise ProjectionDoesNotMatch
elif depth == 2:
# Check for standard names on variables as spherical is often not defined in the metadata file.
found = {}
for k, v in cls._cf_attributes.items():
sn = v[CFName.STANDARD_NAME]
found[sn] = []
for vname, vmeta in variables.items():
if vmeta['attrs'].get(CFName.STANDARD_NAME) == sn:
found[sn].append(vname)
# There should be only one variable found for each standard name.
counts = [len(v) for v in found.values()]
if any([c > 1 for c in counts]) or any([c == 0 for c in counts]):
raise ProjectionDoesNotMatch
else:
return cls()
else:
raise CRSDepthNotImplemented(depth)
class CFAlbersEqualArea(CFCoordinateReferenceSystem):
grid_mapping_name = 'albers_conical_equal_area'
iterable_parameters = {'standard_parallel': 'format_standard_parallel'}
map_parameters = {'standard_parallel': 'lat_{0}',
'longitude_of_central_meridian': 'lon_0',
'latitude_of_projection_origin': 'lat_0',
'false_easting': 'x_0',
'false_northing': 'y_0'}
proj_name = 'aea'
class CFLambertConformal(CFCoordinateReferenceSystem):
grid_mapping_name = 'lambert_conformal_conic'
iterable_parameters = {'standard_parallel': 'format_standard_parallel'}
map_parameters = {'standard_parallel': 'lat_{0}',
'longitude_of_central_meridian': 'lon_0',
'latitude_of_projection_origin': 'lat_0',
'false_easting': 'x_0',
'false_northing': 'y_0',
'units': 'units'}
map_parameters_defaults = {'false_easting': 0,
'false_northing': 0}
proj_name = 'lcc'
@classmethod
def _load_from_metadata_finalize_(cls, kwds, var, meta):
kwds['units'] = meta['variables'][kwds['projection_x_coordinate']]['attrs'].get('units')
class CFPolarStereographic(CFCoordinateReferenceSystem):
grid_mapping_name = 'polar_stereographic'
map_parameters = {'standard_parallel': 'lat_ts',
'latitude_of_projection_origin': 'lat_0',
'straight_vertical_longitude_from_pole': 'lon_0',
'false_easting': 'x_0',
'false_northing': 'y_0',
'scale_factor': 'k_0'}
proj_name = 'stere'
iterable_parameters = {}
def __init__(self, *args, **kwds):
if 'scale_factor' not in kwds:
kwds['scale_factor'] = 1.0
super(CFPolarStereographic, self).__init__(*args, **kwds)
class CFNarccapObliqueMercator(CFCoordinateReferenceSystem):
grid_mapping_name = 'transverse_mercator'
map_parameters = {'latitude_of_projection_origin': 'lat_0',
'longitude_of_central_meridian': 'lonc',
'scale_factor_at_central_meridian': 'k_0',
'false_easting': 'x_0',
'false_northing': 'y_0',
'alpha': 'alpha'}
proj_name = 'omerc'
iterable_parameters = {}
def __init__(self, *args, **kwds):
if 'alpha' not in kwds:
kwds['alpha'] = 360
super(CFNarccapObliqueMercator, self).__init__(*args, **kwds)
class CFRotatedPole(CFCoordinateReferenceSystem):
grid_mapping_name = 'rotated_latitude_longitude'
iterable_parameters = {}
map_parameters = {'grid_north_pole_longitude': None, 'grid_north_pole_latitude': None}
proj_name = 'omerc'
_find_projection_coordinates = False
_fuzzy_grid_mapping_names = ('rotated_pole', 'rotated_lat_lon')
_template = '+proj=ob_tran +o_proj=latlon +o_lon_p={lon_pole} +o_lat_p={lat_pole} +lon_0=180 +ellps={ellps}'
def __init__(self, *args, **kwds):
super(CFRotatedPole, self).__init__(*args, **kwds)
# this is the transformation string used in the proj operation
self._trans_proj = self._template.format(lon_pole=kwds['grid_north_pole_longitude'],
lat_pole=kwds['grid_north_pole_latitude'],
ellps=constants.PROJ4_ROTATED_POLE_ELLPS)
def update_with_rotated_pole_transformation(self, grid, inverse=False):
"""
:type grid: :class:`~ocgis.Grid`
:param bool inverse: If ``True``, this is an inverse transformation.
:rtype: :class:`~ocgis.Grid`
"""
assert grid.crs.is_geographic or isinstance(grid.crs, CFRotatedPole)
grid.remove_bounds()
grid.expand()
rlon, rlat = get_lonlat_rotated_pole_transform(grid.x.get_value().flatten(), grid.y.get_value().flatten(),
self._trans_proj, inverse=inverse)
grid.x.set_value(rlon.reshape(*grid.shape))
grid.y.set_value(rlat.reshape(*grid.shape))
def write_to_rootgrp(self, rootgrp, **kwargs):
"""
.. note:: See :meth:`~ocgis.interface.base.crs.CoordinateReferenceSystem.write_to_rootgrp`.
"""
variable = super(CFRotatedPole, self).write_to_rootgrp(rootgrp, **kwargs)
if kwargs.get(KeywordArgument.WITH_PROJ4, False):
variable.proj4 = ''
variable.proj4_transform = self._trans_proj
return variable
def create_crs(value, **kwargs):
"""
Create a coordinate system object from a dictionary definition. This will create a spherical coordinate system
object, for example, if there is a fuzzy match with that coordinate system.
:param dict value: The coordinate system's dictionary definition.
:param dict kwargs: Optional keyword arguments to the coordinate system's creation.
:return: :class:`ocgis.variable.crs.AbstractCRS`
"""
if value is None:
ret = None
else:
if isinstance(value, AbstractCRS):
value = value.value
to_check = np.array([Spherical, WGS84], dtype=object)
match = np.zeros(len(to_check), dtype=bool)
for idx, t in enumerate(to_check):
match[idx] = t.fuzzy_check(value)
found = to_check[match]
if found.size > 1:
raise ValueError('Multiple coordinate systems found: {}'.format(found))
elif found.size == 0:
ret = CoordinateReferenceSystem(value=value, **kwargs)
else:
ret = found[0]()
return ret
def get_lonlat_rotated_pole_transform(lon, lat, transform, inverse=False, is_vectorized=False):
"""
Transform longitude and latitude coordinates to/from their rotated pole representation.
:param lon: Vector of longitude coordinates.
:param lat: Vector of latitude coordinates.
:param str transform: The PROJ.4 transform string.
:param inverse: If ``True``, coordinates are in spherical longitude/latitude and should be transformed to rotated
pole.
:return: A tuple with the first element being transformed longitude and the second element being transformed
latitude.
:rtype: tuple
"""
import csv
import subprocess
class ProjDialect(csv.excel):
lineterminator = '\n'
delimiter = '\t'
f = tempfile.NamedTemporaryFile(mode='w')
try:
writer = csv.writer(f, dialect=ProjDialect)
if is_vectorized:
for lon_idx, lat_idx in itertools.product(*[list(range(lon.shape[0])), list(range(lat.shape[0]))]):
writer.writerow([lon[lon_idx], lat[lat_idx]])
else:
for idx in range(lon.shape[0]):
writer.writerow([lon[idx], lat[idx]])
f.flush()
cmd = transform.split(' ')
cmd.append(f.name)
if inverse:
program = 'invproj'
else:
program = 'proj'
cmd = [program, '-f', '"%.6f"', '-m', '57.2957795130823'] + cmd
capture = subprocess.check_output(cmd)
finally:
f.close()
capture = capture.decode()
coords = capture.split('\n')
new_coords = []
for ii, coord in enumerate(coords):
coord = coord.replace('"', '')
coord = coord.split('\t')
try:
coord = list(map(float, coord))
# likely empty string
except ValueError:
if coord[0] == '':
continue
else:
raise
new_coords.append(coord)
rlon_rlat = np.array(new_coords)
rlon = rlon_rlat[:, 0]
rlat = rlon_rlat[:, 1]
return rlon, rlat