pudl.dfc

Implemenation of DataFrameCollection.

Pudl ETL needs to exchange collections of named tables (pandas.DataFrame) between ETL tasks and the volume of data contained in these tables can far exceed the memory of a single machine.

Prefect framework currently caches task results in-memory and this can lead to out of memory problem, especially when dealing with large datasets (e.g. during the full data release). To alleviate this problem, prefect team recommends passing “references” to actual data that is stored separately.

DataFrameCollection does just this. It keeps lightweight references to named data frames and stores the data either locally or on cloud storage (we use pandas.to_pickle method which supports these various storage backends out of the box).

Think of DataFrameCollection as a dict-like structure backed by a disk.

Module Contents

Classes

DataFrameCollection

This class can hold named pandas.DataFrame that are stored on disk or GCS.

Functions

merge(left: DataFrameCollection, right: DataFrameCollection)

Merges two DataFrameCollection instances.

merge_list(list_of_dfc: List[DataFrameCollection])

Merges list of DataFrameCollection instancs.

fanout(dfc: DataFrameCollection, chunk_size=1) → List[DataFrameCollection]

Split big DataFrameCollection into list of fixed size DataFrameCollections.

filter_by_name(dfc: DataFrameCollection, condition: Callable[[str], bool]) → DataFrameCollection

Returns DataFrameCollection containing only tables that match the condition.

Attributes

logger

pudl.dfc.logger[source]
exception pudl.dfc.TableExistsError[source]

Bases: Exception

The table already exists.

Either the table already exists in the DataFrameCollection when it is added or the file containing the serialized form is found on disk.

class pudl.dfc.DataFrameCollection(storage_path: Optional[str] = None, **data_frames: Dict[str, pandas.DataFrame])[source]

This class can hold named pandas.DataFrame that are stored on disk or GCS.

This should be used whenever dictionaries of named pandas.DataFrames are passed between prefect tasks. Due to the implicit in-memory caching of task results it is important to keep the in-memory footprint of the exchanged data small.

This wrapper achieves this by maintaining references to tables that themselves are stored on a persistent medium such as local disk of GCS bucket.

This is intended to be used from within prefect flows and new instances can be configured by setting relevant prefect.context variables.

_get_filename(self, name: str, table_id: uuid.UUID) str[source]

Returns filename where the given dataframe shoudl be stored.

get(self, name: str) pandas.DataFrame[source]

Returns the content of the named dataframe.

_create_file(self, name: str) fsspec.core.OpenFile[source]

Open the file that should hold the serialized contentes for the table.

Raises

TableExistsError if the underlying file already exists.

store(self, name: str, data: pandas.DataFrame)[source]

Adds named dataframe to collection and stores its contents on disk.

add_reference(self, name: str, table_id: uuid.UUID)[source]

Adds reference to a named dataframe to this collection.

This assumes that the data is already present on disk.

__getitem__(self, name: str) pandas.DataFrame[source]

Allows accessing dataframes via self[name].

__setitem__(self, name: str, data: pandas.DataFrame)[source]

Allows adding dataframes via self[name] = value.

__len__(self)[source]

Returns number of tables that are stored in this DataFrameCollection.

__bool__(self)[source]

Returns true if this collection contains something.

items(self) Iterator[Tuple[str, pandas.DataFrame]][source]

Iterates over table names and the corresponding pd.DataFrame objects.

get_table_names(self) List[str][source]

Returns sorted list of dataframes that are contained in this collection.

references(self) Iterator[Tuple[str, uuid.UUID]][source]

Returns a set-like object with (name, table_id) tuples.

static from_dict(d: Dict[str, pandas.DataFrame])[source]

Constructs new DataFrameCollection from dataframe dictionary.

to_dict(self) Dict[str, pandas.DataFrame][source]

Loads the entire collection to memory as a dictionary.

update(self, other)[source]

Adds references to tables from the other DataFrameCollection.

union(self, *others)[source]

Returns new DataFrameCollection that is union of self and others.

pudl.dfc.merge(left: DataFrameCollection, right: DataFrameCollection)[source]

Merges two DataFrameCollection instances.

pudl.dfc.merge_list(list_of_dfc: List[DataFrameCollection])[source]

Merges list of DataFrameCollection instancs.

pudl.dfc.fanout(dfc: DataFrameCollection, chunk_size=1) List[DataFrameCollection][source]

Split big DataFrameCollection into list of fixed size DataFrameCollections.

This breaks the input DataFrameCollection into list of smaller DataFrameCollection objects that each hold chunk_size tables. This can be used to allow parallel processing of large DFC contents.

pudl.dfc.filter_by_name(dfc: DataFrameCollection, condition: Callable[[str], bool]) DataFrameCollection[source]

Returns DataFrameCollection containing only tables that match the condition.

Conditions get table_name as a parameter. We could also pass dfc itself but it seems currently unnecessary.