Techno Blender
Digitally Yours.

Boosting Spark Union Operator Performance: Optimization Tips for Improved Query Speed | by Chengzhi Zhao | Apr, 2023

0 45


Photo by Fahrul Azmi on Unsplash

The union operator is one of the set operators to merge two input data frames into one. Union is a convenient operation in Apache Spark for combining rows with the same order of columns. One frequently used case is applying different transformations and then unioning them together.

The ways of using the union operation in Spark are often discussed widely. However, a hidden fact that has been less discussed is the performance caveat associated with the union operator. If we didn’t understand the caveat of the union operator in Spark, we might fall into the trap of doubling the execution time to get the result.

We will focus on the Apache Spark DataFrame union operator in this story with examples, show you the physical query plan, and share techniques for optimization in this story.

Like Relational Database (RDBMS) SQL, the union is a direct way to combine rows. One important thing to note when dealing with a union operator is to ensure rows follow the same structure:

  • The number of columns should be identical. The union operation won’t silently work or fill with NULL when the number of columns differs on data frames.
  • The column data type should match and resolves columns by position. The column name should follow the same sequence for each data frame. Nevertheless, that’s not mandatory. The first data frame will be chosen as the default for the column name. So mixing order can potentially cause an undesired result. Spark unionByName is intended to resolve this issue.

In Spark, the operation unionAll is an alias to union that doesn’t remove duplication. We’d need to add distinct after performing union to perform SQL-like union operations without duplication.

We can also combine multiple data frames to produce a final data frame.

df = df1.union(df2).union(df3)

One typical pattern of using the union operator is splitting a single data frame into multiple, then applying different transformations, and eventually combining them into the final one.

Here is an example: we have two big tables (fact table) that need to join, and the best way to join is the SortMerged join in Spark. Once we got the SortMerged data frame, we split it into four subsets. Each subset uses different transformations, and eventually, we combine those 4 data frames into the final one.

Union Operation in Spark | Image By Author

Spark data frame leverages Catalyst optimizer, which takes the data frame code you had, then performs code analysis, logical optimization, physical planning, and code generation. Catalyst tries to create an optimal plan that executes your Spark job efficiently.

In recent years, Spark has extensively accomplished a lot of optimization on Catalyst to improve performance on Spark join operations. The join operation has more scenarios to use than the union operation, leading to less effort put into the union operation.

If users don’t use union on entirely different data sources, union operators will face a potential performance bottleneck — Catalyst isn’t “smart” to identify the shared data frames to reuse.

In this case, Spark will take each data frame as separate branches, then perform everything from the root multiple times. In our example, we will perform the two big table join four times! It is a huge bottleneck.

It’s straightforward to reproduce a non-optimized physical query plan for the union operator in Spark. We will do the following

  1. Create two data frames from 1 to 1000000. Let’s call them df1 and df2
  2. Perform inner join on df1 and df2
  3. Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
  4. Add a transformation with a field called magic_value , which is generated by two dummy transformations.
  5. Union the odd and even number data frames
## Create two data frames from 1 to 1000000. Let's call them df1 and df2
df1 = spark.createDataFrame([i for i in range(1000000)], IntegerType())
df2 = spark.createDataFrame([i for i in range(1000000)], IntegerType())

## Perform inner join on df1 and df2
df = df1.join(df2, how="inner", on="value")

## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
df_odd = df.filter(df.value % 2 == 1)
df_even = df.filter(df.value % 2 == 0)

## Add a transformation with a field called magic_value which is generated by two dummy transformations.
df_odd = df_odd.withColumn("magic_value", df.value+1)
df_even = df_even.withColumn("magic_value", df.value/2)

## Union the odd and even number data frames
df_odd.union(df_even).count()

Here is a high-level view of what the DAG looks like. If we look at the DAG bottom-up, one thing that stands out is the join happened twice, and the upstream almost looks identical.

We have seen where Spark needs to optimize the union operator extensively, and much time is wasted performing unnecessary recomputing if the data source can be reused.

