Source code for pudl.extract.excel
"""Load excel metadata CSV files form a python data package."""
import importlib.resources
import pathlib
import re
from collections import defaultdict
from io import BytesIO
import dbfread
import pandas as pd
from dagster import (
AssetsDefinition,
DynamicOut,
DynamicOutput,
OpDefinition,
graph_asset,
op,
)
import pudl
[docs]
class Metadata:
"""Load Excel metadata from Python package data.
Excel sheet files may contain many different tables. When we load those
into dataframes, metadata tells us how to do this. Metadata generally informs
us about the position of a given page in the file (which sheet and which row)
and it informs us how to translate excel column names into standardized
column names.
When metadata object is instantiated, it is given ${dataset} name and it
will attempt to load csv files from pudl.package_data.${dataset} package.
It expects the following kinds of files:
* skiprows.csv tells us how many initial rows should be skipped when loading
data for given (partition, page).
* skipfooter.csv tells us how many bottom rows should be skipped when
loading data for given partition (partition, page).
* page_map.csv tells us what is the excel sheet name that should be read
when loading data for given (partition, page)
* column_map/${page}.csv currently informs us how to translate input column
names to standardized pudl names for given (partition, input_col_name).
Relevant page is encoded in the filename.
Optional file:
* page_part_map.csv tells us what secondary partition (e.g. "form") needs to be
specified to correctly identify the file housing the desired page. This is only
required when a file can only be uniquely located using a combination of
partitions (e.g. form and year).
"""
# TODO: we could validate whether metadata is valid for all year. We should have
# existing records for each (year, page) -> sheet_name, (year, page) -> skiprows
# and for all (year, page) -> column map
def __init__(self, dataset_name: str):
"""Create Metadata object and load metadata from python package.
Args:
dataset_name: Name of the package/dataset to load the metadata from.
Files will be loaded from pudl.package_data.${dataset_name}
"""
pkg = f"pudl.package_data.{dataset_name}"
self._dataset_name = dataset_name
self._skiprows = self._load_csv(pkg, "skiprows.csv")
self._skipfooter = self._load_csv(pkg, "skipfooter.csv")
self._sheet_name = self._load_csv(pkg, "page_map.csv")
self._file_name = self._load_csv(pkg, "file_map.csv")
# Most excel extracted datasets do not have a page to part map. If they
# don't, assign null.
try:
self._page_part_map = self._load_csv(pkg, "page_part_map.csv")
except FileNotFoundError:
self._page_part_map = pd.DataFrame()
column_map_pkg = pkg + ".column_maps"
self._column_map = {}
for res_path in importlib.resources.files(column_map_pkg).iterdir():
# res_path is expected to end with ${page}.csv
if res_path.suffix == ".csv":
column_map = self._load_csv(column_map_pkg, res_path.name)
self._column_map[res_path.stem] = column_map
[docs]
def get_dataset_name(self):
"""Returns the name of the dataset described by this metadata."""
return self._dataset_name
[docs]
def get_sheet_name(self, page, **partition):
"""Return name of Excel sheet containing data for given partition and page."""
return self._sheet_name.loc[page, str(self._get_partition_key(partition))]
[docs]
def get_skiprows(self, page, **partition):
"""Return number of header rows to skip when loading a partition and page."""
return self._skiprows.loc[page, str(self._get_partition_key(partition))]
[docs]
def get_file_name(self, page, **partition):
"""Returns file name of given partition and page."""
return self._file_name.loc[page, str(self._get_partition_key(partition))]
[docs]
def get_column_map(self, page, **partition):
"""Return dictionary for renaming columns in a given partition and page."""
return {
v: k
for k, v in self._column_map[page]
.T.loc[str(self._get_partition_key(partition))]
.to_dict()
.items()
if v != -1
}
[docs]
def get_all_columns(self, page):
"""Returns list of all pudl columns for a given page across all partitions."""
return sorted(self._column_map[page].T.columns)
[docs]
def get_all_pages(self):
"""Returns list of all known pages."""
return sorted(self._column_map.keys())
[docs]
def get_form(self, page) -> str:
"""Returns the form name for a given page."""
return self._page_part_map.loc[page, "form"]
@staticmethod
[docs]
def _load_csv(package, filename):
"""Load metadata from a filename that is found in a package."""
return pd.read_csv(
importlib.resources.files(package) / filename, index_col=0, comment="#"
)
@staticmethod
[docs]
def _get_partition_key(partition):
"""Grab the partition key."""
if len(partition) != 1:
raise AssertionError(
f"Expecting exactly one partition attribute (found: {partition})"
)
return list(partition.values())[0]
[docs]
class GenericExtractor:
"""Logic for extracting :class:`pd.DataFrame` from Excel spreadsheets.
This class implements the generic dataset agnostic logic to load data
from excel spreadsheet simply by using excel Metadata for given dataset.
It is expected that individual datasets wil subclass this code and add
custom business logic by overriding necessary methods.
When implementing custom business logic, the following should be modified:
1. DATASET class attribute controls which excel metadata should be loaded.
2. BLACKLISTED_PAGES class attribute specifies which pages should not
be loaded from the underlying excel files even if the metadata is
available. This can be used for experimental/new code that should not be
run yet.
3. dtypes() should return dict with {column_name: pandas_datatype} if you
need to specify which datatypes should be uded upon loading.
4. If data cleanup is necessary, you can apply custom logic by overriding
one of the following functions (they all return the modified dataframe):
* process_raw() is applied right after loading the excel DataFrame
from the disk.
* process_renamed() is applied after input columns were renamed to
standardized pudl columns.
* process_final_page() is applied when data from all available years
is merged into single DataFrame for a given page.
5. get_datapackage_resources() if partition is anything other than a year,
this method should be overwritten in the dataset-specific extractor.
"""
"""Instance of metadata object to use with this extractor."""
"""List of supported pages that should not be extracted."""
def __init__(self, ds):
"""Create new extractor object and load metadata.
Args:
ds (datastore.Datastore): An initialized datastore, or subclass
"""
if not self.METADATA:
raise NotImplementedError("self.METADATA must be set.")
self._metadata = self.METADATA
self._dataset_name = self._metadata.get_dataset_name()
self._file_cache = {}
self.ds = ds
self.cols_added: list[str] = []
[docs]
def process_raw(self, df, page, **partition):
"""Transforms raw dataframe and rename columns."""
df = self.add_data_maturity(df, page, **partition)
return df.rename(columns=self._metadata.get_column_map(page, **partition))
[docs]
def add_data_maturity(self, df: pd.DataFrame, page, **partition) -> pd.DataFrame:
"""Add data_maturity column to indicate the maturity of partition data.
The three options enumerated here are ``final``, ``provisional`` or
``monthly_update`` (``incremental_ytd`` is not currently implemented). We
determine if a df should be labeled as ``provisional`` by using the file names
because EIA seems to always include ``Early_Release`` in the file names. We
determine if a df should be labeled as ``monthly_update`` by checking if the
``self.dataset_name`` is ``eia860m``.
This method adds a column and thus adds ``data_maturity`` to
``self.cols_added``.
"""
maturity = "final"
file_name = self.excel_filename(page, **partition)
if "early_release" in file_name.lower():
maturity = "provisional"
elif self._dataset_name == "eia860m":
maturity = "monthly_update"
elif "EIA923_Schedules_2_3_4_5_M_" in file_name:
release_month = re.search(
r"EIA923_Schedules_2_3_4_5_M_(\d{2})",
file_name,
).group(1)
if release_month != "12":
maturity = "incremental_ytd"
df = df.assign(data_maturity=maturity)
self.cols_added.append("data_maturity")
return df
@staticmethod
[docs]
def process_renamed(df, page, **partition):
"""Transforms dataframe after columns are renamed."""
return df
@staticmethod
[docs]
def get_dtypes(page, **partition):
"""Provide custom dtypes for given page and partition."""
return {}
[docs]
def process_final_page(self, df, page):
"""Final processing stage applied to a page DataFrame."""
return df
[docs]
def zipfile_resource_partitions(self, page, **partition) -> dict:
"""Specify the partitions used for returning a zipfile from the datastore.
By default, this method appends any page to partition mapping in
:attr:`METADATA._page_part_map`. Most datasets do not have page to part
maps and just return the same partition that is passed in. If you have
dataset-specific partition mappings that are needed to return a zipfile from the
datastore, override this method to return the desired partitions.
"""
if not self.METADATA._page_part_map.empty:
partition.update(self.METADATA._page_part_map.loc[page])
return partition
[docs]
def extract(self, **partitions):
"""Extracts dataframes.
Returns dict where keys are page names and values are
DataFrames containing data across given years.
Args:
partitions (list, tuple or string): list of partitions to
extract. (Ex: [2009, 2010] if dataset is partitioned by years
or '2020-08' if dataset is partitioned by year_month)
"""
raw_dfs = {}
if not partitions:
logger.warning(
f"No partitions were given. Not extracting {self._dataset_name} "
"spreadsheet data."
)
return raw_dfs
logger.info(f"Extracting {self._dataset_name} spreadsheet data.")
for page in self._metadata.get_all_pages():
if page in self.BLACKLISTED_PAGES:
logger.debug(f"Skipping blacklisted page {page}.")
continue
dfs = [
pd.DataFrame(),
]
for partition in pudl.helpers.iterate_multivalue_dict(**partitions):
# we are going to skip
if self.excel_filename(page, **partition) == "-1":
logger.debug(f"No page for {self._dataset_name} {page} {partition}")
continue
logger.debug(
f"Loading dataframe for {self._dataset_name} {page} {partition}"
)
newdata = pd.read_excel(
self.load_excel_file(page, **partition),
sheet_name=self._metadata.get_sheet_name(page, **partition),
skiprows=self._metadata.get_skiprows(page, **partition),
skipfooter=self._metadata.get_skipfooter(page, **partition),
dtype=self.get_dtypes(page, **partition),
engine="calamine",
)
newdata = pudl.helpers.simplify_columns(newdata)
newdata = self.process_raw(newdata, page, **partition)
newdata = self.process_renamed(newdata, page, **partition)
dfs.append(newdata)
# check if there are any missing or extra columns
str_part = str(list(partition.values())[0])
col_map = self.METADATA._column_map[page]
page_cols = col_map.loc[
(col_map[str_part].notnull()) & (col_map[str_part] != -1),
[str_part],
].index
expected_cols = page_cols.union(self.cols_added)
if set(newdata.columns) != set(expected_cols):
# TODO (bendnorman): Enforce canonical fields for all raw fields?
extra_raw_cols = set(newdata.columns).difference(expected_cols)
missing_raw_cols = set(expected_cols).difference(newdata.columns)
if extra_raw_cols:
logger.warning(
f"{page}/{str_part}:Extra columns found in extracted table:"
f"\n{extra_raw_cols}"
)
if missing_raw_cols:
logger.warning(
f"{page}/{str_part}: Expected columns not found in extracted table:"
f"\n{missing_raw_cols}"
)
df = pd.concat(dfs, sort=True, ignore_index=True)
# After all years are loaded, add empty columns that could appear
# in other years so that df matches the database schema
missing_cols = list(
set(self._metadata.get_all_columns(page)).difference(df.columns)
)
df = pd.concat([df, pd.DataFrame(columns=missing_cols)], sort=True)
raw_dfs[page] = self.process_final_page(df=df, page=page)
return raw_dfs
[docs]
def load_excel_file(self, page: str, **partition) -> pd.ExcelFile:
"""Produce the ExcelFile object for the given (partition, page).
Args:
page: pudl name for the dataset contents, eg "boiler_generator_assn" or
"coal_stocks".
partition: partition to load. (ex: 2009 for year partition or "2020-08" for
year_month partition)
Returns:
Parsed Excel spreadsheet page.
"""
xlsx_filename = self.excel_filename(page, **partition)
if xlsx_filename not in self._file_cache:
excel_file = None
try:
# eia860m exports the resources as raw xlsx files that are not
# embedded in zip archives. To support this, we will first try
# to retrieve the resource directly. If this fails, we will attempt
# to open zip archive and locate the xlsx file inside that.
# TODO(rousik): if we can make it so, it would be useful to normalize
# the eia860m and zip the xlsx files. Then we could simplify this code.
res = self.ds.get_unique_resource(
self._dataset_name, name=xlsx_filename
)
excel_file = pd.ExcelFile(res, engine="calamine")
except KeyError:
with self.ds.get_zipfile_resource(
self._dataset_name,
**self.zipfile_resource_partitions(page, **partition),
) as zf:
# If loading the excel file from the zip fails then try to open a dbf file.
extension = pathlib.Path(xlsx_filename).suffix.lower()
if extension == ".dbf":
with zf.open(xlsx_filename) as dbf_filepath:
df = pd.DataFrame(
iter(dbfread.DBF(xlsx_filename, filedata=dbf_filepath))
)
excel_file = pudl.helpers.convert_df_to_excel_file(
df, index=False
)
else:
excel_file = pd.ExcelFile(
BytesIO(zf.read(xlsx_filename)), engine="calamine"
)
finally:
self._file_cache[xlsx_filename] = excel_file
# TODO(rousik): this _file_cache could be replaced with @cache or @memoize annotations
return self._file_cache[xlsx_filename]
[docs]
def excel_filename(self, page, **partition):
"""Produce the xlsx document file name as it will appear in the archive.
Args:
page: pudl name for the dataset contents, eg "boiler_generator_assn" or
"coal_stocks"
partition: partition to load. (ex: 2009 for year partition or "2020-08" for
year_month partition)
Returns:
string name of the xlsx file
"""
return self.METADATA.get_file_name(page, **partition)
@op
[docs]
def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]:
"""Concatenate similar pages of data from different years into single dataframes.
Transform a list of dictionaries of dataframes into a single dictionary of
dataframes, where each dataframe is the concatenation of dataframes with identical
keys from the input list.
Args:
paged_dfs: A list of dictionaries whose keys are page names, and values are
extracted DataFrames. Each element of the list corresponds to a single
year of the dataset being extracted.
Returns:
A dictionary of DataFrames keyed by page name, where the DataFrame contains that
page's data from all extracted years concatenated together.
"""
# Transform the list of dictionaries of dataframes into a dictionary of lists of
# dataframes, in which all dataframes in each list represent different instances of
# the same page of data from different years
all_data = defaultdict(list)
for dfs in paged_dfs:
for page in dfs:
all_data[page].append(dfs[page])
# concatenate the dataframes in each list in the dictionary into a single dataframe
for page in all_data:
all_data[page] = pd.concat(all_data[page]).reset_index(drop=True)
return all_data
[docs]
def year_extractor_factory(
extractor_cls: type[GenericExtractor], name: str
) -> OpDefinition:
"""Construct a Dagster op that extracts one year of data, given an extractor class.
Args:
extractor_cls: Class of type :class:`GenericExtractor` used to extract the data.
name: Name of an Excel based dataset (e.g. "eia860").
"""
def extract_single_year(context, year: int) -> dict[str, pd.DataFrame]:
"""A function that extracts a year of spreadsheet data from an Excel file.
This function will be decorated with a Dagster op and returned.
Args:
context: Dagster keyword that provides access to resources and config.
year: Year of data to extract.
Returns:
A dictionary of DataFrames extracted from Excel, keyed by page name.
"""
ds = context.resources.datastore
return extractor_cls(ds).extract(year=[year])
return op(
required_resource_keys={"datastore", "dataset_settings"},
name=f"extract_single_{name}_year",
)(extract_single_year)
[docs]
def years_from_settings_factory(name: str) -> OpDefinition:
"""Construct a Dagster op to get target years from settings in the Dagster context.
Args:
name: Name of an Excel based dataset (e.g. "eia860").
"""
def years_from_settings(context) -> DynamicOutput:
"""Produce target years for the given dataset from the dataset settings object.
These will be used to kick off worker processes to extract each year of data in
parallel.
Yields:
A Dagster :class:`DynamicOutput` object representing the year to be
extracted. See the Dagster API documentation for more details:
https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut
"""
if "eia" in name: # Account for nested settings if EIA
year_settings = context.resources.dataset_settings.eia
else:
year_settings = context.resources.dataset_settings
for year in getattr(year_settings, name).years:
yield DynamicOutput(year, mapping_key=str(year))
return op(
out=DynamicOut(),
required_resource_keys={"dataset_settings"},
name=f"{name}_years_from_settings",
)(years_from_settings)
[docs]
def raw_df_factory(
extractor_cls: type[GenericExtractor], name: str
) -> AssetsDefinition:
"""Return a dagster graph asset to extract a set of raw DataFrames from Excel files.
Args:
extractor_cls: The dataset-specific Excel extractor used to extract the data.
Needs to correspond to the dataset identified by ``name``.
name: Name of an Excel based dataset (e.g. "eia860").
"""
# Build a Dagster op that can extract a single year of data
year_extractor = year_extractor_factory(extractor_cls, name)
# Get the list of target years to extract from the PUDL ETL settings object which is
# stored in the Dagster context that is available to all ops.
years_from_settings = years_from_settings_factory(name)
def raw_dfs() -> dict[str, pd.DataFrame]:
"""Produce a dictionary of extracted dataframes."""
years = years_from_settings()
# Clone dagster op for each year using DynamicOut.map()
# See https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut
dfs = years.map(lambda year: year_extractor(year))
# Collect the results from all of those cloned ops and concatenate the
# individual years of data into a single multi-year dataframe for each different
# page in the spreadsheet based dataset using DynamicOut.collect()
return concat_pages(dfs.collect())
return graph_asset(name=f"raw_{name}__all_dfs")(raw_dfs)