Skip to content

Spark YouTube

Spark Concepts and Code.

Lecture 1 : What is Apache Spark

image

Unified :

image

Computing Engine:

image

Spark is not storage platform we can store the data in hdfs, rdbms etc...

Spark can process terabytes of data in distributed manner.

Compute Cluster:

image

  • each slave has 16 GB RAM, 1TB storage and 4 core CPU
  • even master has some data and RAM
  • the above cluster can compute 64 gb of data at a time.
  • the master divides the data among the slave nodes and then slaves process the data.

Lecture 2 : Why Apache Spark?

Different Databases image

new formats like video, audio, json,avro started coming in but rdms cannot handle it.

volume of data also increased.

What is Big Data?

image

Data Lake works on Extract Load Transform architecture

Issues with RDBMS

  • Storage
  • Processing - RAM and CPU

Enter Spark...

image

Monolith vs Microservice Architecture

image

Lecture 3 : Hadoop vs Spark

Misconception:

  • Hadoop is a database - its not a database just a filesystem (hdfs)
  • Spark is 100 times faster than hadoop
  • Spark processes data in RAM but Hadoop doesnt

Differences

Performance

image

Hadoop does lot of read write IO operations and sends data back and forth to the disk. image

But in spark each executor has its own memory. image

Where is there no difference?

When we have very less data like 10 GB, there is no difference because the hadoop cluster also doesnt write to the disk it fits first time in memory.

Batch vs Stream Processing

image

Ease of Use

image

Spark has both low level and high level API in Python which is easier than using Hive. Low level programming is on RDD level.

Security
  • Hadoop has in built Kerberos Authentication via YARN whereas Spark doesnt have any security mechanism.

  • The authentication helps create ACL lists at directory level in HDFS.

  • Spark uses HDFS Storage so it gets ACL feature / ability and when it uses YARN it gets Kerberos Authentication.

Fault Tolerance

image

Data Replication in Hadoop image

HDFS keeps track of which node / rack has the data from A B C and D

image

DAG in Spark

  • So Spark computes / transforms in multiple processes Process 1 -> Process 2 -> Process 3 ....
  • After each process the data is stored in a data structure called RDD which is immutable. So even if there is a failure Spark engine knows how to reconstruct the data for a particular process from the RDD at that stage.

Lecture 4 : Spark Ecosystem

image

High Level API : We cna write any SQL queries in python,java etc... there are ML and GraphX librries also.

We can write code in many languages. Low Level API : we can make RDD's and work on them.

image

Where does Spark run?

image

Spark Engine would need some memory for transformation.

  • suppose it needs 4 worker nodes each 20 GB and a driver node of 20 gb.
  • it goes to the cluster manager and asks for total 100 GB of memory, if available then the manager will assign that muuch storage.
  • cluster manager is also called YARN, K8S, Standalone managers

Lecture 5 : Read Modes in Spark

image

format data file format : csv,json,jdbc and odbc connection. Format is optional parameter, by default its parquet format

option inferschema, mode and header [optional field]

schema manual schema can be passed here

load path from where we need to read the data [not optional]

DataframeReader API

Access it using 'spark.read' in the spark session

image

mode in Spark

image

Lecture 6 : Spark Architecture

Spark Cluster

image

  • 20 core per machine and 100 GB RAM / each machine
  • Total Cluster : 200 cores and 1TB RAM

image

  • The master is controlled by Resource Manager and the workers are controlled by Node Manager.

What happens when user submits code?

image

  • The user submits some Spark code for execution to the Resource Manager. It needs 20 GB RAM, 25 GB executor, 5 total executors and 5 CPU cores.
  • So the manager goes to W5 and asks to create 20GB container as the driver node.

What happens inside the container?

Driver Allocation

Now this 20 GB driver is called Application Master image

There are two main() functions inside the master, one is for PySpark and other is for JVM like Java,Scala etc...

The JVM main() is called Application Driver.

The Spark Core has a Java Wrapper and the Java Wrapper has a Python Wrapper.

When we write code in PySpark it gets converted to Java Wrapper.

The PySpark driver is not a requirement but the Java Wrapper is required to run any code.

Worker Allocation

image

  • Now the Application master asks for the executors to be assigned and the resource manager allocates.

Executor Container

image

Each executor has 5 core CPU and 25GB RAM. Each one of them runs on separate container.

THe above is when we have pure Java code and dont use Python UDF.

But what if we use Python UDF functions?

image We need a Python worker inside the executor to be able to run the code.

Lecture 7 : Schema in Spark

StructType and StructField

image

Example:

image

image

How to skip the header row?

df = spark.read.option("skipRows", 2).csv("file.csv")

Lecture 8 : Handling Corrupter Records in Spark

image

How many records in each mode?

image

Permissive Mode

image

DropMalformed

image

How to Print Corrupted Records

image

image

Output image

How to Store Corrupted Records

image

The corrupted records are in json format image

Lecture 9 : Transformations and Actions in Spark

image

Types of Transformations

  • Narrow Transformation
  • Wide Transformation

image

Example: image

Suppose data is of 200MB. 200MB / 128MB = 2 partitions

image

Let's say both partitions go to separate executors.

Q1 : Filtering Records image There is no data movement here.

Q2: Find Total Income of each employee image

One id = 2 record is in one partition and the other is in the second partition so we need to do wide transformation image

Data needs to be shuffled and records with same id must be moved to same partition.

  • filter,select,union etc are narrow transformations
  • join,groupby,distinct

Lecture 10 : DAG and Lazy Evaluation in Spark

image

  • For every action there is a new job, here there are three actions : read,inferSchema,sum and show
  • When used with groupBy().sum(): It is considered an action because it triggers computation to aggregate data across partitions and produce a result. This operation forces Spark to execute the transformations leading up to it, effectively creating a job.
  • When used as a column expression df.select(sum("value")): It acts more like a transformation in Spark's context, especially if part of a larger query or pipeline that does not immediately trigger execution. In this case, it only defines the operation and does not create a job until an action (like show() or collect()) is called.

  • Job for reading file image Whole Stage Codegen - generate Java ByteCode

  • Inferschema image

  • GroupBy and Count As explained above this is an action.

  • Show Final action to display df

