Source code for pudl.workspace.datastore

"""Datastore manages file retrieval for PUDL datasets."""

import argparse
import hashlib
import io
import json
import logging
import re
import sys
import zipfile
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple

import coloredlogs
import datapackage
import requests
import yaml
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

from pudl.workspace import resource_cache
from pudl.workspace.resource_cache import PudlResourceKey

[docs]logger = logging.getLogger(__name__)
# The Zenodo tokens recorded here should have read-only access to our archives. # Including them here is correct in order to allow public use of this tool, so # long as we stick to read-only keys.
[docs]PUDL_YML = Path.home() / ".pudl.yml"
[docs]class ChecksumMismatch(ValueError): """Resource checksum (md5) does not match.""" pass
[docs]class DatapackageDescriptor: """A simple wrapper providing access to datapackage.json contents.""" def __init__(self, datapackage_json: dict, dataset: str, doi: str): """ Constructs DatapackageDescriptor. Args: datapackage_json (dict): parsed datapackage.json describing this datapackage. dataset (str): name of the dataset. doi (str): DOI (aka version) of the dataset. """ self.datapackage_json = datapackage_json self.dataset = dataset self.doi = doi self._validate_datapackage(datapackage_json)
[docs] def get_resource_path(self, name: str) -> str: """Returns zenodo url that holds contents of given named resource.""" res = self._get_resource_metadata(name) # remote_url is sometimes set on the local cached version of datapackage.json # so we should be using that if it exists. return res.get("remote_url") or res.get("path")
[docs] def _get_resource_metadata(self, name: str) -> dict: for res in self.datapackage_json["resources"]: if res["name"] == name: return res raise KeyError( f"Resource {name} not found for {self.dataset}/{self.doi}")
[docs] def validate_checksum(self, name: str, content: str) -> bool: """Returns True if content matches checksum for given named resource.""" expected_checksum = self._get_resource_metadata(name)["hash"] m = hashlib.md5() # nosec m.update(content) if m.hexdigest() != expected_checksum: raise ChecksumMismatch( f'Checksum for resource {name} does not match.' f'Expected {expected_checksum}, got {m.hexdigest()}')
[docs] def _matches(self, res: dict, **filters: Any): parts = res.get('parts', {}) return all(str(parts.get(k)) == str(v) for k, v in filters.items())
[docs] def get_resources(self, name: str = None, **filters: Any) -> Iterator[PudlResourceKey]: """Returns series of PudlResourceKey identifiers for matching resources. Args: name (str): if specified, find resource(s) with this name. filters (dict): if specified, find resoure(s) matching these key=value constraints. The constraints are matched against the 'parts' field of the resource entry in the datapackage.json. """ for res in self.datapackage_json["resources"]: if name and res["name"] != name: continue if self._matches(res, **filters): yield PudlResourceKey( dataset=self.dataset, doi=self.doi, name=res["name"])
[docs] def get_partitions(self, name: str = None) -> Dict[str, Set[str]]: """Returns mapping of all known partition keys to the set of its known values.""" partitions = defaultdict(set) # type: Dict[str, Set[str]] for res in self.datapackage_json["resources"]: if name and res["name"] != name: continue for k, v in res.get('parts', {}).items(): partitions[k].add(v) return partitions
[docs] def _validate_datapackage(self, datapackage_json: dict): """Checks the correctness of datapackage.json metadata. Throws ValueError if invalid.""" dp = datapackage.Package(datapackage_json) if not dp.valid: msg = f"Found {len(dp.errors)} datapackage validation errors:\n" for e in dp.errors: msg = msg + f" * {e}\n" raise ValueError(msg)
[docs] def get_json_string(self) -> str: """Exports the underlying json as normalized (sorted, indented) json string.""" return json.dumps(self.datapackage_json, sort_keys=True, indent=4)
[docs]class ZenodoFetcher: """API for fetching datapackage descriptors and resource contents from zenodo.""" # Zenodo tokens recorded here should have read-only access to our archives. # Including them here is correct in order to allow public use of this tool, so # long as we stick to read-only keys.
[docs] TOKEN = { # Read-only personal access tokens for pudl@catalyst.coop: "sandbox": "qyPC29wGPaflUUVAv1oGw99ytwBqwEEdwi4NuUrpwc3xUcEwbmuB4emwysco", "production": "KXcG5s9TqeuPh1Ukt5QYbzhCElp9LxuqAuiwdqHP0WS4qGIQiydHn6FBtdJ5" }
[docs] DOI = { "sandbox": { "censusdp1tract": "10.5072/zenodo.674992", "eia860": "10.5072/zenodo.926292", "eia860m": "10.5072/zenodo.926659", "eia861": "10.5072/zenodo.687052", "eia923": "10.5072/zenodo.926301", "epacems": "10.5072/zenodo.672963", "ferc1": "10.5072/zenodo.926302", "ferc714": "10.5072/zenodo.926660", }, "production": { "censusdp1tract": "10.5281/zenodo.4127049", "eia860": "10.5281/zenodo.5534934", "eia860m": "10.5281/zenodo.6321197", "eia861": "10.5281/zenodo.5602102", "eia923": "10.5281/zenodo.5596977", "epacems": "10.5281/zenodo.4660268", "ferc1": "10.5281/zenodo.5534788", "ferc714": "10.5281/zenodo.5076672", }, }
[docs] API_ROOT = { "sandbox": "https://sandbox.zenodo.org/api", "production": "https://zenodo.org/api", }
def __init__(self, sandbox: bool = False, timeout: float = 15.0): """Constructs ZenodoFetcher instance. Args: sandbox (bool): controls whether production or sandbox zenodo backends and associated DOIs should be used. timeout (float): timeout (in seconds) for http requests. """ backend = "sandbox" if sandbox else "production" self._api_root = self.API_ROOT[backend] self._token = self.TOKEN[backend] self._dataset_to_doi = self.DOI[backend] self._descriptor_cache = {} # type: Dict[str, DatapackageDescriptor] self.timeout = timeout retries = Retry(backoff_factor=2, total=3, status_forcelist=[429, 500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retries) self.http = requests.Session() self.http.mount("http://", adapter) self.http.mount("https://", adapter)
[docs] def _fetch_from_url(self, url: str) -> requests.Response: logger.info(f"Retrieving {url} from zenodo") response = self.http.get( url, params={"access_token": self._token}, timeout=self.timeout) if response.status_code == requests.codes.ok: logger.debug(f"Successfully downloaded {url}") return response else: raise ValueError(f"Could not download {url}: {response.text}")
[docs] def _doi_to_url(self, doi: str) -> str: """Returns url that holds the datapackage for given doi.""" match = re.search(r"zenodo.([\d]+)", doi) if match is None: raise ValueError(f"Invalid doi {doi}") zen_id = int(match.groups()[0]) return f"{self._api_root}/deposit/depositions/{zen_id}"
[docs] def get_descriptor(self, dataset: str) -> DatapackageDescriptor: """Returns DatapackageDescriptor for given dataset.""" doi = self._dataset_to_doi.get(dataset) if not doi: raise KeyError(f"No doi found for dataset {dataset}") if doi not in self._descriptor_cache: dpkg = self._fetch_from_url(self._doi_to_url(doi)) for f in dpkg.json()["files"]: if f["filename"] == "datapackage.json": resp = self._fetch_from_url(f["links"]["download"]) self._descriptor_cache[doi] = DatapackageDescriptor( resp.json(), dataset=dataset, doi=doi) break else: raise RuntimeError( f"Zenodo datapackage for {dataset}/{doi} does not contain valid datapackage.json") return self._descriptor_cache[doi]
[docs] def get_resource_key(self, dataset: str, name: str) -> PudlResourceKey: """Returns PudlResourceKey for given resource.""" return PudlResourceKey(dataset, self._dataset_to_doi[dataset], name)
[docs] def get_doi(self, dataset: str) -> str: """Returns DOI for given dataset.""" return self._dataset_to_doi[dataset]
[docs] def get_resource(self, res: PudlResourceKey) -> bytes: """Given resource key, retrieve contents of the file from zenodo.""" desc = self.get_descriptor(res.dataset) url = desc.get_resource_path(res.name) content = self._fetch_from_url(url).content desc.validate_checksum(res.name, content) return content
[docs] def get_known_datasets(self) -> List[str]: """Returns list of supported datasets.""" return sorted(self._dataset_to_doi)
[docs]class Datastore: """Handle connections and downloading of Zenodo Source archives.""" def __init__( self, local_cache_path: Optional[Path] = None, gcs_cache_path: Optional[str] = None, sandbox: bool = False, timeout: float = 15 ): # TODO(rousik): figure out an efficient way to configure datastore caching """ Datastore manages file retrieval for PUDL datasets. Args: local_cache_path (Path): if provided, LocalFileCache pointed at the data subdirectory of this path will be used with this Datastore. gcs_cache_path (str): if provided, GoogleCloudStorageCache will be used to retrieve data files. The path is expected to have the following format: gs://bucket[/path_prefix] sandbox (bool): if True, use sandbox zenodo backend when retrieving files, otherwise use production. This affects which zenodo servers are contacted as well as dois used for each dataset. timeout (floaTR): connection timeouts (in seconds) to use when connecting to Zenodo servers. """ self._cache = resource_cache.LayeredCache() self._datapackage_descriptors: Dict[str, DatapackageDescriptor] = {} if local_cache_path: self._cache.add_cache_layer( resource_cache.LocalFileCache(local_cache_path)) if gcs_cache_path: self._cache.add_cache_layer( resource_cache.GoogleCloudStorageCache(gcs_cache_path)) self._zenodo_fetcher = ZenodoFetcher( sandbox=sandbox, timeout=timeout)
[docs] def get_known_datasets(self) -> List[str]: """Returns list of supported datasets.""" return self._zenodo_fetcher.get_known_datasets()
[docs] def get_datapackage_descriptor(self, dataset: str) -> DatapackageDescriptor: """Fetch datapackage descriptor for given dataset either from cache or from zenodo.""" doi = self._zenodo_fetcher.get_doi(dataset) if doi not in self._datapackage_descriptors: res = PudlResourceKey(dataset, doi, "datapackage.json") if self._cache.contains(res): self._datapackage_descriptors[doi] = DatapackageDescriptor( json.loads(self._cache.get(res).decode('utf-8')), dataset=dataset, doi=doi) else: desc = self._zenodo_fetcher.get_descriptor(dataset) self._datapackage_descriptors[doi] = desc self._cache.add(res, bytes(desc.get_json_string(), "utf-8")) return self._datapackage_descriptors[doi]
[docs] def get_resources( self, dataset: str, cached_only: bool = False, skip_optimally_cached: bool = False, **filters: Any) -> Iterator[Tuple[PudlResourceKey, bytes]]: """Return content of the matching resources. Args: dataset (str): name of the dataset to query. cached_only (bool): if True, only retrieve resources that are present in the cache. skip_optimally_cached (bool): if True, only retrieve resources that are not optimally cached. This triggers attempt to optimally cache these resources. filters (key=val): only return resources that match the key-value mapping in their metadata["parts"]. Yields: (PudlResourceKey, io.BytesIO) holding content for each matching resource """ desc = self.get_datapackage_descriptor(dataset) for res in desc.get_resources(**filters): if self._cache.is_optimally_cached(res) and skip_optimally_cached: logger.debug(f'{res} is already optimally cached.') continue if self._cache.contains(res): logger.debug(f"Retrieved {res} from cache.") yield (res, self._cache.get(res)) elif not cached_only: logger.debug(f"Retrieved {res} from zenodo.") contents = self._zenodo_fetcher.get_resource(res) self._cache.add(res, contents) yield (res, contents)
[docs] def remove_from_cache(self, res: PudlResourceKey): """Remove given resource from the associated cache.""" self._cache.delete(res)
[docs] def get_unique_resource(self, dataset: str, **filters: Any) -> bytes: """Returns content of a resource assuming there is exactly one that matches.""" res = self.get_resources(dataset, **filters) try: _, content = next(res) except StopIteration: raise KeyError(f"No resources found for {dataset}: {filters}") try: next(res) except StopIteration: return content raise KeyError(f"Multiple resources found for {dataset}: {filters}")
[docs] def get_zipfile_resource(self, dataset: str, **filters: Any) -> zipfile.ZipFile: """Retrieves unique resource and opens it as a ZipFile.""" return zipfile.ZipFile(io.BytesIO(self.get_unique_resource(dataset, **filters)))
[docs]class ParseKeyValues(argparse.Action): """Transforms k1=v1,k2=v2,... into dict(k1=v1, k2=v2, ...)."""
[docs] def __call__(self, parser, namespace, values, option_string=None): """Parses the argument value into dict.""" d = getattr(namespace, self.dest, {}) if isinstance(values, str): values = [values] for val in values: for kv in val.split(','): k, v = kv.split('=') d[k] = v setattr(namespace, self.dest, d)
[docs]def parse_command_line(): """Collect the command line arguments.""" prod_dois = "\n".join( [f" - {x}" for x in ZenodoFetcher.DOI["production"].keys()]) sand_dois = "\n".join( [f" - {x}" for x in ZenodoFetcher.DOI["sandbox"].keys()]) dataset_msg = f""" Available Production Datasets: {prod_dois} Available Sandbox Datasets: {sand_dois}""" parser = argparse.ArgumentParser( description="Download and cache ETL source data from Zenodo.", epilog=dataset_msg, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( "--dataset", help="Download the specified dataset only. See below for available options. " "The default is to download all, which may take an hour or more." "speed." ) parser.add_argument( "--pudl_in", help="Override pudl_in directory, defaults to setting in ~/.pudl.yml", ) parser.add_argument( "--validate", help="Validate locally cached datapackages, but don't download anything.", action="store_true", default=False, ) parser.add_argument( "--sandbox", help="Download data from Zenodo sandbox server. For testing purposes only.", action="store_true", default=False, ) parser.add_argument( "--loglevel", help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).", default="INFO", ) parser.add_argument( "--quiet", help="Do not send logging messages to stdout.", action="store_true", default=False, ) parser.add_argument( "--populate-gcs-cache", default=None, help="If specified, upload data resources to this GCS bucket" ) parser.add_argument( "--partition", default={}, action=ParseKeyValues, metavar="KEY=VALUE,...", help="Only retrieve resources matching these conditions." ) parser.add_argument( "--list-partitions", action="store_true", default=False, help="List available partition keys and values for each dataset." ) return parser.parse_args()
[docs]def _get_pudl_in(args: dict) -> Path: """Figure out what pudl_in path should be used.""" if args.pudl_in: return Path(args.pudl_in) else: cfg = yaml.safe_load(PUDL_YML.open()) return Path(cfg["pudl_in"])
[docs]def _create_datastore(args: dict) -> Datastore: """Constructs datastore instance.""" local_cache_path = None if not args.populate_gcs_cache: local_cache_path = _get_pudl_in(args) / "data" return Datastore( sandbox=args.sandbox, local_cache_path=local_cache_path, gcs_cache_path=args.populate_gcs_cache)
[docs]def validate_cache(dstore: Datastore, datasets: List[str], args: argparse.Namespace) -> None: """Validate elements in the datastore cache. Delete invalid entires from cache.""" for single_ds in datasets: num_total = 0 num_invalid = 0 descriptor = dstore.get_datapackage_descriptor(single_ds) for res, content in dstore.get_resources(single_ds, cached_only=True, **args.partition): try: num_total += 1 descriptor.validate_checksum(res.name, content) except ChecksumMismatch: num_invalid += 1 logger.warning( f"Resource {res} has invalid checksum. Removing from cache.") dstore.remove_from_cache(res) logger.info( f'Checked {num_total} resources for {single_ds}. Removed {num_invalid}.')
[docs]def fetch_resources(dstore: Datastore, datasets: List[str], args: argparse.Namespace) -> None: """Retrieve all matching resources and store them in the cache.""" for single_ds in datasets: for res, _ in dstore.get_resources( single_ds, skip_optimally_cached=True, **args.partition): logger.info(f"Retrieved {res}.")
[docs]def main(): """Cache datasets.""" args = parse_command_line() pudl_logger = logging.getLogger("pudl") log_format = '%(asctime)s [%(levelname)8s] %(name)s:%(lineno)s %(message)s' coloredlogs.install(fmt=log_format, level='INFO', logger=pudl_logger) logger.setLevel(args.loglevel) dstore = _create_datastore(args) if args.dataset: datasets = [args.dataset] else: datasets = dstore.get_known_datasets() if args.partition: logger.info( f"Only retrieving resources for partition: {args.partition}") if args.list_partitions: print_partitions(dstore, datasets) elif args.validate: validate_cache(dstore, datasets, args) else: fetch_resources(dstore, datasets, args)
if __name__ == "__main__": sys.exit(main())