The DAG for non-optimized query plan for Union Operation | Image By Author

Here is the physical plan that has 50 stages scheduled with AQE enabled. We can see ids 13 and 27. Spark did perform join twice on each branch and recomputed its branch.

The non-optimized Physical query plan for Union Operation | Image By Author

Now we can see this potential bottleneck. How could we resolve this? One option is to double the number of executors to run more concurrent tasks. But there is a better way to hint to Catalyst and let it reuse the joined data frame from memory.

To resolve the issue of the Spark performance of union operation, we can explicitly call a cache to persist the joined data frame in memory. So Catalyst knows the shortcut to fetch the data instead of returning it to the source.

Where to add the cache() ? The recommended place would be the data frame before the filtering and after the join is completed.

Let’s see it in action:

# ...........................
## Perform inner join on df1 and df2
df = df1.join(df2, how="inner", on="value")

## add cache here
df.cache()

## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
df_odd = df.filter(df.value % 2 == 1)
# ...........................

Here is the query plan: InMemoryTableScan is present, so we can reuse the data frame to save other computing.

The DAG for optimized query plan for Union Operation | Image By Author

Now the physical plan is reduced to have only 32 stages, and if we check, ids 1 and 15 both leverage the InMemoryTableScan. This could save much more time if we split the original data frames into smaller datasets and then union them.

Optimized Physical query plan for Union Operation | Image By Author

I hope this story helps provide some insights into why sometimes the union operation becomes a bottleneck for your Spark performance. Due to the lack of optimization in Catalyst for the union operator in Spark, users need to be aware of such caveats to develop Spark code more effectively.

Adding cache can save time in our example, but it won’t help if the union is performed on two completely different data sources and there is no shared place to perform cache.

Kazuaki Ishizaki’s talk inspires this story — Goodbye Hell of Unions in Spark SQL, and my experience handling a similar issue for my projects.

Goodbye Hell of Unions in Spark SQL

ps: If you are interest in how to handle data skew part of Spark performance, I have another story on TDS for it.


Photo by Fahrul Azmi on Unsplash

The union operator is one of the set operators to merge two input data frames into one. Union is a convenient operation in Apache Spark for combining rows with the same order of columns. One frequently used case is applying different transformations and then unioning them together.

The ways of using the union operation in Spark are often discussed widely. However, a hidden fact that has been less discussed is the performance caveat associated with the union operator. If we didn’t understand the caveat of the union operator in Spark, we might fall into the trap of doubling the execution time to get the result.

We will focus on the Apache Spark DataFrame union operator in this story with examples, show you the physical query plan, and share techniques for optimization in this story.

Like Relational Database (RDBMS) SQL, the union is a direct way to combine rows. One important thing to note when dealing with a union operator is to ensure rows follow the same structure:

  • The number of columns should be identical. The union operation won’t silently work or fill with NULL when the number of columns differs on data frames.
  • The column data type should match and resolves columns by position. The column name should follow the same sequence for each data frame. Nevertheless, that’s not mandatory. The first data frame will be chosen as the default for the column name. So mixing order can potentially cause an undesired result. Spark unionByName is intended to resolve this issue.

In Spark, the operation unionAll is an alias to union that doesn’t remove duplication. We’d need to add distinct after performing union to perform SQL-like union operations without duplication.

We can also combine multiple data frames to produce a final data frame.

df = df1.union(df2).union(df3)

One typical pattern of using the union operator is splitting a single data frame into multiple, then applying different transformations, and eventually combining them into the final one.

Here is an example: we have two big tables (fact table) that need to join, and the best way to join is the SortMerged join in Spark. Once we got the SortMerged data frame, we split it into four subsets. Each subset uses different transformations, and eventually, we combine those 4 data frames into the final one.

Union Operation in Spark | Image By Author

Spark data frame leverages Catalyst optimizer, which takes the data frame code you had, then performs code analysis, logical optimization, physical planning, and code generation. Catalyst tries to create an optimal plan that executes your Spark job efficiently.

