"""Functions for reading data out of the Census DP1 SQLite Database."""importgeopandasasgpdimportpandasaspdimportsqlalchemyassafromdagsterimportAssetIn,AssetsDefinition,assetimportpudl
[docs]defcensus_asset_factory(layer:str)->AssetsDefinition:"""An asset factory for finished EIA tables."""@asset(ins={"raw_censusdp1tract__all_tables":AssetIn("raw_censusdp1tract__all_tables")},name=f"_core_censusdp1tract__{LAYER_NAMES[layer]}",)defcensus_layer(raw_censusdp1tract__all_tables,**kwargs)->gpd.GeoDataFrame:"""Select one layer from the Census DP1 database. Uses information within the Census DP1 database to set the coordinate reference system and to identify the column containing the geometry. The geometry column is renamed to "geom" as that's the default withing Geopandas. No other column names or types are altered. """census_conn=f"sqlite:///{raw_censusdp1tract__all_tables}"dp1_engine=sa.create_engine(census_conn)defget_layer(layer,dp1_engine):ifnotisinstance(layer,str):raiseTypeError(f"Argument 'layer' must be a string, got arg of type {layer}.")layer=layer.lower()iflayernotin["state","county","tract"]:raiseValueError("Census DP1 layer must be one of 'state', 'county' or 'tract', "f"but got {layer}.")table_name=f"{layer}_2010census_dp1"df=pd.read_sql("""SELECT geom_cols.f_table_name as table_name, geom_cols.f_geometry_column as geom_col, crs.auth_name as auth_name, crs.auth_srid as auth_sridFROM geometry_columns geom_colsINNER JOIN spatial_ref_sys crs ON geom_cols.srid = crs.sridWHERE table_name = ?""",dp1_engine,params=(table_name,),)iflen(df)!=1:raiseAssertionError(f"Expected exactly 1 geometry description, but found {len(df)}")geom_col=df.loc[0,"geom_col"]crs_auth_str=f"{df.loc[0,'auth_name']}:{df.loc[0,'auth_srid']}".lower()gdf=gpd.read_postgis(table_name,dp1_engine,geom_col=geom_col,crs=crs_auth_str)gdf=gdf.rename_geometry("geometry")returngdfreturnget_layer(layer,dp1_engine)returncensus_layer