Deep Dive into creating a Dask DataFrame Collection with from_map
By Rick Zamora
Dask DataFrame provides dedicated IO functions for several popular tabular-data formats, like CSV and Parquet. If you are working with a supported format, then the corresponding function (e.g read_csv) is likely to be the most reliable way to create a new Dask DataFrame collection. For other workflows, from_map now offers a convenient way to define a DataFrame collection as an arbitrary function mapping. While these kinds of workflows have historically required users to adopt the Dask Delayed API, from_map now makes custom collection creation both easier and more performant.
What is from_map?
The from_map API was added to Dask DataFrame in v2022.05.1 with the intention of replacing from_delayed as the recommended means of custom DataFrame creation. At its core, from_map simply converts each element of an iterable object (inputs) into a distinct Dask DataFrame partition, using a common function (func):
dd.from_map(func: Callable, iterable: Iterable) -> dd.DataFrame
The overall behavior is essentially the Dask DataFrame equivalent of the standard-Python map function:
map(func: Callable, iterable: Iterable) -> Iterator
Note that both from_map and map actually support an arbitrary number of iterable inputs. However, we will only focus on the use of a single iterable argument in this post.
A simple example
To better understand the behavior of from_map, let’s consider the simple case that we want to interact with Feather-formatted data created with the following Pandas code:
import pandas as pd
size = 3
paths = ["./data.0.feather", "./data.1.feather"]
for i, path in enumerate(paths):
    index = range(i * size, i * size + size)
    a = [i] * size
    b = list("xyz")
    df = pd.DataFrame({"A": a, "B": b, "index": index})
    df.to_feather(path)
Since Dask does not yet offer a dedicated read_feather function (as of dask-2023.3.1), most users would assume that the only option to create a Dask DataFrame collection is to use dask.delayed. The “best practice” for creating a collection in this case, however, is to wrap pd.read_feather or cudf.read_feather in a from_map call like so:
>>> import dask.dataframe as dd
>>> ddf = dd.from_map(pd.read_feather, paths)
>>> ddf
Dask DataFrame Structure:
                   A       B  index
npartitions=2
               int64  object  int64
                 ...     ...    ...
                 ...     ...    ...
Dask Name: read_feather, 1 graph layer
Which produces the following Pandas (or cuDF) object after computation:
>>> ddf.compute()
   A  B  index
0  0  x      0
1  0  y      1
2  0  z      2
0  1  x      3
1  1  y      4
2  1  z      5
Although the same output can be achieved using the conventional dd.from_delayed strategy, using from_map will improve the available opportunities for task-graph optimization within Dask.
Performance considerations: Specifying meta and divisions
Although func and iterable are the only required arguments to from_map, one can significantly improve the overall performance of a workflow by specifying optional arguments like meta and divisions.
Due to the lazy nature of Dask DataFrame, each collection is required to carry around a schema (column name and dtype information) in the form of an empty Pandas (or cuDF) object. If meta is not directly provided to the from_map function, the schema will need to be populated by eagerly materializing the first partition, which can increase the apparent latency of the from_map API call itself. For this reason, it is always recommended to specify an explicit meta argument if the expected column names and dtypes are known a priori.
While passing in a meta argument is likely to reduce thefrom_map API call latency, passing in a divisions argument makes it possible to reduce the end-to-end compute time. This is because, by specifying divisions, we are allowing Dask DataFrame to track useful per-partition min/max statistics. Therefore, if the overall workflow involves grouping or joining on the index, Dask can avoid the need to perform unnecessary shuffling operations.
Using from_map to implement a custom API
Although it is currently difficult to automatically extract division information from the metadata of an arbitrary Feather dataset, from_map makes it relatively easy to implement your own highly-functional read_feather API using PyArrow. For example, the following code is all that one needs to enable lazy Feather IO with both column projection and index selection:
def from_arrow(table):
    """(Optional) Utility to enforce 'backend' configuration"""
    from dask import config
    if config.get("dataframe.backend") == "cudf":
        import cudf
        return cudf.DataFrame.from_arrow(table)
    else:
        return table.to_pandas()
