Create a Spark DataFrame from Pandas or NumPy with Arrow
If you are a Pandas or NumPy user and have ever tried to create a Spark DataFrame from local data, you might have noticed that it is an unbearably slow process. In fact, the time it takes to do so usually prohibits this from any data set that is at all interesting. Starting from Spark 2.3, the addition of SPARK-22216 enables creating a DataFrame from Pandas using Arrow to make this process much more efficient. You can now transfer large data sets to Spark from your local Pandas session almost instantly and also be sure that your data types are preserved. This post will demonstrate a simple example of how to do this and walk through the Spark internals of how it is accomplished.
A simple example to create a DataFrame from Pandas
For this example, we will generate a 2D array of random doubles from NumPy that is 1,000,000 x 10. We will then wrap this NumPy data with Pandas, applying a label for each column name, and use this as our input into Spark.
import pandas as pd import numpy as np data = np.random.rand(1000000, 10) pdf = pd.DataFrame(data, columns=list("abcdefghij"))
To input this data into Spark with Arrow, we first need to enable it with the below config. This
could also be included in
spark-defaults.conf to be enabled for all sessions. Spark simply
takes the Pandas DataFrame as input and converts it into a Spark DataFrame which is distributed
across the cluster. Using Arrow, the schema is automatically transferred to Spark and data type
information will be retained, but you can also manually specify the schema to override if desired.
Assuming an existing Spark session
spark.conf.set("spark.sql.execution.arrow.enabled", "true") df = spark.createDataFrame(pdf)
That’s all there is to it! The Pandas DataFrame will be sliced up according to the number from
SparkContext.defaultParallelism() which can be set by the conf “spark.default.parallelism” for
the default scheduler. Depending on the size of the data you are importing to Spark, you might
need to tweak this setting.
The above can be found as a notebook gist here to try out for yourself.
How it Works Behind the Scenes
The code path for this is pretty straight-forward and boils down to just a few key steps. All the
work is done in
SparkSession._create_from_pandas_with_arrow from session.py, which is invoked
createDataFrame after the input is found to be a Pandas DataFrame and Arrow is enabled.
Slice the Pandas DataFrame into chunks according to the number for default parallelism
Convert each chunk of Pandas data into an Arrow
Convert the schema from Arrow to Spark
RecordBatches to the JVM which become a
Wrap the JavaRDD with the Spark schema to create a
Let’s look at these steps in a bit more detail to examine performance. First, slicing the Pandas DataFrame is a cheap operation because it only uses references to the original data and does not make copies. Converting the slices to Arrow record batches will end up copying the data since it came from slices, but it is efficiently copied as chunks. Arrow can perform zero-copy conversions to/from Pandas data and will do so automatically when it is able to safely reference the data.
Step 3 will create a Spark schema from Arrow schema, which is a simple mapping. Arrow has detailed
type definitions and supports all types available in Spark, however Spark only supports ya subset
of Arrow types, so you might need to be careful what you are importing. For example a union type
is supported in Arrow, but not Spark. At the time of writing this
fully supported, see the Spark documentation for more info.
Step 4 is where the Arrow data is sent to the JVM. This is necessary in actualizing the DataFrame
and will allow Spark to perform SQL operations completely within the JVM. Here the Arrow record
batches are written to a temporary file in
SparkContext._serialize_to_jvm where they are read
back in chunks by the JVM and then parallelized to an RDD. Writing to a temporary file was done
to meld with existing code and is definitely much better than transferring the data
over a call with Py4J. In practice, this works pretty well and doesn’t seem to be much of a
bottleneck and I’m not sure if setting up a local socket to send the data would do better, but
could be an area to check out in the future.
With all the above complete, the final step is done in
ArrowConverters.toDataFrame which maps
the partitions of the
JavaRDD[Array[Byte]] containing the Arrow record batches to an
InternalRow iterator and uses that along with the schema to construct the DataFrame.
Performance Comparison with Arrow Disabled
Here is a few benchmarks of comparing the wall-clock time of calling
createDataFrame with and
without Arrow enabled. The data used is random doubles similar to the example above, the column
Size below is the total number of double values transferred. The runs were done on laptop in
Spark local mode with default Spark settings, each timing is the best of 3 consecutive iterations.
|Size||With Arrow||Without Arrow|
|50,000||14.2 ms||334 ms|
|100,000||15.6 ms||643 ms|
|500,000||21.9 ms||3.13 s|
|1,000,000||29.6 ms||6.35 s|
|5,000,000||107 ms||31.5 s|
|10,000,000||245 ms||63 s|
I won’t get into the details of the code path of when Arrow is disabled, but there are a few reasons that make it inefficient. First, Spark does not look at the Pandas DataFrame to get data type information, it tries to infer itself. It can not make use of NumPy data chunks and must iterate over each record and read each value as a Python object. When it prepares the data to send to the JVM, it must serialize each scalar value in the pickle format. Finally, once on the JVM, it goes through another set of conversions to apply the proper Scala type.
Download this notebook to try out the above examples or here for the gist