image After we read the csv and inferSchema there are no jobs created since filter and repartition both are transformations not actions.

When there are two filters on same dataset

image

This is the job image

Optimizations on the Filter

Both the filters are on the same task image The optimizations can be applied because Spark is lazily evaluated.

Lecture 11: Working with JSON Data in Spark

image

Two types of JSON notation:

  • Line Delimited JSON image

  • Multi Line JSON image

[
{
  "name": "Manish",
  "age": 20,
  "salary": 20000
},
{
  "name": "Nikita",
  "age": 25,
  "salary": 21000
},
{
  "name": "Pritam",
  "age": 16,
  "salary": 22000
},
{
  "name": "Prantosh",
  "age": 35,
  "salary": 25000
},
{
  "name": "Vikash",
  "age": 67,
  "salary": 40000
}
]

Line Delimited JSON is more efficient in terms of performance because the compiler knows that each line has one JSON record whereas in multiline json the compiler needs to keept track of where the record ends and the next one starts.

Different number of keys in each line

image

Here what happens is that the line with the extra key has the value while for the rest its null image

Multiline Incorrect JSON

We dont pass a list here rather its just dictionaries

{
  "name": "Manish",
  "age": 20,
  "salary": 20000
},
{
  "name": "Nikita",
  "age": 25,
  "salary": 21000
},
{
  "name": "Pritam",
  "age": 16,
  "salary": 22000
},
{
  "name": "Prantosh",
  "age": 35,
  "salary": 25000
},
{
  "name": "Vikash",
  "age": 67,
  "salary": 40000
}
When we process the json it just reads the first dictionary as a record and the rest is not processed.

image

Corrupted Records

We dont need to define _corrupted_record in the schema, it will add the column on its ownn

image

Lecture 12: Spark SQL Engine

image

How is Spark Code compiled?

  • The catalyst optimizer creates a plan and creates RDD lineage

Phases in Catalyst Optimizer

image

Workflow Diagram
  • Unresolved Logical Plan : Bunch of crude steps to execute the SQL code
  • Catalog : The table, files and database metadata information si stored in the catalog. Suppose we call read.csv on file that doesnt exist. The procedure that gives / throws the error is assisted via the catalog. In Analysis phase, we go through these steps. If some file/table is not found then we get Analysis Exception This error occurs when the Logical plan provided is not able to be resolved.
  • Reoslved Logical Plan : This is the phase when we finished analysing the catalog objects.
  • Logical Optimization: There are many examples. Suppose we need just two columns in select output, the spark engine does not fetch all the columns rather jsut fetches the two columns from memory that we need. Another example is when we use multiple filters on the same column in different lines of code. When we execute this code, we see that all of it is executed with or statements in one single line of code.
  • Physical Plan: This involves taking decision like the type of join to use: Broadcast Join is one example. From the logical plan, we can build multiple physical plans. Thebest Physical Plan is a set of RDDs to be run on different executors on the cluster.

image

Lecture 13: Resilient Distributed Dataset

image

Data Storage of List

image

Data Storage in RDD

Suppose we have 500MB of data and 128MB partition, so we will have 4 partitions.

The data is scattered on various executors. image

Its not in single contiguous location like elements of a list. The data structure used ot process this data is called RDD image

image

Why is RDD recoverable?

  • RDD is immutable. If we apply multiple filters each dataset after filtering is a different dataset image

  • In below case if rdd2 fails then we can restore rdd1 because of the lineage. image

Disadvantage of RDD

  • No optimization done by Spark on RDD. The dev must specify explicitly on how to optimize RDD.

Advantage

  • Works well with unstructured data where there are no columns and rows / key-value pairs
  • RDD is type safe, we get error on compile time rather than runtime which happens with Dataframe API.

Avoiding RDDs

image

  • RDD : How to do? Dataframe API: Just specify what to do?

image You can see in above case that we have a join and filter but we are specifically saying that first join then filter so it triggers a shuffle first and then filter which is not beneficial.

Lecture 14 : Parquet File Internals

image

There are two types of file formats:

  • Columnar Based and Row Based

Physical Storage of Data on Disk

image

Write Once Read Many

The funda of big data is write once read many.

  • We dont need all the columns for analytics of big data, so columnar storage is the best.
  • If we store in row based format then we need to jump many memory racks to be able to get the data we need.

image

  • OLTP generally use row base4d file format.
  • I/O should be reduced so OLAP uses columnar format.

Why Columnar format may not be the best?

image

In above case we can get col1 and col2 easily but for col10 we still need to scan the entire file.

To tackle this:

Let's say we have 100 million total rows.

We can store 100,000 records at a time, continuously in one row, then the next 100,000 records in next row and so on in hybrid format.

image

Logical Partitioning in Parquet

image

Let's day we have 500mb data, each row group by default has 128 mb data, so we will have 4 row groups. Each row group will have some metadata attached to it.

In our example let's say one row group has 100000 records. The column is futher stored as a page.

Runlength Encoding and Bitpacking

image

Suppose we have 10 lakh records but there can be say 4 countries.

So parquet actually creates a dictionary of key value pair with key as int starting from 0 to 3 and then in the dictionary encoded data, we can see the keys being used insted of country name.

image

More details on Parquet

The row-wise formats store data as records, one after another. This format works well when accessing entire records frequently. However, it can be inefficient when dealing with analytics, where you often only need specific columns from a large dataset.

image

Imagine a table with 50 columns and millions of rows. If you’re only interested in analyzing 3 of those columns, a row-wise format would still require you to read all 50 columns for each row.

