PySpark
PySpark Internals for Large Scale Data Processing
Internal Working and Architecture of Spark
High Level Overview
Flow of the Logic
Worker Node Architecture
- There are total of 64 cores, so 64 processes can execute parallelly and each executor will have 16 processes running.
- There can be partitions within each executor that can remain idle without any process running so we need to define a parameter to define how many min number of processes need to be run on each executor at once.
- To avoid having idle nodes we need to keep the number of partitions as a multiple of the core processor.
Summary of Spark Application
Attributes of Spark
- Its Scalable.
- Its Fault Tolerant and follows lazy evaluation. First Spark makes logical plans to process the data, then it builds a DAG and hence if one worker node goes down we can still recover the data.
- Spark supports multiple programming languages like Python and Scala.
- Spark can handle streaming data also.
- Spark is known for its speed due to in memory computation model.
- Spark has rich libraries for ML and Data Processsing.
Common Terminologies
Spark does not immediately process the data as mentioned above. When an action is called by the developer for storage or transformation purposes, it processes the data according to the DAG and return the output to the user.
To store data in on heap memory we need to serialize the data.
Stages and Tasks
Libraries Suppoprted by Spark
- SQL
- Streaming
- MLLib
- SparkX
Driver Node Architecture
Worker Node Architecture
On Heap Memory Architecture
- Out of 32gb, around 300mb is used by spark for disaster recovery and cannot be used by the user or the processes.
- Of the remaining (32gb - 300mb) we allocate 40% of the heap memory as the user memory and then 60% of the heap memory as the unified memory.
- In unified memory 50% is used for scheduling and 50% of the memory is used for executing.
API Architecture in Spark
Dataset = Best of RDD + Best Of DataFrame
-
RDDs were always stored on the on heap memory but dataframes can be stored on the off heap memory also.
-
All types of datasets here are immutable.
- Strong Typed feature ensures certain things like mismatch in the datatypes is detected in compile time.
Transformations and Actions
Lazy Evaluation and DAG Logic Flow
Narrow Transformation
Each and every executor can work independently on the data and don't require any dependency from the others.
Wide Transformation
This type of transformation involves the shuffling of data.
df.collect
is used to perform action after various transformations have been applied to the data.
On heap vs Off Heap Memory
On Heap Memory Architecture
Each and every executor has its own On Heap Memory
Off Heap Memory Architecture
The Off Heap Memory is managed by the OS and shared by all the executors when the on heap memory runs out of space.
The performance can be hit when we use the On Heap Memory because in the middle of any process if the on heap memory is full, then the Garbage Colector has to scan theentire memory to check and remove the unwanted memory space and then resume the process again.
Clusters in PySpark
-
If multiple users are using All Purpose Clusters the resources are shared between them.
-
It is used in notebooks where we have to check the output of every command after executing it.
-
Job Clusters are used for schedulle jobs and the clusters are created duing runtime.
-
Pools are used to create and combine multiple clusters where we can command how many clusters must be active all the time. So there is no bootup time and is used for cost cutting.
Cluster Modes
Spark Architecture Diagram
Apache Spark Internals
How does Spark Execute Queries?
Spark Cluster Execution
- Executor is a JVM virtual machine that runs on the workers.
- The 625mb file is divided into memory partitions and then sent to the workers for execution.
- Its an automatic parallelism process.
Hive Metastore
Parquet File Format
-
We prefer the parquet format because consider a dataset with 100 columns in it and we want to only fetch data of first three columns. We can use parquet format to do it faster compared to csv.
-
Search for files with java in it with Linux
%sh ps grep 'java'
-
How to read markdown files use bash
%fs head /databricks-dataset/README.md
-
Display all mount points
%fs mounts
-
Declare a python variable in the spark context which SQL commands can access.
spark.sql(f"SET c.events_path = {events_path}")
-
Creating a table form the files and load it as a table
- Add notebook params as widgets
What is there in Spark SQL?
Lazy Evaluation
Explicit vs Implicit vs Infer Schema
Fastest one is explicit since we don't need to read the data files.
Query Execution Process
, True),
StructField("ecommerce", StructType([
StructField("purchaseRevenue", DoubleType(), True),
StructField("total_item_quantity", LongType(), True),
StructField("unique_items", LongType(), True)
]), True),
StructField("event_name", StringType(), True),
StructField("event_previous_timestamp", LongType(), True),
StructField("event_timestamp", LongType(), True),
StructField("geo", StructType([
StructField("city", StringType(), True),
StructField("state", StringType(), True)
]), True),
StructField("items", ArrayType(
StructType([
StructField("coupon", StringType(), True),
StructField("item_id", StringType(), True),
StructField("item_name", StringType(), True),
StructField("item_revenue_in_usd", DoubleType(), True),
StructField("price_in_usd", DoubleType(), True),
StructField("quantity", LongType(), True)
])
), True),
StructField("traffic_source", StringType(), True),
StructField("user_first_touch_timestamp", LongType(), True),
StructField("user_id", StringType(), True)
])
events_df = (spark
.read
.schema(user_defined_schema)
.json(events_json_path)
)
There are no jobs that are spanned while running the above code since we give all the data to infer that spark needs.
Write Dataframes to tables
Reading Complex JSON and performing operations
rev_df = (events_df
.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
.withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
.withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
.sort(col("avg_purchase_revenue").desc())
)
display(rev_df)
selectExpr()
apple_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
display(apple_df)

