pudl.io_managers#

Dagster IO Managers.

Module Contents#

Classes#

SQLiteIOManager

IO Manager that writes and retrieves dataframes from a SQLite database.

PudlSQLiteIOManager

IO Manager that writes and retrieves dataframes from a SQLite database.

FercSQLiteIOManager

IO Manager for reading tables from FERC databases.

FercDBFSQLiteIOManager

IO Manager for only reading tables from the FERC 1 database.

FercXBRLSQLiteIOManager

IO Manager for only reading tables from the XBRL database.

PandasParquetIOManager

An IO Manager that dumps outputs to a parquet file.

Functions#

pudl_sqlite_io_manager(→ PudlSQLiteIOManager)

Create a SQLiteManager dagster resource for the pudl database.

ferc1_dbf_sqlite_io_manager(→ FercDBFSQLiteIOManager)

Create a SQLiteManager dagster resource for the ferc1 dbf database.

ferc1_xbrl_sqlite_io_manager(→ FercXBRLSQLiteIOManager)

Create a SQLiteManager dagster resource for the ferc1 dbf database.

epacems_io_manager(→ PandasParquetIOManager)

IO Manager that writes EPA CEMS partitions to individual parquet files.

Attributes#

pudl.io_managers.logger[source]#
pudl.io_managers.MINIMUM_SQLITE_VERSION = '3.32.0'[source]#
exception pudl.io_managers.ForeignKeyError(child_table: str, parent_table: str, foreign_key: str, rowids: list[int])[source]#

Bases: sqlalchemy.exc.SQLAlchemyError

Raised when data in a database violates a foreign key constraint.

__str__()[source]#

Create string representation of ForeignKeyError object.

__eq__(other)[source]#

Compare a ForeignKeyError with another object.

exception pudl.io_managers.ForeignKeyErrors(fk_errors: list[ForeignKeyError])[source]#

Bases: sqlalchemy.exc.SQLAlchemyError

Raised when data in a database violate multiple foreign key constraints.

__str__()[source]#

Create string representation of ForeignKeyErrors object.

__iter__()[source]#

Iterate over the fk errors.

__getitem__(idx)[source]#

Index the fk errors.

class pudl.io_managers.SQLiteIOManager(base_dir: str, db_name: str, md: sqlalchemy.MetaData | None = None, timeout: float = 1000.0)[source]#

Bases: dagster.IOManager

IO Manager that writes and retrieves dataframes from a SQLite database.

_get_table_name(context) str[source]#

Get asset name from dagster context object.

_setup_database(timeout: float = 1000.0) sqlalchemy.Engine[source]#

Create database and metadata if they don’t exist.

Parameters:

timeout – How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed.

Returns:

SQL Alchemy engine that connects to a database in the base_dir.

Return type:

engine

_get_sqlalchemy_table(table_name: str) sqlalchemy.Table[source]#

Get SQL Alchemy Table object from metadata given a table_name.

Parameters:

table_name – The name of the table to look up.

Returns:

Corresponding SQL Alchemy Table in SQLiteIOManager metadata.

Return type:

table

Raises:

ValueError – if table_name does not exist in the SQLiteIOManager metadata.

_get_fk_list(table: str) pandas.DataFrame[source]#

Retrieve a dataframe of foreign keys for a table.

Description from the SQLite Docs: ‘This pragma returns one row for each foreign key constraint created by a REFERENCES clause in the CREATE TABLE statement of table “table-name”.’

The PRAGMA returns one row for each field in a foreign key constraint. This method collapses foreign keys with multiple fields into one record for readability.

check_foreign_keys() None[source]#

Check foreign key relationships in the database.

The order assets are loaded into the database will not satisfy foreign key constraints so we can’t enable foreign key constraints. However, we can check for foreign key failures once all of the data has been loaded into the database using the foreign_key_check and foreign_key_list PRAGMAs.

You can learn more about the PRAGMAs in the SQLite docs.

Raises:

ForeignKeyErrors – if data in the database violate foreign key constraints.

_handle_pandas_output(context: dagster.OutputContext, df: pandas.DataFrame)[source]#

Write dataframe to the database.

SQLite does not support concurrent writes to the database. Instead, SQLite queues write transactions and executes them one at a time. This allows the assets to be processed in parallel. See the SQLAlchemy docs to learn more about SQLite concurrency.

Parameters:
  • context – dagster keyword that provides access to output information like asset name.

  • df – dataframe to write to the database.

_handle_str_output(context: dagster.OutputContext, query: str)[source]#

Execute a sql query on the database.

This is used for creating output views in the database.

Parameters:
  • context – dagster keyword that provides access output information like asset name.

  • query – sql query to execute in the database.

handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str)[source]#

Handle an op or asset output.

If the output is a dataframe, write it to the database. If it is a string execute it as a SQL query.

Parameters:
  • context – dagster keyword that provides access output information like asset name.

  • obj – a sql query or dataframe to add to the database.

Raises:

Exception – if an asset or op returns an unsupported datatype.

load_input(context: dagster.InputContext) pandas.DataFrame[source]#

Load a dataframe from a sqlite database.

Parameters:

context – dagster keyword that provides access output information like asset name.

class pudl.io_managers.PudlSQLiteIOManager(base_dir: str, db_name: str, package: pudl.metadata.classes.Package | None = None, timeout: float = 1000.0)[source]#

Bases: SQLiteIOManager

