Skip to content

Spark YouTube

Spark Concepts and Code

Lecture 1 : What is Apache Spark

image

Unified :

image

Computing Engine:

image

Spark is not storage platform we can store the data in hdfs, rdbms etc...

Spark can process terabytes of data in distributed manner.

Compute Cluster:

image

  • each slave has 16 GB RAM, 1TB storage and 4 core CPU
  • even master has some data and RAM
  • the above cluster can compute 64 gb of data at a time.
  • the master divides the data among the slave nodes and then slaves process the data.

Lecture 2 : Why Apache Spark?

Different Databases image

new formats like video, audio, json,avro started coming in but rdms cannot handle it.

volume of data also increased.

What is Big Data?

image

Data Lake works on Extract Load Transform architecture

Issues with RDBMS

  • Storage
  • Processing - RAM and CPU

Enter Spark...

image

Monolith vs Microservice Architecture

image

Lecture 3 : Hadoop vs Spark

Misconception:

  • Hadoop is a database - its not a database just a filesystem (hdfs)
  • Spark is 100 times faster than hadoop
  • Spark processes data in RAM but Hadoop doesnt

Differences

Performance

image

Hadoop does lot of read write IO operations and sends data back and forth to the disk. image

But in spark each executor has its own memory. image

Where is there no difference?

When we have very less data like 10 GB, there is no difference because the hadoop cluster also doesnt write to the disk it fits first time in memory.

Batch vs Stream Processing

image

Ease of Use

image

Spark has both low level and high level API in Python which is easier than using Hive. Low level programming is on RDD level.

Security
  • Hadoop has in built Kerberos Authentication via YARN whereas Spark doesnt have any security mechanism.

  • The authentication helps create ACL lists at directory level in HDFS.

  • Spark uses HDFS Storage so it gets ACL feature / ability and when it uses YARN it gets Kerberos Authentication.

Fault Tolerance

image

Data Replication in Hadoop image

HDFS keeps track of which node / rack has the data from A B C and D

image

DAG in Spark

  • So Spark computes / transforms in multiple processes Process 1 -> Process 2 -> Process 3 ....
  • After each process the data is stored in a data structure called RDD which is immutable. So even if there is a failure Spark engine knows how to reconstruct the data for a particular process from the RDD at that stage.

Lecture 4 : Spark Ecosystem

image

High Level API : We cna write any SQL queries in python,java etc... there are ML and GraphX librries also.

We can write code in many languages. Low Level API : we can make RDD's and work on them.

image

Where does Spark run?

image

Spark Engine would need some memory for transformation.

  • suppose it needs 4 worker nodes each 20 GB and a driver node of 20 gb.
  • it goes to the cluster manager and asks for total 100 GB of memory, if available then the manager will assign that muuch storage.
  • cluster manager is also called YARN, K8S, Standalone managers

Lecture 5 : Read Modes in Spark

image

format -> data file format : csv,json,jdbc and odbc connection. Format is optional parameter, by default its parquet format option -> inferschema, mode and header [optional field] schema -> manual schema can be passed here load -> path from where we need to read the data [not optional]

DataframeReader API

Access it using 'spark.read' in the spark session

image

mode in Spark

image

Lecture 6 : Spark Architecture

Spark Cluster

image

  • 20 core per machine and 100 GB RAM / each machine
  • Total Cluster : 200 cores and 1TB RAM

image

  • The master is controlled by Resource Manager and the workers are controlled by Node Manager.

What happens when user submits code?

image

  • The user submits some Spark code for execution to the Resource Manager. It needs 20 GB RAM, 25 GB executor, 5 total executors and 5 CPU cores.
  • So the manager goes to W5 and asks to create 20GB container as the driver node.

What happens inside the container?

Driver Allocation

Now this 20 GB driver is called Application Master image

There are two main() functions inside the master, one is for PySpark and other is for JVM like Java,Scala etc...

The JVM main() is called Application Driver.

The Spark Core has a Java Wrapper and the Java Wrapper has a Python Wrapper.

When we write code in PySpark it gets converted to Java Wrapper.

