pudl.workspace.datastore#

Datastore manages file retrieval for PUDL datasets.

Module Contents#

Classes#

DatapackageDescriptor

A simple wrapper providing access to datapackage.json contents.

ZenodoDoiSettings

Digital Object Identifiers pointing to currently used Zenodo archives.

ZenodoFetcher

API for fetching datapackage descriptors and resource contents from zenodo.

Datastore

Handle connections and downloading of Zenodo Source archives.

ParseKeyValues

Transforms k1=v1,k2=v2,...

Functions#

parse_command_line()

Collect the command line arguments.

print_partitions(→ None)

Prints known partition keys and its values for each of the datasets.

validate_cache(→ None)

Validate elements in the datastore cache.

fetch_resources(→ None)

Retrieve all matching resources and store them in the cache.

main()

Cache datasets.

Attributes#

pudl.workspace.datastore.logger[source]#
pudl.workspace.datastore.PUDL_YML[source]#
pudl.workspace.datastore.ZenodoDoi[source]#
exception pudl.workspace.datastore.ChecksumMismatchError[source]#

Bases: ValueError

Resource checksum (md5) does not match.

class pudl.workspace.datastore.DatapackageDescriptor(datapackage_json: dict, dataset: str, doi: ZenodoDoi)[source]#

A simple wrapper providing access to datapackage.json contents.

get_resource_path(name: str) str[source]#

Returns zenodo url that holds contents of given named resource.

_get_resource_metadata(name: str) dict[source]#
get_download_size() int[source]#

Returns the total download size of all the resources in MB.

validate_checksum(name: str, content: str) bool[source]#

Returns True if content matches checksum for given named resource.

_matches(res: dict, **filters: Any)[source]#
get_resources(name: str = None, **filters: Any) collections.abc.Iterator[pudl.workspace.resource_cache.PudlResourceKey][source]#

Returns series of PudlResourceKey identifiers for matching resources.

Parameters:
  • name – if specified, find resource(s) with this name.

  • filters (dict) – if specified, find resoure(s) matching these key=value constraints. The constraints are matched against the ‘parts’ field of the resource entry in the datapackage.json.

get_partitions(name: str = None) dict[str, set[str]][source]#

Return mapping of known partition keys to their allowed known values.

get_partition_filters(**filters: Any) collections.abc.Iterator[dict[str, str]][source]#

Returns list of all known partition mappings.

This can be used to iterate over all resources as the mappings can be directly used as filters and should map to unique resource.

Parameters:

filters – additional constraints for selecting relevant partitions.

_validate_datapackage(datapackage_json: dict)[source]#

Checks the correctness of datapackage.json metadata.

Throws ValueError if invalid.

get_json_string() str[source]#

Exports the underlying json as normalized (sorted, indented) json string.

class pudl.workspace.datastore.ZenodoDoiSettings(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: pydantic_settings.sources.DotenvType | None = ENV_FILE_SENTINEL, _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | pathlib.Path | None = None, **values: Any)[source]#

Bases: pydantic_settings.BaseSettings

Digital Object Identifiers pointing to currently used Zenodo archives.

censusdp1tract: ZenodoDoi = '10.5281/zenodo.4127049'[source]#
eia860: ZenodoDoi = '10.5281/zenodo.10067566'[source]#
eia860m: ZenodoDoi = '10.5281/zenodo.10204686'[source]#
eia861: ZenodoDoi = '10.5281/zenodo.10204708'[source]#
eia923: ZenodoDoi = '10.5281/zenodo.10067550'[source]#
eia_bulk_elec: ZenodoDoi = '10.5281/zenodo.7067367'[source]#
epacamd_eia: ZenodoDoi = '10.5281/zenodo.7900974'[source]#
epacems: ZenodoDoi = '10.5281/zenodo.8235497'[source]#
ferc1: ZenodoDoi = '10.5281/zenodo.8326634'[source]#
ferc2: ZenodoDoi = '10.5281/zenodo.8326697'[source]#
ferc6: ZenodoDoi = '10.5281/zenodo.8326696'[source]#
ferc60: ZenodoDoi = '10.5281/zenodo.8326695'[source]#
ferc714: ZenodoDoi = '10.5281/zenodo.8326694'[source]#
phmsagas: ZenodoDoi = '10.5281/zenodo.8346646'[source]#
model_config[source]#
class pudl.workspace.datastore.ZenodoFetcher(zenodo_dois: ZenodoDoiSettings | None = None, timeout: float = 15.0)[source]#

