Skip to content

Spark Join Strategies

Lecture 26 : Join Strategies in Spark🔗

image

Joins are expensive due to shuffling.

4 partitions are there in each dataframe.

image

Executors in the cluster

image

Now we need to join employee and salary df to get the output but they are on different executors, so we need to do data shuffling.

Each executor has 200 partitions. Goal is to get all same keys in one executor. image

image

image

  • Since we want to get id for 1 we divide 1/200 = 1 and then send all the data to that executor 1.

image

Suppose we want to map the salary for id = 7 so the data from the employee df with id = 7 and also salary df with id=7 will come into the executor 7.

Similarly id = 201 will go into 201/200 = executor no 1.

Types of Join Strategies🔗

image

image

Joins generally result in shuffling

There are two dataframes df1 and df2 each with 4 partitions.

image

We have two executors.

In join goal is to join with same keys.

image

We can see that red P1 has corresponding id for salary in the other executor.

image

We need to get same keys fetched from other executors.

When a dataframe is sent to executors by default 200 partitions are created per dataframe.

image

Now let's say we want to find salary for id = 1 we can divide 1/200 on blue = 1 and 1/200 on red = 1, so both data will come into executor 1 in the partition 1.

image

Similarly for id = 7 also we will send the data on blue and red P7

But if id = 201 then 201/200 = 1 so this id will come into P1 only.

If we have id = 102 then 102/200 = 102 partition on 2nd executor.

image

The executors can be on different worker nodes also, we need to then move data across from one worker node to other.

Strategies🔗

image

Broadcast nested loop join is costly because we dont do a straight join, rather its based on < an > conditions, its O(n^2)

Shuffle Sort Merge Join🔗

image

TC : O(nlogn)

Shuffle Hash Join🔗

The smaller table gets a hash table created with hashed keys in memory.

Now from df1 we checked which keys match with O(1) lookup using the hash table.

image

Broadcast Join🔗

image

The tables that are less than 100mb can be broadcast.

Scenario : Suppose one table is 1GB size so we will have 1000MB / 128MB = 8 partitions and there is another table of size 5mb.

So if we dont broadcast, then the df with 100gb should be shuffled around with 5mb data across executors for joining. Instead of that we will just send the small df in all the executors so that there is no shuffling.

image

The amount of data that can be broadcast depends on the memory of executor and driver. Make sure that there is no case where driver memory is 2GB and we are trying to broadcast 1GB data.

Demo🔗

There are total 200 partitions when we join

image

Normal Sort Merge Join Execution Plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   Project [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
   +- SortMergeJoin [country_id#10487L], [country_id#10513L], Inner
      :- ColumnarToRow
      :  +- PhotonResultStage
      :     +- PhotonSort [country_id#10487L ASC NULLS FIRST]
      :        +- PhotonShuffleExchangeSource
      :           +- PhotonShuffleMapStage
      :              +- PhotonShuffleExchangeSink hashpartitioning(country_id#10487L, 1024)
      :                 +- PhotonFilter isnotnull(country_id#10487L)
      :                    +- PhotonRowToColumnar
      :                       +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
      +- ColumnarToRow
         +- PhotonResultStage
            +- PhotonSort [country_id#10513L ASC NULLS FIRST]
               +- PhotonShuffleExchangeSource
                  +- PhotonShuffleMapStage
                     +- PhotonShuffleExchangeSink hashpartitioning(country_id#10513L, 1024)
                        +- PhotonFilter isnotnull(country_id#10513L)
                           +- PhotonRowToColumnar
                              +- LocalTableScan [country_id#10513L, country_name#10514]

== Photon Explanation ==
Photon does not fully support the query because:
        Unsupported node: SortMergeJoin [country_id#10487L], [country_id#10513L], Inner.

Reference node:
    SortMergeJoin [country_id#10487L], [country_id#10513L], Inner

Spark UI Diagram

image

image

Broadcast Join Execution Plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
         +- PhotonBroadcastHashJoin [country_id#10487L], [country_id#10513L], Inner, BuildRight, false, true
            :- PhotonFilter isnotnull(country_id#10487L)
            :  +- PhotonRowToColumnar
            :     +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
            +- PhotonShuffleExchangeSource
               +- PhotonShuffleMapStage
                  +- PhotonShuffleExchangeSink SinglePartition
                     +- PhotonFilter isnotnull(country_id#10513L)
                        +- PhotonRowToColumnar
                           +- LocalTableScan [country_id#10513L, country_name#10514]

== Photon Explanation ==
The query is fully supported by Photon.

image