Spark Join Strategies
Lecture 26 : Join Strategies in Spark🔗
Joins are expensive due to shuffling.
4 partitions are there in each dataframe.
Executors in the cluster
Now we need to join employee and salary df to get the output but they are on different executors, so we need to do data shuffling.
Each executor has 200 partitions. Goal is to get all same keys in one executor.
- Since we want to get id for 1 we divide 1/200 = 1 and then send all the data to that executor 1.
Suppose we want to map the salary for id = 7 so the data from the employee df with id = 7 and also salary df with id=7 will come into the executor 7.
Similarly id = 201 will go into 201/200 = executor no 1.
Types of Join Strategies🔗
Joins generally result in shuffling
There are two dataframes df1 and df2 each with 4 partitions.
We have two executors.
In join goal is to join with same keys.
We can see that red P1 has corresponding id for salary in the other executor.
We need to get same keys fetched from other executors.
When a dataframe is sent to executors by default 200 partitions are created per dataframe.
Now let's say we want to find salary for id = 1 we can divide 1/200 on blue = 1 and 1/200 on red = 1, so both data will come into executor 1 in the partition 1.
Similarly for id = 7 also we will send the data on blue and red P7
But if id = 201 then 201/200 = 1 so this id will come into P1 only.
If we have id = 102 then 102/200 = 102 partition on 2nd executor.
The executors can be on different worker nodes also, we need to then move data across from one worker node to other.
Strategies🔗
Broadcast nested loop join is costly because we dont do a straight join, rather its based on < an > conditions, its O(n^2)
Shuffle Sort Merge Join🔗
TC : O(nlogn)
Shuffle Hash Join🔗
The smaller table gets a hash table created with hashed keys in memory.
Now from df1 we checked which keys match with O(1) lookup using the hash table.
Broadcast Join🔗
The tables that are less than 100mb can be broadcast.
Scenario : Suppose one table is 1GB size so we will have 1000MB / 128MB = 8 partitions and there is another table of size 5mb.
So if we dont broadcast, then the df with 100gb should be shuffled around with 5mb data across executors for joining. Instead of that we will just send the small df in all the executors so that there is no shuffling.
The amount of data that can be broadcast depends on the memory of executor and driver. Make sure that there is no case where driver memory is 2GB and we are trying to broadcast 1GB data.
Demo🔗
There are total 200 partitions when we join
Normal Sort Merge Join Execution Plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
Project [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
+- SortMergeJoin [country_id#10487L], [country_id#10513L], Inner
:- ColumnarToRow
: +- PhotonResultStage
: +- PhotonSort [country_id#10487L ASC NULLS FIRST]
: +- PhotonShuffleExchangeSource
: +- PhotonShuffleMapStage
: +- PhotonShuffleExchangeSink hashpartitioning(country_id#10487L, 1024)
: +- PhotonFilter isnotnull(country_id#10487L)
: +- PhotonRowToColumnar
: +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
+- ColumnarToRow
+- PhotonResultStage
+- PhotonSort [country_id#10513L ASC NULLS FIRST]
+- PhotonShuffleExchangeSource
+- PhotonShuffleMapStage
+- PhotonShuffleExchangeSink hashpartitioning(country_id#10513L, 1024)
+- PhotonFilter isnotnull(country_id#10513L)
+- PhotonRowToColumnar
+- LocalTableScan [country_id#10513L, country_name#10514]
== Photon Explanation ==
Photon does not fully support the query because:
Unsupported node: SortMergeJoin [country_id#10487L], [country_id#10513L], Inner.
Reference node:
SortMergeJoin [country_id#10487L], [country_id#10513L], Inner
Spark UI Diagram
Broadcast Join Execution Plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
ColumnarToRow
+- PhotonResultStage
+- PhotonProject [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
+- PhotonBroadcastHashJoin [country_id#10487L], [country_id#10513L], Inner, BuildRight, false, true
:- PhotonFilter isnotnull(country_id#10487L)
: +- PhotonRowToColumnar
: +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
+- PhotonShuffleExchangeSource
+- PhotonShuffleMapStage
+- PhotonShuffleExchangeSink SinglePartition
+- PhotonFilter isnotnull(country_id#10513L)
+- PhotonRowToColumnar
+- LocalTableScan [country_id#10513L, country_name#10514]
== Photon Explanation ==
The query is fully supported by Photon.