Columnar formats address this issue by storing data in columns instead of rows. This means that when you need specific columns, you can read only the columnsdata you need, significantly reducing the amount of data scanned.

image

However, storing data in a columnar format has some downsides. The record write or update operation requires touching multiple column segments, resulting in numerous I/O operations. This can significantly slow the write performance, especially when dealing with high-throughput writes.

When queries involve multiple columns, the database system must reconstruct the records from separate columns. The cost of this reconstruction increases with the number of columns involved in the query.

The hybrid format combines the best of both worlds. The format groups data into "row groups," each containing a subset of rows (aka horizontal partition). Within each row group, data for each column is called a “column chunk" (aka vertical partition).

image

In the row group, these chunks are guaranteed to be stored contiguously on disk.

Terminologies

image

A Parquet file is composed of:

Row Groups: Each row group contains a subset of the rows in the dataset. Data is organized into columns within each row group, each stored in a column chunk.

Column Chunk: A chunk is the data for a particular column in the row group. Column chunk is further divided into pages.

Pages: A page is the smallest data unit in Parquet. There are several types of pages, including data pages (actual data), dictionary pages (dictionary-encoded values), and index pages (used for faster data lookup).

Metadata Types in Parquet

Magic number: The magic number is a specific sequence of bytes (PAR1) located at the beginning and end of the file. It is used to verify whether it is a valid Parquet file.

FileMetadata: Parquet stores FileMetadata in the footer of the file. This metadata provides information like the number of rows, data schema, and row group metadata. Each row group metadata contains information about its column chunks (ColumnMetadata), such as the encoding and compression scheme, the size, the page offset, the min/max value of the column chunk, etc. The application can use information in this metadata to prune unnecessary data.

PageHeader: The page header metadata is stored with the page data and includes information such as value, definition, and repetition encoding. Parquet also stores definition and repetition levels to handle nested data. The application uses the header to read and decode the data.

How is data written into Parquet?

image

  • The application issues a written request with parameters like the data, the compression and encoding scheme for each column (optional), the file scheme (write to one or multiple files), etc.

  • The Parquet Writer first collects information, such as the data schema, the null appearance, the encoding scheme, and all the column types recorded in FileMetadata.

  • The Writer writes the magic number at the beginning of the file.

  • Then, it calculates the number of row groups based on the row group’s max size (configurable) and the data’s size. This step also determines which subset of data belongs to which row group.

  • For each row group, it iterates through the column list to write each column chunk for the row group. This step will use the compression scheme specified by the user (the default is none) to compress the data when writing the chunks.

  • The chunk writing process begins by calculating the number of rows per page using the max page size and the chunk size. Next, it will try to calculate the column's min/max statistic. (This calculation is only applied to columns with a measurable type, such as integer or float.)

  • Then, the column chunk is written page by page sequentially. Each page has a header that includes the page’s number of rows, the page’s encoding for data, repetition, and definition. The dictionary page is stored with its header before the data page if dictionary encoding is used.

  • After writing all the pages for the column chunk, the Parquet Writer constructs the metadata for that chunk, which includes information like the column's min/max, the uncompressed/compressed size, the first data page offset, and the first dictionary page offset.

  • The column chunk writing process continues until all columns in the row group are written to disk contiguously. The metadata for each column chunk is recorded in the row group metadata.

  • After writing all the row groups, all row groups’ metadata is recorded in the FileMetadata.

  • The FileMetadata is written to the footer.

  • The process finishes by writing the magic number at the end of the file.

How is data read from Parquet?

image

  • The application issues a read request with parameters such as the input file, filters to limit the number of read row groups, the set of desired columns, etc.

  • If the application requires verification that it’s reading a valid Parquet file, the reader will check if there is a magic number at the beginning and end of the file by seeking the first and last four bytes.

  • It then tries to read the FileMetadata from the footer. It extracts information for later use, such as the file schema and the row group metadata.

  • If filters are specified, they will limit the scanned row groups by iterating over every row group and checking the filters against each chunk’s statistics. If it satisfies the filters, this row group is appended to the list, which is later used to read.

  • The reader defines the column list to read. If the application specifies a subset of columns it wants to read, the list only contains these columns.

  • The next step is reading the row groups. The reader will iterate through the row group list and read each row group.

  • The reader will read the column chunks for each row group based on the column list. It used ColumnMetadata to read the chunk.

  • When reading the column chunk for the first time, the reader locates the position of the first data page (or dictionary page if dictionary encoding is used) using the first page offset in the column metadata. From this position, the reader reads the pages sequentially until no pages are left.

  • To determine whether any data remains, the reader tracks the current number of read rows and compares it to the chunk’s total number of rows. If the two numbers are equal, the reader has read all the chunk data.

  • To read and decode each data page, the reader visits the page header to collect information like the value encoding, the definition, and the repetition level encoding.

  • After reading all the row groups’ column chunks, the reader moves to read the following row groups.

  • The process continues until all the row groups in the row group list are read.

Because the Parquet file can be stored in multiple files, the application can read them simultaneously.

In addition, a single Parquet file is partitioned horizontally (row groups) and vertically (column chunks), which allows the application to read data in parallel at the row group or column level.

OLAP Workload Example

image

image

Demo

parquet-tools inspect <filename>

image image

Gives some file and brief column level metadata.

parquet_file.metadata.row_group(0).column_group(0) image

Compression is GZIP image

Encoding is explained on top.

Bitpacking Advantage

  • Bitpacking helps in compressing the bits so in above case we just have 4 unique values and hence we need just 2 bytes.
  • Query in seconds for running select on csv,parquet etc..

image

Summary

image

  • Here the actual data is stored in the pages and it has metadata like min,max and count.

  • Let's say we need to find out people less than 18 years age

image

Here when we divide data into row groups, we dont need to do any IO read operation on Row group 2, it saves lot of time and optimize performance.

The above concept is called Predicate Pushdown.

Projection Pruning

