"""
Download the original public data sources used by PUDL.
This module provides programmatic, platform-independent access to the original
data sources which are used to populate the PUDL database. Those sources
currently include: FERC Form 1, EIA Form 860, and EIA Form 923. The module
can be used to download the data, and populate a local data store which is
organized such that the rest of the PUDL package knows where to find all the
raw data it needs.
Support for selectively downloading portions of the EPA's large Continuous
Emissions Monitoring System dataset will be added in the future.
"""
import concurrent.futures
import ftplib # nosec: B402 We sadly need ftplib for ferc1 & epacems. :-(
import logging
import os
import shutil
import urllib
import warnings
import zipfile
import pudl.constants as pc
logger = logging.getLogger(__name__)
[docs]def assert_valid_param(source, year, # noqa: 901
month=None, state=None, check_month=None):
"""
Check whether parameters used in various datastore functions are valid.
Args:
source (str): A string indicating which data source we are going to be
downloading. Currently it must be one of the following: eia860,
eia861, eia923, ferc1, epacems.
year (int or None): the year for which data should be downloaded. Must
be within the range of valid data years, which is specified for
each data source in the pudl.constants module. Use None for data
sources that do not have multiple years.
month (int): the month for which data should be downloaded. Only used
for EPA CEMS.
state (str): the state for which data should be downloaded. Only used
for EPA CEMS.
check_month (bool): Check whether the input month is valid? This is
automaticlaly set to True for EPA CEMS.
Raises:
AssertionError: If the source is not among the list of valid sources.
AssertionError: If the source is not found in the valid data years.
AssertionError: If the year is not valid for the specified source.
AssertionError: If the source is not found in valid base download URLs.
AssertionError: If the month is not valid (1-12).
AssertionError: If the state is not a valid US state abbreviation.
"""
if source not in pc.data_sources:
raise AssertionError(
f"Source '{source}' not found in valid data sources.")
if year is not None:
if source not in pc.data_years:
raise AssertionError(
f"Source '{source}' not found in valid data years.")
if year not in pc.data_years[source]:
raise AssertionError(
f"Year {year} is not valid for source {source}.")
if source not in pc.base_data_urls:
raise AssertionError(
f"Source '{source}' not found in valid base download URLs.")
if check_month is None:
check_month = source == 'epacems'
if source == 'epacems':
valid_states = pc.cems_states.keys()
else:
valid_states = pc.us_states.keys()
if check_month:
if month not in range(1, 13):
raise AssertionError(f"Month {month} is not valid (must be 1-12)")
if state.upper() not in valid_states:
raise AssertionError(
f"Invalid state '{state}'. Must use US state abbreviations.")
[docs]def source_url(source, year, month=None, state=None, table=None): # noqa: C901
"""Construct a download URL for the specified federal data source and year.
Args:
source (str): A string indicating which data source we are going to be
downloading. Currently it must be one of the following:
- 'eia860'
- 'eia861'
- 'eia923'
- 'ferc1'
- 'epacems'
year (int or None): the year for which data should be downloaded. Must
be within the range of valid data years, which is specified for
each data source in the pudl.constants module. Use None for data
sources that do not have multiple years.
month (int): the month for which data should be downloaded. Only used
for EPA CEMS.
state (str): the state for which data should be downloaded. Only used
for EPA CEMS.
table (str): the table for which data should be downloaded. Only used
for EPA IPM.
Returns:
download_url (str): a full URL from which the requested data may be
obtained
"""
assert_valid_param(source=source, year=year, month=month, state=state)
base_url = pc.base_data_urls[source]
if source == 'eia860':
if year < max(pc.data_years['eia860']):
download_url = f'{base_url}/archive/xls/eia860{year}.zip'
else:
download_url = f'{base_url}/xls/eia860{year}.zip'
elif source == 'eia861':
if year < 2012:
# Before 2012 they used 2 digit years. Y2K12 FTW!
download_url = f"{base_url}/f861{str(year)[2:]}.zip"
else:
download_url = f"{base_url}/f861{year}.zip"
elif source == 'eia923':
if year < 2008:
prefix = 'f906920_'
else:
prefix = 'f923_'
if year < max(pc.data_years['eia923']):
arch_path = 'archive/xls'
else:
arch_path = 'xls'
download_url = f"{base_url}/{arch_path}/{prefix}{year}.zip"
elif source == 'ferc1':
download_url = f"{base_url}/f1_{year}.zip"
elif (source == 'epacems'):
# lowercase the state and zero-pad the month
download_url = (
f"{base_url}/{year}/"
f"{year}{state.lower()}{str(month).zfill(2)}.zip"
)
elif source == 'epaipm':
table_url_ext = pc.epaipm_url_ext[table]
download_url = f"{base_url}/{table_url_ext}"
else:
# we should never ever get here because of the assert statement.
raise AssertionError(f"Bad data source '{source}' requested.")
return download_url
[docs]def path(source, data_dir, # noqa: C901
year=None, month=None, state=None, file=True):
"""Construct a variety of local datastore paths for a given data source.
PUDL expects the original data it ingests to be organized in a particular
way. This function allows you to easily construct useful paths that refer
to various parts of the data store, by specifying the data source you are
interested in, and optionally the year of data you're seeking, as well as
whether you want the originally downloaded files for that year, or the
directory in which a given year's worth of data for a particular data
source can be found.
Note: if you change the default arguments here, you should also change them
for paths_for_year()
Args:
source (str): A string indicating which data source we are going to be
downloading. Currently it must be one of the following: ferc1,
eia923, eia860, epacems.
data_dir (path-like): Path to the top level datastore directory.
year (int or None): the year of data that the returned path should
pertain to. Must be within the range of valid data years, which is
specified for each data source in pudl.constants.data_years, unless
year is set to zero, in which case only the top level directory for
the data source specified in source is returned. If None, no
subdirectory is used for the data source.
month (int): Month of year (1-12). Only applies to epacems.
state (str): Two letter US state abbreviation. Only applies to epacems.
file (bool): If True, return the full path to the originally downloaded
file specified by the data source and year. If file is true, year
must not be set to zero, as a year is required to specify a
particular downloaded file.
Returns:
str: the path to requested resource within the local PUDL datastore.
"""
assert_valid_param(source=source, year=year, month=month, state=state,
check_month=False)
if file is True and year is None and source != 'epaipm':
raise AssertionError(
"A year is required to generate full datastore file path.")
if source == 'eia860':
dstore_path = os.path.join(data_dir, 'eia', 'form860')
if year is not None:
dstore_path = os.path.join(dstore_path, f"eia860{year}")
elif source == 'eia861':
dstore_path = os.path.join(data_dir, 'eia', 'form861')
if year is not None:
dstore_path = os.path.join(dstore_path, f"eia861{year}")
if year > 2011:
folder = f'f861{year}'
elif year in list(range(2001, 2006)) + list(range(2007, 2010)):
folder = str(year)
elif year in list(range(1990, 2001)) + [2006, 2010, 2011]:
folder = f'f861{str(year)[-2:]}'
dstore_path = os.path.join(dstore_path, folder)
elif source == 'eia923':
dstore_path = os.path.join(data_dir, 'eia', 'form923')
if year is not None:
if year < 2008:
prefix = 'f906920_'
else:
prefix = 'f923_'
dstore_path = os.path.join(dstore_path, f"{prefix}{year}")
elif source == 'ferc1':
dstore_path = os.path.join(data_dir, 'ferc', 'form1')
if year is not None:
dstore_path = os.path.join(dstore_path, f"f1_{year}")
elif (source == 'epacems'):
dstore_path = os.path.join(data_dir, 'epa', 'cems')
if year is not None:
dstore_path = os.path.join(dstore_path, f"epacems{year}")
elif source == 'epaipm':
dstore_path = os.path.join(data_dir, 'epa', 'ipm', 'epaipm')
else:
# we should never ever get here because of the assert statement.
raise AssertionError(f"Bad data source '{source}' requested.")
# Handle month and state, if they're provided
if month is None:
month_str = ''
else:
month_str = str(month).zfill(2)
if state is None:
state_str = ''
else:
state_str = state.lower()
if file is True:
# Current naming convention requires the name of the directory to which
# an original data source is downloaded to be the same as the basename
# of the file itself...
basename = os.path.basename(dstore_path)
# For all the non-CEMS data, state_str and month_str are '',
# but this should work for other monthly data too.
dstore_path = os.path.join(
dstore_path, f"{basename}{state_str}{month_str}.zip")
return dstore_path
[docs]def paths_for_year(source, data_dir, year=None, states=None, file=True):
"""Derive all paths for a given source and year. See path() for details.
Args:
source (str): A string indicating which data source we are going to be
downloading. Currently it must be one of the following: ferc1,
eia923, eia860, epacems.
data_dir (path-like): Path to the top level datastore directory.
year (int or None): the year of data that the returned path should
pertain to. Must be within the range of valid data years, which is
specified for each data source in pudl.constants.data_years, unless
year is set to zero, in which case only the top level directory for
the data source specified in source is returned. If None, no
subdirectory is used for the data source.
month (int): Month of year (1-12). Only applies to epacems.
state (str): Two letter US state abbreviation. Only applies to epacems.
file (bool): If True, return the full path to the originally downloaded
file specified by the data source and year. If file is true, year
must not be set to zero, as a year is required to specify a
particular downloaded file.
Returns:
str: the path to requested resource within the local PUDL datastore.
"""
# TODO: I'm not sure this is the best construction, since it relies on
# the order being the same here as in the url list comprehension
if states is None:
states = pc.cems_states.keys()
if source == 'epacems':
paths = [path(source, data_dir, year=year, month=month,
state=state, file=file)
# For consistency, it's important that this is state, then
# month
for state in states
for month in range(1, 13)]
else:
paths = [path(source, data_dir, year=year, file=file)]
return paths
[docs]def download(source, year, states, data_dir):
"""Download the original data for the specified data source and year.
Given a data source and the desired year of data, download the original
data files from the appropriate federal website, and place them in a
temporary directory within the data store. This function does not do any
checking to see whether the file already exists, or needs to be updated,
and does not do any of the organization of the datastore after download,
it simply gets the requested file.
Args:
source (str): the data source to retrieve. Must be one of: 'eia860',
'eia923', 'ferc1', or 'epacems'.
year (int or None): the year of data that the returned path should
pertain to. Must be within the range of valid data years, which is
specified for each data source in pudl.constants.data_years. Note
that for data (like EPA CEMS) that have multiple datasets per year,
this function will download all the files for the specified year.
Use None for data sources that do not have multiple years.
states (iterable): List of two letter US state abbreviations indicating
which states data should be downloaded for.
data_dir (path-like): Path to the top level datastore directory.
Returns:
path-like: The path to the local downloaded file.
"""
assert_valid_param(source=source, year=year, check_month=False)
tmp_dir = os.path.join(data_dir, 'tmp')
# Ensure that the temporary download directory exists:
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
if source == 'epacems':
src_urls = [source_url(source, year, month=month, state=state)
# For consistency, it's important that this is state, then
# month
for state in states
for month in range(1, 13)]
tmp_files = [os.path.join(tmp_dir, os.path.basename(f))
for f in paths_for_year(
source, data_dir, year=year, states=states)]
elif source == 'epaipm':
# This is going to download all of the IPM tables listed in
# pudl.constants.epaipm_pudl_tables and pc.epaipm_url_ext.
# I'm finding it easier to
# code the url and temp files than use provided functions.
fns = pc.epaipm_url_ext.values()
base_url = pc.base_data_urls['epaipm']
src_urls = [f'{base_url}/{f}' for f in fns]
tmp_files = [os.path.join(tmp_dir, f) for f in fns]
else:
src_urls = [source_url(source, year)]
tmp_files = [os.path.join(
tmp_dir, os.path.basename(path(source, data_dir, year)))]
if source == 'epacems':
logger.info(f"Downloading {source} data for {year}.")
elif year is None:
logger.info(f"Downloading {source} data.")
else:
logger.info(f"Downloading {source} data for {year} from {src_urls[0]}")
url_schemes = {urllib.parse.urlparse(url).scheme for url in src_urls}
# Pass all the URLs at once, rather than looping here, because that way
# we can use the same FTP connection for all of the src_urls
# (without going all the way to a global FTP cache)
if url_schemes == {"ftp"}:
_download_ftp(src_urls, tmp_files)
else:
_download_default(src_urls, tmp_files)
return tmp_files
def _download_ftp(src_urls, tmp_files, allow_retry=True): # noqa: C901
"""
Download a source data using FTP, retrying as necessary.
Args:
src_urls (list): A list of complete source URLs for the files to be
downloaded.
tmp_files (list): A corresponding list of local temporary files to
which the downloaded files should be saved.
allow_retry (bool): If True, retry on errors. Otherwise do not retry.
Returns:
None
"""
if len(src_urls) != len(tmp_files):
raise ValueError(
"The number of source URLs and temporary files are not equal.")
if len(src_urls) == 0:
raise ValueError("Got zero source URLs!")
parsed_urls = [urllib.parse.urlparse(url) for url in src_urls]
domains = {url.netloc for url in parsed_urls}
within_domain_paths = [url.path for url in parsed_urls]
if len(domains) > 1:
# This should never be true, but it seems good to check
raise NotImplementedError(
"I don't yet know how to download from multiple domains")
domain = domains.pop()
ftp = ftplib.FTP(domain) # nosec: B321 Sadly required for ferc1 & epacems
login_result = ftp.login()
assert login_result.startswith("230"), \
f"Failed to login to {domain}: {login_result}"
url_to_retry = []
tmp_to_retry = []
error_messages = []
for path, tmp_file, src_url in zip(within_domain_paths, tmp_files, src_urls):
with open(tmp_file, "wb") as f:
try:
ftp.retrbinary(f"RETR {path}", f.write)
except ftplib.all_errors as e:
error_messages.append(e)
url_to_retry.append(src_url)
tmp_to_retry.append(tmp_file)
# Now retry failures recursively
num_failed = len(url_to_retry)
if num_failed > 0:
if allow_retry and len(src_urls) == 1:
# If there was only one URL and it failed, retry once.
return _download_ftp(url_to_retry, tmp_to_retry, allow_retry=False)
elif allow_retry and src_urls != url_to_retry:
# If there were multiple URLs and at least one didn't fail,
# keep retrying until all fail or all succeed.
return _download_ftp(url_to_retry,
tmp_to_retry,
allow_retry=allow_retry)
if url_to_retry == src_urls:
err_msg = (
f"Download failed for all {num_failed} URLs. " +
"Maybe the server is down?\n" +
"Here are the failure messages:\n " +
" \n".join(error_messages)
)
if not allow_retry:
err_msg = (
f"Download failed for {num_failed} URLs and no more " +
"retries are allowed.\n" +
"Here are the failure messages:\n " +
" \n".join(error_messages)
)
warnings.warn(err_msg)
def _download_default(src_urls, tmp_files, allow_retry=True):
"""Download URLs to files. Designed to be called by `download` function.
If the file cannot be downloaded, the program will issue a warning.
Args:
src_urls (list of str): the source URLs to download.
tmp_files (list of str): the corresponding files to save.
allow_retry (bool): Should the function call itself again to
retry the download? (Default will try twice for a single file, or
until all files fail)
Returns:
None
Todo:
Replace assert statement
"""
assert len(src_urls) == len(tmp_files) > 0
url_to_retry = []
tmp_to_retry = []
for src_url, tmp_file in zip(src_urls, tmp_files):
try:
# While we aren't auditing the url to ensure it's http/https, the
# URLs are hard-coded in pudl.constants, so we ought to know what
# we are connecting to. Thus the # nosec comment to avoid the
# security linter (bandit) from complaining.
_ = urllib.request.urlretrieve( # nosec
src_url, filename=tmp_file)
except urllib.error.URLError:
url_to_retry.append(src_url)
tmp_to_retry.append(tmp_to_retry)
# Now retry failures recursively
num_failed = len(url_to_retry)
if num_failed > 0:
if allow_retry and len(src_urls) == 1:
# If there was only one URL and it failed, retry once.
return _download_default(url_to_retry,
tmp_to_retry,
allow_retry=False)
elif allow_retry and src_urls != url_to_retry:
# If there were multiple URLs and at least one didn't fail,
# keep retrying until all fail or all succeed.
return _download_default(url_to_retry,
tmp_to_retry,
allow_retry=allow_retry)
if url_to_retry == src_urls:
err_msg = f"""ERROR: Download failed for all {num_failed} URLs.
Maybe the server is down?"""
if not allow_retry:
err_msg = f"""ERROR: Download failed for {num_failed}
URLs and no more retries are allowed."""
warnings.warn(err_msg)
[docs]def organize(source, year, states, data_dir, # noqa: C901
unzip=True, dl=True):
"""Put downloaded original data file where it belongs in the datastore.
Once we've downloaded an original file from the public website it lives on
we need to put it where it belongs in the datastore. Optionally, we also
unzip it and clean up the directory hierarchy that results from unzipping.
Args:
source (str): the data source to retrieve. Must be one of: 'eia860',
'eia923', 'ferc1', or 'epacems'.
year (int or None): the year of data that the returned path should
pertain to. Must be within the range of valid data years, which is
specified for each data source in pudl.constants.data_years. Use
None for data sources that do not have multiple years.
data_dir (path-like): Path to the top level datastore directory.
unzip (bool): If True, unzip the file once downloaded, and place the
resulting data files where they ought to be in the datastore.
dl (bool): If False, the files were not downloaded in this run.
Returns:
None
Todo:
Replace 4 assert statements
"""
assert source in pc.data_sources, \
f"Source '{source}' not found in valid data sources."
if year is not None:
assert source in pc.data_years, \
f"Source '{source}' not found in valid data years."
assert year in pc.data_years[source], \
f"Year {year} is not valid for source {source}."
assert source in pc.base_data_urls, \
f"Source '{source}' not found in valid base download URLs."
assert_valid_param(source=source, year=year, check_month=False)
tmpdir = os.path.join(data_dir, 'tmp')
# For non-CEMS, the newfiles and destfiles lists will have length 1.
if source == 'epaipm':
fns = list(pc.epaipm_url_ext.values())
epaipm_files = [os.path.join(tmpdir, f) for f in fns]
# Create a ZIP file for epaipm so that it can behave more like the
# other sources, including having a well defined "path" to check
# whether it exists in the datastore.
zip_path = os.path.join(tmpdir, 'epaipm.zip')
with zipfile.ZipFile(zip_path, mode='w') as epaipm_zip:
for f in epaipm_files:
epaipm_zip.write(f, arcname=os.path.basename(f))
for file in epaipm_files:
os.remove(file)
newfiles = [os.path.join(tmpdir, os.path.basename(f))
for f in paths_for_year(source=source,
year=year,
states=states,
data_dir=data_dir)]
destfiles = paths_for_year(source=source,
year=year,
states=states,
file=True, data_dir=data_dir)
# If we've gotten to this point, we're wiping out the previous version of
# the data for this source and year... so lets wipe it! Scary!
destdir = path(source=source, year=year, file=False, data_dir=data_dir)
if dl:
if os.path.exists(destdir) and source != 'epacems':
shutil.rmtree(destdir)
# move the new file from wherever it is, to its rightful home.
if not os.path.exists(destdir):
os.makedirs(destdir)
for newfile, destfile in zip(newfiles, destfiles):
# paranoid safety check to make sure these files match...
assert os.path.basename(newfile) == os.path.basename(destfile)
shutil.move(newfile, destfile) # works more cases than os.rename
# If download is False, then we already did this rmtree and move
# The last time this program ran.
# If we're unzipping the downloaded file, then we may have some
# reorganization to do. Currently all data sources will get unzipped,
# except the CEMS, because they're really big and take up 92% less space.
if(unzip and source != 'epacems'):
# Unzip the downloaded file in its new home:
zip_ref = zipfile.ZipFile(destfile, 'r')
logger.info(f"unzipping {destfile}")
zip_ref.extractall(destdir)
zip_ref.close()
# Most of the data sources can just be unzipped in place and be done
# with it, but FERC Form 1 requires some special attention:
# data source we're working with:
if source == 'ferc1':
topdirs = [os.path.join(destdir, td)
for td in ['UPLOADERS', 'FORMSADMIN']]
for td in topdirs:
if os.path.exists(td):
bottomdir = os.path.join(td, 'FORM1', 'working')
tomove = os.listdir(bottomdir)
for fn in tomove:
shutil.move(os.path.join(bottomdir, fn), destdir)
shutil.rmtree(td)
[docs]def check_if_need_update(source, year, states, data_dir, clobber=False):
"""Check to see if the file is already downloaded and clobber is False.
Do we really need to download the requested data? Only case in which
we don't have to do anything is when the downloaded file already exists
and clobber is False.
Args:
source (str): the data source to retrieve. Must be one of: eia860,
eia923, ferc1, or epacems.
year (int or None): the year of data that the returned path should
pertain to. Must be within the range of valid data years, which is
specified for each data source in pudl.constants.data_years. Note
that for data (like EPA CEMS) that have multiple datasets per year,
this function will download all the files for the specified year.
Use None for data sources that do not have multiple years.
states (iterable): List of two letter US state abbreviations indicating
which states data should be downloaded for.
data_dir (path-like): Path to the top level datastore directory.
clobber (bool): If True, clobber the existing file and note that the
file will need to be replaced with an updated file.
Returns:
bool: Whether an update is needed (True) or not (False)
"""
paths = paths_for_year(source=source, year=year, states=states,
data_dir=data_dir)
need_update = False
msg = None
for path in paths:
if os.path.exists(path):
if clobber:
msg = f"{source} data for {year} already present, CLOBBERING."
need_update = True
else:
msg = f"{source} data for {year} already present, skipping."
else:
need_update = True
if msg is not None:
logger.info(msg)
return need_update
[docs]def update(source, year, states, data_dir, clobber=False, unzip=True,
dl=True):
"""Update the local datastore for the given source and year.
If necessary, pull down a new copy of the data for the specified data
source and year. If we already have the requested data, do nothing,
unless clobber is True -- in which case remove the existing data and
replace it with a freshly downloaded copy.
Note that update_datastore.py runs this function in parallel, so files
multiple sources and years may be in progress simultaneously.
Args:
source (str): the data source to retrieve. Must be one of: 'eia860',
'eia923', 'ferc1', or 'epacems'.
year (int): the year of data that the returned path should pertain to.
Must be within the range of valid data years, which is specified
for each data source in pudl.constants.data_years.
states (iterable): List of two letter US state abbreviations indicating
which states data should be downloaded for. Currently only affects
the epacems dataset.
clobber (bool): If true, replace existing copy of the requested data
if we have it, with freshly downloaded data.
unzip (bool): If true, unzip the file once downloaded, and place the
resulting data files where they ought to be in the datastore.
EPA CEMS files will never be unzipped.
data_dir (str): The ``data`` directory which holds the PUDL datastore.
dl (bool): If False, don't download the files, only unzip ones
that are already present. If True, do download the files. Either
way, still obey the unzip and clobber settings. (unzip=False and
dl=False will do nothing.)
Returns:
None
"""
need_update = check_if_need_update(
source=source, year=year, states=states,
data_dir=data_dir, clobber=clobber)
if need_update:
# Otherwise we're downloading:
if dl:
download(source=source, year=year,
states=states, data_dir=data_dir)
organize(source=source, year=year, states=states, unzip=unzip,
data_dir=data_dir, dl=dl)
[docs]def parallel_update(sources,
years_by_source,
states,
data_dir,
clobber=False,
unzip=True,
dl=True):
"""Download many original source data files in parallel using threads."""
with concurrent.futures.ThreadPoolExecutor() as executor:
for source in sources:
for year in years_by_source[source]:
executor.submit(update, source, year, states,
clobber=clobber,
unzip=unzip,
data_dir=data_dir,
dl=download)