The PySpark driver is not a requirement but the Java Wrapper is required to run any code.

Worker Allocation

image

  • Now the Application master asks for the executors to be assigned and the resource manager allocates.

Executor Container

image

Each executor has 5 core CPU and 25GB RAM.

THe above is when we have pure Java code and dont use Python UDF.

But what if we use Python UDF functions?

image We need a Python worker inside the executor to be able to run the code.

Lecture 7 : Schema in Spark

StructType and StructField

image

Example:

image

image

How to skip the header row?

df = spark.read.option("skipRows", 2).csv("file.csv")

Lecture 8 : Handling Corrupter Records in Spark

image

How many records in each mode?

image

Permissive Mode

image

DropMalformed

image

How to Print Corrupted Records

image

image

Output image

How to Store Corrupted Records

image

The corrupted records are in json format image

Lecture 9 : Transformations and Actions in Spark

image

Types of Transformations

  • Narrow Transformation
  • Wide Transformation

image

Example: image

Suppose data is of 200MB. 200MB / 128MB = 2 partitions

image

Let's say both partitions go to separate executors.

Q1 : Filtering Records image There is no data movement here.

Q2: Find Total Income of each employee image

One id = 2 record is in one partition and the other is in the second partition so we need to do wide transformation image

Data needs to be shuffled and records with same id must be moved to same partition.

  • filter,select,union etc are narrow transformations
  • join,groupby,distinct

Lecture 10 : DAG and Lazy Evaluation in Spark

image

  • For every action there is a new job, here there are three actions : read,inferSchema,sum and show
  • When used with groupBy().sum(): It is considered an action because it triggers computation to aggregate data across partitions and produce a result. This operation forces Spark to execute the transformations leading up to it, effectively creating a job.
  • When used as a column expression df.select(sum("value")): It acts more like a transformation in Spark's context, especially if part of a larger query or pipeline that does not immediately trigger execution. In this case, it only defines the operation and does not create a job until an action (like show() or collect()) is called.

  • Job for reading file image Whole Stage Codegen - generate Java ByteCode

  • Inferschema image

  • GroupBy and Count As explained above this is an action.

  • Show Final action to display df

image After we read the csv and inferSchema there are no jobs created since filter and repartition both are transformations not actions.

When there are two filters on same dataset

image

This is the job image

Optimizations on the Filter

Both the filters are on the same task image The optimizations can be applied because Spark is lazily evaluated.

Lecture 11: Working with JSON Data in Spark

image

Two types of JSON notation:

  • Line Delimited JSON image

  • Multi Line JSON image

[
{
  "name": "Manish",
  "age": 20,
  "salary": 20000
},
{
  "name": "Nikita",
  "age": 25,
  "salary": 21000
},
{
  "name": "Pritam",
  "age": 16,
  "salary": 22000
},
{
  "name": "Prantosh",
  "age": 35,
  "salary": 25000
},
{
  "name": "Vikash",
  "age": 67,
  "salary": 40000
}
]

Line Delimited JSON is more efficient in terms of performance because the compiler knows that each line has one JSON record whereas in multiline json the compiler needs to keept track of where the record ends and the next one starts.

Different number of keys in each line

image

Here what happens is that the line with the extra key has the value while for the rest its null image

Multiline Incorrect JSON

We dont pass a list here rather its just dictionaries

{
  "name": "Manish",
  "age": 20,
  "salary": 20000
},
{
  "name": "Nikita",
  "age": 25,
  "salary": 21000
},
{
  "name": "Pritam",
  "age": 16,
  "salary": 22000
},
{
  "name": "Prantosh",
  "age": 35,
  "salary": 25000
},
{
  "name": "Vikash",
  "age": 67,
  "salary": 40000
}
When we process the json it just reads the first dictionary as a record and the rest is not processed.

image

Corrupted Records

We dont need to define _corrupted_record in the schema, it will add the column on its ownn

image

Lecture 12: Spark SQL Engine

image

How is Spark Code compiled?

  • The catalyst optimizer creates a plan and creates RDD lineage

Phases in Catalyst Optimizer

image