Projection Pruning means we dont read IO from columns that are not part of the select query or that arent required for any join.

Lecture 15 : How to write data on the disk?

image

image

Modes to write data

image

Create three files image

  write_df = read_df.repartition(3).write.format("csv")\
    .option("header", "True")\
    .mode("overwrite")\  # Using .mode() instead of .option() for overwrite mode
    .option("path", "/FileStore/tables/Write_Data/")\
    .save()

Lecture 16: Partitioning and Bucketing

image

In above data we cannot partition by any column because there is no similarity but we can bucket the data.

image

image In above data we can partition by the country, but again we might have more data in India partition and less data in Japan.

Example of Partitioning

image

image

The advantage is that the entire data is not scanned and only few partitions are scanned based on the filters.

Partitioning by Id

image

Here we can see that we have created partitions by ID and since ID is low cardinality column partitioning is not efficient and we need to use bucketing.

Partitioning by Address and Gender

image

Bucketing by Id

Dividing into 3 buckets image

Tasks vs Buckets

image

  • If we have 200 partitions we will have 200 tasks and each task will create 5 buckets each, to tackle this we first repartition into 5 partitions and then bucketBy 5.

How does bucketing help with joins?

image

  • Here we can see that since we have same column bucket on both tables the ids can be easily mapped and there is no shuffling.

Bucket Pruning

image Suppose we have 1M Aadhar Ids and we divide into 100,000 each bucket so when we divide the above aadhar id by 100000 then we get the exact bucket where the number lies in.

Lecture 17 : Spark Session vs Spark Context

  • Spark Session is entry point to the Spark cluster where we provide the parameters to create and operate our cluster.
  • Spark session will have different context like one for SQL, PySpark etc...

image

Lecture 18: Job, Stage and Tasks

image

  • One Application is created.
  • One job is created per action.
  • One stage is defined for every transformation like filter.
  • Task is the actually activity on the data that's happening.

image

Example of Job,Action and Task

image

Complete flow diagram

image

Every job has minimum one stage and one task.

image Repartition to filter is one job because we dont hit an action in between.

Every wide dependency transformation has its own stage. All narrow dependency transformations come in one stage as a DAG.

image

How do tasks get created? [Read and Write Exchange]

image

  • The repartition stage actually is a wide dependency transformation and creates two partitions from one, its a Write exchange of data.
  • Now the filter and select stage reads this repartitioned data(Read exchange) and filter creates two tasks because we have two partitions.
  • Next we need to find out how many folks earn > 90000 and age > 25 so we need to do a groupby that's a wide dependency transformation and it creates another stage. By default there are 200 partitions created.
  • So some partitions may have data and some wont.

image

Lecture 17: Dataframe Transformations in Spark Part 1

image Data gets stored in Row() format in the form of bytes

image

Columns are expressions. Expressions are set of transformations on more than one value in a record.

Ways to select values / columns

image

image

Column Manipulations

image

Other methods image

selectExpr image

Aliasing Columns image

Lecutre 18 : Dataframe Transformations in Spark Part II

filter() / where() no difference

image

image

Multiple filter conditions

image

Literals in spark

Used to pass same value in all the columns image

Adding Columns

If the column already exists then it gets overwritten. image

Renaming Columns

image

Lecture 19: union vs unionAll()

image

We can see that here we have a duplicate id image

In PySpark union and unionAll behaves in the same way, both retain duplicates image

But in Spark SQL when we do union it drops the duplicate records image

image

Selecting data and unioning the same table

image

What happens when we change the order of the columns?

wrong_manager_df actually has the wrong order of columns but still we get the union output but in a wrong column values. image

If we give different number of columns an exception is thrown. image

If we use unionByName then the column names on both dfs must be the same. image

Lecture 19: Repartitioning and Coalesce

Suppose we have 5 partitions and one of them is skewed a lot 100MB, let's say this is the best selling product records. This partition takes lot of time to compute. So the other executors have to wait until this executor finishes processing. image

Repartitioning vs Coalesce

Repartitioning

Suppose we have the above partitions and total data is 100mb. let's say we do repartition(5) so we will have 5 partitions now for the data with 40mb per partition.

Coalesce

In case of coalesce there is no equal splitting of partition memory, rather the already existing partitions get merged together. image

There is no shuffling in coalesce but in repartitioning there is shuffling of data.

Pros and Cons in repartitioning

  • There is evenly distributed data.
  • Con is that IO operations are more, its expensive.
  • Con of coalesce is that the data is unevenly distributed.

Repartitioning can increase or decrease the partitions but coalescing can only decrease the partitions.

How to get number of partitions?

flight_df.rdd.getNumPartitions() gets the initial number of partitions and then we can repartition flight_df.repartition(4). Data is evenly distributed.

image

Repartitioning based on columns

image

Since we asked for 300 partitions and we have 255 records some partitions will have null record. image

Coalescing

image Suppose we have 8 partitions and we coalesce into 3 partitions. Coalesce has only one arg.

Uneven distribution of data in partitions. image

Lecture 20 : Case when / if else in Spark

image

image

Apply logic on one column then process if else logic

image

Spark SQL Logic

image

Lecture 21 : Unique and Sorted Records

image

distinct()

Original Data image

Distinct Data image

Distinct Based on certain columns image

⚠️ Distinct takes no arguments we need to select the columns first and then apply distinct.

Dropping duplicate records

Point to note is that the dataframe manager_df has no changes, it just shows the records after dups have been dropped. image

sort()

image

Descending order image

Sorting on multiple columns

Here first the salary is srranged in desc order then we arrange the name in asc order from those records with same salary. image

Lecture 22 : Aggregate functions

Count as both Action and Transformation

image

⚠️ When we are doing count on a single column and there is a null in it, its not considered in the count. But for all columns we have nulls in the count. image

Job created in first case and its not created in second case below. image

Lecture 23: Group By In Spark

Sample Data

image

Questions

image

