"""
This module takes a bundle of datapackages and flattens them.
Because we have enabled the generation of multiple data packages as a part of a
data package "bundle", we need to squish the multiple data packages together in
order to put all of the pudl data into one data package. This is especailly
useful for converting the data package to a SQLite database or any other format.
The module does two main things:
- squish the csv's together
- squish the metadata (datapackage.json) files together
The CSV squishing is pretty simple and is all being done in
`flatten_data_packages_csvs`. We are assuming and enforcing that if two data
packages include the same dataset, that dataset has the same ETL parameters
(years, tables, states, etc.). The metadata is slightly more complicated to
compile because each element of the metadata is structured differently. Most of
that work is being done in `flatten_data_package_metadata`.
"""
import json
import logging
import os
import pathlib
import shutil
import pudl
logger = logging.getLogger(__name__)
##############################################################################
# Flattening PUDL Data Packages
##############################################################################
[docs]def flatten_data_packages_csvs(pkg_bundle_dir, pkg_name='pudl-all'):
"""
Copy the CSVs into a new data package directory.
Args:
pkg_bundle_dir (path-like): the subdirectory where the bundle of data
packages live
pkg_name (str): the name you choose for the flattened data package.
"""
# set where the flattened datapackage is going to live
all_dir = pathlib.Path(pkg_bundle_dir, pkg_name)
# delete the subdirectory if it exists
if os.path.exists(all_dir):
shutil.rmtree(all_dir)
# make the subdirectory..
os.mkdir(all_dir)
# we also need the sub-subdirectory for the data
all_data_dir = pathlib.Path(pkg_bundle_dir, pkg_name, 'data')
os.mkdir(all_data_dir)
# for each of the package directories, copy over the csv's
for pkg_dir in pkg_bundle_dir.iterdir():
# copy all the csv's except not from all_dir - would make duplicates or
# from the epacems package (because it has CEMS and EIA files).
if pkg_dir != all_dir: # and 'epacems' in pkg_dir.name:
for csv in pathlib.Path(pkg_dir, 'data').iterdir():
# if the csv already exists, shutil.copy will overrite. this is
# fine because we've already checked if the parameters are the
# same
shutil.copy(csv, all_data_dir)
# for the CEMS pacakge, only pull the actual CEMS tables.
# elif 'epacems' in pkg_dir.name:
# for csv in pathlib.Path(pkg_dir, 'data').iterdir():
# shutil.copy(csv, all_data_dir)
# if 'hourly_emissions_epacems' in csv.name:
# shutil.copy(csv, all_data_dir)
[docs]def get_all_sources(pkg_descriptor_elements):
"""Grab list of all of the datasets in a data package bundle."""
titles = set()
for sources in pkg_descriptor_elements['sources']:
for source in sources:
titles.add(source['title'])
return(titles)
[docs]def check_for_matching_parameters(pkg_bundle_dir, pkg_name):
"""
Check to see if the ETL parameters for datasets are the same across dp's.
Args:
pkg_bundle_dir (path-like): the subdirectory where the bundle of data
packages live
pkg_name (str): the name you choose for the flattened data package.
"""
logger.info('Checking for matching ETL parameters across data packages')
# grab all of the metadata components
pkg_descriptor_elements = compile_data_packages_metadata(pkg_bundle_dir,
pkg_name=pkg_name)
# grab all of the "titles" (read sources)
titles = get_all_sources(pkg_descriptor_elements)
# check if
for title in titles:
samezies = get_same_source_meta(pkg_descriptor_elements, title)
# for each of the source dictionaries, check if they are the same
for source_dict in samezies:
if not samezies[0] == source_dict:
raise AssertionError(f'parameters do not match for {title}')
[docs]def flatten_pudl_datapackages(pudl_settings,
pkg_bundle_name,
pkg_name='pudl-all'):
"""
Combines a collection of PUDL data packages into one.
Args:
pkg_bundle_name (str): the name of the subdirectory where the bundle of
data packages live. Normally, this name will have been generated in
`generate_data_packages`.
pudl_settings (dict) : a dictionary filled with settings that mostly
describe paths to various resources and outputs.
pkg_name (str): the name you choose for the flattened data package.
Returns:
dict: a dictionary of the data package validation report.
"""
# determine the subdirectory for the package bundles...
pkg_bundle_dir = pathlib.Path(pudl_settings['datapackage_dir'],
pkg_bundle_name)
if not os.path.exists(pkg_bundle_dir):
raise AssertionError(
"The datapackage bundle directory does not exist. ")
# check that data packages that have the same sources have the same parameters
check_for_matching_parameters(pkg_bundle_dir, pkg_name)
# copy the csv's into a new data package directory
flatten_data_packages_csvs(pkg_bundle_dir,
pkg_name=pkg_name)
# generate a flattened dp metadata descriptor
pkg_descriptor = flatten_data_package_metadata(pkg_bundle_dir,
pkg_name=pkg_name)
# using the pkg_descriptor, validate and save the data package metadata
report = pudl.load.metadata.validate_save_pkg(
pkg_descriptor,
pkg_dir=pathlib.Path(pkg_bundle_dir, pkg_name))
return(report)