Workflow Diagram
  • Unresolved Logical Plan : Bunch of crude steps to execute the SQL code
  • Catalog : The table, files and database metadata information si stored in the catalog. Suppose we call read.csv on file that doesnt exist. The procedure that gives / throws the error is assisted via the catalog. In Analysis phase, we go through these steps. If some file/table is not found then we get Analysis Exception This error occurs when the Logical plan provided is not able to be resolved.
  • Reoslved Logical Plan : This is the phase when we finished analysing the catalog objects.
  • Logical Optimization: There are many examples. Suppose we need just two columns in select output, the spark engine does not fetch all the columns rather jsut fetches the two columns from memory that we need. Another example is when we use multiple filters on the same column in different lines of code. When we execute this code, we see that all of it is executed with or statements in one single line of code.
  • Physical Plan: This involves taking decision like the type of join to use: Broadcast Join is one example. From the logical plan, we can build multiple physical plans. Thebest Physical Plan is a set of RDDs to be run on different executors on the cluster.

image

Lecture 13: Resilient Distributed Dataset

image

Data Storage of List

image

Data Storage in RDD

Suppose we have 500MB of data and 128MB partition, so we will have 4 partitions.

The data is scattered on various executors. image

Its not in single contiguous location like elements of a list. The data structure used ot process this data is called RDD image

image

Why is RDD recoverable?

  • RDD is immutable. If we apply multiple filters each dataset after filtering is a different dataset image

  • In below case if rdd2 fails then we can restore rdd1 because of the lineage. image

Disadvantage of RDD

  • No optimization done by Spark on RDD. The dev must specify explicitly on how to optimize RDD.

Advantage

  • Works well with unstructured data where there are no columns and rows / key-value pairs
  • RDD is type safe, we get error on compile time rather than runtime which happens with Dataframe API.

Avoiding RDDs

image

  • RDD : How to do? Dataframe API: Just specify what to do?

image You can see in above case that we have a join and filter but we are specifically saying that first join then filter so it triggers a shuffle first and then filter which is not beneficial.

Lecture 14 : Parquet File Internals

image

There are two types of file formats:

  • Columnar Based and Row Based

Physical Storage of Data on Disk

image

Write Once Read Many

The funda of big data is write once read many.

  • We dont need all the columns for analytics of big data, so columnar storage is the best.
  • If we store in row based format then we need to jump many memory racks to be able to get the data we need.

image

  • OLTP generally use row base4d file format.
  • I/O should be reduced so OLAP uses columnar format.

Why Columnar format may not be the best?

image

In above case we can get col1 and col2 easily but for col10 we still need to scan the entire file.

To tackle this:

Let's say we have 100 million total rows.

We can store 100,000 records at a time, continuously in one row, then the next 100,000 records in next row and so on in hybrid format.

image

Logical Partitioning in Parquet

image

Let's day we have 500mb data, each row group by default has 128 mb data, so we will have 4 row groups. Each row group will have some metadata attached to it.

In our example let's say one row group has 100000 records. The column is futher stored as a page.

Runlength Encoding and Bitpacking

image

Suppose we have 10 lakh records but there can be say 4 countries.

So parquet actually creates a dictionary of key value pair with key as int starting from 0 to 3 and then in the dictionary encoded data, we can see the keys being used insted of country name.

Demo

parquet-tools inspect <filename>

image image

Gives some file and brief column level metadata.

parquet_file.metadata.row_group(0).column_group(0) image

Compression is GZIP image

Encoding is explained on top.

Bitpacking Advantage

  • Bitpacking helps in compressing the bits so in above case we just have 4 unique values and hence we need just 2 bytes.
  • Query in seconds for running select on csv,parquet etc..

image

Summary

image

  • Here the actual data is stored in the pages and it has metadata like min,max and count.

  • Let's say we need to find out people less than 18 years age

image

Here when we divide data into row groups, we dont need to do any IO read operation on Row group 2, it saves lot of time and optimize performance.

The above concept is called Predicate Pushdown.

Projection Pruning

Projection Pruning means we dont read IO from columns that are not part of the select query or that arent required for any join.

Lecture 15 : How to write data on the disk?

