Source code for pyprom.feature_discovery

"""
pyProm: Copyright 2016.

This software is distributed under a license that is described in
the LICENSE file that accompanies it.
"""

from __future__ import division

import numpy
import logging

from collections import defaultdict
from timeit import default_timer
from datetime import timedelta
from .lib.locations.gridpoint import GridPoint
from .lib.locations.saddle import Saddle
from .lib.locations.summit import Summit
from .lib.locations.runoff import Runoff
from .lib.locations.base_gridpoint import BaseGridPoint
from .lib.containers.saddles import SaddlesContainer
from .lib.containers.summits import SummitsContainer
from .lib.containers.runoffs import RunoffsContainer
from .lib.containers.perimeter import Perimeter
from .lib.logic.equalheight import equalHeightBlob


[docs]class AnalyzeData: """ Analyze Data is responsible for discovering the following features: :class:`pyprom.lib.locations.saddle.Saddle`, :class:`pyprom.lib.locations.summit.Summit`, :class:`pyprom.lib.locations.runoff.Runoff` """
[docs] def __init__(self, datamap): """ :param datamap: datamap to discover features on. :type: :class:`pyprom.lib.datamap.DataMap` object. """ self.logger = logging.getLogger('{}'.format(__name__)) self.datamap = datamap self.data = self.datamap.numpy_map self.max_y = self.datamap.max_y self.max_x = self.datamap.max_x self.x_mapEdge = {0: True, self.max_x: True} self.y_mapEdge = {0: True, self.max_y: True} self.explored = defaultdict(dict)
[docs] def run(self): """ Shortcut for running analysis. This will find all features on the datamap, as well as rebuild all :class:`pyprom.lib.locations.saddle.Saddle` into a more digestible format with accurate midpoints and only 2 high edges a piece. :return: Containers with features :rtype: :class:`pyprom.lib.containers.saddles.SaddlesContainer` :class:`pyprom.lib.containers.summits.SummitsContainer` :class:`pyprom.lib.containers.runoffs.RunoffsContainer` """ _, _, _ = self.analyze() self.logger.info("Rebuilding Saddles") self.saddleObjects = self.saddleObjects.rebuildSaddles(self.datamap) return self.summitObjects, self.saddleObjects, self.runoffObjects
[docs] def analyze(self): """ Analyze Routine. Looks for :class:`pyprom.lib.locations.summit.Summit`, :class:`pyprom.lib.locations.saddle.Saddle` and :class:`pyprom.lib.locations.runoff.Runoff` features :return: Containers :rtype: :class:`pyprom.lib.containers.saddles.SaddlesContainer`, :class:`pyprom.lib.containers.summits.SummitsContainer`, :class:`pyprom.lib.containers.runoffs.RunoffsContainer`, """ self.logger.info("Initiating Saddle, Summit, Runoff Identification") self.summitObjects = SummitsContainer([]) self.saddleObjects = SaddlesContainer([]) self.runoffObjects = RunoffsContainer([]) iterator = numpy.nditer(self.data, flags=['multi_index']) index = 0 start = default_timer() then = start # Iterate through numpy grid, and keep track of GridPoint coordinates. while not iterator.finished: x, y = iterator.multi_index # core storage is always in metric. if self.datamap.unit == "FEET": self.elevation = float(.3048 * iterator[0]) else: self.elevation = float(iterator[0]) # Quick Progress Meter. Needs refinement, index += 1 if not index % 100000: now = default_timer() pointsPerSec = round(index / (now - start), 2) self.logger.info( "Points per second: {} - {}%" " runtime: {}, split: {}".format( pointsPerSec, round(index / self.data.size * 100, 2), (str(timedelta(seconds=round(now - start, 2)))), round(now - then, 2) )) then = now # skip if this is a nodata point. if self.elevation == self.datamap.nodata: iterator.iternext() continue # Check for summit, saddle, or runoff results = self.summit_and_saddle(x, y) if results: for result in results: if isinstance(result, Summit): self.summitObjects.append(result) if isinstance(result, Runoff): self.runoffObjects.append(result) elif isinstance(result, Saddle): self.saddleObjects.append(result) # Reset variables, and go to next gridpoint. iterator.iternext() # Free some memory. del(self.explored) return self.summitObjects, self.saddleObjects, self.runoffObjects
[docs] def analyze_multipoint(self, x, y, ptElevation): """ Logic for analyzing a feature which fits the definition of a multipoint. :param int x: x coordinate in raster data. :param int y: y coordinate in raster data. :param ptElevation: Elevation of Multipoint Blob :type ptElevation: int, float :return: Discovered feature or None :rtype: :class:`pyprom.lib.locations.saddle.Saddle`, :class:`pyprom.lib.locations.summit.Summit`, or None. """ blob, edgePoints = equalHeightBlob(self.datamap, x, y, ptElevation) edge = blob.perimeter.mapEdge for exemptPoint in blob: self.explored[exemptPoint.x][exemptPoint.y] = True return self.consolidatedFeatureLogic(x, y, blob.perimeter, blob, edge, edgePoints)
[docs] def summit_and_saddle(self, x, y): """ summit_and_saddle does that actual discovery of :class:`pyprom.lib.locations.saddle.Saddle`, or :class:`pyprom.lib.locations.summit.Summit` :param int x: x coordinate in raster data. :param int y: y coordinate in raster data. :return: Disocvered Feature, or None :rtype: :class:`pyprom.lib.locations.saddle.Saddle` :class:`pyprom.lib.locations.summit.Summit` or :class:`pyprom.lib.locations.runoff.Runoff`, or None. """ # Exempt! bail out! if self.explored[x].get(y, False): return None edge = False # Label this as an mapEdge under the following condition edgePoints = [] if self.x_mapEdge.get(x) or self.y_mapEdge.get(y): edge = True edgePoints = [BaseGridPoint(x, y)] # Begin the ardous task of analyzing points and multipoints neighbor = self.datamap.iterateDiagonal(x, y) shoreSetIndex = defaultdict(dict) shoreMapEdge = [] for _x, _y, elevation in neighbor: # Nothing there? move along. if elevation == self.datamap.nodata: continue # If we have equal neighbors, we need to kick off analysis to # a special MultiPoint analysis function and return the result. if elevation == self.elevation and\ not self.explored[_x].get(_y, False): return self.analyze_multipoint(_x, _y, elevation) gp = GridPoint(_x, _y, elevation) if elevation > self.elevation: shoreSetIndex[_x][_y] = gp if self.x_mapEdge.get(_x) or self.y_mapEdge.get(_y): shoreMapEdge.append(gp) shoreSet = Perimeter(pointIndex=shoreSetIndex, datamap=self.datamap, mapEdge=edge, mapEdgePoints=shoreMapEdge) return self.consolidatedFeatureLogic(x, y, shoreSet, None, edge, edgePoints)
[docs] def consolidatedFeatureLogic(self, x, y, perimeter, multipoint, edge, edgePoints): """ Consolidated Feature Logic analyzes the highEdges around a point or multipoint and determines if the pattern matches a :class:`pyprom.lib.locations.saddle.Saddle`, :class:`pyprom.lib.locations.summit.Summit` or :class:`pyprom.lib.locations.runoff.Runoff` :param int x: x coordinate in raster data. :param int y: y coordinate in raster data. :param perimeter: Perimeter container :type perimeter: :class:`pyprom.lib.containers.perimeter.Perimeter` :param multipoint: MultiPoint container :type multipoint: :class:`pyprom.lib.containers.multipoint.MultiPoint` :param bool edge: is this feature on the map edge? :return: List of Container Objects. :rtype: :class:`pyprom.lib.containers.spot_elevation.SpotElevationContainer` child objects. """ returnableLocations = [] highPerimeter = perimeter.findHighEdges( self.elevation) if not len(highPerimeter): lat, long = self.datamap.xy_to_latlong(x, y) summit = Summit(lat, long, self.elevation, multiPoint=multipoint, edge=edge, edgePoints=edgePoints ) returnableLocations.append(summit) # edge summits are inherently Runoffs if edge: runoff = Runoff(lat, long, self.elevation, multiPoint=multipoint, edge=edge, highShores=highPerimeter, edgePoints=edgePoints) returnableLocations.append(runoff) return returnableLocations elif (len(highPerimeter) > 1): lat, long = self.datamap.xy_to_latlong(x, y) # if we're an edge and all edgepoints are lower than our point. if edge and len([a for a in perimeter.mapEdgePoints if a.elevation < self.elevation]) ==\ len(perimeter.mapEdgePoints): runoff = Runoff(lat, long, self.elevation, multiPoint=multipoint, highShores=highPerimeter, edgePoints=edgePoints) returnableLocations.append(runoff) return returnableLocations saddle = Saddle(lat, long, self.elevation, multiPoint=multipoint, edge=edge, highShores=highPerimeter, edgePoints=edgePoints) returnableLocations.append(saddle) # if we're an edge and all edgepoints are lower than our point. if edge and len([a for a in perimeter.mapEdgePoints if a.elevation < self.elevation]) ==\ len(perimeter.mapEdgePoints): lat, long = self.datamap.xy_to_latlong(x, y) runoff = Runoff(lat, long, self.elevation, multiPoint=multipoint, highShores=highPerimeter, edgePoints=edgePoints) returnableLocations.append(runoff) return returnableLocations