Source code for pudl.analysis.spatial

"""Spatial operations for demand allocation."""

import itertools
import warnings
from collections.abc import Callable, Iterable
from typing import Literal

import geopandas as gpd
import pandas as pd
import shapely.ops
from shapely.geometry import GeometryCollection, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry


[docs] def check_gdf(gdf: gpd.GeoDataFrame) -> None: """Check that GeoDataFrame contains (Multi)Polygon geometries with non-zero area. Args: gdf: GeoDataFrame. Raises: TypeError: Object is not a GeoDataFrame. AttributeError: GeoDataFrame has no geometry. TypeError: Geometry is not a GeoSeries. ValueError: Geometry contains null geometries. ValueError: Geometry contains non-(Multi)Polygon geometries. ValueError: Geometry contains (Multi)Polygon geometries with zero area. ValueError: MultiPolygon contains Polygon geometries with zero area. """ if not isinstance(gdf, gpd.GeoDataFrame): raise TypeError("Object is not a GeoDataFrame") if not hasattr(gdf, "geometry"): raise AttributeError("GeoDataFrame has no geometry") if not isinstance(gdf.geometry, gpd.GeoSeries): raise TypeError("Geometry is not a GeoSeries") warnings.filterwarnings("ignore", "GeoSeries.isna", UserWarning) if gdf.geometry.isna().any(): raise ValueError("Geometry contains null geometries") if not gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"]).all(): raise ValueError("Geometry contains non-(Multi)Polygon geometries") if not gdf.geometry.area.all(): raise ValueError("Geometry contains (Multi)Polygon geometries with zero area") is_mpoly = gdf.geometry.geom_type == "MultiPolygon" for mpoly in gdf.geometry[is_mpoly]: for poly in mpoly.geoms: if not poly.area: raise ValueError( "MultiPolygon contains Polygon geometries with zero area" )
[docs] def polygonize(geom: BaseGeometry) -> Polygon | MultiPolygon: """Convert geometry to (Multi)Polygon. Args: geom: Geometry to convert to (Multi)Polygon. Returns: Geometry converted to (Multi)Polygon, with all zero-area components removed. Raises: ValueError: Geometry has zero area. """ polys = [] # Explode geometries to polygons if isinstance(geom, GeometryCollection): for g in geom.geoms: if isinstance(g, Polygon): polys.append(g) elif isinstance(g, MultiPolygon): polys.extend(g) elif isinstance(geom, MultiPolygon): polys.extend(geom) elif isinstance(geom, Polygon): polys.append(geom) # Remove zero-area polygons polys = [p for p in polys if p.area] if not polys: raise ValueError("Geometry has zero area") if len(polys) == 1: return polys[0] return MultiPolygon(polys)
[docs] def explode(gdf: gpd.GeoDataFrame, ratios: Iterable[str] = None) -> gpd.GeoDataFrame: """Explode MultiPolygon to multiple Polygon geometries. Args: gdf: GeoDataFrame with non-zero-area (Multi)Polygon geometries. ratios: Names of columns to rescale by the area fraction of the Polygon relative to the MultiPolygon. If provided, MultiPolygon cannot self-intersect. By default, the original value is used unchanged. Raises: ValueError: Geometry contains self-intersecting MultiPolygon. Returns: GeoDataFrame with each Polygon as a separate row in the GeoDataFrame. The index is the number of the source row in the input GeoDataFrame. """ check_gdf(gdf) gdf = gdf.reset_index(drop=True) is_mpoly = gdf.geometry.geom_type == "MultiPolygon" if ratios and is_mpoly.any(): union_area = gdf.geometry[is_mpoly].apply(shapely.ops.unary_union).area if (union_area != gdf.geometry[is_mpoly].area).any(): raise ValueError("Geometry contains self-intersecting MultiPolygon") result = gdf.explode(index_parts=False) if ratios: fraction = ( result.geometry.area.to_numpy() / gdf.geometry.area[result.index].to_numpy() ) result[ratios] = result[ratios].multiply(fraction, axis="index") return result[gdf.columns]
[docs] def self_union(gdf: gpd.GeoDataFrame, ratios: Iterable[str] = None) -> gpd.GeoDataFrame: """Calculate the geometric union of a feature layer with itself. Areas of overlap are split into two or more geometrically-identical features: one for each of the original overlapping features. Each split feature contains the attributes of the original feature. Args: gdf: GeoDataFrame with non-zero-area MultiPolygon geometries. ratios: Names of columns to rescale by the area fraction of the split feature relative to the original. By default, the original value is used unchanged. Returns: GeoDataFrame representing the union of the input features with themselves. Its index contains tuples of the index of the original overlapping features. Raises: NotImplementedError: MultiPolygon geometries are not yet supported. """ check_gdf(gdf) gdf = gdf.reset_index(drop=True) is_mpoly = gdf.geometry.geom_type == "MultiPolygon" if is_mpoly.any(): raise NotImplementedError("MultiPolygon geometries are not yet supported") # Calculate all pairwise intersections # https://nbviewer.jupyter.org/gist/jorisvandenbossche/3a55a16fda9b3c37e0fb48b1d4019e65 pairs = itertools.combinations(gdf.geometry, 2) intersections = gpd.GeoSeries([a.intersection(b) for a, b in pairs]) # Form polygons from the boundaries of the original polygons and their intersections boundaries = pd.concat([gdf.geometry, intersections]).boundary.unary_union polygons = gpd.GeoSeries(shapely.ops.polygonize(boundaries)) # Determine origin of each polygon by a spatial join on representative points points = gpd.GeoDataFrame(geometry=polygons.representative_point()) oids = gpd.sjoin( points, gdf[["geometry"]], how="left", predicate="within", )["index_right"] # Build new dataframe columns = get_data_columns(gdf) df = gpd.GeoDataFrame( data=gdf.loc[oids, columns].reset_index(drop=True), geometry=polygons[oids.index].to_numpy(), ) if ratios: fraction = df.area.to_numpy() / gdf.area[oids].to_numpy() df[ratios] = df[ratios].multiply(fraction, axis="index") # Add original row indices to index df.index = oids.groupby(oids.index).agg(tuple)[oids.index] df.index.name = None # Return with original column order return df[gdf.columns]
[docs] def dissolve( gdf: gpd.GeoDataFrame, by: Iterable[str], func: Callable | str | list | dict, how: ( Literal["union", "first"] | Callable[[gpd.GeoSeries], BaseGeometry] ) = "union", ) -> gpd.GeoDataFrame: """Dissolve layer by aggregating features based on common attributes. Args: gdf: GeoDataFrame with non-empty (Multi)Polygon geometries. by: Names of columns to group features by. func: Aggregation function for data columns (see :meth:`pd.DataFrame.groupby`). how: Aggregation function for geometry column. Either 'union' (:meth:`gpd.GeoSeries.unary_union`), 'first' (first geometry in group), or a function aggregating multiple geometries into one. Returns: GeoDataFrame with dissolved geometry and data columns, and grouping columns set as the index. """ check_gdf(gdf) merges = {"union": lambda x: x.unary_union, "first": lambda x: x.iloc[0]} data = gdf.drop(columns=gdf.geometry.name).groupby(by=by).aggregate(func) geometry = gdf.groupby(by=by, group_keys=False)[gdf.geometry.name].aggregate( merges.get(how, how) ) return gpd.GeoDataFrame(geometry, geometry=gdf.geometry.name, crs=gdf.crs).join( data )
[docs] def overlay( *gdfs: gpd.GeoDataFrame, how: Literal[ "intersection", "union", "identity", "symmetric_difference", "difference" ] = "intersection", ratios: Iterable[str] = None, ) -> gpd.GeoDataFrame: """Overlay multiple layers incrementally. When a feature from one layer overlaps the feature of another layer, the area of overlap is split into two geometrically-identical features: one for each of the original overlapping features. Each split feature contains the attributes of the original feature. TODO: To identify the source of output features, the user can ensure that each layer contains a column to index by. Alternatively, tuples of indices of the overlapping feature from each layer (null if none) could be returned as the index. Args: gdfs: GeoDataFrames with non-empty (Multi)Polygon geometries assumed to contain no self-overlaps (see :func:`self_union`). Names of (non-geometry) columns cannot be used more than once. Any index colums are ignored. how: Spatial overlay method (see :func:`gpd.overlay`). ratios: Names of columns to rescale by the area fraction of the split feature relative to the original. By default, the original value is used unchanged. Raises: ValueError: Duplicate column names in layers. Returns: GeoDataFrame with the geometries and attributes resulting from the overlay. """ for gdf in gdfs: check_gdf(gdf) if ratios is None: ratios = [] # Check for duplicate non-geometry column names seen = set() duplicates = { c for df in gdfs for c in get_data_columns(df) if c in seen or seen.add(c) } if duplicates: raise ValueError(f"Duplicate column names in layers: {duplicates}") # Drop index columns and replace with default index of known name # NOTE: Assumes that default index name not already names of columns keys = [f"__id{i}__" for i in range(len(gdfs))] gdfs = [ df.reset_index(drop=True).rename_axis(k) for df, k in zip(gdfs, keys, strict=True) ] overlay = None for i in range(len(gdfs) - 1): a, b = overlay if i else gdfs[i], gdfs[i + 1] # Perform overlay with geometry and constant fields constants = [ [c for c in df.columns if c == df.geometry.name or c not in ratios] for df in (a, b) ] overlay = gpd.overlay( a[constants[0]].reset_index(), b[constants[1]].reset_index(), how=how ) # For uniform fields, compute area fraction of originals and merge by index # new_value = (old_value / old_area) * new_area dfs = [] for j, df in enumerate((a, b)): df_ratios = [c for c in df.columns if c != df.geometry.name and c in ratios] if df_ratios: dfs.append( df[df_ratios] .div(df.area, axis="index") .reindex(overlay[keys[j]]) .reset_index(drop=True) .mul(overlay.area, axis="index") ) if dfs: # Assumed to be faster than incremental concat overlay = pd.concat([overlay] + dfs, axis="columns") return overlay.drop(columns=keys)
[docs] def get_data_columns(df: pd.DataFrame) -> list: """Return list of columns, ignoring geometry.""" if isinstance(df, gpd.GeoDataFrame) and hasattr(df, "geometry"): return [col for col in df.columns if col != df.geometry.name] return list(df.columns)