"""
Routines for generating PUDL tabular data package and resource metadata.
This module enables the generation and use of the metadata for tabular data
packages. It also saves and validates the datapackage once the
metadata is compiled. In general the routines in this module can only be used
**after** the referenced CSV's have been generated by the top level PUDL ETL
module, and written out to the datapackage data directory by the
`pudl.load.csv` module.
The metadata comes from three basic sources: the datapkg_settings that are read
in from the YAML file specifying the datapackage or bundle of datapackages to
be generated, the CSV files themselves (their names, sizes, and hash values)
and the stored metadata template which ultimately determines the structure of
the relational database that these output tabular data packages represent, and
encodes field specific table schemas. See the "megadata" which is stored in
`src/pudl/package_data/meta/datapkg/datapackage.json`.
For unpartitioned tables which are contained in a single tabular data resource
this is a relatively straightforward process. However, larger tables that have
been partitioned into smaller tabular data resources that are part of a
resource group (e.g. EPA CEMS) have additional complexities. We have tried to
say "resource" when referring to an individual output CSV that has its own
metadata entry, and "table" when referring to whole tables which typically
contain only a single resource, but may be composed of hundreds or even
thousands of individual resources.
See https://frictionlessdata.io for more details on the tabular data package
standards.
In addition, we have included PUDL specific metadata fields that document the
ETL parameters which were used to process the data, temporal and spatial
coverage for each resource, Zenodo DOIs if appropriate, UUIDs to identify the
individual data packages as well as co-generated bundles of data packages that
can be used together to instantiate a single database, etc.
"""
import datetime
import hashlib
import importlib
import json
import logging
import pathlib
import re
import uuid
import datapackage
import goodtables
import pkg_resources
import pudl
from pudl import constants as pc
logger = logging.getLogger(__name__)
##############################################################################
# CREATING PACKAGES AND METADATA
##############################################################################
[docs]def hash_csv(csv_path):
"""Calculates a SHA-256 hash of the CSV file for data integrity checking.
Args:
csv_path (path-like) : Path the CSV file to hash.
Returns:
str: the hexdigest of the hash, with a 'sha256:' prefix.
"""
# how big of a bit should I take?
blocksize = 65536
# sha256 is the fastest relatively secure hashing algorith.
hasher = hashlib.sha256()
# opening the file and eat it for lunch
with open(csv_path, 'rb') as afile:
buf = afile.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(blocksize)
# returns the hash
return f"sha256:{hasher.hexdigest()}"
[docs]def compile_partitions(datapkg_settings):
"""
Given a datapackage settings dictionary, extract dataset partitions.
Iterates through all the datasets enumerated in the datapackage settings,
and compiles a dictionary indicating which datasets should be partitioned
and on what basis when they are output as tabular data resources. Currently
this only applies to the epacems dataset. Datapackage settings must be
validated because currently we inject EPA CEMS partitioning variables
(epacems_years, epacems_states) during the validation process.
Args:
datapkg_settings (dict): a dictionary containing validated datapackage
settings, mostly read in from a PUDL ETL settings file.
Returns:
dict: Uses table name (e.g. hourly_emissions_epacems) as keys, and
lists of partition variables (e.g. ["epacems_years", "epacems_states"])
as the values. If no datasets within the datapackage are being
partitioned, this is an empty dictionary.
"""
partitions = {}
for dataset in datapkg_settings['datasets']:
for dataset_name in dataset:
try:
partitions.update(dataset[dataset_name]['partition'])
except KeyError:
pass
return partitions
[docs]def get_unpartitioned_tables(resources, datapkg_settings):
"""
Generate a list of database table names from a list of data resources.
In the case of EPA CEMS and potentially other large datasets, we are
partitioning a single table into many tabular data resources that are
part of a resource group. However in some contexts we want to refer to the
list of corresponding databse tables, rather than the list of resources.
The partition key in the datapackage settings is the name of the table
without the partition elements, and so in the case of partitioned tables
we use that key as the name of the table. Otherwise we just use the name
of the resource.
Args:
resources (iterable): A list of tabular data resource names. They must
be expected to appear in the datapackage specified by
datapkg_settings.
datapkg_settings (dict): a dictionary containing validated datapackage
settings, mostly read in from a PUDL ETL settings file.
Returns:
list: The names of the database tables corresponding to the tabular
datapackage resource names that were passed in.
"""
partitions = compile_partitions(datapkg_settings)
tables_unpartitioned = set()
if not partitions:
tables_unpartitioned = resources
else:
for resource in resources:
for table in partitions.keys():
if table in resource:
tables_unpartitioned.add(table)
else:
tables_unpartitioned.add(resource)
return tables_unpartitioned
[docs]def data_sources_from_tables(table_names):
"""
Look up data sources used by the given list of PUDL database tables.
Args:
tables_names (iterable): a list of names of 'seed' tables, whose
dependencies we are seeking to find.
Returns:
set: The set of data sources for the list of PUDL table names.
"""
all_tables = get_dependent_tables_from_list(table_names)
table_sources = set()
# All tables get PUDL:
table_sources.add('pudl')
for t in all_tables:
for src in pc.data_sources:
if re.match(f".*_{src}$", t):
table_sources.add(src)
return table_sources
[docs]def get_datapkg_fks(datapkg_json):
"""
Get a dictionary of foreign key relationships from datapackage metadata.
Args:
datapkg_json (path-like): Path to the datapackage.json
containing the schema from which the foreign key relationships
will be read.
Returns:
dict: table names (keys) with lists of table names (values) which the
key table has forgien key relationships with.
"""
with open(datapkg_json) as md:
metadata = json.load(md)
fk_relash = {}
for tbl in metadata['resources']:
fk_relash[tbl['name']] = []
if 'foreignKeys' in tbl['schema']:
fk_tables = []
for fk in tbl['schema']['foreignKeys']:
fk_tables.append(fk['reference']['resource'])
fk_relash[tbl['name']] = fk_tables
return fk_relash
[docs]def get_dependent_tables(table_name, fk_relash):
"""
For a given table, get the list of all the other tables it depends on.
Args:
table_name (str): The table whose dependencies we are looking for.
fk_relash (dict): table names (keys) with lists of table names (values)
which the key table has forgien key relationships with.
Returns:
set: the set of all the tables the specified table depends upon.
"""
# Add the initial table
dependent_tables = set()
dependent_tables.add(table_name)
# Get the list of tables this table depends on:
new_table_names = set()
new_table_names.update(fk_relash[table_name])
# Recursively call this function on the tables our initial
# table depends on:
for table_name in new_table_names:
logger.debug(f"Finding dependent tables for {table_name}")
dependent_tables.add(table_name)
for t in get_dependent_tables(table_name, fk_relash):
dependent_tables.add(t)
return dependent_tables
[docs]def get_dependent_tables_from_list(table_names):
"""
Given a list of tables, find all the other tables they depend on.
Iterate over a list of input tables, adding them and all of their dependent
tables to a set, and return that set. Useful for determining which tables
need to be exported together to yield a self-contained subset of the PUDL
database.
Args:
table_names (iterable): a list of names of 'seed' tables, whose
dependencies we are seeking to find.
Returns:
set: All tables with which any of the input tables have ForeignKey
relations.
"""
with importlib.resources.path('pudl.package_data.meta.datapkg',
'datapackage.json') as datapkg_json:
fk_relash = get_datapkg_fks(datapkg_json)
all_the_tables = set()
for t in table_names:
for x in get_dependent_tables(t, fk_relash):
all_the_tables.add(x)
return all_the_tables
[docs]def pull_resource_from_megadata(resource_name):
"""
Read metadata for a given data resource from the stored PUDL megadata.
Args:
resource_name (str): the name of the tabular data resource whose JSON
descriptor we are reading.
Returns:
dict: A Python dictionary containing the resource descriptor portion of
a data package descriptor, not expected to be valid or complete.
Raises:
ValueError: If table_name is not found exactly one time in the PUDL
metadata library.
"""
with importlib.resources.open_text('pudl.package_data.meta.datapkg',
'datapackage.json') as datapkg_json:
metadata_mega = json.load(datapkg_json)
# bc we partition the CEMS output, the CEMS table name includes the state,
# year or other partition.. therefor we need to assume for the sake of
# grabing metadata that any table name that includes the table name is cems
if "hourly_emissions_epacems" in resource_name:
table_name_mega = "hourly_emissions_epacems"
else:
table_name_mega = resource_name
table_resource = [
x for x in metadata_mega['resources'] if x['name'] == table_name_mega
]
if len(table_resource) == 0:
raise ValueError(f"{resource_name} not found in stored metadata.")
if len(table_resource) > 1:
raise ValueError(f"{resource_name} found multiple times in metadata.")
table_resource = table_resource[0]
# rename the resource name to the og table name
# this is important for the partitioned tables in particular
table_resource['name'] = resource_name
return table_resource
[docs]def spatial_coverage(resource_name):
"""
Extract spatial coverage (country and state) for a given source.
Args:
resource_name (str): The name of the (potentially partitioned) resource
for which we are enumerating the spatial coverage. Currently this
is the only place we are able to access the partitioned spatial
coverage after the ETL process has completed.
Returns:
dict: A dictionary containing country and potentially state level
spatial coverage elements. Country keys are "country" for the full name
of country, "iso_3166-1_alpha-2" for the 2-letter ISO code, and
"iso_3166-1_alpha-3" for the 3-letter ISO code. State level elements
are "state" (a two letter ISO code for sub-national jurisdiction) and
"iso_3166-2" for the combined country-state code conforming to that
standard.
"""
coverage = {
"country": "United States of America",
# More generally... ISO 3166-1 2-letter country code:
"iso_3166-1_alpha-2": "US",
# More generally... ISO 3166-1 3-letter country code:
"iso_3166-1_alpha-3": "USA",
}
if "hourly_emissions_epacems" in resource_name:
us_state = resource_name.split("_")[4].upper()
coverage["state"] = us_state
# ISO3166-2:US code for the relevant state or outlying area:
coverage["iso_3166-2"] = f"US-{us_state}"
return coverage
[docs]def temporal_coverage(resource_name, datapkg_settings):
"""Extract start and end dates from ETL parameters for a given source.
Args:
resource_name (str): The name of the (potentially partitioned) resource
for which we are enumerating the spatial coverage. Currently this
is the only place we are able to access the partitioned spatial
coverage after the ETL process has completed.
datapkg_settings (dict): Python dictionary represeting the ETL
parameters read in from the settings file, pertaining to the
tabular datapackage this resource is part of.
Returns:
dict: A dictionary of two items, keys "start_date" and "end_date" with
values in ISO 8601 YYYY-MM-DD format, indicating the extent of the
time series data contained within the resource. If the resource does
not contain time series data, the dates are null.
"""
start_date = None
end_date = None
if "hourly_emissions_epacems" in resource_name:
year = resource_name.split("_")[3]
start_date = f"{year}-01-01"
end_date = f"{year}-12-31"
else:
source_years = f"{resource_name.split('_')[-1]}_years"
for dataset in datapkg_settings["datasets"]:
etl_params = list(dataset.values())[0]
try:
start_date = f"{min(etl_params[source_years])}-01-01"
end_date = f"{max(etl_params[source_years])}-12-31"
break
except KeyError:
continue
return {"start_date": start_date, "end_date": end_date}
[docs]def get_tabular_data_resource(resource_name, datapkg_dir,
datapkg_settings, partitions=False):
"""
Create a Tabular Data Resource descriptor for a PUDL table.
Based on the information in the database, and some additional metadata this
function will generate a valid Tabular Data Resource descriptor, according
to the Frictionless Data specification, which can be found here:
https://frictionlessdata.io/specs/tabular-data-resource/
Args:
resource_name (string): name of the tabular data resource for which you
want to generate a Tabular Data Resource descriptor. This is the
resource name, rather than the database table name, because we
partition large tables into resource groups consisting of many
files.
datapkg_dir (path-like): The location of the directory for this
package. The data package directory will be a subdirectory in the
`datapkg_dir` directory, with the name of the package as the name
of the subdirectory.
datapkg_settings (dict): Python dictionary represeting the ETL
parameters read in from the settings file, pertaining to the
tabular datapackage this resource is part of.
partitions (dict): A dictionary with PUDL database table names as the
keys (e.g. hourly_emissions_epacems), and lists of partition
variables (e.g. ["epacems_years", "epacems_states"]) as the keys.
Returns:
dict: A Python dictionary representing a tabular data resource
descriptor that complies with the Frictionless Data specification.
"""
# Only some datasets have meaningful temporal coverage:
# temporal_data = ["eia860", "eia923", "ferc1", "eia861", "epacems"]
# every time we want to generate the cems table, we want it compressed
if "hourly_emissions_epacems" in resource_name:
abs_path = pathlib.Path(datapkg_dir, "data", f"{resource_name}.csv.gz")
else:
abs_path = pathlib.Path(datapkg_dir, "data", f"{resource_name}.csv")
# pull the skeleton of the descriptor from the megadata file
descriptor = pull_resource_from_megadata(resource_name)
descriptor["path"] = str(abs_path.relative_to(abs_path.parent.parent))
descriptor["bytes"] = abs_path.stat().st_size
descriptor["hash"] = hash_csv(abs_path)
descriptor["created"] = (
datetime.datetime.utcnow()
.replace(microsecond=0)
.isoformat() + "Z"
)
unpartitioned_tables = get_unpartitioned_tables([resource_name],
datapkg_settings)
data_sources = data_sources_from_tables(unpartitioned_tables)
descriptor["sources"] = [pc.data_source_info[src] for src in data_sources]
descriptor["coverage"] = {
"temporal": temporal_coverage(resource_name, datapkg_settings),
"spatial": spatial_coverage(resource_name),
}
if partitions:
for part in partitions.keys():
if part in resource_name:
descriptor["group"] = part
resource = datapackage.Resource(descriptor)
if resource.valid:
logger.debug(f"{resource_name} is a valid resource")
else:
logger.info(resource)
raise ValueError(
f"""
Invalid tabular data resource descriptor: {resource.name}
Errors:
{resource.errors}
"""
)
return descriptor
[docs]def compile_keywords(data_sources):
"""Compile the set of all keywords associated with given data sources.
The list of keywords we associate with each data source is stored in
the ``pudl.constants.keywords_by_data_source`` dictionary.
Args:
data_sources (iterable): List of data source codes (eia923, ferc1,
etc.) from which to gather keywords.
Returns:
list: the set of all unique keywords associated with any of the input
data sources.
"""
keywords = set()
for src in data_sources:
keywords.update(pc.keywords_by_data_source[src])
return list(keywords)
[docs]def get_autoincrement_columns(unpartitioned_tables):
"""Grab the autoincrement columns for pkg tables."""
with importlib.resources.open_text('pudl.package_data.meta.datapkg',
'datapackage.json') as md:
metadata_mega = json.load(md)
autoincrement = {}
for table in unpartitioned_tables:
try:
autoincrement[table] = metadata_mega['autoincrement'][table]
except KeyError:
pass
return autoincrement
[docs]def validate_save_datapkg(datapkg_descriptor, datapkg_dir,
row_limit=1000, table_limit=10):
"""
Validate datapackage descriptor, save it, and validate some sample data.
Args:
datapkg_descriptor (dict): A Python dictionary representation of a
(hopefully valid) tabular datapackage descriptor.
datapkg_dir (path-like): Directory into which the datapackage.json
file containing the tabular datapackage descriptor should be
written.
row_limit (int): Number of rows to validate in each table. Passed in to
goodtables.validate()
table_limit (int): Number of different tables to validate within the
datapackage. Passed in in to goodtables.validate(). Note that for
larger numbers of tables this has caused memory issues, not sure
why.
Returns:
dict: A dictionary containing the goodtables datapackage validation
report. Note that this will only be returned if there are no errors,
otherwise it is output as an error message.
Raises:
ValueError: if the datapackage descriptor passed in is invalid, or if
any of the tables has a data validation error.
"""
# Use that descriptor to instantiate a Package object
datapkg = datapackage.Package(datapkg_descriptor)
# Validate the data package descriptor before we go to
logger.info(
f"Validating JSON descriptor for {datapkg.descriptor['name']} "
f"tabular data package...")
if not datapkg.valid:
raise ValueError(
f"Invalid tabular data package: {datapkg.descriptor['name']} "
f"Errors: {datapkg.errors}")
logger.info('JSON descriptor appears valid!')
# datapkg_json is the datapackage.json that we ultimately output:
datapkg_json = pathlib.Path(datapkg_dir, "datapackage.json")
datapkg.save(str(datapkg_json))
logger.info(
f"Validating a sample of data from {datapkg.descriptor['name']} "
f"tabular data package using goodtables...")
# Validate the data within the package using goodtables:
report = goodtables.validate(
datapkg_json,
# TODO: check which checks are applied... and uncomment out the line
# below when the checks are integrated
# checks=['structure', 'schema', 'foreign-key'],
# table_limit=100,
row_limit=1000)
if not report["valid"]:
goodtables_errors = ""
for table in report["tables"]:
if not table["valid"]:
goodtables_errors += str(table["source"])
goodtables_errors += str(table["errors"])
raise ValueError(
f"Data package data validation failed with goodtables. "
f"Errors: {goodtables_errors}"
)
logger.info("Congrats! You made a valid data package!")
logger.info("============================================================")
logger.info(" If you like PUDL (or not!) we'd love to hear from you...")
logger.info(" Let us know you are using PUDL at: hello@catalyst.coop")
logger.info(" Sign up for our newsletter: https://catalyst.coop/updates/")
logger.info("============================================================")
return report