I have a parquet dataset stored on s3, and I would like to query specific rows from the dataset. I was able to do that using petastorm
but now I want to do that using only pyarrow
.
Here's my attempt:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
'analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.read_pandas().to_pandas()
But that returns a pandas DataFrame as if the filter didn't work, i.e I have rows with various values of event_name
. Is there something I am missing or something I misunderstood? I could filter after getting the pandas DataFrame but I would use much more memory space than needed.
Note: I’ve expanded this into a comprehensive guide to Python and Parquet in this post
In order to use filters you need to store your data in Parquet format using partitions. Loading a few Parquet columns and partitions out of many can result in massive improvements in I/O performance with Parquet versus CSV. Parquet can partition files based on values of one or more fields and it creates a directory tree for the unique combinations of the nested values, or just one set of directories for one partition column. The PySpark Parquet documentation explains how Parquet works fairly well.
A partition on gender and country would look like this:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
There is also row group partitioning if you need to further partition your data, but most tools only support specifying row group size and you have to do the key-->row group
lookup yourself, which is ugly (happy to answer about that in another question).
You need to partition your data using Parquet and then you can load it using filters. You can write the data in partitions using PyArrow, pandas or Dask or PySpark for large datasets.
For example, to write partitions in pandas:
df.to_parquet(
path='analytics.xxx',
engine='pyarrow',
compression='snappy',
columns=['col1', 'col5'],
partition_cols=['event_name', 'event_category']
)
This lays the files out like:
analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
To grab events by one property using the partition columns, you put a tuple filter in a list:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
To grab an event with two or more properties using AND you just create a list of filter tuples:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
('event_name', '=', 'SomeEvent'),
('event_category', '=', 'SomeCategory')
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
To grab two events using OR you need to nest the filter tuples in their own lists:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
[('event_name', '=', 'SomeEvent')],
[('event_name', '=', 'OtherEvent')]
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
As another answer mentioned, the easiest way to load data filtering to just certain columns in certain partitions wherever the data is located (locally or in the cloud) is to use the awswrangler
module. If you're using S3, check out the documentation for awswrangler.s3.read_parquet()
and awswrangler.s3.to_parquet()
. The filtering works the same as with the examples above.
import awswrangler as wr
df = wr.s3.read_parquet(
path="analytics.xxx",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
pyarrow.parquet.read_table()
If you're using PyArrow, you can also use pyarrow.parquet.read_table()
:
import pyarrow.parquet as pq
fp = pq.read_table(
source='analytics.xxx',
use_threads=True,
columns=['some_event', 'some_category'],
filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()
Finally, in PySpark you can use pyspark.sql.DataFrameReader.read_parquet()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('Stack Overflow Example Parquet Column Load') \
.getOrCreate()
# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
.select('event_name', 'event_category') \
.filter(F.col('event_name') == 'SomeEvent')
Hopefully this helps you work with Parquet :)