Source code for pudl.etl

"""Dagster definitions for the PUDL ETL and Output tables."""

import importlib.resources
import itertools
import warnings

import pandera as pr
from dagster import (
    AssetCheckResult,
    AssetChecksDefinition,
    AssetKey,
    AssetsDefinition,
    AssetSelection,
    Definitions,
    ExperimentalWarning,
    SourceAsset,
    asset_check,
    define_asset_job,
    load_asset_checks_from_modules,
    load_assets_from_modules,
)
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition

import pudl
from pudl.io_managers import (
    epacems_io_manager,
    ferc1_dbf_sqlite_io_manager,
    ferc1_xbrl_sqlite_io_manager,
    pudl_mixed_format_io_manager,
)
from pudl.metadata import PUDL_PACKAGE
from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings
from pudl.settings import EtlSettings

from . import (
    check_foreign_keys,
    cli,
    eia_bulk_elec_assets,
    epacems_assets,
    glue_assets,
    static_assets,
)

[docs] logger = pudl.logging_helpers.get_logger(__name__)
# Asset Checks are still Experimental, silence the warning since we use them # everywhere. warnings.filterwarnings("ignore", category=ExperimentalWarning)
[docs] raw_module_groups = { "raw_eia176": [pudl.extract.eia176], "raw_eia191": [pudl.extract.eia191], "raw_eia757a": [pudl.extract.eia757a], "raw_eia860": [pudl.extract.eia860], "raw_eia860m": [pudl.extract.eia860m], "raw_eia861": [pudl.extract.eia861], "raw_eia923": [pudl.extract.eia923], "raw_eia930": [pudl.extract.eia930], "raw_eiaaeo": [pudl.extract.eiaaeo], "raw_ferc1": [pudl.extract.ferc1], "raw_ferc714": [pudl.extract.ferc714], "raw_gridpathratoolkit": [pudl.extract.gridpathratoolkit], "raw_phmsagas": [pudl.extract.phmsagas], "raw_nrelatb": [pudl.extract.nrelatb], }
[docs] core_module_groups = { "_core_eia860": [pudl.transform.eia860], "_core_eia923": [pudl.transform.eia923], "core_censusdp1tract": [ pudl.convert.censusdp1tract_to_sqlite, pudl.output.censusdp1tract, ], "core_assn": [glue_assets], "core_codes": [static_assets], "core_eia": [pudl.transform.eia], "core_eia_bulk_elec": [eia_bulk_elec_assets], "core_eia860m": [pudl.transform.eia860m], "core_eia861": [pudl.transform.eia861], "core_epacems": [epacems_assets], "core_ferc1": [pudl.transform.ferc1], "core_ferc714": [pudl.transform.ferc714], "core_gridpathratoolkit": [pudl.transform.gridpathratoolkit], }
[docs] out_module_groups = { "eia_ferc1_record_linkage": [ pudl.analysis.plant_parts_eia, pudl.analysis.record_linkage.eia_ferc1_record_linkage, ], "out_allocate_gen_fuel": [pudl.analysis.allocate_gen_fuel], "out_derived_gen_attributes": [pudl.analysis.mcoe], "out_eia": [ pudl.output.eia, pudl.output.eia860, pudl.output.eia923, pudl.output.eia_bulk_elec, ], "out_ferc1": [ pudl.output.ferc1, pudl.analysis.record_linkage.classify_plants_ferc1, ], "out_respondents_ferc714": [pudl.output.ferc714], "out_service_territory_eia861": [pudl.analysis.service_territory], "out_state_demand_ferc714": [pudl.analysis.state_demand], }
[docs] all_asset_modules = raw_module_groups | core_module_groups | out_module_groups
[docs] default_assets = list( itertools.chain.from_iterable( load_assets_from_modules( modules, group_name=group_name, ) for group_name, modules in all_asset_modules.items() ) )
[docs] default_asset_checks = list( itertools.chain.from_iterable( load_asset_checks_from_modules( modules, ) for modules in all_asset_modules.values() ) )
[docs] def asset_check_from_schema( asset_key: AssetKey, package: pudl.metadata.classes.Package, ) -> AssetChecksDefinition | None: """Create a dagster asset check based on the resource schema, if defined.""" resource_id = asset_key.to_user_string() try: resource = package.get_resource(resource_id) except ValueError: return None pandera_schema = resource.schema.to_pandera() @asset_check(asset=asset_key) def pandera_schema_check(asset_value) -> AssetCheckResult: try: pandera_schema.validate(asset_value, lazy=True) except pr.errors.SchemaErrors as schema_errors: return AssetCheckResult( passed=False, metadata={ "errors": [ { "failure_cases": str(err.failure_cases), "data": str(err.data), } for err in schema_errors.schema_errors ], }, ) return AssetCheckResult(passed=True) return pandera_schema_check
[docs] def _get_keys_from_assets( asset_def: AssetsDefinition | SourceAsset | CacheableAssetsDefinition, ) -> list[AssetKey]: """Get a list of asset keys. Most assets have one key, which can be retrieved as a list from ``asset.keys``. Multi-assets have multiple keys, which can also be retrieved as a list from ``asset.keys``. SourceAssets always only have one key, and don't have ``asset.keys``. So we look for ``asset.key`` and wrap it in a list. We don't handle CacheableAssetsDefinitions yet. """ if isinstance(asset_def, AssetsDefinition): return list(asset_def.keys) if isinstance(asset_def, SourceAsset): return [asset_def.key] return []
[docs] _package = PUDL_PACKAGE
[docs] _asset_keys = itertools.chain.from_iterable( _get_keys_from_assets(asset_def) for asset_def in default_assets )
default_asset_checks += [ check for check in ( asset_check_from_schema(asset_key, _package) for asset_key in _asset_keys if asset_key.to_user_string() != "core_epacems__hourly_emissions" ) if check is not None ]
[docs] default_resources = { "datastore": datastore, "pudl_io_manager": pudl_mixed_format_io_manager, "ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager, "ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager, "dataset_settings": dataset_settings, "ferc_to_sqlite_settings": ferc_to_sqlite_settings, "epacems_io_manager": epacems_io_manager, }
# Limit the number of concurrent workers when launch assets that use a lot of memory.
[docs] default_tag_concurrency_limits = [ { "key": "memory-use", "value": "high", "limit": 4, }, ]
[docs] default_config = pudl.helpers.get_dagster_execution_config( tag_concurrency_limits=default_tag_concurrency_limits )
default_config |= pudl.analysis.ml_tools.get_ml_models_config()
[docs] def create_non_cems_selection(all_assets: list[AssetsDefinition]) -> AssetSelection: """Create a selection of assets excluding CEMS and all downstream assets. Args: all_assets: A list of asset definitions to remove CEMS assets from. Returns: An asset selection with all_assets assets excluding CEMS assets. """ all_asset_keys = pudl.helpers.get_asset_keys(all_assets) all_selection = AssetSelection.keys(*all_asset_keys) cems_selection = AssetSelection.keys(AssetKey("core_epacems__hourly_emissions")) return all_selection - cems_selection.downstream()
[docs] def load_dataset_settings_from_file(setting_filename: str) -> dict: """Load dataset settings from a settings file in `pudl.package_data.settings`. Args: setting_filename: name of settings file. Returns: Dictionary of dataset settings. """ dataset_settings = EtlSettings.from_yaml( importlib.resources.files("pudl.package_data.settings") / f"{setting_filename}.yml" ).datasets.model_dump() return dataset_settings
[docs] defs: Definitions = Definitions( assets=default_assets, asset_checks=default_asset_checks, resources=default_resources, jobs=[ define_asset_job( name="etl_full", description="This job executes all years of all assets.", config=default_config, ), define_asset_job( name="etl_full_no_cems", selection=create_non_cems_selection(default_assets), description="This job executes all years of all assets except the " "core_epacems__hourly_emissions asset and all assets downstream.", ), define_asset_job( name="etl_fast", config=default_config | { "resources": { "dataset_settings": { "config": load_dataset_settings_from_file("etl_fast") } } }, description="This job executes the most recent year of each asset.", ), define_asset_job( name="etl_fast_no_cems", selection=create_non_cems_selection(default_assets), config={ "resources": { "dataset_settings": { "config": load_dataset_settings_from_file("etl_fast") } } }, description="This job executes the most recent year of each asset except the " "core_epacems__hourly_emissions asset and all assets downstream.", ), ], )
"""A collection of dagster assets, resources, IO managers, and jobs for the PUDL ETL."""