Skip to content

Spark Adaptive Query Execution

Lecture 29: Adaptive Query Execution🔗

Features of AQE🔗

image

Dynamically Coalescing Shuffle Partitions🔗

Sugar is best selling product it has highest data in the partition.

image

Now there is a GroupBy / Shuffling of data. All the Sugar data comes to one partition.

image

By default there are 200 partitions, but 195 are empty.

The resources are getting wasted because these 195 partitions also need to be shuffled.

The 5 partitions become 5 tasks but Partition 1 takes lot of time to run.

image

Now AQE coalesces the partitions.

image

Two tasks are now reduced and also 2 cores become free.

But even after coalescing we may end up with data skew.

image

image

Once we coalesce we end up with 2 partitions and 1/2 completes fast, the one with sugar takes time.

Data Splitting🔗

image

If median is 5MB and one partition is > 25MB then the data splits.

Dynamically Switching Join Strategy🔗

image

By default spark does sort merge join.

Now if we compress table2 to become 10mb, even though sort merge join DAG is built, if AQE is enabled, we can check runtime statistics.

image

Since data is only 10MB we can broadcast the data but shuffling still happens only sorting and merging is avoided.

Dynamically Optimizing Skew Join🔗

image

We are considering two tables where key = Sugar and just 128MB of data.

Let's show other partitions also

image

Now when we do the Sort Merge Join and get all keys together the Sugar partition size increases.

image

All tasks except the one with Sugar completes fast.

image

This leads to OOM error.

Solutions🔗
  • Salting
  • AQE

AQE has ShuffleReader, it has statistics on the memory and size of each partition. This parttion gets automatically split in both tables.

image