image

image

Modes to write data

image

Create three files image

  write_df = read_df.repartition(3).write.format("csv")\
    .option("header", "True")\
    .mode("overwrite")\  # Using .mode() instead of .option() for overwrite mode
    .option("path", "/FileStore/tables/Write_Data/")\
    .save()

Lecture 16: Partitioning and Bucketing

image

In above data we cannot partition by any column because there is no similarity but we can bucket the data.

image

image In above data we can partition by the country, but again we might have more data in India partition and less data in Japan.

Example of Partitioning

image

image

The advantage is that the entire data is not scanned and only few partitions are scanned based on the filters.

Partitioning by Id

image

Here we can see that we have created partitions by ID and since ID is low cardinality column partitioning is not efficient and we need to use bucketing.

Partitioning by Address and Gender

image

Bucketing by Id

Dividing into 3 buckets image

Tasks vs Buckets

image

  • If we have 200 partitions we will have 200 tasks and each task will create 5 buckets each, to tackle this we first repartition into 5 partitions and then bucketBy 5.

How does bucketing help with joins?

image

  • Here we can see that since we have same column bucket on both tables the ids can be easily mapped and there is no shuffling.

Bucket Pruning

image Suppose we have 1M Aadhar Ids and we divide into 100,000 each bucket so when we divide the above aadhar id by 100000 then we get the exact bucket where the number lies in.

Lecture 17 : Spark Session vs Spark Context

  • Spark Session is entry point to the Spark cluster where we provide the parameters to create and operate our cluster.
  • Spark session will have different context like one for SQL, PySpark etc...

image

Lecture 18: Job, Stage and Tasks

image

  • One Application is created.
  • One job is created per action.
  • One stage is defined for every transformation like filter.
  • Task is the actually activity on the data that's happening.

image

Example of Job,Action and Task

image

Complete flow diagram

image

Every job has minimum one stage and one task.

image Repartition to filter is one job because we dont hit an action in between.

Every wide dependency transformation has its own stage. All narrow dependency transformations come in one stage as a DAG.

image

How do tasks get created? [Read and Write Exchange]

image

  • The repartition stage actually is a wide dependency transformation and creates two partitions from one, its a Write exchange of data.
  • Now the filter and select stage reads this repartitioned data(Read exchange) and filter creates two tasks because we have two partitions.
  • Next we need to find out how many folks earn > 90000 and age > 25 so we need to do a groupby that's a wide dependency transformation and it creates another stage. By default there are 200 partitions created.
  • So some partitions may have data and some wont.

image

Lecture 17: Dataframe Transformations in Spark Part 1

image Data gets stored in Row() format in the form of bytes

image

Columns are expressions. Expressions are set of transformations on more than one value in a record.

Ways to select values / columns

image

image

Column Manipulations

image

Other methods image

selectExpr image

Aliasing Columns image

Lecutre 18 : Dataframe Transformations in Spark Part II

filter() / where() no difference

image

image

Multiple filter conditions

image

Literals in spark

Used to pass same value in all the columns image

Adding Columns

If the column already exists then it gets overwritten. image

Renaming Columns

image

Lecture 19: union vs unionAll()

image

We can see that here we have a duplicate id image

In PySpark union and unionAll behaves in the same way, both retain duplicates image

But in Spark SQL when we do union it drops the duplicate records image

image

Selecting data and unioning the same table

image

What happens when we change the order of the columns?

wrong_manager_df actually has the wrong order of columns but still we get the union output but in a wrong column values. image

If we give different number of columns an exception is thrown. image

If we use unionByName then the column names on both dfs must be the same. image

Lecture 19: Repartitioning and Coalesce

Suppose we have 5 partitions and one of them is skewed a lot 100MB, let's say this is the best selling product records. This partition takes lot of time to compute. So the other executors have to wait until this executor finishes processing. image

Repartitioning vs Coalesce

Repartitioning

Suppose we have the above partitions and total data is 100mb. let's say we do repartition(5) so we will have 5 partitions now for the data with 40mb per partition.

Coalesce

In case of coalesce there is no equal splitting of partition memory, rather the already existing partitions get merged together. image