API for fetching datapackage descriptors and resource contents from zenodo.

_descriptor_cache: dict[str, DatapackageDescriptor][source]#
zenodo_dois: ZenodoDoiSettings[source]#
timeout: float[source]#
http: requests.Session[source]#
get_doi(dataset: str) ZenodoDoi[source]#

Returns DOI for given dataset.

get_known_datasets() list[str][source]#

Returns list of supported datasets.

_get_url(doi: ZenodoDoi) pydantic.HttpUrl[source]#

Construct a Zenodo depsition URL based on its Zenodo DOI.

_fetch_from_url(url: pydantic.HttpUrl) requests.Response[source]#
get_descriptor(dataset: str) DatapackageDescriptor[source]#

Returns class:DatapackageDescriptor for given dataset.

get_resource(res: pudl.workspace.resource_cache.PudlResourceKey) bytes[source]#

Given resource key, retrieve contents of the file from zenodo.

class pudl.workspace.datastore.Datastore(local_cache_path: pathlib.Path | None = None, gcs_cache_path: str | None = None, timeout: float = 15.0)[source]#

Handle connections and downloading of Zenodo Source archives.

get_known_datasets() list[str][source]#

Returns list of supported datasets.

get_datapackage_descriptor(dataset: str) DatapackageDescriptor[source]#

Fetch datapackage descriptor for dataset either from cache or Zenodo.

get_resources(dataset: str, cached_only: bool = False, skip_optimally_cached: bool = False, **filters: Any) collections.abc.Iterator[tuple[pudl.workspace.resource_cache.PudlResourceKey, bytes]][source]#

Return content of the matching resources.

Parameters:
  • dataset – name of the dataset to query.

  • cached_only – if True, only retrieve resources that are present in the cache.

  • skip_optimally_cached – if True, only retrieve resources that are not optimally cached. This triggers attempt to optimally cache these resources.

  • filters (key=val) – only return resources that match the key-value mapping in their

  • metadata["parts"].

Yields:

(PudlResourceKey, io.BytesIO) holding content for each matching resource

remove_from_cache(res: pudl.workspace.resource_cache.PudlResourceKey) None[source]#

Remove given resource from the associated cache.

get_unique_resource(dataset: str, **filters: Any) bytes[source]#

Returns content of a resource assuming there is exactly one that matches.

get_zipfile_resource(dataset: str, **filters: Any) zipfile.ZipFile[source]#

Retrieves unique resource and opens it as a ZipFile.

get_zipfile_resources(dataset: str, **filters: Any) collections.abc.Iterator[tuple[pudl.workspace.resource_cache.PudlResourceKey, zipfile.ZipFile]][source]#

Iterates over resources that match filters and opens each as ZipFile.

get_zipfile_file_names(zip_file: zipfile.ZipFile)[source]#

Given a zipfile, return a list of the file names in it.

class pudl.workspace.datastore.ParseKeyValues(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]#

Bases: argparse.Action

Transforms k1=v1,k2=v2,…

into dict(k1=v1, k2=v2, …).

__call__(parser, namespace, values, option_string=None)[source]#

Parses the argument value into dict.

pudl.workspace.datastore.parse_command_line()[source]#

Collect the command line arguments.

pudl.workspace.datastore.print_partitions(dstore: Datastore, datasets: list[str]) None[source]#

Prints known partition keys and its values for each of the datasets.

pudl.workspace.datastore.validate_cache(dstore: Datastore, datasets: list[str], args: argparse.Namespace) None[source]#

Validate elements in the datastore cache.

Delete invalid entires from cache.

pudl.workspace.datastore.fetch_resources(dstore: Datastore, datasets: list[str], args: argparse.Namespace) None[source]#

Retrieve all matching resources and store them in the cache.

pudl.workspace.datastore.main()[source]#

Cache datasets.