Spark YouTube
Spark Concepts and Code.
Lecture 1 : What is Apache Spark
Unified :
Computing Engine:
Spark is not storage platform we can store the data in hdfs, rdbms etc...
Spark can process terabytes of data in distributed manner.
Compute Cluster:
- each slave has 16 GB RAM, 1TB storage and 4 core CPU
- even master has some data and RAM
- the above cluster can compute 64 gb of data at a time.
- the master divides the data among the slave nodes and then slaves process the data.
Lecture 2 : Why Apache Spark?
Different Databases
new formats like video, audio, json,avro started coming in but rdms cannot handle it.
volume of data also increased.
What is Big Data?
Data Lake works on Extract Load Transform architecture
Issues with RDBMS
- Storage
- Processing - RAM and CPU
Enter Spark...
Monolith vs Microservice Architecture
Lecture 3 : Hadoop vs Spark
Misconception:
- Hadoop is a database - its not a database just a filesystem (hdfs)
- Spark is 100 times faster than hadoop
- Spark processes data in RAM but Hadoop doesnt
Differences
Performance
Hadoop does lot of read write IO operations and sends data back and forth to the disk.
But in spark each executor has its own memory.
Where is there no difference?
When we have very less data like 10 GB, there is no difference because the hadoop cluster also doesnt write to the disk it fits first time in memory.
Batch vs Stream Processing
Ease of Use
Spark has both low level and high level API in Python which is easier than using Hive. Low level programming is on RDD level.
Security
-
Hadoop has in built Kerberos Authentication via YARN whereas Spark doesnt have any security mechanism.
-
The authentication helps create ACL lists at directory level in HDFS.
-
Spark uses HDFS Storage so it gets ACL feature / ability and when it uses YARN it gets Kerberos Authentication.
Fault Tolerance
Data Replication in Hadoop
HDFS keeps track of which node / rack has the data from A B C and D
DAG in Spark
- So Spark computes / transforms in multiple processes Process 1 -> Process 2 -> Process 3 ....
- After each process the data is stored in a data structure called RDD which is immutable. So even if there is a failure Spark engine knows how to reconstruct the data for a particular process from the RDD at that stage.
Lecture 4 : Spark Ecosystem
High Level API : We cna write any SQL queries in python,java etc... there are ML and GraphX librries also.
We can write code in many languages. Low Level API : we can make RDD's and work on them.
Where does Spark run?
Spark Engine would need some memory for transformation.
- suppose it needs 4 worker nodes each 20 GB and a driver node of 20 gb.
- it goes to the cluster manager and asks for total 100 GB of memory, if available then the manager will assign that muuch storage.
- cluster manager is also called YARN, K8S, Standalone managers
Lecture 5 : Read Modes in Spark
format data file format : csv,json,jdbc and odbc connection. Format is optional parameter, by default its parquet format
option inferschema, mode and header [optional field]
schema manual schema can be passed here
load path from where we need to read the data [not optional]
DataframeReader API
Access it using 'spark.read' in the spark session
mode
in Spark
Lecture 6 : Spark Architecture
Spark Cluster
- 20 core per machine and 100 GB RAM / each machine
- Total Cluster : 200 cores and 1TB RAM
- The master is controlled by Resource Manager and the workers are controlled by Node Manager.
What happens when user submits code?
- The user submits some Spark code for execution to the Resource Manager. It needs 20 GB RAM, 25 GB executor, 5 total executors and 5 CPU cores.
- So the manager goes to W5 and asks to create 20GB container as the driver node.
What happens inside the container?
Driver Allocation
Now this 20 GB driver is called Application Master
There are two main() functions inside the master, one is for PySpark and other is for JVM like Java,Scala etc...
The JVM main() is called Application Driver.
The Spark Core has a Java Wrapper and the Java Wrapper has a Python Wrapper.
When we write code in PySpark it gets converted to Java Wrapper.
The PySpark driver is not a requirement but the Java Wrapper is required to run any code.
Worker Allocation
- Now the Application master asks for the executors to be assigned and the resource manager allocates.
Executor Container
Each executor has 5 core CPU and 25GB RAM. Each one of them runs on separate container.
THe above is when we have pure Java code and dont use Python UDF.
But what if we use Python UDF functions?
We need a Python worker inside the executor to be able to run the code.
Lecture 7 : Schema in Spark
StructType and StructField
Example:
How to skip the header row?
Lecture 8 : Handling Corrupter Records in Spark
How many records in each mode?
Permissive Mode
DropMalformed
How to Print Corrupted Records
Output
How to Store Corrupted Records
The corrupted records are in json format
Lecture 9 : Transformations and Actions in Spark
Types of Transformations
- Narrow Transformation
- Wide Transformation
Example:
Suppose data is of 200MB. 200MB / 128MB = 2 partitions
Let's say both partitions go to separate executors.
Q1 : Filtering Records
There is no data movement here.
Q2: Find Total Income of each employee
One id = 2 record is in one partition and the other is in the second partition so we need to do wide transformation
Data needs to be shuffled and records with same id must be moved to same partition.
- filter,select,union etc are narrow transformations
- join,groupby,distinct
Lecture 10 : DAG and Lazy Evaluation in Spark
- For every action there is a new job, here there are three actions : read,inferSchema,sum and show
- When used with groupBy().sum(): It is considered an action because it triggers computation to aggregate data across partitions and produce a result. This operation forces Spark to execute the transformations leading up to it, effectively creating a job.
-
When used as a column expression df.select(sum("value")): It acts more like a transformation in Spark's context, especially if part of a larger query or pipeline that does not immediately trigger execution. In this case, it only defines the operation and does not create a job until an action (like show() or collect()) is called.
-
Job for reading file
Whole Stage Codegen - generate Java ByteCode
-
Inferschema
-
GroupBy and Count As explained above this is an action.
-
Show Final action to display df
After we read the csv and inferSchema there are no jobs created since filter and repartition both are transformations not actions.
When there are two filters on same dataset
This is the job
Optimizations on the Filter
Both the filters are on the same task
The optimizations can be applied because Spark is lazily evaluated.
Lecture 11: Working with JSON Data in Spark
Two types of JSON notation:
-
Line Delimited JSON
-
Multi Line JSON
[
{
"name": "Manish",
"age": 20,
"salary": 20000
},
{
"name": "Nikita",
"age": 25,
"salary": 21000
},
{
"name": "Pritam",
"age": 16,
"salary": 22000
},
{
"name": "Prantosh",
"age": 35,
"salary": 25000
},
{
"name": "Vikash",
"age": 67,
"salary": 40000
}
]
Line Delimited JSON is more efficient in terms of performance because the compiler knows that each line has one JSON record whereas in multiline json the compiler needs to keept track of where the record ends and the next one starts.
Different number of keys in each line
Here what happens is that the line with the extra key has the value while for the rest its null
Multiline Incorrect JSON
We dont pass a list here rather its just dictionaries
{
"name": "Manish",
"age": 20,
"salary": 20000
},
{
"name": "Nikita",
"age": 25,
"salary": 21000
},
{
"name": "Pritam",
"age": 16,
"salary": 22000
},
{
"name": "Prantosh",
"age": 35,
"salary": 25000
},
{
"name": "Vikash",
"age": 67,
"salary": 40000
}
Corrupted Records
We dont need to define _corrupted_record
in the schema, it will add the column on its ownn
Lecture 12: Spark SQL Engine
How is Spark Code compiled?
- The catalyst optimizer creates a plan and creates RDD lineage
Phases in Catalyst Optimizer
Workflow Diagram
- Unresolved Logical Plan : Bunch of crude steps to execute the SQL code
- Catalog : The table, files and database metadata information si stored in the catalog. Suppose we call read.csv on file that doesnt exist. The procedure that gives / throws the error is assisted via the catalog. In Analysis phase, we go through these steps. If some file/table is not found then we get Analysis Exception This error occurs when the Logical plan provided is not able to be resolved.
- Reoslved Logical Plan : This is the phase when we finished analysing the catalog objects.
- Logical Optimization: There are many examples. Suppose we need just two columns in select output, the spark engine does not fetch all the columns rather jsut fetches the two columns from memory that we need. Another example is when we use multiple filters on the same column in different lines of code. When we execute this code, we see that all of it is executed with or statements in one single line of code.
- Physical Plan: This involves taking decision like the type of join to use: Broadcast Join is one example. From the logical plan, we can build multiple physical plans. Thebest Physical Plan is a set of RDDs to be run on different executors on the cluster.
Lecture 13: Resilient Distributed Dataset
Data Storage of List
Data Storage in RDD
Suppose we have 500MB of data and 128MB partition, so we will have 4 partitions.
The data is scattered on various executors.
Its not in single contiguous location like elements of a list. The data structure used ot process this data is called RDD
Why is RDD recoverable?
-
RDD is immutable. If we apply multiple filters each dataset after filtering is a different dataset
-
In below case if rdd2 fails then we can restore rdd1 because of the lineage.
Disadvantage of RDD
- No optimization done by Spark on RDD. The dev must specify explicitly on how to optimize RDD.
Advantage
- Works well with unstructured data where there are no columns and rows / key-value pairs
- RDD is type safe, we get error on compile time rather than runtime which happens with Dataframe API.
Avoiding RDDs
- RDD : How to do? Dataframe API: Just specify what to do?
You can see in above case that we have a join and filter but we are specifically saying that first join then filter so it triggers a shuffle first and then filter which is not beneficial.
Lecture 14 : Parquet File Internals
There are two types of file formats:
- Columnar Based and Row Based
Physical Storage of Data on Disk
Write Once Read Many
The funda of big data is write once read many.
- We dont need all the columns for analytics of big data, so columnar storage is the best.
- If we store in row based format then we need to jump many memory racks to be able to get the data we need.
- OLTP generally use row base4d file format.
- I/O should be reduced so OLAP uses columnar format.
Why Columnar format may not be the best?
In above case we can get col1 and col2 easily but for col10 we still need to scan the entire file.
To tackle this:
Let's say we have 100 million total rows.
We can store 100,000 records at a time, continuously in one row, then the next 100,000 records in next row and so on in hybrid format.
Logical Partitioning in Parquet
Let's day we have 500mb data, each row group by default has 128 mb data, so we will have 4 row groups. Each row group will have some metadata attached to it.
In our example let's say one row group has 100000 records. The column is futher stored as a page.
Runlength Encoding and Bitpacking
Suppose we have 10 lakh records but there can be say 4 countries.
So parquet actually creates a dictionary of key value pair with key as int starting from 0 to 3 and then in the dictionary encoded data, we can see the keys being used insted of country name.
More details on Parquet
The row-wise formats store data as records, one after another. This format works well when accessing entire records frequently. However, it can be inefficient when dealing with analytics, where you often only need specific columns from a large dataset.
Imagine a table with 50 columns and millions of rows. If you’re only interested in analyzing 3 of those columns, a row-wise format would still require you to read all 50 columns for each row.
Columnar formats address this issue by storing data in columns instead of rows. This means that when you need specific columns, you can read only the columnsdata you need, significantly reducing the amount of data scanned.
However, storing data in a columnar format has some downsides. The record write or update operation requires touching multiple column segments, resulting in numerous I/O operations. This can significantly slow the write performance, especially when dealing with high-throughput writes.
When queries involve multiple columns, the database system must reconstruct the records from separate columns. The cost of this reconstruction increases with the number of columns involved in the query.
The hybrid format combines the best of both worlds. The format groups data into "row groups," each containing a subset of rows (aka horizontal partition). Within each row group, data for each column is called a “column chunk" (aka vertical partition).
In the row group, these chunks are guaranteed to be stored contiguously on disk.
Terminologies
A Parquet file is composed of:
Row Groups: Each row group contains a subset of the rows in the dataset. Data is organized into columns within each row group, each stored in a column chunk.
Column Chunk: A chunk is the data for a particular column in the row group. Column chunk is further divided into pages.
Pages: A page is the smallest data unit in Parquet. There are several types of pages, including data pages (actual data), dictionary pages (dictionary-encoded values), and index pages (used for faster data lookup).
Metadata Types in Parquet
Magic number: The magic number is a specific sequence of bytes (PAR1) located at the beginning and end of the file. It is used to verify whether it is a valid Parquet file.
FileMetadata: Parquet stores FileMetadata in the footer of the file. This metadata provides information like the number of rows, data schema, and row group metadata. Each row group metadata contains information about its column chunks (ColumnMetadata), such as the encoding and compression scheme, the size, the page offset, the min/max value of the column chunk, etc. The application can use information in this metadata to prune unnecessary data.
PageHeader: The page header metadata is stored with the page data and includes information such as value, definition, and repetition encoding. Parquet also stores definition and repetition levels to handle nested data. The application uses the header to read and decode the data.
How is data written into Parquet?
-
The application issues a written request with parameters like the data, the compression and encoding scheme for each column (optional), the file scheme (write to one or multiple files), etc.
-
The Parquet Writer first collects information, such as the data schema, the null appearance, the encoding scheme, and all the column types recorded in FileMetadata.
-
The Writer writes the magic number at the beginning of the file.
-
Then, it calculates the number of row groups based on the row group’s max size (configurable) and the data’s size. This step also determines which subset of data belongs to which row group.
-
For each row group, it iterates through the column list to write each column chunk for the row group. This step will use the compression scheme specified by the user (the default is none) to compress the data when writing the chunks.
-
The chunk writing process begins by calculating the number of rows per page using the max page size and the chunk size. Next, it will try to calculate the column's min/max statistic. (This calculation is only applied to columns with a measurable type, such as integer or float.)
-
Then, the column chunk is written page by page sequentially. Each page has a header that includes the page’s number of rows, the page’s encoding for data, repetition, and definition. The dictionary page is stored with its header before the data page if dictionary encoding is used.
-
After writing all the pages for the column chunk, the Parquet Writer constructs the metadata for that chunk, which includes information like the column's min/max, the uncompressed/compressed size, the first data page offset, and the first dictionary page offset.
-
The column chunk writing process continues until all columns in the row group are written to disk contiguously. The metadata for each column chunk is recorded in the row group metadata.
-
After writing all the row groups, all row groups’ metadata is recorded in the FileMetadata.
-
The FileMetadata is written to the footer.
-
The process finishes by writing the magic number at the end of the file.
How is data read from Parquet?
-
The application issues a read request with parameters such as the input file, filters to limit the number of read row groups, the set of desired columns, etc.
-
If the application requires verification that it’s reading a valid Parquet file, the reader will check if there is a magic number at the beginning and end of the file by seeking the first and last four bytes.
-
It then tries to read the FileMetadata from the footer. It extracts information for later use, such as the file schema and the row group metadata.
-
If filters are specified, they will limit the scanned row groups by iterating over every row group and checking the filters against each chunk’s statistics. If it satisfies the filters, this row group is appended to the list, which is later used to read.
-
The reader defines the column list to read. If the application specifies a subset of columns it wants to read, the list only contains these columns.
-
The next step is reading the row groups. The reader will iterate through the row group list and read each row group.
-
The reader will read the column chunks for each row group based on the column list. It used ColumnMetadata to read the chunk.
-
When reading the column chunk for the first time, the reader locates the position of the first data page (or dictionary page if dictionary encoding is used) using the first page offset in the column metadata. From this position, the reader reads the pages sequentially until no pages are left.
-
To determine whether any data remains, the reader tracks the current number of read rows and compares it to the chunk’s total number of rows. If the two numbers are equal, the reader has read all the chunk data.
-
To read and decode each data page, the reader visits the page header to collect information like the value encoding, the definition, and the repetition level encoding.
-
After reading all the row groups’ column chunks, the reader moves to read the following row groups.
-
The process continues until all the row groups in the row group list are read.
Because the Parquet file can be stored in multiple files, the application can read them simultaneously.
In addition, a single Parquet file is partitioned horizontally (row groups) and vertically (column chunks), which allows the application to read data in parallel at the row group or column level.
OLAP Workload Example
Demo
parquet-tools inspect <filename>
Gives some file and brief column level metadata.
parquet_file.metadata.row_group(0).column_group(0)
Compression is GZIP
Encoding is explained on top.
Bitpacking Advantage
- Bitpacking helps in compressing the bits so in above case we just have 4 unique values and hence we need just 2 bytes.
- Query in seconds for running select on csv,parquet etc..
Summary
-
Here the actual data is stored in the pages and it has metadata like min,max and count.
-
Let's say we need to find out people less than 18 years age
Here when we divide data into row groups, we dont need to do any IO read operation on Row group 2, it saves lot of time and optimize performance.
The above concept is called Predicate Pushdown.
Projection Pruning
Projection Pruning means we dont read IO from columns that are not part of the select query or that arent required for any join.
Lecture 15 : How to write data on the disk?
Modes to write data
Create three files
write_df = read_df.repartition(3).write.format("csv")\
.option("header", "True")\
.mode("overwrite")\ # Using .mode() instead of .option() for overwrite mode
.option("path", "/FileStore/tables/Write_Data/")\
.save()
Lecture 16: Partitioning and Bucketing
In above data we cannot partition by any column because there is no similarity but we can bucket the data.
In above data we can partition by the country, but again we might have more data in India partition and less data in Japan.
Example of Partitioning
The advantage is that the entire data is not scanned and only few partitions are scanned based on the filters.
Partitioning by Id
Here we can see that we have created partitions by ID and since ID is low cardinality column partitioning is not efficient and we need to use bucketing.
Partitioning by Address and Gender
Bucketing by Id
Dividing into 3 buckets
Tasks vs Buckets
- If we have 200 partitions we will have 200 tasks and each task will create 5 buckets each, to tackle this we first repartition into 5 partitions and then bucketBy 5.
How does bucketing help with joins?
- Here we can see that since we have same column bucket on both tables the ids can be easily mapped and there is no shuffling.
Bucket Pruning
Suppose we have 1M Aadhar Ids and we divide into 100,000 each bucket so when we divide the above aadhar id by 100000 then we get the exact bucket where the number lies in.
Lecture 17 : Spark Session vs Spark Context
- Spark Session is entry point to the Spark cluster where we provide the parameters to create and operate our cluster.
- Spark session will have different context like one for SQL, PySpark etc...
Lecture 18: Job, Stage and Tasks
- One Application is created.
- One job is created per action.
- One stage is defined for every transformation like filter.
- Task is the actually activity on the data that's happening.
Example of Job,Action and Task
Complete flow diagram
Every job has minimum one stage and one task.
Repartition to filter is one job because we dont hit an action in between.
Every wide dependency transformation has its own stage. All narrow dependency transformations come in one stage as a DAG.
How do tasks get created? [Read and Write Exchange]
- The repartition stage actually is a wide dependency transformation and creates two partitions from one, its a Write exchange of data.
- Now the filter and select stage reads this repartitioned data(Read exchange) and filter creates two tasks because we have two partitions.
- Next we need to find out how many folks earn > 90000 and age > 25 so we need to do a groupby that's a wide dependency transformation and it creates another stage. By default there are 200 partitions created.
- So some partitions may have data and some wont.
Lecture 17: Dataframe Transformations in Spark Part 1
Data gets stored in Row() format in the form of bytes
Columns are expressions. Expressions are set of transformations on more than one value in a record.
Ways to select values / columns
Column Manipulations
Other methods
selectExpr
Aliasing Columns
Lecutre 18 : Dataframe Transformations in Spark Part II
filter()
/ where()
no difference
Multiple filter conditions
Literals in spark
Used to pass same value in all the columns
Adding Columns
If the column already exists then it gets overwritten.
Renaming Columns
Lecture 19: union vs unionAll()
We can see that here we have a duplicate id
In PySpark union and unionAll behaves in the same way, both retain duplicates
But in Spark SQL when we do union it drops the duplicate records
Selecting data and unioning the same table
What happens when we change the order of the columns?
wrong_manager_df
actually has the wrong order of columns but still we get the union output but in a wrong column values.
If we give different number of columns an exception is thrown.
If we use unionByName then the column names on both dfs must be the same.
Lecture 19: Repartitioning and Coalesce
Suppose we have 5 partitions and one of them is skewed a lot 100MB, let's say this is the best selling product records. This partition takes lot of time to compute. So the other executors have to wait until this executor finishes processing.
Repartitioning vs Coalesce
Repartitioning
Suppose we have the above partitions and total data is 100mb. let's say we do repartition(5) so we will have 5 partitions now for the data with 40mb per partition.
Coalesce
In case of coalesce there is no equal splitting of partition memory, rather the already existing partitions get merged together.
There is no shuffling in coalesce but in repartitioning there is shuffling of data.
Pros and Cons in repartitioning
- There is evenly distributed data.
- Con is that IO operations are more, its expensive.
- Con of coalesce is that the data is unevenly distributed.
Repartitioning can increase or decrease the partitions but coalescing can only decrease the partitions.
How to get number of partitions?
flight_df.rdd.getNumPartitions()
gets the initial number of partitions and then we can repartition flight_df.repartition(4)
. Data is evenly distributed.
Repartitioning based on columns
Since we asked for 300 partitions and we have 255 records some partitions will have null record.
Coalescing
Suppose we have 8 partitions and we coalesce into 3 partitions. Coalesce has only one arg.
Uneven distribution of data in partitions.
Lecture 20 : Case when / if else in Spark
Apply logic on one column then process if else logic
Spark SQL Logic
Lecture 21 : Unique and Sorted Records
distinct()
Original Data
Distinct Data
Distinct Based on certain columns
⚠️ Distinct takes no arguments we need to select the columns first and then apply distinct.
Dropping duplicate records
Point to note is that the dataframe manager_df
has no changes, it just shows the records after dups have been dropped.
sort()
Descending order
Sorting on multiple columns
Here first the salary is srranged in desc order then we arrange the name in asc order from those records with same salary.
Lecture 22 : Aggregate functions
Count as both Action and Transformation
⚠️ When we are doing count on a single column and there is a null in it, its not considered in the count. But for all columns we have nulls in the count.
Job created in first case and its not created in second case below.
Lecture 23: Group By In Spark
Sample Data
Questions
Salary per department using groupBy()
Where do we use window functions?
Suppose we need to find out the percentage of total salary from a particular dept that the person is earning. we can use window function to specify the total salary per department in the particular record itself like I've shown below.
This way we dont need to perform a join.
Grouping by two columns
Lecture 24 : Joins in Spark part 1
Which customers joined platform but never brought anything?
Whenever we need information from another table, we use joins and there should be some common column.
Join is a costly wide dependency operation.
How do joins work?
How many records do we get after inner joining the below two tables.
We get a total of 9 records.
Sometimes data gets duplicated when we do joins, so we should use distinct() but remember distinct is wide dependency transform.
Lecture 25 : Types of Join in Spark
Inner Join
Left Join
All records in left table + those that join with right table, whereever we dont get match on right table the columns become null.
Right Join
Full Outer Join
Left Semi Join
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LeftSemiJoinExample").getOrCreate()
# Left DataFrame: Orders
orders = spark.createDataFrame([
(1, "iPhone"),
(2, "Pixel"),
(3, "OnePlus"),
(4, "Nokia")
], ["customer_id", "product"])
# Right DataFrame: Valid Customers
valid_customers = spark.createDataFrame([
(1,), (3,)
], ["customer_id"])
# Perform left semi join
filtered_orders = orders.join(valid_customers, on="customer_id", how="left_semi")
filtered_orders.show()
Output
+-----------+--------+
|customer_id|product |
+-----------+--------+
| 1|iPhone |
| 3|OnePlus |
+-----------+--------+
Left Anti Join
Find out all customers who have never purchased any product.
Cross Join
Never use cross join!
Lecture 26 : Join Strategies in Spark
Joins are expensive due to shuffling.
4 partitions are there in each dataframe.
Executors in the cluster
Now we need to join employee and salary df to get the output but they are on different executors, so we need to do data shuffling.
Each executor has 200 partitions. Goal is to get all same keys in one executor.
- Since we want to get id for 1 we divide 1/200 = 1 and then send all the data to that executor 1.
Suppose we want to map the salary for id = 7 so the data from the employee df with id = 7 and also salary df with id=7 will come into the executor 7.
Similarly id = 201 will go into 201/200 = executor no 1.
Types of Join Strategies
Joins generally result in shuffling
There are two dataframes df1 and df2 each with 4 partitions.
We have two executors.
In join goal is to join with same keys.
We can see that red P1 has corresponding id for salary in the other executor.
We need to get same keys fetched from other executors.
When a dataframe is sent to executors by default 200 partitions are created per dataframe.
Now let's say we want to find salary for id = 1 we can divide 1/200 on blue = 1 and 1/200 on red = 1, so both data will come into executor 1 in the partition 1.
Similarly for id = 7 also we will send the data on blue and red P7
But if id = 201 then 201/200 = 1 so this id will come into P1 only.
If we have id = 102 then 102/200 = 102 partition on 2nd executor.
The executors can be on different worker nodes also, we need to then move data across from one worker node to other.
Strategies
Broadcast nested loop join is costly because we dont do a straight join, rather its based on < an > conditions, its O(n^2)
Shuffle Sort Merge Join
TC : O(nlogn)
Shuffle Hash Join
The smaller table gets a hash table created with hashed keys in memory.
Now from df1 we checked which keys match with O(1) lookup using the hash table.
Broadcast Join
The tables that are less than 100mb can be broadcast.
Scenario : Suppose one table is 1GB size so we will have 1000MB / 128MB = 8 partitions and there is another table of size 5mb.
So if we dont broadcast, then the df with 100gb should be shuffled around with 5mb data across executors for joining. Instead of that we will just send the small df in all the executors so that there is no shuffling.
The amount of data that can be broadcast depends on the memory of executor and driver. Make sure that there is no case where driver memory is 2GB and we are trying to broadcast 1GB data.
Demo
There are total 200 partitions when we join
Normal Sort Merge Join Execution Plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
Project [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
+- SortMergeJoin [country_id#10487L], [country_id#10513L], Inner
:- ColumnarToRow
: +- PhotonResultStage
: +- PhotonSort [country_id#10487L ASC NULLS FIRST]
: +- PhotonShuffleExchangeSource
: +- PhotonShuffleMapStage
: +- PhotonShuffleExchangeSink hashpartitioning(country_id#10487L, 1024)
: +- PhotonFilter isnotnull(country_id#10487L)
: +- PhotonRowToColumnar
: +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
+- ColumnarToRow
+- PhotonResultStage
+- PhotonSort [country_id#10513L ASC NULLS FIRST]
+- PhotonShuffleExchangeSource
+- PhotonShuffleMapStage
+- PhotonShuffleExchangeSink hashpartitioning(country_id#10513L, 1024)
+- PhotonFilter isnotnull(country_id#10513L)
+- PhotonRowToColumnar
+- LocalTableScan [country_id#10513L, country_name#10514]
== Photon Explanation ==
Photon does not fully support the query because:
Unsupported node: SortMergeJoin [country_id#10487L], [country_id#10513L], Inner.
Reference node:
SortMergeJoin [country_id#10487L], [country_id#10513L], Inner
Spark UI Diagram
Broadcast Join Execution Plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
ColumnarToRow
+- PhotonResultStage
+- PhotonProject [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
+- PhotonBroadcastHashJoin [country_id#10487L], [country_id#10513L], Inner, BuildRight, false, true
:- PhotonFilter isnotnull(country_id#10487L)
: +- PhotonRowToColumnar
: +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
+- PhotonShuffleExchangeSource
+- PhotonShuffleMapStage
+- PhotonShuffleExchangeSink SinglePartition
+- PhotonFilter isnotnull(country_id#10513L)
+- PhotonRowToColumnar
+- LocalTableScan [country_id#10513L, country_name#10514]
== Photon Explanation ==
The query is fully supported by Photon.
Window functions in Spark
Rank vs Dense Rank
Dense rank does not leave any gaps between the ranks.
Lead and Lag
Range and Row Between
Q1
Using first and last functions let's try to acheive this.
Data:
This solution is wrong, ideally we should get 111000 in all rows of latest_sales
column.
Let's look at explain plan.
We can see that the window here is unbounded preceeding and current row
What do these terms mean?
- Unbounded preceeding : If i'm standing at a current row in a window I will return the result of any operation on the window from here to all the rows before me in the window.
- current_row : the row im standing at.
- Unbounded following : opposite of unbounded preceeding.
- rows_between(start_row,end_row) : basically the row we are currently at is 0, all rows before that are negative numbers and all rows after that is positive numbers.
If we dont give anything then it just goes from current row to either unbounded preceeding (first row) of window or unbounded following (last row) of window.
Converting from string to unixtime when we have two fields date and time.
emp_df = emp_df.withColumn("timestamp",from_unixtime(unix_timestamp(expr("CONCAT(date,' ',time)"),"dd-MM-yyyy HH:mm")))
The timestamp column is a string.
Spark Memory Management
If we do df.range(100000)
and then do df.collect()
on 1Gb driver we get OOM error
Spark Architecture
Driver memory is of two types:
- spark.driver.memory
- spark.driver.memoryOverhead
With collect all records go into the driver. But with show just one partition gets sent to the heap space.
🎯 Think of the Spark Driver Like a Worker
Imagine the Spark driver is a person doing a big task at a desk.
The desk = spark.driver.memory (main memory)
The room around the desk = spark.driver.memoryOverhead (extra space to move, store tools, use side tables)
🧠 Why Just the Desk Isn’t Enough
Let’s say the driver (person) is:
Writing on paper (standard Spark tasks)
Using a laptop (Python/PySpark or native code)
Holding tools and files (temporary data, buffers, network stuff)
Only giving them a desk (spark.driver.memory) isn't enough:
The laptop (native code, Python UDFs) might need space outside the desk
The tools (Spark internals, shuffle, serialization) don’t fit on the desk — they use off-heap memory
If you don’t give them enough room around the desk (memoryOverhead), they might trip over stuff and fail the task.
🧪 Real Spark Example When you run PySpark like this:
That Python UDF runs outside the JVM. It needs extra native memory, not regular Java memory.
Spark says:
“I’ll use driver.memory for my JVM, but I need some memoryOverhead for the native stuff.”
✅ Summary (in 1 line)
spark.driver.memory is for Spark's own work (Java),
spark.driver.memoryOverhead is for everything outside the JVM — like Python, shuffle, native code.
The memory overhead is max(384mb,10% of driver memory)
Let's say there is df1
and we want to join it with two small tables df2
and df3
.
We send both df2 and df3 to the driver.
Let's say we now give 5 dayasets worth 250 mb and the total driver space is 1G.
If rest 750mb is not enough for other processes then the driver will give OOM exception.
💥 So… How Can GC Cause Out of Memory (OOM)?
You’d think GC helps prevent OOMs — and it does! But in high-memory-pressure situations, it can actually cause or worsen them.
🚨 Here’s how it happens: 1. Too Many Objects / Too Much Data in Memory You load huge datasets or perform wide transformations (e.g., groupBy, join).
Spark stores a lot of intermediate data in RAM (JVM heap).
👉 JVM tries to make space by running GC again and again.
- GC Takes Too Long If GC runs too often or too long (e.g., > 30s), the JVM thinks something’s wrong.
You get:
This means:
“GC is using 98% of the CPU but only recovering 2% of memory — I give up.”
- GC Can’t Free Anything Some objects (like cached RDDs or references from your code) stay in memory.
GC runs but can't collect them because they're still "referenced".
Eventually, JVM runs out of space and crashes with:
java.lang.OutOfMemoryError: Java heap space
⚠️ Common Scenarios in Spark
Cause Result
Large shuffles / joins Too many objects in memory
Caching huge RDDs Heap filled, GC can't recover
Improper partitions Few tasks → huge memory per task
Memory leaks (bad code) Uncollectable references
Example code
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import random
spark = SparkSession.builder \
.appName("OOM-GC-Demo") \
.config("spark.driver.memory", "1g") \
.getOrCreate()
# Create a large DataFrame with few partitions (causes memory pressure)
data = [(i % 10, random.randint(1, 1000)) for i in range(10_000_000)] # 10 million rows
df = spark.createDataFrame(data, ["group_id", "value"])
# Force a wide transformation + cache
result = df.groupBy("group_id").count().persist(StorageLevel.MEMORY_ONLY)
# Trigger action
result.count()
✅ How to Fix
Increase spark.executor.memory or spark.driver.memory
Use persist(StorageLevel.DISK_ONLY) if RAM is tight
Avoid huge wide transformations without enough partitions
Tune GC (G1GC is often better for large heaps)
Executor Memory OOM
10 GB per executor and 4 cores
Expanding one executor
Exceeding either 10GB or 1GB leads to OOM
How is 10GB divided?
What does each part of the user memory do?
- Reserved Memory
Minimum 450mb must be our memory of executor.
- User Memory
- Storage Memory Usage
- Executor Memory Usage
What does each part of the spark memory do?
⚙️ Background: Memory in Spark Executors
Each executor in Spark has a limited memory budget. This memory is split for:
-
Execution Memory: used for joins, aggregations, shuffles
-
Storage Memory: used for caching RDDs or DataFrames
-
User Memory: everything else (broadcast vars, UDFs, JVM overhead)
🔄 1. Static Memory Manager (Old)
This was Spark's memory model before Spark 1.6.
🔧 How It Works:
- Fixed memory boundaries set in config.
- You manually allocate how much memory goes to:
- Storage (RDD cache)
- Execution (shuffles, joins)
- If storage fills up → cached blocks are evicted.
- No sharing between execution and storage.
Example fractions
🔄 2. Unified Memory Manager (Modern - Default)
Introduced in Spark 1.6+ and is default since Spark 2.0.
🔧 How It Works:
Combines execution + storage into a single unified memory pool.
Dynamic memory sharing: if execution needs more, storage can give up memory — and vice versa.
Much more flexible and efficient.
✅ Benefits:
- Less tuning needed
- Avoids wasted memory in one region while another needs more
- Better stability under pressure
In bwlo case execution memory is empty so storage mmemory uses more of execution memory for caching
Now executor does some work in blue boxes
Now entire memory is full, so we need to evict some data that has been cached. This happens in LRU fashion.
Now let's say executor has entire memory used 2.9 something gb... but it needs more memory.
If the storage pool memory is free it can utilize that.
If the storage pool is also full, then we get OOM!!!
When can we neither evict the data nor spill to disk?
Suppose we have two dataframes df1 and df2 and the key id = 1 is heavily skewed in both dataframes, and its 3GB
Since we need to get all the data from df1 and df2 with id = 1 onto the same executor to perform the join, we have just 2.9GB but the data is 3gb so it gives OOM.
We can handle 3-4 cores per executor beyond that we get memory executor error.
❓ When can Spark neither evict nor spill data from executor memory?
This happens when both eviction and spilling are not possible, and it leads to:
💥 OutOfMemoryError in executors.
✅ These are the main scenarios:
🧱 1. Execution Memory Pressure with No Spill Support
Execution memory is used for:
- Joins (SortMergeJoin, HashJoin)
- Aggregations (groupByKey, reduceByKey)
- Sorts
Some operations (like hash-based aggregations) need a lot of memory, and not all are spillable.
🔥 Example:
If collect_set() builds a huge in-memory structure (e.g., millions of unique events per user)And that structure can’t be spilled to disk
And execution memory is full
👉 Spark can’t evict (no caching), and can’t spill (not supported for this op) → 💣 OOM
🔁 2. Execution Takes Priority, So Storage Can't Evict Enough
In Unified Memory Manager, execution gets priority over storage.
But sometimes, even after evicting all cache, execution still doesn’t get enough memory.
🔥 Example: - You cached a large DataFrame. - Then you do a massive join.
Spark evicts all cached data, but still can't free enough memory.
👉 No more memory to give → 💥
User Code holding References
🍕 Imagine Spark is a Pizza Party Spark is throwing a pizza party. You and your friends (the executors) are each given a plate (memory) to hold some pizza slices (data).
The rule is:
“Eat your slice, then give your plate back so someone else can use it.”
😬 But You Keep Holding Your Plate You finish your slice, but instead of giving the plate back, you say:
“Hmm… I might want to lick the plate later,” so you hold on to it.
And you keep doing this with every plate 🍽️.
Now, you have 10 plates stacked up, all empty, but you're still holding them.
🍕 But There’s a Problem… Spark wants to serve more pizza (more data), but now there are no plates left. Even though you’re not using yours, Spark can’t take them back, because you’re still holding on.
💥 Result? Spark gets frustrated and says:
“I’m out of plates! I can’t serve any more pizza!”
That’s when Spark crashes with a memory error (OOM) — because it can’t clean up the memory you're holding onto.
✅ What Should You Do? Let go of the plates as soon as you're done eating (i.e., don’t store data in variables or lists forever).
That way, Spark can reuse memory and everyone gets more pizza. 🍕
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HoldingReferencesOOM") \
.config("spark.driver.memory", "1g") \
.getOrCreate()
# Create a large DataFrame
df = spark.range(1_000_000) # 1 million rows
# ❌ BAD: Holding all rows in a Python list
all_data = df.collect() # Loads entire DataFrame into driver memory
# Still holding reference to a big object
# Spark can't clean this up because Python is holding it
# Do more operations
df2 = df.selectExpr("id * 2 as double_id")
df2.show()
Spark wants to free memory, but it can’t, because your code is still holding a reference to the list all_list
is still a reference and even though we may not use it later Java GC doesnt know that. its like we finish playing with a teddy bear but still hold onto it, the teacher thinks we are still playing with it, so they cant take it back.
df = spark.range(1_000_000)
# ✅ Process data without collecting everything into memory
df.filter("id % 2 == 0").show(10) # only shows first 10 rows
Lecture 27: Spark Submit
Spark submit is a command line tool to run spark applications, it packages the spark code and runs on cluster.
The spark cluster can be standalone,local,K8s or YARN.
Spark Submit Command
Master can run on yarn
,local
or k8s
deploy-mode
-> specifies where driver runs
--class
-> not required for python, just scala or java
--jars
-> my sql connector jar files
spark.dynamicAllocation.enabled
-> free's up some memory if we are not using it
We provide two arguments to main.py
file.
We can provide syntax to generate log file.
The local system computer from where we run the command is called edge node.
Lecture 28 : Deployment Modes in Spark
Below is the normal Spark Architecture
Here we have a separate EC2 instance called edge node. Its configuration is not as much as the other nodes.
User does not connect directly to the cluster rather connects to the edge node now.
They can login to the edge node and perform tasks. Kerberos is used for Authentication and Authorization.
Any data that needs to be submitted to cluster also goes through edge node.
The /bin/spark-submit folder is on the edge node, it contains hadoop client libaries YARN is not installed here.
client mode deployment
Driver is made on the edge node.
cluster mode
In cluster mode, the driver is created on the cluster.
pros and cons of client mode
Pro :
- The user can see the cluster logs on their own system.
Con :
- Once the driver in the local system shuts down, the executors also go down.
- When we submit on client mode we will have network latency. Two way communication creates lot of delay.
In cluster mode, we are given an application id and using that we can see the spark ui details.
Lecture 29: Adaptive Query Execution
Features of AQE
Dynamically Coalescing Shuffle Partitions
Sugar is best selling product it has highest data in the partition.
Now there is a GroupBy / Shuffling of data. All the Sugar data comes to one partition.
By default there are 200 partitions, but 195 are empty.
The resources are getting wasted because these 195 partitions also need to be shuffled.
The 5 partitions become 5 tasks but Partition 1 takes lot of time to run.
Now AQE coalesces the partitions.
Two tasks are now reduced and also 2 cores become free.
But even after coalescing we may end up with data skew.
Once we coalesce we end up with 2 partitions and 1/2 completes fast, the one with sugar takes time.
Data Splitting
If median is 5MB and one partition is > 25MB then the data splits.
Dynamically Switching Join Strategy
By default spark does sort merge join.
Now if we compress table2 to become 10mb, even though sort merge join DAG is built, if AQE is enabled, we can check runtime statistics.
Since data is only 10MB we can broadcast the data but shuffling still happens only sorting and merging is avoided.
Dynamically Optimizing Skew Join
We are considering two tables where key = Sugar and just 128MB of data.
Let's show other partitions also
Now when we do the Sort Merge Join and get all keys together the Sugar partition size increases.
All tasks except the one with Sugar completes fast.
This leads to OOM error.
Solutions
- Salting
- AQE
AQE has ShuffleReader, it has statistics on the memory and size of each partition. This parttion gets automatically split in both tables.
Lecture 30 : Cache vs Persist
Spark Memory Management
The Spark Memory is further expanded into Storage Memory Pool and Executor Memory Pool.
The cache is stored in Storage Memory Pool.
Not all the memory is stored in the executor. When df line code is run, df is removed from the executor, but in the subsequent step df is used to calculate df2. So in this case ideally df should be computed again, but if we have cache memory we can directly fetch from there.
When we cache the data goes from executor short lived memory to the storage pool. The cache is based on LRU mechanism.
Now let's day we have 15 partitions but only 14.5 fit in storage memory, 1 partition does not get stored.
By default storage_level for df.cache()
is MEMORY_AND_DISK
if the data does not fit in MEMORY move it to disk, but the data takes time to read from disk so its not recommended.
If the partition gets lost during IO, the DAG will recalculate that.
persist()
gives us more flexibility.
persist()
-> when we pass MEMORY_AND_DISK to persist it becomes cache(). Cache is just a wrapper class on persist().
How many partitions get stored in cache?
When we use show()
just one partition gets cached.
When we use count()
all partitions get cached.
Storage Level in Persist
In MEMORY_AND_DISK
the data is stored in disk after memory is full but its stored in serialized form, it should be deserialized before spark can process it and hence CPU utilization is high here.
MEMORY_ONLY_2
This data is replicated twice. Just to ensure that if we have complex computations and things fail, we dont need to do computations again.
MEMORY_ONLY_SER
and MEMORY_AND_DISK_SER
works only in Scala and Java.
Lecture 31 : Dynamic Resource Allocation
Cluster Configuration
There is no problem with above configuration.
But let's say another user comes and asks for more resources...
The red person can't get assigned any more memory since cluster is already full.
The resource manager works on FIFO process.
Now this small process that needs only 25GB may have to wait for hours.
In dynamic memory allocation, the data that is not used is released.
Resource Manager has no role, spark internal algo does this.
Let's say we have free 750GB and driver demands 500GB from resource manager but there might be other processes waiting for the memory so it may not get it.
We can use min executors and max executors to get around this. We set min executors in such a way that process does not fail.
Now let's say there is a process which has completed execution and so Dynamic Resource Allocator frees the data. But we want it for further calculations. Do we calculate it again? No.
We can use External Shuffle Service. This works independently on every worker node and data in this doesnt get deleted.
If executors idle from 60s then we release the data.
How does executor ask for resources?
If the executor does not get its required memory within 1 sec then it starts asking in two fold manner.
First it asks for 1GB then 2GB then 4GB and so on...
spark.scheduler.backlogTimeout
= 2s the executor waits till 2s before asking for memory.
Parallel Execution and Multi Threading
When to avoid dynamic resource allocation?
For critical jobs that needs to be run within certain SLA avoid it.
Lecture 32 : Dynamic Partition Pruning
In below code we have a filter applied to select only 19th April 2023 data,
Below we can see that only one file that is for 19th April 2023 is read, not all of them.
DPP with 2 tables
Partition pruning does not happen on first table but will happen on table 2. Dynamic Partition Pruning helps us to update filter on runtime.
Two conditions:
- Data should be partitioned.
- 2nd Table should be broadcasted.
Without Dynamic Partition Pruning
Total 123 files read from first table not one like previous case.
With Dynamic Partition Pruning
The smaller dimdate table is broadcasted and hash join performed. Only 3 files are read this time.
At runtime a subquery is run...
Now because of the runtime filter only 4 partitions are read/scanned.