def read_feather(paths, columns=None, index=None):
    """Create a Dask DataFrame from Feather files
    Example of a "custom" `from_map` IO function
    Parameters
    ----------
    paths: list
        List of Feather-formatted paths. Each path will
        be mapped to a distinct DataFrame partition.
    columns: list or None, default None
        Optional list of columns to select from each file.
    index: str or None, default None
        Optional column name to set as the DataFrame index.
    Returns
    -------
    dask.dataframe.DataFrame
    """
    import dask.dataframe as dd
    import pyarrow.dataset as ds
    # Step 1: Extract `meta` from the dataset
    dataset = ds.dataset(paths, format="feather")
    meta = from_arrow(dataset.schema.empty_table())
    meta = meta.set_index(index) if index else meta
    columns = columns or list(meta.columns)
    meta = meta[columns]
    # Step 2: Define the `func` argument
    def func(frag, columns=None, index=None):
        # Create a Pandas DataFrame from a dataset fragment
        # NOTE: In practice, this function should
        # always be defined outside `read_feather`
        assert columns is not None
        read_columns = columns
        if index and index not in columns:
            read_columns = columns + [index]
        df = from_arrow(frag.to_table(columns=read_columns))
        df = df.set_index(index) if index else df
        return df[columns] if columns else df
    # Step 3: Define the `iterable` argument
    iterable = dataset.get_fragments()
    # Step 4: Call `from_map`
    return dd.from_map(
        func,
        iterable,
        meta=meta,
        index=index,  # `func` kwarg
        columns=columns,  # `func` kwarg
    )
Here we see that using from_map to enable completely-lazy collection creation only requires four steps. First, we use pyarrow.dataset to define a meta argument for from_map, so that we can avoid the unnecessary overhead of an eager read operation. For some file formats and/or applications, it may also be possible to calculate divisions at this point. However, as explained above, such information is not readily available for this particular example.
The second step is to define the underlying function (func) that we will use to produce each of our final DataFrame partitions. Third, we define one or more iterable objects containing the unique information needed to produce each partition (iterable). In this case, the only iterable object corresponds to a generator of pyarrow.dataset fragments, which is essentially a wrapper around the input path list.
The fourth and final step is to use the final func, interable, and meta information to call the from_map API. Note that we also use this opportunity to specify additional key-word arguments, like columns and index. In contrast to the iterable positional arguments, which are always mapped to func, these key-word arguments will be broadcasted.
Using theread_feather implementation above, it becomes both easy and efficient to convert an arbitrary Feather dataset into a lazy Dask DataFrame collection:
>>> ddf = read_feather(paths, columns=["A"], index="index")
>>> ddf
Dask DataFrame Structure:
                   A
npartitions=2
               int64
                 ...
                 ...
Dask Name: func, 1 graph layer
>>> ddf.compute()
       A
index
0      0
1      0
2      0
3      1
4      1
5      1
Advanced: Enhancing column projection
Although a read_feather implementation like the one above is likely to meet the basic needs of most applications, it is certainly possible that users will often leave out the column argument in practice. For example:
a = read_feather(paths)["A"]
For code like this, as the implementation currently stands, each IO task would be forced to read in an entire Feather file, and then select the ”A” column from a Pandas/cuDF DataFrame only after it had already been read into memory. The additional overhead is insignificant for the toy-dataset used here. However, avoiding this kind of unnecessary IO can lead to dramatic performance improvements in real-world applications.
So, how can we modify our read_feather implementation to take advantage of external column-projection operations (like ddf["A"])? The good news is that from_map is already equipped with the necessary graph-optimization hooks to handle this, so long as the func object satisfies the DataFrameIOFunction protocol:
@runtime_checkable
class DataFrameIOFunction(Protocol):
    """DataFrame IO function with projectable columns
    Enables column projection in ``DataFrameIOLayer``.
    """
    @property
    def columns(self):
        """Return the current column projection"""
        raise NotImplementedError
    def project_columns(self, columns):
        """Return a new DataFrameIOFunction object
        with a new column projection
        """
        raise NotImplementedError
    def __call__(self, *args, **kwargs):
        """Return a new DataFrame partition"""
        raise NotImplementedError
That is, all we need to do is change “Step 2” of our implementation to use the following code instead:
    from dask.dataframe.io.utils import DataFrameIOFunction
    class ReadFeather(DataFrameIOFunction):
        """Create a Pandas/cuDF DataFrame from a dataset fragment"""
        def __init__(self, columns, index):
            self._columns = columns
            self.index = index
        @property
        def columns(self):
            return self._columns
        def project_columns(self, columns):
            # Replace this object with one that will only read `columns`
            if columns != self.columns:
                return ReadFeather(columns, self.index)
            return self
        def __call__(self, frag):
            # Same logic as original `func`
            read_columns = self.columns
            if index and self.index not in self.columns:
                read_columns = self.columns + [self.index]
            df = from_arrow(frag.to_table(columns=read_columns))
            df = df.set_index(self.index) if self.index else df
            return df[self.columns] if self.columns else df
    func = ReadFeather(columns, index)
Conclusion
It is now easier than ever to create a Dask DataFrame collection from an arbitrary data source. Although the dask.delayed API has already enabled similar functionality for many years, from_map now makes it possible to implement a custom IO function without sacrificing any of the high-level graph optimizations leveraged by the rest of the Dask DataFrame API.
Start experimenting with from_map today, and let us know how it goes!
blog comments powered by Disqus