Source code for ocgis.calc.library.index.duration

from collections import OrderedDict

import numpy as np
from ocgis import env
from ocgis.calc import base
from ocgis.exc import DefinitionValidationError
from ocgis.util.helpers import iter_array


[docs]class Duration(base.AbstractUnivariateSetFunction, base.AbstractParameterizedFunction): key = 'duration' parms_definition = {'threshold': float, 'operation': str, 'summary': str} # Output data type will vary by the summary operation (e.g. float for mean, int for max) description = 'Summarizes consecutive occurrences in a sequence where the logical operation returns TRUE. The summary operation is applied to the sequences within a temporal aggregation.' standard_name = 'duration' long_name = 'Duration'
[docs] def calculate(self, values, threshold=None, operation=None, summary='mean'): """ :param threshold: The threshold value to use for the logical operation. :type threshold: float :param operation: The logical operation. One of 'gt','gte','lt', or 'lte'. :type operation: str :param summary: The summary operation to apply the durations. One of 'mean','median','std','max', or 'min'. :type summary: str """ assert (len(values.shape) == 3) # storage array for counts shp_out = values.shape[-2:] store = np.zeros(shp_out, dtype=self.dtype).flatten() # get the summary operation from the numpy library summary_operation = getattr(np, summary) # find longest sequence for each geometry across time dimension for ii, fill in enumerate(self._iter_consecutive_(values, threshold, operation)): # case of only a singular occurrence if len(fill) > 1: fill = summary_operation(fill) else: try: fill = fill[0] # value is likely masked except IndexError: fill = 0 store[ii] = fill store.resize(shp_out) # update the output mask. this only applies to geometries so pick the # first masked time field store = np.ma.array(store, mask=values.mask[0, :, :]) return store
def _iter_consecutive_(self, values, threshold, operation): # time index reference ref = np.arange(0, values.shape[0]) # perform requested logical operation if operation == 'gt': arr = values > threshold elif operation == 'lt': arr = values < threshold elif operation == 'gte': arr = values >= threshold elif operation == 'lte': arr = values <= threshold # find longest sequence for each geometry across time dimension for rowidx, colidx in iter_array(values[0, :, :], use_mask=False): vec = arr[:, rowidx, colidx].reshape(-1) # check first if there is a longer series than 1 if np.any(np.diff(ref[vec]) == 1): # find locations where the values switch diff_idx = np.diff(vec) if diff_idx.shape != ref.shape: diff_idx = np.append(diff_idx, [False]) split_idx = ref[diff_idx] + 1 splits = np.array_split(vec, split_idx) fill = [a.sum() for a in splits if np.all(a)] # case of only a singular occurrence elif np.any(vec): fill = [1] # case for no occurrence else: fill = [0] yield (fill) @classmethod def validate(cls, ops): if 'year' not in ops.calc_grouping: msg = 'Calculation grouping must include "year" for duration calculations.' raise DefinitionValidationError('calc', msg)
[docs]class FrequencyDuration(base.AbstractKeyedOutputFunction, Duration): key = 'freq_duration' description = 'Count the frequency of spell durations within the temporal aggregation.' dtype_default = object structure_dtype = OrderedDict([['names', ['duration', 'count']], ['formats', [env.NP_INT, env.NP_INT]]]) parms_definition = {'threshold': float, 'operation': str} standard_name = 'frequency_duration' long_name = 'Frequency Duration'
[docs] def calculate(self, values, threshold=None, operation=None): """ :param threshold: The threshold value to use for the logical operation. :type threshold: float :param operation: The logical operation. One of 'gt','gte','lt', or 'lte'. :type operation: str """ shp_out = values.shape[-2:] store = np.zeros(shp_out, dtype=object).flatten() for ii, duration in enumerate(self._iter_consecutive_(values, threshold, operation)): summary = self._get_summary_(duration) store[ii] = summary store.resize(shp_out) # Update the output mask. this only applies to geometries so pick the first masked time field store = np.ma.array(store, mask=values.mask[0, :, :]) return store
@classmethod def get_dtype(cls, *args, **kwargs): return cls.dtype_default @classmethod def validate(cls, ops): Duration.validate(ops) def _get_summary_(self, duration): """ :param duration: List of duration elements for frequency target. :type duration: list >>> duration = [3, 5, 2, 2] :returns: NumPy structure with dimension equal to the count of unique elements in the `duration` sequence. """ set_duration = set(duration) ret = np.empty(len(set_duration), dtype=self.structure_dtype) for ii, sd in enumerate(set_duration): idx = np.array(duration) == sd count = idx.sum() ret[ii]['duration'] = sd ret[ii]['count'] = count return ret