import datetime
import itertools
from collections import deque
from copy import deepcopy
from decimal import Decimal
import netCDF4 as nc
import numpy as np
import six
from ocgis import constants, env, Dimension
from ocgis import netcdftime
from ocgis.constants import HeaderName, KeywordArgument
from ocgis.exc import EmptySubsetError, IncompleteSeasonError, CannotFormatTimeError, ResolutionError
from ocgis.util.helpers import get_is_date_between, iter_array, get_none_or_slice
from ocgis.variable.base import SourcedVariable, get_attribute_property, set_attribute_property
[docs]class TemporalVariable(SourcedVariable):
"""
.. note:: Accepts all parameters to :class:`~ocgis.SourcedVariable`.
:keyword str calendar: (``='standard'``) The calendar to use when converting from float to datetime objects. Any of
the netCDF-CF calendar tyes: http://unidata.github.io/netcdf4-python/netCDF4-module.html#num2date
:keyword str units: (``='days since 0000-01-01 00:00:00'``) The units string to use when converting from float to
datetime objects. See: http://unidata.github.io/netcdf4-python/netCDF4-module.html#num2date
:keyword bool format_time: (``=True``) If ``False``, do not allow access to ``datetime``-like objects. If these
properties are accessed, raise :class:``~ocgis.exc.CannotFormatTimeError``.
"""
_date_parts = ('year', 'month', 'day', 'hour', 'minute', 'second')
[docs] def __init__(self, **kwargs):
self._value_datetime = None
self._bounds_datetime = None
self._value_numtime = None
self._bounds_numtime = None
self.format_time = kwargs.pop('format_time', True)
calendar = kwargs.pop('calendar', 'auto')
if kwargs.get('name') is None:
kwargs['name'] = constants.DEFAULT_TEMPORAL_NAME
super(TemporalVariable, self).__init__(**kwargs)
if calendar != 'auto':
self.calendar = calendar
if self.calendar is None:
if calendar == 'auto':
calendar = constants.DEFAULT_TEMPORAL_CALENDAR
self.calendar = calendar
if self.units is None:
self.units = constants.DEFAULT_TEMPORAL_UNITS
def _getitem_finalize_(self, ret, slc):
if isinstance(slc, dict):
slc = slc[self.dimensions[0].name]
ret._value_numtime = get_none_or_slice(ret._value_numtime, slc)
ret._value_datetime = get_none_or_slice(ret._value_datetime, slc)
@property
def calendar(self):
"""
Get or set the calendar for the variable. If ``None``, the ``standard`` calendar will be used.
:rtype: str
"""
return get_attribute_property(self, 'calendar')
@calendar.setter
def calendar(self, value):
set_attribute_property(self, 'calendar', value)
if self.has_bounds:
set_attribute_property(self.bounds, 'calendar', value)
@property
def cfunits(self):
"""
:return: CF units object with appropriate calendar
"""
ret = super(TemporalVariable, self).cfunits
ret = ret.__class__(str(ret), calendar=self.calendar)
return ret
@property
def extent_datetime(self):
"""
:return: lower and upper time bounds as a two-element :class:`datetime.datetime` tuple
:rtype: tuple
"""
if not self.format_time:
raise CannotFormatTimeError('extent_datetime')
extent = self.extent
if get_datetime_conversion_state(extent[0]):
extent = self.get_datetime(extent)
return tuple(extent)
@property
def extent_numtime(self):
"""
:return: lower and upper time bounds as a two-element ``float`` tuple
:rtype: tuple
"""
extent = self.extent
if not get_datetime_conversion_state(extent[0]):
extent = self.get_numtime(extent)
return tuple(extent)
@property
def value_datetime(self):
"""
:return: time value as a :class:`datetime.datetime` masked ``object`` array
:rtype: :class:`numpy.ma.MaskedArray`
"""
if not self.format_time:
raise CannotFormatTimeError('value_datetime')
if self._value_datetime is None:
if get_datetime_conversion_state(self.get_value().flatten()[0]):
self._value_datetime = np.ma.array(self.get_datetime(self.get_value()), mask=self.get_mask(), ndmin=1,
fill_value=None)
else:
self._value_datetime = self.get_masked_value()
return self._value_datetime
@property
def value_numtime(self):
"""
:return: time value as a :class:`datetime.datetime` masked ``float`` array
:rtype: :class:`numpy.ma.MaskedArray`
"""
if self._value_numtime is None:
if not get_datetime_conversion_state(self.get_value().flatten()[0]):
self._value_numtime = np.ma.array(self.get_numtime(self.get_value()), mask=self.get_mask(), ndmin=1)
else:
self._value_numtime = self.get_masked_value()
return self._value_numtime
@property
def _has_months_units(self):
# Test if the units are the special case with months in the time units.
if str(self.units).startswith('months'):
ret = True
else:
ret = False
return ret
def _as_record_(self, *args, **kwargs):
add_parts = kwargs.pop('add_parts', True)
pytype_primitives = kwargs.get('pytype_primitives', False)
if pytype_primitives:
kwargs['formatter'] = str
ret = super(TemporalVariable, self)._as_record_(*args, **kwargs)
if add_parts and self.format_time:
value = self._get_iter_value_().flatten()[0]
mask = self.get_mask()
if mask is None:
mask = False
else:
mask = mask.flatten()[0]
if not mask:
ret[HeaderName.TEMPORAL_YEAR] = value.year
ret[HeaderName.TEMPORAL_MONTH] = value.month
ret[HeaderName.TEMPORAL_DAY] = value.day
else:
ret[HeaderName.TEMPORAL_YEAR] = None
ret[HeaderName.TEMPORAL_MONTH] = None
ret[HeaderName.TEMPORAL_DAY] = None
return ret
[docs] @classmethod
def from_variable(cls, variable, format_time=True):
"""
:param variable: The source variable to convert to a time variable.
:param bool format_time: See :class:`~ocgis.TemporalVariable`.
:return: a standard variable converted to a time variable
:rtype: :class:`~ocgis.TemporalVariable`
"""
bounds = variable.bounds
if variable.has_bounds:
bounds = cls.from_variable(bounds, format_time=format_time)
try:
request_dataset = variable._request_dataset
except AttributeError:
# Not all variables have request datasets.
request_dataset = None
ret = cls(name=variable.name, value=variable._value, mask=variable._mask, dimensions=variable._dimensions,
dtype=variable._dtype, attrs=variable._attrs, fill_value=variable._fill_value,
parent=variable.parent, bounds=bounds, format_time=format_time, request_dataset=request_dataset,
source_name=variable.source_name, should_init_from_source=False, uid=variable.uid)
return ret
[docs] def get_between(self, lower, upper, return_indices=False):
if get_datetime_conversion_state(self.get_value()[0]):
lower, upper = tuple(self.get_numtime([lower, upper]))
return super(TemporalVariable, self).get_between(lower, upper, return_indices=return_indices)
[docs] def get_datetime(self, arr):
"""
:param arr: An array of floats to convert ``datetime``-like objects.
:type arr: :class:`numpy.ndarray`
:returns: ``object`` array of the same shape as ``arr`` with float objects converted to ``datetime`` objects.
:rtype: :class:`numpy.ndarray`
"""
# If there are month units, call the special procedure to convert those to datetime objects.
if not self._has_months_units:
arr = np.atleast_1d(nc.num2date(arr, str(self.units), calendar=self.calendar))
dt = get_datetime_or_netcdftime
for idx, t in iter_array(arr, return_value=True):
# Attempt to convert times to datetime objects.
try:
arr[idx] = dt(t.year, t.month, t.day, t.hour, t.minute, t.second)
# This may fail for some calendars, in that case maintain the instance object returned from netcdftime.
# See: http://netcdf4-python.googlecode.com/svn/trunk/docs/netcdftime.netcdftime.datetime-class.html
except ValueError:
arr[idx] = arr[idx]
else:
arr = get_datetime_from_months_time_units(arr, str(self.units),
month_centroid=constants.CALC_MONTH_CENTROID)
return arr
[docs] def get_grouping(self, grouping):
"""
Create a temporally grouped variable using string group sequences.
:param grouping: The temporal grouping to use when creating the temporal group dimension.
>>> grouping = ['month']
:type grouping: `sequence` of :class:`str`
:rtype: :class:`~ocgis.variable.temporal.TemporalGroupVariable`
"""
# There is no need to go through the process of breaking out datetime parts when the grouping is 'all'.
if grouping == 'all':
new_bounds, date_parts, repr_dt, dgroups = self._get_grouping_all_()
# The process for getting "unique" seasons is also specialized.
elif 'unique' in grouping:
new_bounds, date_parts, repr_dt, dgroups = self._get_grouping_seasonal_unique_(grouping)
# For standard groups ("['month']") or seasons across entire time range.
else:
new_bounds, date_parts, repr_dt, dgroups = self._get_grouping_other_(grouping)
new_name = 'climatology_bounds'
time_dimension_name = self.dimensions[0].name
if self.has_bounds:
new_dimensions = [d.name for d in self.bounds.dimensions]
else:
new_dimensions = [time_dimension_name, 'bounds']
# Create the new time dimension as unlimited if the original time variable also has an unlimited dimension.
if self.dimensions[0].is_unlimited:
new_time_dimension = Dimension(name=time_dimension_name, size_current=len(repr_dt))
else:
new_time_dimension = time_dimension_name
new_dimensions[0] = new_time_dimension
new_bounds = TemporalVariable(value=new_bounds, name=new_name, dimensions=new_dimensions)
new_attrs = deepcopy(self.attrs)
# new_attrs['climatology'] = new_bounds.name
tgv = TemporalGroupVariable(grouping=grouping, date_parts=date_parts, bounds=new_bounds, dgroups=dgroups,
value=repr_dt, units=self.units, calendar=self.calendar, name=self.name,
attrs=new_attrs, dimensions=new_dimensions[0])
tgv.attrs.pop(TemporalVariable._bounds_attribute_name, None)
return tgv
[docs] def get_iter(self, **kwargs):
driver = kwargs.get(KeywordArgument.DRIVER)
ret = super(TemporalVariable, self).get_iter(**kwargs)
if driver is not None:
ret.formatter = driver.iterator_formatter_time_value
if self.has_bounds:
for follower in ret.followers:
follower.formatter = driver.iterator_formatter_time_bounds
return ret
[docs] def get_numtime(self, arr):
"""
:param arr: An array of ``datetime``-like objects to convert to numeric time.
:type arr: :class:`numpy.ndarray`
:returns: An array of numeric values with same shape as ``arr``.
:rtype: :class:`numpy.ndarray`
"""
arr = np.atleast_1d(arr)
try:
ret = nc.date2num(arr, str(self.units), calendar=self.calendar)
except (ValueError, TypeError):
# Special behavior for conversion of time units with months.
if self._has_months_units:
ret = get_num_from_months_time_units(arr, self.units, dtype=None)
else:
# Odd behavior in netcdftime objects? Try with datetime objects.
flat_arr = arr.flatten()
fill = np.zeros(flat_arr.shape, dtype=object)
for idx, element in enumerate(flat_arr):
fill[idx] = datetime.datetime(element.year, element.month, element.day, element.hour,
element.minute, element.second, element.microsecond)
fill = fill.reshape(arr.shape)
ret = np.atleast_1d(nc.date2num(fill, str(self.units), calendar=self.calendar))
return ret
[docs] def get_report(self):
"""
:return: sequence of descriptive strings about the time variable
:rtype: `sequence` of :class:`str`
"""
lines = super(TemporalVariable, self).get_report()
try:
if self.format_time:
res = int(self.resolution)
try:
start_date, end_date = self.extent_datetime
# The times may not be formattable.
except (ValueError, OverflowError) as e:
messages = ('year is out of range', 'month must be in 1..12', 'date value out of range')
if e.message in messages:
start_date, end_date = self.extent
else:
raise
else:
res = 'NA (non-formatted times requested)'
start_date, end_date = self.extent
# Raised if the temporal dimension has a single value.
except ResolutionError:
res = 'NA (singleton)'
start_date, end_date = self.extent
lines += ['Start Date = {0}'.format(start_date),
'End Date = {0}'.format(end_date),
'Calendar = {0}'.format(self.calendar),
'Units = {0}'.format(self.units),
'Resolution (Days) = {0}'.format(res)]
return lines
[docs] def get_subset_by_function(self, func, return_indices=False):
"""
Subset the temporal dimension by an arbitrary function. The functions must take one argument and one keyword.
The argument is a vector of ``datetime`` objects. The keyword argument should be called "bounds" and may be
``None``. If the bounds value is not ``None``, it should expect a n-by-2 array of ``datetime`` objects. The
function must return an integer sequence suitable for indexing. For example:
>>> def subset_func(value, bounds=None):
>>> indices = []
>>> for ii, v in enumerate(value):
>>> if v.month == 6:
>>> indices.append(ii)
>>> return indices
>>> td = TemporalDimension(...)
>>>
>>> td_subset = td.get_subset_by_function(subset_func)
:param func: The function to use for subsetting.
:type func: :class:`FunctionType`
:param bool return_indices: If ``True``, return the index integers used for slicing/subsetting of the target
object.
:rtype: :class:`~ocgis.TemporalVariable` | :class:`tuple`
"""
if self.has_bounds:
bounds = self.bounds.value_datetime
else:
bounds = None
indices = np.array(func(self.value_datetime, bounds=bounds))
ret = self[indices]
if return_indices:
ret = (ret, indices)
return ret
[docs] def get_time_region(self, time_region, return_indices=False):
"""
:param dict time_region: A dictionary defining the time region subset.
>>> time_region = {'month': [1, 2, 3], 'year': [2000]}
:param bool return_indices: If ``True``, also return the indices used to subset the variable.
:return: shallow copy of the sliced time variable
:rtype: :class:`~ocgis.TemporalVariable`
"""
assert isinstance(time_region, dict)
# return the values to use for the temporal region subsetting.
value = self.value_datetime
if self.has_bounds:
bounds = self.bounds.value_datetime
else:
bounds = None
# switch to indicate if bounds or centroid datetimes are to be used.
use_bounds = False if bounds is None else True
# remove any none values in the time_region dictionary. this will save
# time in iteration.
time_region = time_region.copy()
time_region = {k: v for k, v in time_region.items() if v is not None}
assert len(time_region) > 0
# this is the boolean selection array.
select = np.zeros(self.shape[0], dtype=bool)
# for each row, determine if the date criterion are met updating the
# select matrix accordingly.
row_check = np.zeros(len(time_region), dtype=bool)
for idx_row in range(select.shape[0]):
# do the comparison for each time_region element.
if use_bounds:
row = bounds[idx_row, :]
else:
row = value[idx_row]
for ii, (k, v) in enumerate(time_region.items()):
if use_bounds:
to_include = []
for element in v:
kwds = {k: element}
to_include.append(get_is_date_between(row[0], row[1], **kwds))
fill = any(to_include)
else:
part = getattr(row, k)
fill = True if part in v else False
row_check[ii] = fill
if row_check.all():
select[idx_row] = True
if not select.any():
raise EmptySubsetError(origin='temporal')
ret = self[select]
if return_indices:
raw_idx = np.arange(0, self.shape[0])[select]
ret = (ret, raw_idx)
return ret
def _get_grouping_all_(self):
"""
Applied when the grouping is 'all'.
"""
value = self.value_datetime
lower, upper = self.extent_datetime
# new bounds are simply the minimum and maximum values chosen either from
# the value or bounds array. bounds are given preference.
new_bounds = np.array([lower, upper]).reshape(-1, 2)
# date parts are not needed for the all case
date_parts = None
# the group should be set to select all data.
dgroups = [slice(None)]
# the representative datetime is the center of the value array.
repr_dt = np.array([value[int((self.shape[0] / 2) - 1)]])
return new_bounds, date_parts, repr_dt, dgroups
def _get_grouping_other_(self, grouping):
"""
Applied to groups other than 'all'.
"""
# map date parts to index positions in date part storage array and flip
# they key-value pairs
group_map = dict(list(zip(list(range(0, len(self._date_parts))), self._date_parts, )))
group_map_rev = dict(list(zip(self._date_parts, list(range(0, len(self._date_parts))), )))
# this array will hold the value data constructed differently depending
# on if temporal bounds are present
value = np.empty((self.get_value().shape[0], 3), dtype=object)
# reference the value and bounds datetime object arrays
value_datetime = self.value_datetime
if self.has_bounds:
value_datetime_bounds = self.bounds.value_datetime
else:
value_datetime_bounds = None
# populate the value array depending on the presence of bounds
if value_datetime_bounds is None:
value[:, :] = value_datetime.reshape(-1, 1)
# bounds are currently not used for the grouping mechanism
else:
value[:, 0] = value_datetime_bounds[:, 0]
value[:, 1] = value_datetime
value[:, 2] = value_datetime_bounds[:, 1]
def _get_attrs_(dt):
return ([dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second])
# extract the date parts
parts = np.empty((len(self.get_value()), len(self._date_parts)), dtype=int)
for row in range(parts.shape[0]):
parts[row, :] = _get_attrs_(value[row, 1])
# grouping is different for date part combinations v. seasonal
# aggregation.
if all([isinstance(ii, six.string_types) for ii in grouping]):
unique = deque()
for idx in range(parts.shape[1]):
if group_map[idx] in grouping:
fill = np.unique(parts[:, idx])
else:
fill = [None]
unique.append(fill)
select = deque()
idx2_seq = list(range(len(self._date_parts)))
for idx in itertools.product(*[list(range(len(u))) for u in unique]):
select.append([unique[idx2][idx[idx2]] for idx2 in idx2_seq])
select = np.array(select)
dgroups = deque()
idx_cmp = [group_map_rev[group] for group in grouping]
keep_select = []
for idx in range(select.shape[0]):
match = select[idx, idx_cmp] == parts[:, idx_cmp]
dgrp = match.all(axis=1)
if dgrp.any():
keep_select.append(idx)
dgroups.append(dgrp)
select = select[keep_select, :]
assert (len(dgroups) == select.shape[0])
dtype = [(dp, object) for dp in self._date_parts]
# this is for seasonal aggregations
else:
# we need to remove the year string from the grouping and do
# not want to modify the original list
grouping = deepcopy(grouping)
# search for a year flag, which will break the temporal groups by
# years
if 'year' in grouping:
has_year = True
grouping = list(grouping)
grouping.remove('year')
years = np.unique(parts[:, 0])
else:
has_year = False
years = [None]
dgroups = deque()
grouping_season = deque()
# sort the arrays to ensure the ordered in ascending order
years.sort()
grouping = get_sorted_seasons(grouping, method='min')
for year, season in itertools.product(years, grouping):
subgroup = np.zeros(value.shape[0], dtype=bool)
for idx in range(value.shape[0]):
if has_year:
if parts[idx, 1] in season and year == parts[idx, 0]:
subgroup[idx] = True
else:
if parts[idx, 1] in season:
subgroup[idx] = True
dgroups.append(subgroup)
grouping_season.append([season, year])
dtype = [('months', object), ('year', int)]
grouping = grouping_season
# init arrays to hold values and bounds for the grouped data
new_value = np.empty((len(dgroups),), dtype=dtype)
new_bounds = np.empty((len(dgroups), 2), dtype=object)
for idx, dgrp in enumerate(dgroups):
# Tuple conversion is required for structure arrays: http://docs.scipy.org/doc/numpy/user/basics.rec.html#filling-structured-arrays
try:
new_value[idx] = tuple(select[idx])
# likely a seasonal aggregation with a different group representation
except UnboundLocalError:
try:
new_value[idx] = (grouping[idx][0], grouping[idx][1])
# there is likely no year associated with the seasonal aggregation
# and it is a Nonetype
except TypeError:
new_value[idx]['months'] = grouping[idx][0]
sel = value[dgrp][:, (0, 2)]
new_bounds[idx, :] = [sel.min(), sel.max()]
new_bounds = np.atleast_2d(new_bounds).reshape(-1, 2)
date_parts = np.atleast_1d(new_value)
# This is the representative center time for the temporal group.
repr_dt = self._get_grouping_representative_datetime_(grouping, new_bounds, date_parts)
return new_bounds, date_parts, repr_dt, dgroups
def _get_grouping_representative_datetime_(self, grouping, bounds, value):
ref_value = value
ref_bounds = bounds
ret = np.empty((ref_value.shape[0],), dtype=object)
try:
set_grouping = set(grouping)
if set_grouping == {'month'}:
ref_calc_month_centroid = constants.CALC_MONTH_CENTROID
for idx in range(ret.shape[0]):
month = ref_value[idx]['month']
# Get the start year from the bounds data.
start_year = ref_bounds[idx][0].year
ret[idx] = get_datetime_or_netcdftime(start_year, month, ref_calc_month_centroid)
elif set_grouping == {'year'}:
ref_calc_year_centroid_month = constants.CALC_YEAR_CENTROID_MONTH
ref_calc_year_centroid_day = constants.CALC_YEAR_CENTROID_DAY
for idx in range(ret.shape[0]):
year = ref_value[idx]['year']
ret[idx] = get_datetime_or_netcdftime(year, ref_calc_year_centroid_month,
ref_calc_year_centroid_day)
elif set_grouping == {'month', 'year'}:
ref_calc_month_centroid = constants.CALC_MONTH_CENTROID
for idx in range(ret.shape[0]):
year, month = ref_value[idx]['year'], ref_value[idx]['month']
ret[idx] = get_datetime_or_netcdftime(year, month, ref_calc_month_centroid)
elif set_grouping == {'day'}:
for idx in range(ret.shape[0]):
start_year, start_month = ref_bounds[idx][0].year, ref_bounds[idx][0].month
ret[idx] = get_datetime_or_netcdftime(start_year, start_month, ref_value[idx]['day'], hour=12)
elif set_grouping == {'day', 'month'}:
for idx in range(ret.shape[0]):
start_year = ref_bounds[idx][0].year
day, month = ref_value[idx]['day'], ref_value[idx]['month']
ret[idx] = get_datetime_or_netcdftime(start_year, month, day, hour=12)
elif set_grouping == {'day', 'year'}:
for idx in range(ret.shape[0]):
day, year = ref_value[idx]['day'], ref_value[idx]['year']
ret[idx] = get_datetime_or_netcdftime(year, constants.CALC_YEAR_CENTROID_MONTH, day, hour=12)
elif set_grouping == {'day', 'year', 'month'}:
for idx in range(ret.shape[0]):
day, year, month = ref_value[idx]['day'], ref_value[idx]['year'], ref_value[idx]['month']
ret[idx] = get_datetime_or_netcdftime(year, month, day, hour=12)
else:
raise NotImplementedError('grouping: {0}'.format(grouping))
# Likely a seasonal aggregation.
except TypeError:
# Set for testing if seasonal group crosses the end of a year.
cross_months_set = set([12, 1])
for idx in range(ret.shape[0]):
r_bounds = bounds[idx, :]
# The season crosses into a new year, find the middles differently.
r_value_months = value[idx]['months']
if cross_months_set.issubset(r_value_months):
middle_index = int(np.floor(len(r_value_months) / 2))
center_month = r_value_months[middle_index]
else:
center_month = int(np.floor(np.mean([r_bounds[0].month, r_bounds[1].month])))
center_year = int(np.floor(np.mean([r_bounds[0].year, r_bounds[1].year])))
fill = get_datetime_or_netcdftime(center_year, center_month, constants.CALC_MONTH_CENTROID)
ret[idx] = fill
return ret
def _get_grouping_seasonal_unique_(self, grouping):
"""
:param list grouping: A seasonal list containing the unique flag.
>>> grouping = [[12, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11], 'unique']
:returns: A tuple of elements necessary to create a :class:`ocgis.interface.base.dimension.temporal.TemporalGroupDimension`
object.
:rtype: tuple
"""
# remove the unique keyword from the list
grouping = list(deepcopy(grouping))
grouping.remove('unique')
grouping = get_sorted_seasons(grouping)
# turn the seasons into time regions
time_regions = get_time_regions(grouping, self.value_datetime, raise_if_incomplete=False)
# holds the boolean selection arrays
dgroups = deque()
new_bounds = np.array([], dtype=object).reshape(-1, 2)
repr_dt = np.array([], dtype=object)
# return temporal dimensions and convert to groups
for dgroup, sub in iter_boolean_groups_from_time_regions(time_regions, self, yield_subset=True,
raise_if_incomplete=False):
dgroups.append(dgroup)
sub_value_datetime = sub.value_datetime
new_bounds = np.vstack((new_bounds, [min(sub_value_datetime), max(sub_value_datetime)]))
repr_dt = np.append(repr_dt, sub_value_datetime[int(sub.shape[0] / 2)])
# no date parts yet...
date_parts = None
return new_bounds, date_parts, repr_dt, dgroups
def _get_iter_value_(self):
if self.format_time:
ret = self.value_datetime
else:
ret = self.value_numtime
return ret
def _get_to_conform_value_(self):
return self.value_numtime
def _set_to_conform_value_(self, value):
# Wipe the original values.
self._value_numtime = None
self._value_datetime = None
self._bounds_numtime = None
self._bounds_datetime = None
# Set the new value.
self.set_value(value)
def _set_metadata_from_source_finalize_(self, *args, **kwargs):
var = args[0]
if hasattr(var, 'calendar'):
self.calendar = var.calendar
[docs] def set_bounds(self, value, **kwargs):
super(TemporalVariable, self).set_bounds(value, **kwargs)
if value is not None:
value.calendar = self.calendar
def _set_units_(self, value):
try:
self.calendar = value.calendar
except AttributeError:
if value is not None and not isinstance(value, six.string_types):
raise
# "cfunits" appends the calendar name to the string representation.
value = str(value)
if 'calendar=' in value:
value = value.split('calendar=')[0].strip()
super(TemporalVariable, self)._set_units_(value)
[docs] def set_value(self, value, **kwargs):
# Special handling for template units.
if str(self.units) == 'day as %Y%m%d.%f' and self.format_time:
value = get_datetime_from_template_time_units(value)
# Update the units.
self.units = constants.DEFAULT_TEMPORAL_UNITS
super(TemporalVariable, self).set_value(value, **kwargs)
[docs]class TemporalGroupVariable(TemporalVariable):
"""
Stores temporal grouping information for a time variable. Behaves like a time variable in all other aspects.
.. note:: Accepts all parameters to :class:`~ocgis.TemporalVariable`.
Additional keyword arguments are:
:keyword grouping: (``=None``) See :meth:`~ocgis.TemporalVariable.get_grouping`.
:keyword dgroups: (``=None``) Sequence of boolean arrays defining each unique temporal group.
:type dgroups: `sequence` of :class:`numpy.ndarray`
:keyword date_parts: (``=None``) Sequence of date part tuples.
:type date_parts: `sequence` of :class:`tuple`
"""
_bounds_attribute_name = 'climatology'
[docs] def __init__(self, *args, **kwargs):
self.grouping = kwargs.pop('grouping', None)
self.dgroups = kwargs.pop('dgroups', None)
self.date_parts = kwargs.pop('date_parts', None)
super(TemporalGroupVariable, self).__init__(*args, **kwargs)
def get_datetime_conversion_state(archetype):
"""
:param archetype: The object to test for conversion to datetime.
:type archetyp: float, :class:`datetime.datetime`, or :class:`netcdftime.datetime`
:returns: ``True`` if the object should be converted to datetime.
:rtype: bool
"""
if hasattr(archetype, 'year') and hasattr(archetype, 'month'):
ret = False
else:
ret = True
return ret
def get_datetime_from_months_time_units(vec, units, month_centroid=16):
"""
Convert a vector of months offsets into :class:``datetime.datetime`` objects.
:param vec: Vector of integer month offsets.
:type vec: :class:``np.ndarray``
:param str units: Source units to parse.
:param month_centroid: The center day of the month to use when creating the :class:``datetime.datetime`` objects.
>>> units = "months since 1978-12"
>>> vec = np.array([0,1,2,3])
>>> get_datetime_from_months_time_units(vec,units)
array([1978-12-16 00:00:00, 1979-01-16 00:00:00, 1979-02-16 00:00:00,
1979-03-16 00:00:00], dtype=object)
"""
# only work with integer inputs
vec = np.array(vec, dtype=int)
def _get_datetime_(current_year, origin_month, offset_month, current_month_correction, month_centroid):
return datetime.datetime(current_year, (origin_month + offset_month) - current_month_correction, month_centroid)
origin = get_origin_datetime_from_months_units(units)
origin_month = origin.month
current_year = origin.year
current_month_correction = 0
if vec[0] >= 12:
current_year += int(vec[0] / 12)
vec = vec - (int(vec[0] / 12) * 12)
ret = np.ones(len(vec), dtype=object)
for ii, offset_month in enumerate(vec):
try:
fill = _get_datetime_(current_year, origin_month, offset_month, current_month_correction, month_centroid)
except ValueError:
current_month_correction += 12
current_year += 1
fill = _get_datetime_(current_year, origin_month, offset_month, current_month_correction, month_centroid)
ret[ii] = fill
return ret
def get_datetime_from_template_time_units(vec):
"""
:param vec: A one-dimensional array of floats.
:type vec: :class:`numpy.ndarray`
:returns: An object array with same shape as ``vec`` containing datetime objects.
:rtype: :class:`numpy.ndarray`
"""
dt = get_datetime_or_netcdftime
fill = np.empty_like(vec, dtype=object)
for idx, element in enumerate(vec.flat):
ymd, hm = str(int(element)), element - int(element)
year = int(ymd[0:4])
month = int(ymd[4:6])
day = int(ymd[6:8])
hour = 24 * hm
minute = int((Decimal(hour) % 1) * 60)
hour = int(hour)
fill[idx] = dt(year, month, day, hour, minute)
return fill
def get_datetime_or_netcdftime(*args, **kwargs):
if env.PREFER_NETCDFTIME:
try:
ret = netcdftime.datetime(*args, **kwargs)
except ValueError:
# Assume the datetime object is not compatible with the arguments. Return a netcdftime object.
ret = datetime.datetime(*args, **kwargs)
else:
try:
ret = datetime.datetime(*args, **kwargs)
except ValueError:
ret = netcdftime.datetime(*args, **kwargs)
return ret
def get_difference_in_months(origin, target):
"""
Get the integer difference in months between an origin and target datetime.
:param :class:``datetime.datetime`` origin: The origin datetime object.
:param :class:``datetime.datetime`` target: The target datetime object.
>>> get_difference_in_months(datetime.datetime(1978, 12, 1), datetime.datetime(1979, 3, 1))
3
>>> get_difference_in_months(datetime.datetime(1978, 12, 1), datetime.datetime(1978, 7, 1))
-5
"""
def _count_(start_month, stop_month, start_year, stop_year, direction):
count = 0
curr_month = start_month
curr_year = start_year
while True:
if curr_month == stop_month and curr_year == stop_year:
break
else:
pass
if direction == 'forward':
curr_month += 1
elif direction == 'backward':
curr_month -= 1
else:
raise NotImplementedError
if curr_month == 13:
curr_month = 1
curr_year += 1
if curr_month == 0:
curr_month = 12
curr_year -= 1
if direction == 'forward':
count += 1
else:
count -= 1
return count
origin_month, origin_year = origin.month, origin.year
target_month, target_year = target.month, target.year
if origin <= target:
direction = 'forward'
else:
direction = 'backward'
diff_months = _count_(origin_month, target_month, origin_year, target_year, direction)
return diff_months
def get_is_interannual(sequence):
"""
Returns ``True`` if an integer sequence representing a season crosses a year boundary.
>>> sequence = [11,12,1]
>>> get_is_interannual(sequence)
True
"""
if 12 in sequence and 1 in sequence:
ret = True
else:
ret = False
return ret
def get_num_from_months_time_units(vec, units, dtype=None):
"""
Convert a vector of :class:``datetime.datetime`` objects into an integer vector.
:param vec: Input vector to convert.
:type vec: :class:``np.ndarray``
:param str units: Source units to parse.
:param type dtype: Output vector array type.
>>> units = "months since 1978-12"
>>> vec = np.array([datetime.datetime(1978,12,1),datetime.datetime(1979,1,1)])
>>> get_num_from_months_time_units(vec,units)
array([0, 1])
"""
origin = get_origin_datetime_from_months_units(units)
ret = [get_difference_in_months(origin, target) for target in vec]
return np.array(ret, dtype=dtype)
def get_origin_datetime_from_months_units(units):
"""
Get the origin Python :class:``datetime.datetime`` object from a month string.
:param str units: Source units to parse.
:returns: :class:``datetime.datetime``
>>> units = "months since 1978-12"
>>> get_origin_datetime_from_months_units(units)
datetime.datetime(1978, 12, 1, 0, 0)
"""
origin = ' '.join(units.split(' ')[2:])
to_try = ['%Y-%m', '%Y-%m-%d %H']
converted = False
for tt in to_try:
try:
origin = datetime.datetime.strptime(origin, tt)
converted = True
break
except ValueError as e:
continue
if not converted:
raise e
return origin
def get_sorted_seasons(seasons, method='max'):
"""
Sorts ``seasons`` sequence by ``method`` of season elements.
>>> seasons = [[9,10,11],[12,1,2],[6,7,8]]
>>> get_sorted_seasons(seasons)
[[6,7,8],[9,10,11],[12,1,2]]
:type seasons: list[list[int]]
:type method: str
:rtype: list[list[int]]
"""
methods = {'min': min, 'max': max}
season_map = {}
for ii, season in enumerate(seasons):
season_map[ii] = season
max_map = {}
for key, value in season_map.items():
max_map[methods[method](value)] = key
sorted_maxes = sorted(max_map)
ret = [seasons[max_map[s]] for s in sorted_maxes]
ret = deepcopy(ret)
return ret
def get_time_regions(seasons, dates, raise_if_incomplete=True):
"""
>>> seasons = [[6,7,8],[9,10,11],[12,1,2]]
>>> dates = <vector of datetime objects>
"""
# extract the years from the data vector collapsing them to a unique set then sort in ascending order
years = list(set([d.year for d in dates]))
years.sort()
# holds the return value
time_regions = []
# the interannual cases requires two time region sequences to properly extract. there must be at least two years to
# care about interannual seasons.
if len(years) > 1:
# determine if any of the seasons are interannual
interannual_check = list(map(get_is_interannual, seasons))
else:
interannual_check = [False]
if any(interannual_check):
# loop over years first to ensure each year is accounted for in the time region output
for ii_year, year in enumerate(years):
# the interannual flag is used internally for simple optimization
for ic, cg in zip(interannual_check, seasons):
# if no exception is raised for an incomplete season, this flag indicate whether to append to the output
append_to_time_regions = True
if ic:
# copy and sort in descending order the season because december of the current year should be first.
_cg = deepcopy(cg)
_cg.sort()
_cg.reverse()
# look for the interannual break and split the season into the current year and next year.
diff = np.abs(np.diff(_cg))
split_base = np.arange(1, len(_cg))
split_indices = split_base[diff > 1]
split = np.split(_cg, split_indices)
# will hold the sub-element time regions
sub_time_region = []
for ii_split, s in enumerate(split):
try:
to_append_sub = {'year': [years[ii_year + ii_split]], 'month': s.tolist()}
sub_time_region.append(to_append_sub)
# there may not be another year of data for an interannual season. we DO NOT keep incomplete
# seasons.
except IndexError:
# don't just blow through an incomplete season unless asked to
if raise_if_incomplete:
raise IncompleteSeasonError(None, None, None)
else:
append_to_time_regions = False
continue
to_append = sub_time_region
else:
to_append = [{'year': [year], 'month': cg}]
if append_to_time_regions:
time_regions.append(to_append)
# without interannual seasons the time regions are unique combos of the years and seasons designations
else:
for year, season in itertools.product(years, seasons):
time_regions.append([{'year': [year], 'month': season}])
# ensure each time region is valid. if it is not, remove it from the returned list
td = TemporalVariable(value=dates, dimensions=constants.DimensionName.TEMPORAL)
remove = []
for idx, time_region in enumerate(time_regions):
try:
for sub_time_region in time_region:
td.get_time_region(sub_time_region)
except EmptySubsetError:
remove.append(idx)
for xx in remove:
time_regions.pop(xx)
return time_regions
def iter_boolean_groups_from_time_regions(time_regions, tvar, yield_subset=False,
raise_if_incomplete=True):
"""
:param time_regions: Sequence of nested time region dictionaries.
>>> [[{'month':[1,2],'year':[2024]},...],...]
:param tvar: A temporal variable object.
:type tvar: :class:`ocgis.TemporalVariable`
:param bool yield_subset: If ``True``, yield a tuple with the subset of ``tvar``.
:param bool raise_if_incomplete: If ``True``, raise an exception if the season is incomplete.
:returns: boolean ndarray vector with yld.shape == tvar.shape
:raises: IncompleteSeasonError
"""
for sub_time_regions in time_regions:
# incomplete seasons are searched for in the nested loop. this indicates if a time region group should be
# considered a season.
is_complete = True
idx_append = np.array([], dtype=int)
for time_region in sub_time_regions:
sub, idx = tvar.get_time_region(time_region, return_indices=True)
# insert a check to ensure there are months present for each time region
months = set([d.month for d in sub.value_datetime])
try:
assert (months == set(time_region['month']))
except AssertionError:
if raise_if_incomplete:
for m in time_region['month']:
if m not in months:
raise IncompleteSeasonError(time_region, month=m)
else:
is_complete = False
idx_append = np.append(idx_append, idx)
# if the season is complete append, otherwise pass to next iteration.
if is_complete:
dgroup = np.zeros(tvar.shape[0], dtype=bool)
dgroup[idx_append] = True
else:
continue
if yield_subset:
yld = (dgroup, tvar[dgroup])
else:
yld = dgroup
yield yld