Polars and Hive Datasets, why I wrote polario

posted on 2023-10-11

I love working with Polars instead of Pandas because of the simpler index and high performance. What I needed as a data engineer was loading and storing Hive partitioned datasets. This is supported through Arrow, but support fell short of what I required. I decided to write and publish Polario: a simple pure Python Polars Hive partitioned dataset handler library.

Overview of issues

Hive partitioning is based on putting the partitioning column on disk as a folder name using <column_name>=<value>. Each folder contains parquet files with data.

Issue one. This means that your partitioning column must be a string value and fit in the filesystem folder name. This is where the first issue comes in, you need to make sure that the polars strings are considered simple strings to allow PyArrow to put it in the folder name, instead of putting in ellipsis instead:

# Cast all partition columns to string values
table = table.cast(
    pyarrow.schema(
        [
            f.with_type(pyarrow.string())
            if f.name in self.partition_columns
            else f
            for f in table.schema
        ]
    )
)

Issue two. Appending to the dataset is pretty hard, you have to overwrite the basename_template parameter for pyarrow.dataset.write_dataset to make sure you get new part files written without collisions and destroying other data. However, if you do so, you have to also know the already existing part files to allow you to keep ordering of new data to append to the end of the already existing data.

Issue three. If you decide to change the schema of the dataset, you can't do schema migrations programmatically. One way to do this would be to load a partition, check the schema, fix the schema and write back the partition. This can't be done with pyarrow datasets, because it simply assumes the whole dataset schema is determined by the first part file and will crash if other part files don't adhere to this.

Issue four. If you know which partitions you want to load, it will still do a scan of all objects before allowing you to filter the dataset. This is because you can't tell pyarrow what the partition columns are, so it will do a list to determine which partition columns are there. This can take a long time if you know which partition value you want already.

Using polario

To solve these issues, polario takes another approach to hive dataset access. It uses pyarrow and polars scan with concat to actually generate your data, so it's still fast. But you have to specify the partition column names before you can use the dataset:

from polario.hive_dataset import HiveDataset
import polars as pl

df = pl.from_dicts(
        [
            {"p1": 1, "v": 1},
            {"p1": 2, "v": 1},
        ]
    )

ds = HiveDataset("file:///tmp/", partition_columns=["p1"])

ds.write(df)

As you can see, you have to set both the base path and the partition_columns when initializing the HiveDataset. This makes it possible to read a specific partition as well:

df = ds.read_partition({"p1": "1"})
df

You can also iterate over partitions and append data. For the full set of features, read the documentation. Great fun!

If you end up wanting to use this as well, feel free to pull it in and post issues if you run into anything. If you don't require Hive partitioning for technical reasons, do check out Delta tables from the Delta Lake project. This approach to storage solved all of the issues I have with the PyArrow Hive Dataset, so you don't need polario if you use this.

Happy hacking!