IO Manager that writes and retrieves dataframes from a SQLite database.

This class extends the SQLiteIOManager class to manage database metadata and dtypes using the pudl.metadata.classes.Package class.

_handle_str_output(context: dagster.OutputContext, query: str)[source]#

Execute a sql query on the database.

This is used for creating output views in the database.

Parameters:
  • context – dagster keyword that provides access output information like asset name.

  • query – sql query to execute in the database.

_handle_pandas_output(context: dagster.OutputContext, df: pandas.DataFrame)[source]#

Enforce PUDL DB schema and write dataframe to SQLite.

load_input(context: dagster.InputContext) pandas.DataFrame[source]#

Load a dataframe from a sqlite database.

Parameters:

context – dagster keyword that provides access output information like asset name.

pudl.io_managers.pudl_sqlite_io_manager(init_context) PudlSQLiteIOManager[source]#

Create a SQLiteManager dagster resource for the pudl database.

class pudl.io_managers.FercSQLiteIOManager(base_dir: str = None, db_name: str = None, md: sqlalchemy.MetaData = None, timeout: float = 1000.0)[source]#

Bases: SQLiteIOManager

IO Manager for reading tables from FERC databases.

This class should be subclassed and the load_input and handle_output methods should be implemented.

This IOManager exepcts the database to already exist.

_setup_database(timeout: float = 1000.0) sqlalchemy.Engine[source]#

Create database engine and read the metadata.

Parameters:

timeout – How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed.

Returns:

SQL Alchemy engine that connects to a database in the base_dir.

Return type:

engine

abstract handle_output(context: dagster.OutputContext, obj)[source]#

Handle an op or asset output.

abstract load_input(context: dagster.InputContext) pandas.DataFrame[source]#

Load a dataframe from a sqlite database.

Parameters:

context – dagster keyword that provides access output information like asset name.

class pudl.io_managers.FercDBFSQLiteIOManager(base_dir: str = None, db_name: str = None, md: sqlalchemy.MetaData = None, timeout: float = 1000.0)[source]#

Bases: FercSQLiteIOManager

IO Manager for only reading tables from the FERC 1 database.

This IO Manager is for reading data only. It does not handle outputs because the raw FERC tables are not known prior to running the ETL and are not recorded in our metadata.

abstract handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str)[source]#

Handle an op or asset output.

load_input(context: dagster.InputContext) pandas.DataFrame[source]#

Load a dataframe from a sqlite database.

Parameters:

context – dagster keyword that provides access output information like asset name.

pudl.io_managers.ferc1_dbf_sqlite_io_manager(init_context) FercDBFSQLiteIOManager[source]#

Create a SQLiteManager dagster resource for the ferc1 dbf database.

class pudl.io_managers.FercXBRLSQLiteIOManager(base_dir: str = None, db_name: str = None, md: sqlalchemy.MetaData = None, timeout: float = 1000.0)[source]#

Bases: FercSQLiteIOManager

IO Manager for only reading tables from the XBRL database.

This IO Manager is for reading data only. It does not handle outputs because the raw FERC tables are not known prior to running the ETL and are not recorded in our metadata.

static filter_for_freshest_data(table: pandas.DataFrame, primary_key: list[str]) pandas.DataFrame[source]#

Get most updated values for each XBRL context.

An XBRL context includes an entity ID, the time period the data applies to, and other dimensions such as utility type. Each context has its own ID, but they are frequently redefined with the same contents but different IDs - so we identify them by their actual content.

Each row in our SQLite database includes all the facts for one context/filing pair.

If one context is represented in multiple filings, we take the facts from the most recently-published filing.

This means that if a recently-published filing does not include a value for a fact that was previously reported, then that value will remain null. We do not forward-fill facts on a fact-by-fact basis.

static refine_report_year(df: pandas.DataFrame, xbrl_years: list[int]) pandas.DataFrame[source]#

Set a fact’s report year by its actual dates.

Sometimes a fact belongs to a context which has no ReportYear associated with it; other times there are multiple ReportYears associated with a single filing. In these cases the report year of a specific fact may be associated with the other years in the filing.

In many cases we can infer the actual report year from the fact’s associated time period - either duration or instant.

_get_primary_key(sched_table_name: str) list[str][source]#
abstract handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str)[source]#

Handle an op or asset output.

load_input(context: dagster.InputContext) pandas.DataFrame[source]#

Load a dataframe from a sqlite database.

Parameters:

context – dagster keyword that provides access output information like asset name.

pudl.io_managers.ferc1_xbrl_sqlite_io_manager(init_context) FercXBRLSQLiteIOManager[source]#

Create a SQLiteManager dagster resource for the ferc1 dbf database.

class pudl.io_managers.PandasParquetIOManager(base_path: upath.UPath, schema: pyarrow.Schema)[source]#

Bases: dagster.UPathIOManager

An IO Manager that dumps outputs to a parquet file.

extension: str = '.parquet'[source]#
abstract dump_to_path(context: dagster.OutputContext, obj: dask.dataframe.DataFrame, path: upath.UPath)[source]#

Write dataframe to parquet file.

load_from_path(context: dagster.InputContext, path: upath.UPath) dask.dataframe.DataFrame[source]#

Load a directory of parquet files to a dask dataframe.

pudl.io_managers.epacems_io_manager(init_context: dagster.InitResourceContext) PandasParquetIOManager[source]#

IO Manager that writes EPA CEMS partitions to individual parquet files.