Write Polars DataFrame as parquet dataset

posted on 2022-09-24

Writing a Polars DataFrame as a PyArrow parquet dataset onto remote storage turned out to be more than a 10 minute look up and write up. So I decided to have a quick write-up of my findings.

Update: I eventually created a Python package called polario with this in there.

A Dataset is a collection of parquet files on some storage, for example AWS S3, partitioned into folders. One common example is to have a dataset of incoming events partitioned on the event date. If you have a large number of events per day, this could perform better than reading the parquet metadata of each files.

We start with a DataFrame:

import polars as pl
import pyarrow
import pyarrow.dataset
from datetime import date

example_df = pl.DataFrame(
    {
        "event_date_utc": [date(2022, 1, 1), date(2022, 1, 2)],
        "event_type": ["b", "c"],
        "event_id": [3, 4],
    }
)
example_df

If we want to write this out as a parquet dataset, we use:

pyarrow.dataset.write_dataset(
    df.to_arrow(),
    "/tmp/bneijt/events",
    format="parquet",
    partitioning=["event_date_utc"],
    partitioning_flavor="hive",
    existing_data_behavior="delete_matching",
)

The result is two parquet files on disk:

/tmp/bneijt/events/event_date_utc=2022-01-02/part-0.parquet
/tmp/bneijt/events/event_date_utc=2022-01-01/part-0.parquet

Each parquet file contains only the data, so no event_date_utc column.

To read them back in, you do:

df = pl.DataFrame(
    pyarrow.dataset.dataset(
        "/tmp/bneijt/events",
        format="parquet",
        partitioning="hive",
    ).to_table()
)
df

The result is probably not what you expected: the partitioning is read back in as a str instead of a date. So if you want to do filtering on the dataset, you have to use a string, like so:

df = pl.DataFrame(
    pyarrow.dataset.dataset(
        "/tmp/bneijt/events",
        format="parquet",
        partitioning="hive",
    ).to_table(filter=pyarrow.dataset.field("event_date_utc") == "2022-01-01")
)
df

If you are like me, you think: ok, makes sense, let's make sure we write only use strings as partition keys anyway:

df = example_df.with_column(pl.col("event_date_utc").cast(pl.Utf8))
pyarrow.dataset.write_dataset(
    df.to_arrow(),
    "/tmp/bneijt/large_str",
    format="parquet",
    partitioning=["event_date_utc"],
    partitioning_flavor="hive",
    existing_data_behavior="delete_matching",
)

But then, on disk, you get:

/tmp/bneijt/large_str/event_date_utc=.../part-0.parquet

Not the actual string value of the column. The issue here is that the pl.Utf8 will become a large_string in the Arrow storage model, which on disk will become the triple dots.

To make this work, any partition key that is a large string in the Arrow table should first be cast to a simple string:

df = example_df.with_column(pl.col("event_date_utc").cast(pl.Utf8))
table = df.to_arrow()
table = table.cast(
    pyarrow.schema(
        [
            f.with_type(pyarrow.string()) if f.name in partitioning else f
            for f in table.schema
        ]
    )
)
pyarrow.dataset.write_dataset(
    table,
    "/tmp/bneijt/str",
    format="parquet",
    partitioning=["event_date_utc"],
    partitioning_flavor="hive",
    existing_data_behavior="delete_matching",
)

By casting the columns we use for partitioning to a single string again, we end up with the required result:

/tmp/bneijt/small_str/event_date_utc=2022-01-02/part-0.parquet
/tmp/bneijt/small_str/event_date_utc=2022-01-01/part-0.parquet

The basics are there, but it's not really nice to work with yet and not efficient either.

Current support for the Arrow Datasets is still pretty limited, but with experimental features like scan_ds we know that better support is on it's way.