Write Polars DataFrame as parquet dataset
Writing a Polars DataFrame as a PyArrow parquet dataset onto remote storage turned out to be more than a 10 minute look up and write up. So I decided to have a quick write-up of my findings.
Update: I eventually created a Python package called polario
with this in there.
A Dataset is a collection of parquet files on some storage, for example AWS S3, partitioned into folders. One common example is to have a dataset of incoming events partitioned on the event date. If you have a large number of events per day, this could perform better than reading the parquet metadata of each files.
We start with a DataFrame:
import polars as pl
import pyarrow
import pyarrow.dataset
from datetime import date
example_df = pl.DataFrame(
{
"event_date_utc": [date(2022, 1, 1), date(2022, 1, 2)],
"event_type": ["b", "c"],
"event_id": [3, 4],
}
)
example_df
If we want to write this out as a parquet dataset, we use:
pyarrow.dataset.write_dataset(
df.to_arrow(),
"/tmp/bneijt/events",
format="parquet",
partitioning=["event_date_utc"],
partitioning_flavor="hive",
existing_data_behavior="delete_matching",
)
The result is two parquet files on disk:
/tmp/bneijt/events/event_date_utc=2022-01-02/part-0.parquet
/tmp/bneijt/events/event_date_utc=2022-01-01/part-0.parquet
Each parquet file contains only the data, so no event_date_utc
column.
To read them back in, you do:
df = pl.DataFrame(
pyarrow.dataset.dataset(
"/tmp/bneijt/events",
format="parquet",
partitioning="hive",
).to_table()
)
df
The result is probably not what you expected: the partitioning is read back in as a str instead of a date. So if you want to do filtering on the dataset, you have to use a string, like so:
df = pl.DataFrame(
pyarrow.dataset.dataset(
"/tmp/bneijt/events",
format="parquet",
partitioning="hive",
).to_table(filter=pyarrow.dataset.field("event_date_utc") == "2022-01-01")
)
df
If you are like me, you think: ok, makes sense, let's make sure we write only use strings as partition keys anyway:
df = example_df.with_column(pl.col("event_date_utc").cast(pl.Utf8))
pyarrow.dataset.write_dataset(
df.to_arrow(),
"/tmp/bneijt/large_str",
format="parquet",
partitioning=["event_date_utc"],
partitioning_flavor="hive",
existing_data_behavior="delete_matching",
)
But then, on disk, you get:
/tmp/bneijt/large_str/event_date_utc=.../part-0.parquet
Not the actual string value of the column. The issue here is that the pl.Utf8
will become a large_string
in the Arrow storage model, which on disk will become the triple dots.
To make this work, any partition key that is a large string in the Arrow table should first be cast to a simple string:
df = example_df.with_column(pl.col("event_date_utc").cast(pl.Utf8))
table = df.to_arrow()
table = table.cast(
pyarrow.schema(
[
f.with_type(pyarrow.string()) if f.name in partitioning else f
for f in table.schema
]
)
)
pyarrow.dataset.write_dataset(
table,
"/tmp/bneijt/str",
format="parquet",
partitioning=["event_date_utc"],
partitioning_flavor="hive",
existing_data_behavior="delete_matching",
)
By casting the columns we use for partitioning to a single string again, we end up with the required result:
/tmp/bneijt/small_str/event_date_utc=2022-01-02/part-0.parquet
/tmp/bneijt/small_str/event_date_utc=2022-01-01/part-0.parquet
The basics are there, but it's not really nice to work with yet and not efficient either.
Current support for the Arrow Datasets is still pretty limited, but with experimental features like scan_ds
we know that better support is on it's way.