"""Dagster definitions for the PUDL ETL and Output tables."""
import importlib.resources
import itertools
import warnings
import pandera as pr
from dagster import (
AssetCheckResult,
AssetChecksDefinition,
AssetKey,
AssetsDefinition,
AssetSelection,
Definitions,
ExperimentalWarning,
SourceAsset,
asset_check,
define_asset_job,
load_asset_checks_from_modules,
load_assets_from_modules,
)
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition
import pudl
from pudl.io_managers import (
epacems_io_manager,
ferc1_dbf_sqlite_io_manager,
ferc1_xbrl_sqlite_io_manager,
pudl_mixed_format_io_manager,
)
from pudl.metadata import PUDL_PACKAGE
from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings
from pudl.settings import EtlSettings
from . import (
check_foreign_keys,
cli,
eia_bulk_elec_assets,
epacems_assets,
glue_assets,
static_assets,
)
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
# Asset Checks are still Experimental, silence the warning since we use them
# everywhere.
warnings.filterwarnings("ignore", category=ExperimentalWarning)
[docs]
raw_module_groups = {
"raw_eia176": [pudl.extract.eia176],
"raw_eia191": [pudl.extract.eia191],
"raw_eia757a": [pudl.extract.eia757a],
"raw_eia860": [pudl.extract.eia860],
"raw_eia860m": [pudl.extract.eia860m],
"raw_eia861": [pudl.extract.eia861],
"raw_eia923": [pudl.extract.eia923],
"raw_eia930": [pudl.extract.eia930],
"raw_eiaaeo": [pudl.extract.eiaaeo],
"raw_ferc1": [pudl.extract.ferc1],
"raw_ferc714": [pudl.extract.ferc714],
"raw_gridpathratoolkit": [pudl.extract.gridpathratoolkit],
"raw_phmsagas": [pudl.extract.phmsagas],
"raw_nrelatb": [pudl.extract.nrelatb],
}
[docs]
core_module_groups = {
"_core_eia860": [pudl.transform.eia860],
"_core_eia923": [pudl.transform.eia923],
"core_censusdp1tract": [
pudl.convert.censusdp1tract_to_sqlite,
pudl.output.censusdp1tract,
],
"core_assn": [glue_assets],
"core_codes": [static_assets],
"core_eia": [pudl.transform.eia],
"core_eia_bulk_elec": [eia_bulk_elec_assets],
"core_eia860m": [pudl.transform.eia860m],
"core_eia861": [pudl.transform.eia861],
"core_epacems": [epacems_assets],
"core_ferc1": [pudl.transform.ferc1],
"core_ferc714": [pudl.transform.ferc714],
"core_gridpathratoolkit": [pudl.transform.gridpathratoolkit],
}
[docs]
out_module_groups = {
"eia_ferc1_record_linkage": [
pudl.analysis.plant_parts_eia,
pudl.analysis.record_linkage.eia_ferc1_record_linkage,
],
"out_allocate_gen_fuel": [pudl.analysis.allocate_gen_fuel],
"out_derived_gen_attributes": [pudl.analysis.mcoe],
"out_eia": [
pudl.output.eia,
pudl.output.eia860,
pudl.output.eia923,
pudl.output.eia_bulk_elec,
],
"out_ferc1": [
pudl.output.ferc1,
pudl.analysis.record_linkage.classify_plants_ferc1,
],
"out_respondents_ferc714": [pudl.output.ferc714],
"out_service_territory_eia861": [pudl.analysis.service_territory],
"out_state_demand_ferc714": [pudl.analysis.state_demand],
}
[docs]
all_asset_modules = raw_module_groups | core_module_groups | out_module_groups
[docs]
default_assets = list(
itertools.chain.from_iterable(
load_assets_from_modules(
modules,
group_name=group_name,
)
for group_name, modules in all_asset_modules.items()
)
)
[docs]
default_asset_checks = list(
itertools.chain.from_iterable(
load_asset_checks_from_modules(
modules,
)
for modules in all_asset_modules.values()
)
)
[docs]
def asset_check_from_schema(
asset_key: AssetKey,
package: pudl.metadata.classes.Package,
) -> AssetChecksDefinition | None:
"""Create a dagster asset check based on the resource schema, if defined."""
resource_id = asset_key.to_user_string()
try:
resource = package.get_resource(resource_id)
except ValueError:
return None
pandera_schema = resource.schema.to_pandera()
@asset_check(asset=asset_key)
def pandera_schema_check(asset_value) -> AssetCheckResult:
try:
pandera_schema.validate(asset_value, lazy=True)
except pr.errors.SchemaErrors as schema_errors:
return AssetCheckResult(
passed=False,
metadata={
"errors": [
{
"failure_cases": str(err.failure_cases),
"data": str(err.data),
}
for err in schema_errors.schema_errors
],
},
)
return AssetCheckResult(passed=True)
return pandera_schema_check
[docs]
def _get_keys_from_assets(
asset_def: AssetsDefinition | SourceAsset | CacheableAssetsDefinition,
) -> list[AssetKey]:
"""Get a list of asset keys.
Most assets have one key, which can be retrieved as a list from
``asset.keys``.
Multi-assets have multiple keys, which can also be retrieved as a list from
``asset.keys``.
SourceAssets always only have one key, and don't have ``asset.keys``. So we
look for ``asset.key`` and wrap it in a list.
We don't handle CacheableAssetsDefinitions yet.
"""
if isinstance(asset_def, AssetsDefinition):
return list(asset_def.keys)
if isinstance(asset_def, SourceAsset):
return [asset_def.key]
return []
[docs]
_package = PUDL_PACKAGE
[docs]
_asset_keys = itertools.chain.from_iterable(
_get_keys_from_assets(asset_def) for asset_def in default_assets
)
default_asset_checks += [
check
for check in (
asset_check_from_schema(asset_key, _package)
for asset_key in _asset_keys
if asset_key.to_user_string() != "core_epacems__hourly_emissions"
)
if check is not None
]
[docs]
default_resources = {
"datastore": datastore,
"pudl_io_manager": pudl_mixed_format_io_manager,
"ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager,
"ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager,
"dataset_settings": dataset_settings,
"ferc_to_sqlite_settings": ferc_to_sqlite_settings,
"epacems_io_manager": epacems_io_manager,
}
# Limit the number of concurrent workers when launch assets that use a lot of memory.
[docs]
default_tag_concurrency_limits = [
{
"key": "memory-use",
"value": "high",
"limit": 4,
},
]
[docs]
default_config = pudl.helpers.get_dagster_execution_config(
tag_concurrency_limits=default_tag_concurrency_limits
)
default_config |= pudl.analysis.ml_tools.get_ml_models_config()
[docs]
def create_non_cems_selection(all_assets: list[AssetsDefinition]) -> AssetSelection:
"""Create a selection of assets excluding CEMS and all downstream assets.
Args:
all_assets: A list of asset definitions to remove CEMS assets from.
Returns:
An asset selection with all_assets assets excluding CEMS assets.
"""
all_asset_keys = pudl.helpers.get_asset_keys(all_assets)
all_selection = AssetSelection.keys(*all_asset_keys)
cems_selection = AssetSelection.keys(AssetKey("core_epacems__hourly_emissions"))
return all_selection - cems_selection.downstream()
[docs]
def load_dataset_settings_from_file(setting_filename: str) -> dict:
"""Load dataset settings from a settings file in `pudl.package_data.settings`.
Args:
setting_filename: name of settings file.
Returns:
Dictionary of dataset settings.
"""
dataset_settings = EtlSettings.from_yaml(
importlib.resources.files("pudl.package_data.settings")
/ f"{setting_filename}.yml"
).datasets.model_dump()
return dataset_settings
[docs]
defs: Definitions = Definitions(
assets=default_assets,
asset_checks=default_asset_checks,
resources=default_resources,
jobs=[
define_asset_job(
name="etl_full",
description="This job executes all years of all assets.",
config=default_config,
),
define_asset_job(
name="etl_full_no_cems",
selection=create_non_cems_selection(default_assets),
description="This job executes all years of all assets except the "
"core_epacems__hourly_emissions asset and all assets downstream.",
),
define_asset_job(
name="etl_fast",
config=default_config
| {
"resources": {
"dataset_settings": {
"config": load_dataset_settings_from_file("etl_fast")
}
}
},
description="This job executes the most recent year of each asset.",
),
define_asset_job(
name="etl_fast_no_cems",
selection=create_non_cems_selection(default_assets),
config={
"resources": {
"dataset_settings": {
"config": load_dataset_settings_from_file("etl_fast")
}
}
},
description="This job executes the most recent year of each asset except the "
"core_epacems__hourly_emissions asset and all assets downstream.",
),
],
)
"""A collection of dagster assets, resources, IO managers, and jobs for the PUDL ETL."""