pudl.workspace.datastore#

Datastore manages file retrieval for PUDL datasets.

Module Contents#

Classes#

DatapackageDescriptor

A simple wrapper providing access to datapackage.json contents.

ZenodoDoiSettings

Digital Object Identifiers pointing to currently used Zenodo archives.

ZenodoFetcher

API for fetching datapackage descriptors and resource contents from zenodo.

Datastore

Handle connections and downloading of Zenodo Source archives.

Functions#

print_partitions(→ None)

Prints known partition keys and its values for each of the datasets.

validate_cache(→ None)

Validate elements in the datastore cache.

fetch_resources(→ None)

Retrieve all matching resources and store them in the cache.

_parse_key_values(→ dict[str, str])

Parse key-value pairs into a Python dictionary.

pudl_datastore(dataset, validate, list_partitions, ...)

Manage the raw data inputs to the PUDL data processing pipeline.

Attributes#

pudl.workspace.datastore.logger[source]#
pudl.workspace.datastore.ZenodoDoi[source]#
exception pudl.workspace.datastore.ChecksumMismatchError[source]#

Bases: ValueError

Resource checksum (md5) does not match.

class pudl.workspace.datastore.DatapackageDescriptor(datapackage_json: dict, dataset: str, doi: ZenodoDoi)[source]#

A simple wrapper providing access to datapackage.json contents.

get_resource_path(name: str) str[source]#

Returns zenodo url that holds contents of given named resource.

_get_resource_metadata(name: str) dict[source]#
get_download_size() int[source]#

Returns the total download size of all the resources in MB.

validate_checksum(name: str, content: str) bool[source]#

Returns True if content matches checksum for given named resource.

_matches(res: dict, **filters: Any)[source]#
_match_from_partition(parts: dict[str, str], k: str, v: str | list[str, str])[source]#
get_resources(name: str = None, **filters: Any) collections.abc.Iterator[pudl.workspace.resource_cache.PudlResourceKey][source]#

Returns series of PudlResourceKey identifiers for matching resources.

Parameters:
  • name – if specified, find resource(s) with this name.

  • filters (dict) – if specified, find resoure(s) matching these key=value constraints. The constraints are matched against the ‘parts’ field of the resource entry in the datapackage.json.

get_partitions(name: str = None) dict[str, set[str]][source]#

Return mapping of known partition keys to their allowed known values.

get_partition_filters(**filters: Any) collections.abc.Iterator[dict[str, str]][source]#

Returns list of all known partition mappings.

This can be used to iterate over all resources as the mappings can be directly used as filters and should map to unique resource.

Parameters:

filters – additional constraints for selecting relevant partitions.

_validate_datapackage(datapackage_json: dict)[source]#

Checks the correctness of datapackage.json metadata.

Throws ValueError if invalid.

get_json_string() str[source]#

Exports the underlying json as normalized (sorted, indented) json string.

class pudl.workspace.datastore.ZenodoDoiSettings(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: pydantic_settings.sources.DotenvType | None = ENV_FILE_SENTINEL, _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | pathlib.Path | None = None, **values: Any)[source]#

Bases: pydantic_settings.BaseSettings

Digital Object Identifiers pointing to currently used Zenodo archives.

censusdp1tract: ZenodoDoi = '10.5281/zenodo.4127049'[source]#
eia176: ZenodoDoi = '10.5281/zenodo.10607843'[source]#
eia191: ZenodoDoi = '10.5281/zenodo.10607837'[source]#
eia757a: ZenodoDoi = '10.5281/zenodo.10607839'[source]#
eia860: ZenodoDoi = '10.5281/zenodo.10067566'[source]#
eia860m: ZenodoDoi = '10.5281/zenodo.10603998'[source]#
eia861: ZenodoDoi = '10.5281/zenodo.10204708'[source]#
eia923: ZenodoDoi = '10.5281/zenodo.10603997'[source]#
eiawater: ZenodoDoi = '10.5281/zenodo.10806016'[source]#
eia_bulk_elec: ZenodoDoi = '10.5281/zenodo.10603995'[source]#
epacamd_eia: ZenodoDoi = '10.5281/zenodo.7900974'[source]#
epacems: ZenodoDoi = '10.5281/zenodo.10603994'[source]#
ferc1: ZenodoDoi = '10.5281/zenodo.8326634'[source]#
ferc2: ZenodoDoi = '10.5281/zenodo.8326697'[source]#
ferc6: ZenodoDoi = '10.5281/zenodo.8326696'[source]#
ferc60: ZenodoDoi = '10.5281/zenodo.8326695'[source]#
ferc714: ZenodoDoi = '10.5281/zenodo.8326694'[source]#
phmsagas: ZenodoDoi = '10.5281/zenodo.10493790'[source]#
model_config[source]#
class pudl.workspace.datastore.ZenodoFetcher(zenodo_dois: ZenodoDoiSettings | None = None, timeout: float = 15.0)[source]#

