Source code for ocgis.spatial.geom_cabinet

import os
from collections import OrderedDict

import fiona
import ogr
from ocgis import env
from ocgis.collection.field import Field
from ocgis.util.helpers import get_formatted_slice
from ocgis.util.logging_ocgis import ocgis_lh
from shapely import wkb


[docs]class GeomCabinet(object): """ A utility object designed for accessing shapefiles stored in a locally accessible location. >>> # Adjust location of search directory. >>> import ocgis ... >>> ocgis.env.DIR_GEOMCABINET = '/path/to/local/shapefile/directory' >>> sc = GeomCabinet() >>> # List the shapefiles available. >>> sc.keys() ['state_boundaries', 'mi_watersheds', 'world_countries'] >>> # Load geometries from the shapefile. >>> geoms = sc.get_geoms('state_boundaries') :param path: Absolute path the directory holding shapefile folders. Defaults to :attr:`ocgis.env.DIR_GEOMCABINET`. :type path: str """ def __init__(self, path=None): self._path = path or env.get_geomcabinet_path() @property def path(self): if self._path is None: msg = 'A path value is required. Either pass a path to the constructor or set ocgis.env.DIR_GEOMCABINET.' raise ValueError(msg) elif not os.path.exists(self._path): raise ValueError('Specified path to GeomCabinet folder does not exist: {0}'.format(self._path)) return self._path @staticmethod def get_gdal_driver(path): """ Get the GDAL driver used to open a path. :param str path: Path to target dataset. """ ds = ogr.Open(path) try: ret = get_gdal_driver(ds) finally: ds.Destroy() ds = None return ret
[docs] def keys(self): """Return a list of the shapefile keys contained in the search directory. :rtype: list of str """ ret = [] for dirpath, dirnames, filenames in os.walk(self.path): for fn in filenames: if fn.endswith('shp'): ret.append(os.path.splitext(fn)[0]) return ret
def get_meta(self, key=None, path=None, driver_kwargs=None): path = path or self.get_shp_path(key) if self.get_gdal_driver(path) == 'OpenFileGDB': layer = driver_kwargs['feature_class'] else: layer = None with fiona.open(path, 'r', layer=layer) as source: ret = source.meta return ret def get_shp_path(self, key): return self._get_path_(key, ext='shp') def get_cfg_path(self, key): return self._get_path_(key, ext='cfg') def _get_path_(self, key, ext='shp'): ret = None for dirpath, dirnames, filenames in os.walk(self.path): for filename in filenames: if filename.endswith(ext) and os.path.splitext(filename)[0] == key: ret = os.path.join(dirpath, filename) return ret if ret is None: msg = 'a shapefile with key "{0}" was not found under the directory: {1}'.format(key, self.path) raise ValueError(msg)
[docs] def iter_geoms(self, key=None, select_uid=None, path=None, load_geoms=True, as_field=False, uid=None, select_sql_where=None, slc=None, union=False, data_model=None, driver_kwargs=None): """ See documentation for :class:`~ocgis.GeomCabinetIterator`. """ # Get the path to the output shapefile. shp_path = self._get_path_by_key_or_direct_path_(key=key, path=path) # Get the source metadata. meta = self.get_meta(path=shp_path, driver_kwargs=driver_kwargs) if union: gic = GeomCabinetIterator(key=key, select_uid=select_uid, path=path, load_geoms=load_geoms, as_field=False, uid=uid, select_sql_where=select_sql_where, slc=slc, union=False, data_model=data_model, driver_kwargs=driver_kwargs) yld = Field.from_records(gic, meta['schema'], crs=meta['crs'], uid=uid, union=True, data_model=data_model) yield yld else: if slc is not None and (select_uid is not None or select_sql_where is not None): exc = ValueError('Slice is not allowed with other select statements.') ocgis_lh(exc=exc, logger='geom_cabinet') # Format the slice for iteration. We will get the features by index if a slice is provided. if slc is not None: slc = get_index_slice_for_iteration(slc) # Open the target geometry file. ds = ogr.Open(shp_path) try: # Return the features iterator. features = self._get_features_object_(ds, uid=uid, select_uid=select_uid, select_sql_where=select_sql_where, driver_kwargs=driver_kwargs) # Using slicing, we will select the features individually from the object. if slc is None: itr = features else: # The geodatabase API requires iterations to get the given location. if self.get_gdal_driver(shp_path) == 'OpenFileGDB' or isinstance(slc, slice): def _o_itr_(features_object, slice_start, slice_stop): for ctr2, fb in enumerate(features_object): # ... iterate until start is reached. if ctr2 < slice_start: continue # ... stop if we have reached the stop. elif ctr2 == slice_stop: return yield fb itr = _o_itr_(features, slc.start, slc.stop) else: # Convert the slice index to an integer to avoid type conflict in GDAL layer. itr = (features.GetFeature(int(idx)) for idx in slc) # Convert feature objects to record dictionaries. for ctr, feature in enumerate(itr): if load_geoms: yld = {'geom': wkb.loads(feature.geometry().ExportToWkb())} else: yld = {} items = feature.items() properties = OrderedDict([(key, items[key]) for key in feature.keys()]) yld.update({'properties': properties, 'meta': meta}) if ctr == 0: uid, add_uid = get_uid_from_properties(properties, uid) # The properties schema needs to be updated to account for the adding of a unique identifier. if add_uid: meta['schema']['properties'][uid] = 'int' # Add the unique identifier if required if add_uid: properties[uid] = feature.GetFID() # Ensure the unique identifier is an integer else: properties[uid] = int(properties[uid]) if as_field: yld = Field.from_records([yld], schema=meta['schema'], crs=yld['meta']['crs'], uid=uid, data_model=data_model) yield yld try: assert ctr >= 0 except UnboundLocalError: # occurs if there were not feature returned by the iterator. raise a more clear exception. msg = 'No features returned from target data source. Were features appropriately selected?' raise ValueError(msg) finally: # Close or destroy the data source object if it actually exists. if ds is not None: ds.Destroy() ds = None
def _get_path_by_key_or_direct_path_(self, key=None, path=None): """ :param str key: :param str path: """ # path to the target data source if key is None: try: assert path != None except AssertionError: raise ValueError('If no key is passed, then a path must be provided.') shp_path = path else: shp_path = self.get_shp_path(key) # make sure requested geometry exists if not os.path.exists(shp_path): msg = 'Requested geometry with path "{0}" does not exist in the file system.'.format(shp_path) raise RuntimeError(msg) return shp_path @staticmethod def _get_features_object_(ds, uid=None, select_uid=None, select_sql_where=None, driver_kwargs=None): """ :param ds: Open OGR data source object :type ds: :class:`osgeo.ogr.DataSource` :param str uid: The unique identifier to use during SQL selection. :param sequence select_uid: Sequence of integers mapping to unique geometry identifiers. :param str select_sql_where: A string suitable for insertion into a SQL WHERE statement. See http://www.gdal.org/ogr_sql.html for documentation (section titled "WHERE"). >>> select_sql_where = 'STATE_NAME = "Wisconsin"' :param dict driver_kwargs: GDAL driver-specific arguments. +-------------+----------------------+--------------------------------------+ | Driver | Key | Value | +=============+======================+======================================+ | OpenFileGDB | ``'feature_class'`` | String feature class name to choose. | +-------------+----------------------+--------------------------------------+ :returns: A layer object with selection applied if ``select_uid`` is not ``None``. :rtype: :class:`osgeo.ogr.Layer` """ if driver_kwargs is None: driver_kwargs = {} # Get geometries by selecting the appropriate layer. Only single layer shapefiles are supported. For file # geodatabases, this is selected by the feature class name. if get_gdal_driver(ds) == 'OpenFileGDB': feature_class = driver_kwargs.get('feature_class') if feature_class is None: raise ValueError('For file geodatabases, the feature class may not be None.') lyr = ds.GetLayerByName(str(feature_class)) if lyr is None: poss = [lyr.GetName() for lyr in ds] poss.sort() raise IOError( "Feature class not found: '{}'. Possible features classes are: {}".format(feature_class, poss)) else: lyr = ds.GetLayerByIndex(0) # Get the feature object applying any select statements after acessing. lyr.ResetReading() if select_uid is not None or select_sql_where is not None: lyr_name = lyr.GetName() if select_sql_where is not None: sql = 'SELECT * FROM "{0}" WHERE {1}'.format(lyr_name, select_sql_where) elif select_uid is not None: # if no uid is provided, use the default if uid is None: uid = env.DEFAULT_GEOM_UID # format where statement different for singletons if len(select_uid) == 1: sql_where = '{0} = {1}'.format(uid, select_uid[0]) else: sql_where = '{0} IN {1}'.format(uid, tuple(select_uid)) sql = 'SELECT * FROM "{0}" WHERE {1}'.format(lyr_name, sql_where) features = ds.ExecuteSQL(sql) else: features = lyr return features
[docs]class GeomCabinetIterator(object): """ Iterate over geometries from a shapefile specified by ``key`` or ``path``. >>> sc = GeomCabinet() >>> geoms = sc.iter_geoms('state_boundaries', select_uid=[1, 48]) >>> len(list(geoms)) 2 :param key: Unique key identifier for a shapefile contained in the :class:`~ocgis.GeomCabinet` directory. :type key: str >>> key = 'state_boundaries' :param select_uid: Sequence of unique identifiers to select from the target data source. :type select_uid: sequence >>> select_uid = [23, 24] :param path: Path to the target data source to iterate over. If ``key`` is provided it will override ``path``. :type path: str >>> path = '/path/to/shapefile.shp' :param bool load_geoms: If ``False``, do not load geometries, excluding the ``'geom'`` key from the output dictionary. :param bool as_field: If ``True``, yield field objects. :param str uid: The name of the attribute containing the unique identifier. If ``None``, :attr:`ocgis.env.DEFAULT_GEOM_UID` will be used if present. If no unique identifier is found, add one with name :attr:`ocgis.env.DEFAULT_GEOM_UID`. :param str select_sql_where: SINGLE QUOTES MUST BE USED INSIDE DOUBLE QUOTES FOR PYTHON 3! A string suitable for insertion into a SQL WHERE statement. See http://www.gdal.org/ogr_sql.html for documentation (section titled "WHERE"). >>> select_sql_where = "STATE_NAME = 'Wisconsin'" :param slc: A two-element integer sequence: [start, stop]. >>> slc = [0, 5] :type slc: sequence :param str data_model: The target data model for the iteration. >>> data_model = 'NETCDF3' :param dict driver_kwargs: Format specific keyword arguments to use for driver creation. :raises: ValueError, RuntimeError :rtype: dict """ def __init__(self, key=None, select_uid=None, path=None, load_geoms=True, as_field=False, uid=None, select_sql_where=None, slc=None, union=False, data_model=None, driver_kwargs=None): self.key = key self.path = path self.select_uid = select_uid self.load_geoms = load_geoms self.as_field = as_field self.uid = uid self.select_sql_where = select_sql_where self.slc = slc self.union = union self.data_model = data_model self.driver_kwargs = driver_kwargs self.sc = GeomCabinet()
[docs] def __iter__(self): """ Return an iterator as from :meth:`ocgis.GeomCabinet.iter_geoms`. """ for row in self.sc.iter_geoms(key=self.key, select_uid=self.select_uid, path=self.path, load_geoms=self.load_geoms, as_field=self.as_field, uid=self.uid, select_sql_where=self.select_sql_where, slc=self.slc, union=self.union, data_model=self.data_model, driver_kwargs=self.driver_kwargs): yield row
def __len__(self): if self.union: # Assuming everything works as expected, unioning will always return 1. ret = 1 else: # Get the path to the output shapefile. shp_path = self.sc._get_path_by_key_or_direct_path_(key=self.key, path=self.path) if self.slc is not None: return len(get_index_slice_for_iteration(self.slc)) elif self.select_uid is not None: ret = len(self.select_uid) else: # Get the geometries using a select statement. ds = ogr.Open(shp_path) try: features = self.sc._get_features_object_(ds, uid=self.uid, select_uid=self.select_uid, select_sql_where=self.select_sql_where, driver_kwargs=self.driver_kwargs) ret = len(features) finally: ds.Destroy() ds = None return ret
def get_uid_from_properties(properties, uid): """ :param dict properties: A dictionary of properties with key corresponding to property names. :param str uid: The unique identifier to search for. If ``None``, default to :attr:`~ocgis.env.DEFAULT_GEOM_UID`. :returns: A tuple containing the name of the unique identifier and a boolean indicating if a unique identifier needs to be generated. :rtype: (str, bool) :raises: ValueError """ if uid is None: if env.DEFAULT_GEOM_UID in properties: uid = env.DEFAULT_GEOM_UID else: if uid not in properties: msg = 'The unique identifier "{0}" was not found in the properties dictionary: {1}'.format(uid, properties) raise ValueError(msg) # if there is a unique identifier in the properties dictionary, ensure it may be converted to an integer data type. if uid is not None: try: int(properties[uid]) except ValueError: msg = 'The unique identifier "{0}" may not be converted to an integer data type.'.format(uid) raise ValueError(msg) # if there is no unique identifier, the default identifier name will be assigned. if uid is None: uid = env.DEFAULT_GEOM_UID add_uid = True else: add_uid = False return uid, add_uid class ShpCabinet(GeomCabinet): """Left in for backwards compatibility.""" pass class ShpCabinetIterator(GeomCabinetIterator): """Left in for backwards compatibility.""" pass def get_gdal_driver(ds): driver = ds.GetDriver() return driver.GetName() def get_index_slice_for_iteration(slc): slc = get_formatted_slice(slc, 1)[0] return slc