Salary per department using groupBy() image

Where do we use window functions?

Suppose we need to find out the percentage of total salary from a particular dept that the person is earning. we can use window function to specify the total salary per department in the particular record itself like I've shown below. image

This way we dont need to perform a join.

image

Grouping by two columns

image

Lecture 24 : Joins in Spark part 1

image

Which customers joined platform but never brought anything?

image

Whenever we need information from another table, we use joins and there should be some common column.

Join is a costly wide dependency operation.

How do joins work?

How many records do we get after inner joining the below two tables. image

We get a total of 9 records. image

Sometimes data gets duplicated when we do joins, so we should use distinct() but remember distinct is wide dependency transform.

Lecture 25 : Types of Join in Spark

image

Inner Join

image

Left Join

image All records in left table + those that join with right table, whereever we dont get match on right table the columns become null.

Right Join

image

Full Outer Join

image

Left Semi Join

image

image

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LeftSemiJoinExample").getOrCreate()

# Left DataFrame: Orders
orders = spark.createDataFrame([
    (1, "iPhone"),
    (2, "Pixel"),
    (3, "OnePlus"),
    (4, "Nokia")
], ["customer_id", "product"])

# Right DataFrame: Valid Customers
valid_customers = spark.createDataFrame([
    (1,), (3,)
], ["customer_id"])

# Perform left semi join
filtered_orders = orders.join(valid_customers, on="customer_id", how="left_semi")
filtered_orders.show()

Output

+-----------+--------+
|customer_id|product |
+-----------+--------+
|          1|iPhone  |
|          3|OnePlus |
+-----------+--------+

Left Anti Join

image

Find out all customers who have never purchased any product.

Cross Join

Never use cross join! image

image

Lecture 26 : Join Strategies in Spark

image

Joins are expensive due to shuffling.

4 partitions are there in each dataframe.

image

Executors in the cluster

image

Now we need to join employee and salary df to get the output but they are on different executors, so we need to do data shuffling.

Each executor has 200 partitions. Goal is to get all same keys in one executor. image

image

image

  • Since we want to get id for 1 we divide 1/200 = 1 and then send all the data to that executor 1.

image

Suppose we want to map the salary for id = 7 so the data from the employee df with id = 7 and also salary df with id=7 will come into the executor 7.

Similarly id = 201 will go into 201/200 = executor no 1.

Types of Join Strategies

image

image

Joins generally result in shuffling

There are two dataframes df1 and df2 each with 4 partitions.

image

We have two executors.

In join goal is to join with same keys.

image

We can see that red P1 has corresponding id for salary in the other executor.

image

We need to get same keys fetched from other executors.

When a dataframe is sent to executors by default 200 partitions are created per dataframe.

image

Now let's say we want to find salary for id = 1 we can divide 1/200 on blue = 1 and 1/200 on red = 1, so both data will come into executor 1 in the partition 1.

image

Similarly for id = 7 also we will send the data on blue and red P7

But if id = 201 then 201/200 = 1 so this id will come into P1 only.

If we have id = 102 then 102/200 = 102 partition on 2nd executor.

image

The executors can be on different worker nodes also, we need to then move data across from one worker node to other.

Strategies

image

Broadcast nested loop join is costly because we dont do a straight join, rather its based on < an > conditions, its O(n^2)

Shuffle Sort Merge Join

image

TC : O(nlogn)

Shuffle Hash Join

The smaller table gets a hash table created with hashed keys in memory.

Now from df1 we checked which keys match with O(1) lookup using the hash table.

image

Broadcast Join

image

The tables that are less than 100mb can be broadcast.

Scenario : Suppose one table is 1GB size so we will have 1000MB / 128MB = 8 partitions and there is another table of size 5mb.

So if we dont broadcast, then the df with 100gb should be shuffled around with 5mb data across executors for joining. Instead of that we will just send the small df in all the executors so that there is no shuffling.

image

The amount of data that can be broadcast depends on the memory of executor and driver. Make sure that there is no case where driver memory is 2GB and we are trying to broadcast 1GB data.

Demo

There are total 200 partitions when we join

image

Normal Sort Merge Join Execution Plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   Project [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
   +- SortMergeJoin [country_id#10487L], [country_id#10513L], Inner
      :- ColumnarToRow
      :  +- PhotonResultStage
      :     +- PhotonSort [country_id#10487L ASC NULLS FIRST]
      :        +- PhotonShuffleExchangeSource
      :           +- PhotonShuffleMapStage
      :              +- PhotonShuffleExchangeSink hashpartitioning(country_id#10487L, 1024)
      :                 +- PhotonFilter isnotnull(country_id#10487L)
      :                    +- PhotonRowToColumnar
      :                       +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
      +- ColumnarToRow
         +- PhotonResultStage
            +- PhotonSort [country_id#10513L ASC NULLS FIRST]
               +- PhotonShuffleExchangeSource
                  +- PhotonShuffleMapStage
                     +- PhotonShuffleExchangeSink hashpartitioning(country_id#10513L, 1024)
                        +- PhotonFilter isnotnull(country_id#10513L)
                           +- PhotonRowToColumnar
                              +- LocalTableScan [country_id#10513L, country_name#10514]

== Photon Explanation ==
Photon does not fully support the query because:
        Unsupported node: SortMergeJoin [country_id#10487L], [country_id#10513L], Inner.

Reference node:
    SortMergeJoin [country_id#10487L], [country_id#10513L], Inner

Spark UI Diagram

image

image

Broadcast Join Execution Plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [sale_id#10484L, sale_date#10485, amount#10486L, country_name#10514]
         +- PhotonBroadcastHashJoin [country_id#10487L], [country_id#10513L], Inner, BuildRight, false, true
            :- PhotonFilter isnotnull(country_id#10487L)
            :  +- PhotonRowToColumnar
            :     +- LocalTableScan [sale_id#10484L, sale_date#10485, amount#10486L, country_id#10487L]
            +- PhotonShuffleExchangeSource
               +- PhotonShuffleMapStage
                  +- PhotonShuffleExchangeSink SinglePartition
                     +- PhotonFilter isnotnull(country_id#10513L)
                        +- PhotonRowToColumnar
                           +- LocalTableScan [country_id#10513L, country_name#10514]

== Photon Explanation ==
The query is fully supported by Photon.

image

Window functions in Spark

Rank vs Dense Rank

image

Dense rank does not leave any gaps between the ranks.

image

Lead and Lag

image

image

Range and Row Between

image

Q1

image

Using first and last functions let's try to acheive this.

Data:

image

This solution is wrong, ideally we should get 111000 in all rows of latest_sales column.

image

Let's look at explain plan.

We can see that the window here is unbounded preceeding and current row

image

What do these terms mean?

image

  • Unbounded preceeding : If i'm standing at a current row in a window I will return the result of any operation on the window from here to all the rows before me in the window.
  • current_row : the row im standing at.
  • Unbounded following : opposite of unbounded preceeding.
  • rows_between(start_row,end_row) : basically the row we are currently at is 0, all rows before that are negative numbers and all rows after that is positive numbers.

image

If we dont give anything then it just goes from current row to either unbounded preceeding (first row) of window or unbounded following (last row) of window.

Converting from string to unixtime when we have two fields date and time.

image

emp_df = emp_df.withColumn("timestamp",from_unixtime(unix_timestamp(expr("CONCAT(date,' ',time)"),"dd-MM-yyyy HH:mm")))

The timestamp column is a string.

image

Spark Memory Management

image

If we do df.range(100000) and then do df.collect() on 1Gb driver we get OOM error

image

Spark Architecture

image

Driver memory is of two types:

  • spark.driver.memory
  • spark.driver.memoryOverhead

image

With collect all records go into the driver. But with show just one partition gets sent to the heap space.

🎯 Think of the Spark Driver Like a Worker

Imagine the Spark driver is a person doing a big task at a desk.

The desk = spark.driver.memory (main memory)

The room around the desk = spark.driver.memoryOverhead (extra space to move, store tools, use side tables)

🧠 Why Just the Desk Isn’t Enough

Let’s say the driver (person) is:

Writing on paper (standard Spark tasks)

Using a laptop (Python/PySpark or native code)

Holding tools and files (temporary data, buffers, network stuff)

Only giving them a desk (spark.driver.memory) isn't enough:

The laptop (native code, Python UDFs) might need space outside the desk

The tools (Spark internals, shuffle, serialization) don’t fit on the desk — they use off-heap memory

If you don’t give them enough room around the desk (memoryOverhead), they might trip over stuff and fail the task.

🧪 Real Spark Example When you run PySpark like this:

df.withColumn("double", my_udf(df["col"]))

That Python UDF runs outside the JVM. It needs extra native memory, not regular Java memory.

Spark says:

“I’ll use driver.memory for my JVM, but I need some memoryOverhead for the native stuff.”

✅ Summary (in 1 line)

spark.driver.memory is for Spark's own work (Java),
spark.driver.memoryOverhead is for everything outside the JVM — like Python, shuffle, native code.

The memory overhead is max(384mb,10% of driver memory)

image

Let's say there is df1 and we want to join it with two small tables df2 and df3.

We send both df2 and df3 to the driver.

image

Let's say we now give 5 dayasets worth 250 mb and the total driver space is 1G.

If rest 750mb is not enough for other processes then the driver will give OOM exception.

💥 So… How Can GC Cause Out of Memory (OOM)?

You’d think GC helps prevent OOMs — and it does! But in high-memory-pressure situations, it can actually cause or worsen them.

🚨 Here’s how it happens: 1. Too Many Objects / Too Much Data in Memory You load huge datasets or perform wide transformations (e.g., groupBy, join).

Spark stores a lot of intermediate data in RAM (JVM heap).

👉 JVM tries to make space by running GC again and again.

  1. GC Takes Too Long If GC runs too often or too long (e.g., > 30s), the JVM thinks something’s wrong.

You get:

java.lang.OutOfMemoryError: GC overhead limit exceeded

This means:

“GC is using 98% of the CPU but only recovering 2% of memory — I give up.”

  1. GC Can’t Free Anything Some objects (like cached RDDs or references from your code) stay in memory.

GC runs but can't collect them because they're still "referenced".

Eventually, JVM runs out of space and crashes with:

java.lang.OutOfMemoryError: Java heap space
⚠️ Common Scenarios in Spark
Cause   Result
Large shuffles / joins  Too many objects in memory
Caching huge RDDs   Heap filled, GC can't recover
Improper partitions Few tasks → huge memory per task
Memory leaks (bad code) Uncollectable references

Example code

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import random

spark = SparkSession.builder \
    .appName("OOM-GC-Demo") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

# Create a large DataFrame with few partitions (causes memory pressure)
data = [(i % 10, random.randint(1, 1000)) for i in range(10_000_000)]  # 10 million rows
df = spark.createDataFrame(data, ["group_id", "value"])

# Force a wide transformation + cache
result = df.groupBy("group_id").count().persist(StorageLevel.MEMORY_ONLY)

# Trigger action
result.count()

✅ How to Fix

Increase spark.executor.memory or spark.driver.memory

Use persist(StorageLevel.DISK_ONLY) if RAM is tight

Avoid huge wide transformations without enough partitions

Tune GC (G1GC is often better for large heaps)

Executor Memory OOM

image

image

10 GB per executor and 4 cores

Expanding one executor

image

image

Exceeding either 10GB or 1GB leads to OOM

image

How is 10GB divided?

image

What does each part of the user memory do?

  1. Reserved Memory

Minimum 450mb must be our memory of executor.

image

  1. User Memory

image

  1. Storage Memory Usage

image

  1. Executor Memory Usage

image

What does each part of the spark memory do?

image

⚙️ Background: Memory in Spark Executors

Each executor in Spark has a limited memory budget. This memory is split for:

  • Execution Memory: used for joins, aggregations, shuffles

  • Storage Memory: used for caching RDDs or DataFrames

  • User Memory: everything else (broadcast vars, UDFs, JVM overhead)

🔄 1. Static Memory Manager (Old)

This was Spark's memory model before Spark 1.6.

🔧 How It Works:

  • Fixed memory boundaries set in config.
  • You manually allocate how much memory goes to:
  • Storage (RDD cache)
  • Execution (shuffles, joins)
  • If storage fills up → cached blocks are evicted.
  • No sharing between execution and storage.

Example fractions

spark.storage.memoryFraction = 0.6
spark.shuffle.memoryFraction = 0.2

🔄 2. Unified Memory Manager (Modern - Default)

Introduced in Spark 1.6+ and is default since Spark 2.0.

🔧 How It Works:

Combines execution + storage into a single unified memory pool.

Dynamic memory sharing: if execution needs more, storage can give up memory — and vice versa.

Much more flexible and efficient.

✅ Benefits:

  • Less tuning needed
  • Avoids wasted memory in one region while another needs more
  • Better stability under pressure

In bwlo case execution memory is empty so storage mmemory uses more of execution memory for caching

image

Now executor does some work in blue boxes

image

Now entire memory is full, so we need to evict some data that has been cached. This happens in LRU fashion.

image

Now let's say executor has entire memory used 2.9 something gb... but it needs more memory.

image

If the storage pool memory is free it can utilize that.

image

If the storage pool is also full, then we get OOM!!!

When can we neither evict the data nor spill to disk?

Suppose we have two dataframes df1 and df2 and the key id = 1 is heavily skewed in both dataframes, and its 3GB

Since we need to get all the data from df1 and df2 with id = 1 onto the same executor to perform the join, we have just 2.9GB but the data is 3gb so it gives OOM.

image

image

We can handle 3-4 cores per executor beyond that we get memory executor error.

❓ When can Spark neither evict nor spill data from executor memory?

This happens when both eviction and spilling are not possible, and it leads to:

💥 OutOfMemoryError in executors.

✅ These are the main scenarios:

🧱 1. Execution Memory Pressure with No Spill Support

Execution memory is used for:

  • Joins (SortMergeJoin, HashJoin)
  • Aggregations (groupByKey, reduceByKey)
  • Sorts

Some operations (like hash-based aggregations) need a lot of memory, and not all are spillable.

🔥 Example:

df.groupBy("user_id").agg(collect_set("event"))
If collect_set() builds a huge in-memory structure (e.g., millions of unique events per user)

And that structure can’t be spilled to disk

And execution memory is full

👉 Spark can’t evict (no caching), and can’t spill (not supported for this op) → 💣 OOM

🔁 2. Execution Takes Priority, So Storage Can't Evict Enough

In Unified Memory Manager, execution gets priority over storage.

But sometimes, even after evicting all cache, execution still doesn’t get enough memory.

🔥 Example: - You cached a large DataFrame. - Then you do a massive join.

Spark evicts all cached data, but still can't free enough memory.

👉 No more memory to give → 💥

User Code holding References

🍕 Imagine Spark is a Pizza Party Spark is throwing a pizza party. You and your friends (the executors) are each given a plate (memory) to hold some pizza slices (data).

The rule is:

“Eat your slice, then give your plate back so someone else can use it.”

😬 But You Keep Holding Your Plate You finish your slice, but instead of giving the plate back, you say:

“Hmm… I might want to lick the plate later,” so you hold on to it.

And you keep doing this with every plate 🍽️.

Now, you have 10 plates stacked up, all empty, but you're still holding them.

🍕 But There’s a Problem… Spark wants to serve more pizza (more data), but now there are no plates left. Even though you’re not using yours, Spark can’t take them back, because you’re still holding on.

💥 Result? Spark gets frustrated and says:

“I’m out of plates! I can’t serve any more pizza!”

That’s when Spark crashes with a memory error (OOM) — because it can’t clean up the memory you're holding onto.

✅ What Should You Do? Let go of the plates as soon as you're done eating (i.e., don’t store data in variables or lists forever).

That way, Spark can reuse memory and everyone gets more pizza. 🍕

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HoldingReferencesOOM") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

# Create a large DataFrame
df = spark.range(1_000_000)  # 1 million rows

# ❌ BAD: Holding all rows in a Python list
all_data = df.collect()  # Loads entire DataFrame into driver memory

# Still holding reference to a big object
# Spark can't clean this up because Python is holding it

# Do more operations
df2 = df.selectExpr("id * 2 as double_id")
df2.show()

Spark wants to free memory, but it can’t, because your code is still holding a reference to the list all_list is still a reference and even though we may not use it later Java GC doesnt know that. its like we finish playing with a teddy bear but still hold onto it, the teacher thinks we are still playing with it, so they cant take it back.

df = spark.range(1_000_000)

# ✅ Process data without collecting everything into memory
df.filter("id % 2 == 0").show(10)  # only shows first 10 rows

Lecture 27: Spark Submit

image

Spark submit is a command line tool to run spark applications, it packages the spark code and runs on cluster.

The spark cluster can be standalone,local,K8s or YARN.

Spark Submit Command

image

image

image

Master can run on yarn,local or k8s

deploy-mode -> specifies where driver runs

--class -> not required for python, just scala or java

--jars -> my sql connector jar files

spark.dynamicAllocation.enabled -> free's up some memory if we are not using it

image

We provide two arguments to main.py file.

image

We can provide syntax to generate log file. image

The local system computer from where we run the command is called edge node.

Lecture 28 : Deployment Modes in Spark

image

Below is the normal Spark Architecture

image

Here we have a separate EC2 instance called edge node. Its configuration is not as much as the other nodes.

image

User does not connect directly to the cluster rather connects to the edge node now.

They can login to the edge node and perform tasks. Kerberos is used for Authentication and Authorization.

Any data that needs to be submitted to cluster also goes through edge node.

The /bin/spark-submit folder is on the edge node, it contains hadoop client libaries YARN is not installed here.

image

client mode deployment

image

Driver is made on the edge node.

cluster mode

image

In cluster mode, the driver is created on the cluster.

pros and cons of client mode

Pro :

  • The user can see the cluster logs on their own system.

Con :

  • Once the driver in the local system shuts down, the executors also go down.
  • When we submit on client mode we will have network latency. Two way communication creates lot of delay.

image

In cluster mode, we are given an application id and using that we can see the spark ui details.

image

Lecture 29: Adaptive Query Execution

Features of AQE

image

Dynamically Coalescing Shuffle Partitions

Sugar is best selling product it has highest data in the partition.

image

Now there is a GroupBy / Shuffling of data. All the Sugar data comes to one partition.

image

By default there are 200 partitions, but 195 are empty.

The resources are getting wasted because these 195 partitions also need to be shuffled.

The 5 partitions become 5 tasks but Partition 1 takes lot of time to run.

image

Now AQE coalesces the partitions.

image

Two tasks are now reduced and also 2 cores become free.

But even after coalescing we may end up with data skew.

image

image

Once we coalesce we end up with 2 partitions and 1/2 completes fast, the one with sugar takes time.

Data Splitting

image

If median is 5MB and one partition is > 25MB then the data splits.

Dynamically Switching Join Strategy

image

By default spark does sort merge join.

Now if we compress table2 to become 10mb, even though sort merge join DAG is built, if AQE is enabled, we can check runtime statistics.

image

Since data is only 10MB we can broadcast the data but shuffling still happens only sorting and merging is avoided.

Dynamically Optimizing Skew Join

image

We are considering two tables where key = Sugar and just 128MB of data.

Let's show other partitions also

image

Now when we do the Sort Merge Join and get all keys together the Sugar partition size increases.

image

All tasks except the one with Sugar completes fast.

image

This leads to OOM error.

Solutions
  • Salting
  • AQE

AQE has ShuffleReader, it has statistics on the memory and size of each partition. This parttion gets automatically split in both tables.

image

Lecture 30 : Cache vs Persist

image

Spark Memory Management

image

The Spark Memory is further expanded into Storage Memory Pool and Executor Memory Pool.

image

The cache is stored in Storage Memory Pool.

image

Not all the memory is stored in the executor. When df line code is run, df is removed from the executor, but in the subsequent step df is used to calculate df2. So in this case ideally df should be computed again, but if we have cache memory we can directly fetch from there.

image

When we cache the data goes from executor short lived memory to the storage pool. The cache is based on LRU mechanism.

image

Now let's day we have 15 partitions but only 14.5 fit in storage memory, 1 partition does not get stored.

By default storage_level for df.cache() is MEMORY_AND_DISK if the data does not fit in MEMORY move it to disk, but the data takes time to read from disk so its not recommended.

If the partition gets lost during IO, the DAG will recalculate that.

persist() gives us more flexibility.

image

persist() -> when we pass MEMORY_AND_DISK to persist it becomes cache(). Cache is just a wrapper class on persist().

How many partitions get stored in cache?

When we use show() just one partition gets cached.

image

When we use count() all partitions get cached.

image

Storage Level in Persist

image

In MEMORY_AND_DISK the data is stored in disk after memory is full but its stored in serialized form, it should be deserialized before spark can process it and hence CPU utilization is high here.

image

image

MEMORY_ONLY_2

This data is replicated twice. Just to ensure that if we have complex computations and things fail, we dont need to do computations again.

MEMORY_ONLY_SER and MEMORY_AND_DISK_SER works only in Scala and Java.

Lecture 31 : Dynamic Resource Allocation

image

Cluster Configuration

image

There is no problem with above configuration.

But let's say another user comes and asks for more resources...

image

The red person can't get assigned any more memory since cluster is already full.

The resource manager works on FIFO process.

Now this small process that needs only 25GB may have to wait for hours.

image

In dynamic memory allocation, the data that is not used is released.

Resource Manager has no role, spark internal algo does this.

image

Let's say we have free 750GB and driver demands 500GB from resource manager but there might be other processes waiting for the memory so it may not get it.

We can use min executors and max executors to get around this. We set min executors in such a way that process does not fail.

Now let's say there is a process which has completed execution and so Dynamic Resource Allocator frees the data. But we want it for further calculations. Do we calculate it again? No.

We can use External Shuffle Service. This works independently on every worker node and data in this doesnt get deleted.

image

If executors idle from 60s then we release the data.

How does executor ask for resources?

If the executor does not get its required memory within 1 sec then it starts asking in two fold manner.

image

First it asks for 1GB then 2GB then 4GB and so on...

spark.scheduler.backlogTimeout = 2s the executor waits till 2s before asking for memory.

Parallel Execution and Multi Threading

image

When to avoid dynamic resource allocation?

For critical jobs that needs to be run within certain SLA avoid it.

Lecture 32 : Dynamic Partition Pruning

image

In below code we have a filter applied to select only 19th April 2023 data,

image

Below we can see that only one file that is for 19th April 2023 is read, not all of them.

image

image

DPP with 2 tables

image

Partition pruning does not happen on first table but will happen on table 2. Dynamic Partition Pruning helps us to update filter on runtime.

Two conditions:

  • Data should be partitioned.
  • 2nd Table should be broadcasted.

image

image

Without Dynamic Partition Pruning

Total 123 files read from first table not one like previous case.

image

With Dynamic Partition Pruning

image

image

The smaller dimdate table is broadcasted and hash join performed. Only 3 files are read this time.

image

At runtime a subquery is run...

image

image

Now because of the runtime filter only 4 partitions are read/scanned.