Drop multiple columns
Create New Columns
filter()
to subset rows
revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenue_df)
sort
vs order_by
sort
will run on individual memory partitions and order_by
will sort all the memory partitions together.
revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenue_df)
decrease_sessions_df = events_df.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decrease_sessions_df)
Aggregations
Group By Operations
How Group By Works?
Group Data Methods
Average Purchase Revenue for each state
avg_state_purchases_df = df.groupBy("geo.state").avg("ecommerce.purchase_revenue_in_usd")
display(avg_state_purchases_df)

Total Quantity and sum of purchase revenue and quantity for each combo of city and state
city_purchase_quantities_df = df.groupBy("geo.state", "geo.city").sum("ecommerce.total_item_quantity", "ecommerce.purchase_revenue_in_usd")
display(city_purchase_quantities_df)

List of Aggregation Functions
Multiple Aggregate Functions
from pyspark.sql.functions import avg, approx_count_distinct
state_aggregates_df = (df
.groupBy("geo.state")
.agg(avg("ecommerce.total_item_quantity").alias("avg_quantity"),
approx_count_distinct("user_id").alias("distinct_users"))
)
display(state_aggregates_df)
Unix Timestamps
Datetime Functions
Refer this docs
Add and Subtract Dates
String Built In Functions
Complex Data types
Review
Collection Functions
array_contains
exlpode()
element_at
collect_set
Split to extract email address
Collection Functions Review
Create a column for the size of mattress
mattress_df = (details_df
.filter(array_contains(col("details"), "Mattress"))
.withColumn("size", element_at(col("details"), 2)))
display(mattress_df)

For each email, check what mattress type they purchased.
size_df = mattress_df.groupBy("email").agg(collect_set("size").alias("size options"))
display(size_df)

Miscellaneous Functions
Joins Demo
Handling Null Values
How UDFs run in Scala and Python?
- In case of Scala UDFs, it lies inside the executor so there is no inter process communication is required.
- But in case of Python UDF, we will have a driver program and an executor but the Python UDFs run outside the Executor.
- This means that the Spark DataFrame rows are deserialized, sent row by row to the python UDF that transforms it, serializes it row by row and sends it back to the executors.
- The UDFs are registered on Python Interpreters.
Transform function execution
- The custom UDF that was written took twice as long to execute due to the extra work involved.
- The problem with UDFs is that if we write it in Python, there is extra overhead of converting it to Java bytecode and providing it to the executor.
How to register UDFs?
Python vs Pandas UDF
SQL UDF
Apache Spark Architecture
Cluster Architecture
- Each worker will have only one executor
There is one task for each memory partition.
Driver's work
The rest of the memory partitions do not have any cores to execute on and wait in the queue.
A,E,H and J have finished working. They are idle so worker node assigns the memory partitions 13,14,15 and 16 to them.
Once all of them complete the work, the driver sends the answer set to the client.
- The intermediate result sets mentioned as shuffle write and shuffle read are then sent to the worker node hard drives.
Deep Dive Into Shuffle
-
The memory used went from 20mb to 560 bytes because we are only storing the key value pairs and not the entire data. The key value pairs indicate the color and the number of rows that they are part of. The data is written to the disk under shuffle write.
-
In stage 2 we build shuffle partition with Green, Red and Yellow rows.
Summary
Query Optimization
- RDD is resilient distributed dataset that is just an array of data.
Example
Example 2
Adaptive Query Optimization
Shuffle Partitions : With and Without AQE
Predicate Pushdown
- In this case, less RAM is used so query is faster.
- If we don't remove things from cache that is not needed, when there is only one core in the executor and a query is called in, then it will be given precedence and the cache will be evicted.
Memory Partitioning Guidelines
In the below example we have 8 memory partitions and 200 shuffle partitions.
Check the number of cores in the cluster
Check the number of memory partitions
Repartitioning Dataset
Repartitioning is always a wide transformation.
AQE and Shuffle Partitions
We can set the spark.sql.shuffle.partitions
based on the largest dataset that our application can process.