There is no shuffling in coalesce but in repartitioning there is shuffling of data.

Pros and Cons in repartitioning

  • There is evenly distributed data.
  • Con is that IO operations are more, its expensive.
  • Con of coalesce is that the data is unevenly distributed.

Repartitioning can increase or decrease the partitions but coalescing can only decrease the partitions.

How to get number of partitions?

flight_df.rdd.getNumPartitions() gets the initial number of partitions and then we can repartition flight_df.repartition(4). Data is evenly distributed.

image

Repartitioning based on columns

image

Since we asked for 300 partitions and we have 255 records some partitions will have null record. image

Coalescing

image Suppose we have 8 partitions and we coalesce into 3 partitions. Coalesce has only one arg.

Uneven distribution of data in partitions. image

Lecture 20 : Case when / if else in Spark

image

image

Apply logic on one column then process if else logic

image

Spark SQL Logic

image

Lecture 21 : Unique and Sorted Records

image

distinct()

Original Data image

Distinct Data image

Distinct Based on certain columns image

⚠️ Distinct takes no arguments we need to select the columns first and then apply distinct.

Dropping duplicate records

Point to note is that the dataframe manager_df has no changes, it just shows the records after dups have been dropped. image

sort()

image

Descending order image

Sorting on multiple columns

Here first the salary is srranged in desc order then we arrange the name in asc order from those records with same salary. image

Lecture 22 : Aggregate functions

Count as both Action and Transformation

image

⚠️ When we are doing count on a single column and there is a null in it, its not considered in the count. But for all columns we have nulls in the count. image

Job created in first case and its not created in second case below. image

Lecture 23: Group By In Spark

Sample Data

image

Questions

image

Salary per department using groupBy() image

Where do we use window functions?

Suppose we need to find out the percentage of total salary from a particular dept that the person is earning. we can use window function to specify the total salary per department in the particular record itself like I've shown below. image

This way we dont need to perform a join.

image

Grouping by two columns

image

Lecture 24 : Joins in Spark part 1

image

Which customers joined platform but never brought anything?

image

Whenever we need information from another table, we use joins and there should be some common column.

Join is a costly wide dependency operation.

How do joins work?

How many records do we get after inner joining the below two tables. image

We get a total of 9 records. image

Sometimes data gets duplicated when we do joins, so we should use distinct() but remember distinct is wide dependency transform.

Lecture 25 : Types of Join in Spark

image

Inner Join

image

Left Join

image All records in left table + those that join with right table, whereever we dont get match on right table the columns become null.

Right Join

image

Full Outer Join

image

Left Semi Join

image

image

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LeftSemiJoinExample").getOrCreate()

# Left DataFrame: Orders
orders = spark.createDataFrame([
    (1, "iPhone"),
    (2, "Pixel"),
    (3, "OnePlus"),
    (4, "Nokia")
], ["customer_id", "product"])

# Right DataFrame: Valid Customers
valid_customers = spark.createDataFrame([
    (1,), (3,)
], ["customer_id"])

# Perform left semi join
filtered_orders = orders.join(valid_customers, on="customer_id", how="left_semi")
filtered_orders.show()

Output

+-----------+--------+
|customer_id|product |
+-----------+--------+
|          1|iPhone  |
|          3|OnePlus |
+-----------+--------+

Left Anti Join

image

Find out all customers who have never purchased any product.

Cross Join

Never use cross join! image

image

Lecture 26 : Join Strategies in Spark

image

Joins are expensive due to shuffling.

4 partitions are there in each dataframe.

image

Executors in the cluster

image

Now we need to join employee and salary df to get the output but they are on different executors, so we need to do data shuffling.

Each executor has 200 partitions. Goal is to get all same keys in one executor. image

image

image

  • Since we want to get id for 1 we divide 1/200 = 1 and then send all the data to that executor 1.

image

Suppose we want to map the salary for id = 7 so the data from the employee df with id = 7 and also salary df with id=7 will come into the executor 7.

Similarly id = 201 will go into 201/200 = executor no 1.

Types of Join Strategies

image