In recent years, Spark has extensively accomplished a lot of optimization on Catalyst to improve performance on Spark join operations. The join operation has more scenarios to use than the union operation, leading to less effort put into the union operation.

If users don’t use union on entirely different data sources, union operators will face a potential performance bottleneck — Catalyst isn’t “smart” to identify the shared data frames to reuse.

In this case, Spark will take each data frame as separate branches, then perform everything from the root multiple times. In our example, we will perform the two big table join four times! It is a huge bottleneck.

It’s straightforward to reproduce a non-optimized physical query plan for the union operator in Spark. We will do the following

  1. Create two data frames from 1 to 1000000. Let’s call them df1 and df2
  2. Perform inner join on df1 and df2
  3. Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
  4. Add a transformation with a field called magic_value , which is generated by two dummy transformations.
  5. Union the odd and even number data frames
## Create two data frames from 1 to 1000000. Let's call them df1 and df2
df1 = spark.createDataFrame([i for i in range(1000000)], IntegerType())
df2 = spark.createDataFrame([i for i in range(1000000)], IntegerType())

## Perform inner join on df1 and df2
df = df1.join(df2, how="inner", on="value")

## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
df_odd = df.filter(df.value % 2 == 1)
df_even = df.filter(df.value % 2 == 0)

## Add a transformation with a field called magic_value which is generated by two dummy transformations.
df_odd = df_odd.withColumn("magic_value", df.value+1)
df_even = df_even.withColumn("magic_value", df.value/2)

## Union the odd and even number data frames
df_odd.union(df_even).count()

Here is a high-level view of what the DAG looks like. If we look at the DAG bottom-up, one thing that stands out is the join happened twice, and the upstream almost looks identical.

We have seen where Spark needs to optimize the union operator extensively, and much time is wasted performing unnecessary recomputing if the data source can be reused.

The DAG for non-optimized query plan for Union Operation | Image By Author

Here is the physical plan that has 50 stages scheduled with AQE enabled. We can see ids 13 and 27. Spark did perform join twice on each branch and recomputed its branch.

The non-optimized Physical query plan for Union Operation | Image By Author

Now we can see this potential bottleneck. How could we resolve this? One option is to double the number of executors to run more concurrent tasks. But there is a better way to hint to Catalyst and let it reuse the joined data frame from memory.

To resolve the issue of the Spark performance of union operation, we can explicitly call a cache to persist the joined data frame in memory. So Catalyst knows the shortcut to fetch the data instead of returning it to the source.

Where to add the cache() ? The recommended place would be the data frame before the filtering and after the join is completed.

Let’s see it in action:

# ...........................
## Perform inner join on df1 and df2
df = df1.join(df2, how="inner", on="value")

## add cache here
df.cache()

## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
df_odd = df.filter(df.value % 2 == 1)
# ...........................

Here is the query plan: InMemoryTableScan is present, so we can reuse the data frame to save other computing.

The DAG for optimized query plan for Union Operation | Image By Author

Now the physical plan is reduced to have only 32 stages, and if we check, ids 1 and 15 both leverage the InMemoryTableScan. This could save much more time if we split the original data frames into smaller datasets and then union them.

Optimized Physical query plan for Union Operation | Image By Author

I hope this story helps provide some insights into why sometimes the union operation becomes a bottleneck for your Spark performance. Due to the lack of optimization in Catalyst for the union operator in Spark, users need to be aware of such caveats to develop Spark code more effectively.

Adding cache can save time in our example, but it won’t help if the union is performed on two completely different data sources and there is no shared place to perform cache.

Kazuaki Ishizaki’s talk inspires this story — Goodbye Hell of Unions in Spark SQL, and my experience handling a similar issue for my projects.

Goodbye Hell of Unions in Spark SQL

ps: If you are interest in how to handle data skew part of Spark performance, I have another story on TDS for it.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment