Spark YouTube
Spark Concepts and Code
Lecture 1 : What is Apache Spark
Unified :
Computing Engine:
Spark is not storage platform we can store the data in hdfs, rdbms etc...
Spark can process terabytes of data in distributed manner.
Compute Cluster:
- each slave has 16 GB RAM, 1TB storage and 4 core CPU
- even master has some data and RAM
- the above cluster can compute 64 gb of data at a time.
- the master divides the data among the slave nodes and then slaves process the data.
Lecture 2 : Why Apache Spark?
Different Databases
new formats like video, audio, json,avro started coming in but rdms cannot handle it.
volume of data also increased.
What is Big Data?
Data Lake works on Extract Load Transform architecture
Issues with RDBMS
- Storage
- Processing - RAM and CPU
Enter Spark...
Monolith vs Microservice Architecture
Lecture 3 : Hadoop vs Spark
Misconception:
- Hadoop is a database - its not a database just a filesystem (hdfs)
- Spark is 100 times faster than hadoop
- Spark processes data in RAM but Hadoop doesnt
Differences
Performance
Hadoop does lot of read write IO operations and sends data back and forth to the disk.
But in spark each executor has its own memory.
Where is there no difference?
When we have very less data like 10 GB, there is no difference because the hadoop cluster also doesnt write to the disk it fits first time in memory.
Batch vs Stream Processing
Ease of Use
Spark has both low level and high level API in Python which is easier than using Hive. Low level programming is on RDD level.
Security
-
Hadoop has in built Kerberos Authentication via YARN whereas Spark doesnt have any security mechanism.
-
The authentication helps create ACL lists at directory level in HDFS.
-
Spark uses HDFS Storage so it gets ACL feature / ability and when it uses YARN it gets Kerberos Authentication.
Fault Tolerance
Data Replication in Hadoop
HDFS keeps track of which node / rack has the data from A B C and D
DAG in Spark
- So Spark computes / transforms in multiple processes Process 1 -> Process 2 -> Process 3 ....
- After each process the data is stored in a data structure called RDD which is immutable. So even if there is a failure Spark engine knows how to reconstruct the data for a particular process from the RDD at that stage.
Lecture 4 : Spark Ecosystem
High Level API : We cna write any SQL queries in python,java etc... there are ML and GraphX librries also.
We can write code in many languages. Low Level API : we can make RDD's and work on them.
Where does Spark run?
Spark Engine would need some memory for transformation.
- suppose it needs 4 worker nodes each 20 GB and a driver node of 20 gb.
- it goes to the cluster manager and asks for total 100 GB of memory, if available then the manager will assign that muuch storage.
- cluster manager is also called YARN, K8S, Standalone managers
Lecture 5 : Read Modes in Spark
format -> data file format : csv,json,jdbc and odbc connection. Format is optional parameter, by default its parquet format option -> inferschema, mode and header [optional field] schema -> manual schema can be passed here load -> path from where we need to read the data [not optional]
DataframeReader API
Access it using 'spark.read' in the spark session
mode
in Spark
Lecture 6 : Spark Architecture
Spark Cluster
- 20 core per machine and 100 GB RAM / each machine
- Total Cluster : 200 cores and 1TB RAM
- The master is controlled by Resource Manager and the workers are controlled by Node Manager.
What happens when user submits code?
- The user submits some Spark code for execution to the Resource Manager. It needs 20 GB RAM, 25 GB executor, 5 total executors and 5 CPU cores.
- So the manager goes to W5 and asks to create 20GB container as the driver node.
What happens inside the container?
Driver Allocation
Now this 20 GB driver is called Application Master
There are two main() functions inside the master, one is for PySpark and other is for JVM like Java,Scala etc...
The JVM main() is called Application Driver.
The Spark Core has a Java Wrapper and the Java Wrapper has a Python Wrapper.
When we write code in PySpark it gets converted to Java Wrapper.
The PySpark driver is not a requirement but the Java Wrapper is required to run any code.
Worker Allocation
- Now the Application master asks for the executors to be assigned and the resource manager allocates.
Executor Container
Each executor has 5 core CPU and 25GB RAM.
THe above is when we have pure Java code and dont use Python UDF.
But what if we use Python UDF functions?
We need a Python worker inside the executor to be able to run the code.
Lecture 7 : Schema in Spark
StructType and StructField
Example:
How to skip the header row?
Lecture 8 : Handling Corrupter Records in Spark
How many records in each mode?
Permissive Mode
DropMalformed
How to Print Corrupted Records
Output
How to Store Corrupted Records
The corrupted records are in json format
Lecture 9 : Transformations and Actions in Spark
Types of Transformations
- Narrow Transformation
- Wide Transformation
Example:
Suppose data is of 200MB. 200MB / 128MB = 2 partitions
Let's say both partitions go to separate executors.
Q1 : Filtering Records
There is no data movement here.
Q2: Find Total Income of each employee
One id = 2 record is in one partition and the other is in the second partition so we need to do wide transformation
Data needs to be shuffled and records with same id must be moved to same partition.
- filter,select,union etc are narrow transformations
- join,groupby,distinct
Lecture 10 : DAG and Lazy Evaluation in Spark
- For every action there is a new job, here there are three actions : read,inferSchema,sum and show
- When used with groupBy().sum(): It is considered an action because it triggers computation to aggregate data across partitions and produce a result. This operation forces Spark to execute the transformations leading up to it, effectively creating a job.
-
When used as a column expression df.select(sum("value")): It acts more like a transformation in Spark's context, especially if part of a larger query or pipeline that does not immediately trigger execution. In this case, it only defines the operation and does not create a job until an action (like show() or collect()) is called.
-
Job for reading file
Whole Stage Codegen - generate Java ByteCode
-
Inferschema
-
GroupBy and Count As explained above this is an action.
-
Show Final action to display df
After we read the csv and inferSchema there are no jobs created since filter and repartition both are transformations not actions.
When there are two filters on same dataset
This is the job
Optimizations on the Filter
Both the filters are on the same task
The optimizations can be applied because Spark is lazily evaluated.
Lecture 11: Working with JSON Data in Spark
Two types of JSON notation:
-
Line Delimited JSON
-
Multi Line JSON
[
{
"name": "Manish",
"age": 20,
"salary": 20000
},
{
"name": "Nikita",
"age": 25,
"salary": 21000
},
{
"name": "Pritam",
"age": 16,
"salary": 22000
},
{
"name": "Prantosh",
"age": 35,
"salary": 25000
},
{
"name": "Vikash",
"age": 67,
"salary": 40000
}
]
Line Delimited JSON is more efficient in terms of performance because the compiler knows that each line has one JSON record whereas in multiline json the compiler needs to keept track of where the record ends and the next one starts.
Different number of keys in each line
Here what happens is that the line with the extra key has the value while for the rest its null
Multiline Incorrect JSON
We dont pass a list here rather its just dictionaries
{
"name": "Manish",
"age": 20,
"salary": 20000
},
{
"name": "Nikita",
"age": 25,
"salary": 21000
},
{
"name": "Pritam",
"age": 16,
"salary": 22000
},
{
"name": "Prantosh",
"age": 35,
"salary": 25000
},
{
"name": "Vikash",
"age": 67,
"salary": 40000
}
Corrupted Records
We dont need to define _corrupted_record
in the schema, it will add the column on its ownn
Lecture 12: Spark SQL Engine
How is Spark Code compiled?
- The catalyst optimizer creates a plan and creates RDD lineage
Phases in Catalyst Optimizer
Workflow Diagram
- Unresolved Logical Plan : Bunch of crude steps to execute the SQL code
- Catalog : The table, files and database metadata information si stored in the catalog. Suppose we call read.csv on file that doesnt exist. The procedure that gives / throws the error is assisted via the catalog. In Analysis phase, we go through these steps. If some file/table is not found then we get Analysis Exception This error occurs when the Logical plan provided is not able to be resolved.
- Reoslved Logical Plan : This is the phase when we finished analysing the catalog objects.
- Logical Optimization: There are many examples. Suppose we need just two columns in select output, the spark engine does not fetch all the columns rather jsut fetches the two columns from memory that we need. Another example is when we use multiple filters on the same column in different lines of code. When we execute this code, we see that all of it is executed with or statements in one single line of code.
- Physical Plan: This involves taking decision like the type of join to use: Broadcast Join is one example. From the logical plan, we can build multiple physical plans. Thebest Physical Plan is a set of RDDs to be run on different executors on the cluster.
Lecture 13: Resilient Distributed Dataset
Data Storage of List
Data Storage in RDD
Suppose we have 500MB of data and 128MB partition, so we will have 4 partitions.
The data is scattered on various executors.
Its not in single contiguous location like elements of a list. The data structure used ot process this data is called RDD
Why is RDD recoverable?
-
RDD is immutable. If we apply multiple filters each dataset after filtering is a different dataset
-
In below case if rdd2 fails then we can restore rdd1 because of the lineage.
Disadvantage of RDD
- No optimization done by Spark on RDD. The dev must specify explicitly on how to optimize RDD.
Advantage
- Works well with unstructured data where there are no columns and rows / key-value pairs
- RDD is type safe, we get error on compile time rather than runtime which happens with Dataframe API.
Avoiding RDDs
- RDD : How to do? Dataframe API: Just specify what to do?
You can see in above case that we have a join and filter but we are specifically saying that first join then filter so it triggers a shuffle first and then filter which is not beneficial.
Lecture 14 : Parquet File Internals
There are two types of file formats:
- Columnar Based and Row Based
Physical Storage of Data on Disk
Write Once Read Many
The funda of big data is write once read many.
- We dont need all the columns for analytics of big data, so columnar storage is the best.
- If we store in row based format then we need to jump many memory racks to be able to get the data we need.
- OLTP generally use row base4d file format.
- I/O should be reduced so OLAP uses columnar format.
Why Columnar format may not be the best?
In above case we can get col1 and col2 easily but for col10 we still need to scan the entire file.
To tackle this:
Let's say we have 100 million total rows.
We can store 100,000 records at a time, continuously in one row, then the next 100,000 records in next row and so on in hybrid format.
Logical Partitioning in Parquet
Let's day we have 500mb data, each row group by default has 128 mb data, so we will have 4 row groups. Each row group will have some metadata attached to it.
In our example let's say one row group has 100000 records. The column is futher stored as a page.
Runlength Encoding and Bitpacking
Suppose we have 10 lakh records but there can be say 4 countries.
So parquet actually creates a dictionary of key value pair with key as int starting from 0 to 3 and then in the dictionary encoded data, we can see the keys being used insted of country name.
Demo
parquet-tools inspect <filename>
Gives some file and brief column level metadata.
parquet_file.metadata.row_group(0).column_group(0)
Compression is GZIP
Encoding is explained on top.
Bitpacking Advantage
- Bitpacking helps in compressing the bits so in above case we just have 4 unique values and hence we need just 2 bytes.
- Query in seconds for running select on csv,parquet etc..
Summary
-
Here the actual data is stored in the pages and it has metadata like min,max and count.
-
Let's say we need to find out people less than 18 years age
Here when we divide data into row groups, we dont need to do any IO read operation on Row group 2, it saves lot of time and optimize performance.
The above concept is called Predicate Pushdown.
Projection Pruning
Projection Pruning means we dont read IO from columns that are not part of the select query or that arent required for any join.
Lecture 15 : How to write data on the disk?
Modes to write data
Create three files
write_df = read_df.repartition(3).write.format("csv")\
.option("header", "True")\
.mode("overwrite")\ # Using .mode() instead of .option() for overwrite mode
.option("path", "/FileStore/tables/Write_Data/")\
.save()
Lecture 16: Partitioning and Bucketing
In above data we cannot partition by any column because there is no similarity but we can bucket the data.
In above data we can partition by the country, but again we might have more data in India partition and less data in Japan.
Example of Partitioning
The advantage is that the entire data is not scanned and only few partitions are scanned based on the filters.
Partitioning by Id
Here we can see that we have created partitions by ID and since ID is low cardinality column partitioning is not efficient and we need to use bucketing.
Partitioning by Address and Gender
Bucketing by Id
Dividing into 3 buckets
Tasks vs Buckets
- If we have 200 partitions we will have 200 tasks and each task will create 5 buckets each, to tackle this we first repartition into 5 partitions and then bucketBy 5.
How does bucketing help with joins?
- Here we can see that since we have same column bucket on both tables the ids can be easily mapped and there is no shuffling.
Bucket Pruning
Suppose we have 1M Aadhar Ids and we divide into 100,000 each bucket so when we divide the above aadhar id by 100000 then we get the exact bucket where the number lies in.
Lecture 17 : Spark Session vs Spark Context
- Spark Session is entry point to the Spark cluster where we provide the parameters to create and operate our cluster.
- Spark session will have different context like one for SQL, PySpark etc...
Lecture 18: Job, Stage and Tasks
- One Application is created.
- One job is created per action.
- One stage is defined for every transformation like filter.
- Task is the actually activity on the data that's happening.
Example of Job,Action and Task
Complete flow diagram
Every job has minimum one stage and one task.
Repartition to filter is one job because we dont hit an action in between.
Every wide dependency transformation has its own stage. All narrow dependency transformations come in one stage as a DAG.
How do tasks get created? [Read and Write Exchange]
- The repartition stage actually is a wide dependency transformation and creates two partitions from one, its a Write exchange of data.
- Now the filter and select stage reads this repartitioned data(Read exchange) and filter creates two tasks because we have two partitions.
- Next we need to find out how many folks earn > 90000 and age > 25 so we need to do a groupby that's a wide dependency transformation and it creates another stage. By default there are 200 partitions created.
- So some partitions may have data and some wont.
Lecture 17: Dataframe Transformations in Spark Part 1
Data gets stored in Row() format in the form of bytes
Columns are expressions. Expressions are set of transformations on more than one value in a record.
Ways to select values / columns
Column Manipulations
Other methods
selectExpr
Aliasing Columns
Lecutre 18 : Dataframe Transformations in Spark Part II
filter()
/ where()
no difference
Multiple filter conditions
Literals in spark
Used to pass same value in all the columns
Adding Columns
If the column already exists then it gets overwritten.
Renaming Columns
Lecture 19: union vs unionAll()
We can see that here we have a duplicate id
In PySpark union and unionAll behaves in the same way, both retain duplicates
But in Spark SQL when we do union it drops the duplicate records
Selecting data and unioning the same table
What happens when we change the order of the columns?
wrong_manager_df
actually has the wrong order of columns but still we get the union output but in a wrong column values.
If we give different number of columns an exception is thrown.
If we use unionByName then the column names on both dfs must be the same.
Lecture 19: Repartitioning and Coalesce
Suppose we have 5 partitions and one of them is skewed a lot 100MB, let's say this is the best selling product records. This partition takes lot of time to compute. So the other executors have to wait until this executor finishes processing.
Repartitioning vs Coalesce
Repartitioning
Suppose we have the above partitions and total data is 100mb. let's say we do repartition(5) so we will have 5 partitions now for the data with 40mb per partition.
Coalesce
In case of coalesce there is no equal splitting of partition memory, rather the already existing partitions get merged together.
There is no shuffling in coalesce but in repartitioning there is shuffling of data.
Pros and Cons in repartitioning
- There is evenly distributed data.
- Con is that IO operations are more, its expensive.
- Con of coalesce is that the data is unevenly distributed.
Repartitioning can increase or decrease the partitions but coalescing can only decrease the partitions.
How to get number of partitions?
flight_df.rdd.getNumPartitions()
gets the initial number of partitions and then we can repartition flight_df.repartition(4)
. Data is evenly distributed.
Repartitioning based on columns
Since we asked for 300 partitions and we have 255 records some partitions will have null record.
Coalescing
Suppose we have 8 partitions and we coalesce into 3 partitions. Coalesce has only one arg.
Uneven distribution of data in partitions.
Lecture 20 : Case when / if else in Spark
Apply logic on one column then process if else logic
Spark SQL Logic
Lecture 21 : Unique and Sorted Records
distinct()
Original Data
Distinct Data
Distinct Based on certain columns
⚠️ Distinct takes no arguments we need to select the columns first and then apply distinct.
Dropping duplicate records
Point to note is that the dataframe manager_df
has no changes, it just shows the records after dups have been dropped.
sort()
Descending order
Sorting on multiple columns
Here first the salary is srranged in desc order then we arrange the name in asc order from those records with same salary.
Lecture 22 : Aggregate functions
Count as both Action and Transformation
⚠️ When we are doing count on a single column and there is a null in it, its not considered in the count. But for all columns we have nulls in the count.
Job created in first case and its not created in second case below.
Lecture 23: Group By In Spark
Sample Data
Questions
Salary per department using groupBy()
Where do we use window functions?
Suppose we need to find out the percentage of total salary from a particular dept that the person is earning. we can use window function to specify the total salary per department in the particular record itself like I've shown below.
This way we dont need to perform a join.
Grouping by two columns
Lecture 24 : Joins in Spark part 1
Which customers joined platform but never brought anything?
Whenever we need information from another table, we use joins and there should be some common column.
Join is a costly wide dependency operation.
How do joins work?
How many records do we get after inner joining the below two tables.
We get a total of 9 records.
Sometimes data gets duplicated when we do joins, so we should use distinct() but remember distinct is wide dependency transform.
Lecture 25 : Types of Join in Spark
Inner Join
Left Join
All records in left table + those that join with right table, whereever we dont get match on right table the columns become null.
Right Join
Full Outer Join
Left Semi Join
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LeftSemiJoinExample").getOrCreate()
# Left DataFrame: Orders
orders = spark.createDataFrame([
(1, "iPhone"),
(2, "Pixel"),
(3, "OnePlus"),
(4, "Nokia")
], ["customer_id", "product"])
# Right DataFrame: Valid Customers
valid_customers = spark.createDataFrame([
(1,), (3,)
], ["customer_id"])
# Perform left semi join
filtered_orders = orders.join(valid_customers, on="customer_id", how="left_semi")
filtered_orders.show()
Output
+-----------+--------+
|customer_id|product |
+-----------+--------+
| 1|iPhone |
| 3|OnePlus |
+-----------+--------+
Left Anti Join
Find out all customers who have never purchased any product.
Cross Join
Never use cross join!
Lecture 26 : Join Strategies in Spark
Joins are expensive due to shuffling.
4 partitions are there in each dataframe.
Executors in the cluster
Now we need to join employee and salary df to get the output but they are on different executors, so we need to do data shuffling.
Each executor has 200 partitions. Goal is to get all same keys in one executor.
- Since we want to get id for 1 we divide 1/200 = 1 and then send all the data to that executor 1.
Suppose we want to map the salary for id = 7 so the data from the employee df with id = 7 and also salary df with id=7 will come into the executor 7.
Similarly id = 201 will go into 201/200 = executor no 1.