API for fetching datapackage descriptors and resource contents from zenodo.

_descriptor_cache: dict[str, DatapackageDescriptor][source]#
zenodo_dois: ZenodoDoiSettings[source]#
timeout: float[source]#
get_doi(dataset: str) ZenodoDoi[source]#

Returns DOI for given dataset.

get_known_datasets() list[str][source]#

Returns list of supported datasets.

_get_url(doi: ZenodoDoi) pydantic.HttpUrl[source]#

Construct a Zenodo depsition URL based on its Zenodo DOI.

_fetch_from_url(url: pydantic.HttpUrl) requests.Response[source]#
get_descriptor(dataset: str) DatapackageDescriptor[source]#

Returns class:DatapackageDescriptor for given dataset.

get_resource(res: pudl.workspace.resource_cache.PudlResourceKey) bytes[source]#

Given resource key, retrieve contents of the file from zenodo.

class pudl.workspace.datastore.Datastore(local_cache_path: pathlib.Path | None = None, gcs_cache_path: str | None = None, timeout: float = 15.0)[source]#

Handle connections and downloading of Zenodo Source archives.

get_known_datasets() list[str][source]#

Returns list of supported datasets.

get_datapackage_descriptor(dataset: str) DatapackageDescriptor[source]#

Fetch datapackage descriptor for dataset either from cache or Zenodo.

get_resources(dataset: str, cached_only: bool = False, skip_optimally_cached: bool = False, **filters: Any) collections.abc.Iterator[tuple[pudl.workspace.resource_cache.PudlResourceKey, bytes]][source]#

Return content of the matching resources.

Parameters:
  • dataset – name of the dataset to query.

  • cached_only – if True, only retrieve resources that are present in the cache.

  • skip_optimally_cached – if True, only retrieve resources that are not optimally cached. This triggers attempt to optimally cache these resources.

  • filters (key=val) – only return resources that match the key-value mapping in their

  • metadata["parts"].

Yields:

(PudlResourceKey, io.BytesIO) holding content for each matching resource

remove_from_cache(res: pudl.workspace.resource_cache.PudlResourceKey) None[source]#

Remove given resource from the associated cache.

get_unique_resource(dataset: str, **filters: Any) bytes[source]#

Returns content of a resource assuming there is exactly one that matches.

get_zipfile_resource(dataset: str, **filters: Any) zipfile.ZipFile[source]#

Retrieves unique resource and opens it as a ZipFile.

get_zipfile_resources(dataset: str, **filters: Any) collections.abc.Iterator[tuple[pudl.workspace.resource_cache.PudlResourceKey, zipfile.ZipFile]][source]#

Iterates over resources that match filters and opens each as ZipFile.

get_zipfile_file_names(zip_file: zipfile.ZipFile)[source]#

Given a zipfile, return a list of the file names in it.

pudl.workspace.datastore.print_partitions(dstore: Datastore, datasets: list[str]) None[source]#

Prints known partition keys and its values for each of the datasets.

pudl.workspace.datastore.validate_cache(dstore: Datastore, datasets: list[str], partition: dict[str, str]) None[source]#

Validate elements in the datastore cache.

Delete invalid entires from cache.

pudl.workspace.datastore.fetch_resources(dstore: Datastore, datasets: list[str], partition: dict[str, int | str], gcs_cache_path: str, bypass_local_cache: bool) None[source]#

Retrieve all matching resources and store them in the cache.

pudl.workspace.datastore._parse_key_values(ctx: click.core.Context, param: click.Option, values: str) dict[str, str][source]#

Parse key-value pairs into a Python dictionary.

Transforms a command line argument of the form: k1=v1,k2=v2,k3=v3… into: {k1:v1, k2:v2, k3:v3, …}

pudl.workspace.datastore.pudl_datastore(dataset: list[str], validate: bool, list_partitions: bool, partition: dict[str, int | str], gcs_cache_path: str, bypass_local_cache: bool, logfile: pathlib.Path, loglevel: str)[source]#

Manage the raw data inputs to the PUDL data processing pipeline.

Download all the raw FERC Form 2 data:

pudl_datastore –dataset ferc2

Download the raw FERC Form 2 data only for 2021

pudl_datastore –dataset ferc2 –partition year=2021

Re-download the raw FERC Form 2 data for 2021 even if you already have it:

pudl_datastore –dataset ferc2 –partition year=2021 –bypass-local-cache

Validate all California EPA CEMS data in the local datastore:

pudl_datastore –dataset epacems –validate –partition state=ca

List the available partitions in the EIA-860 and EIA-923 datasets:

pudl_datastore –dataset eia860 –dataset eia923 –list-partitions