Skip to content

PySpark

PySpark Internals for Large Scale Data Processing

Internal Working and Architecture of Spark

High Level Overview

image

Flow of the Logic

image

Worker Node Architecture

image - There are total of 64 cores, so 64 processes can execute parallelly and each executor will have 16 processes running. - There can be partitions within each executor that can remain idle without any process running so we need to define a parameter to define how many min number of processes need to be run on each executor at once. - To avoid having idle nodes we need to keep the number of partitions as a multiple of the core processor.

Summary of Spark Application

image

Attributes of Spark

  • Its Scalable.
  • Its Fault Tolerant and follows lazy evaluation. First Spark makes logical plans to process the data, then it builds a DAG and hence if one worker node goes down we can still recover the data.
  • Spark supports multiple programming languages like Python and Scala.
  • Spark can handle streaming data also.
  • Spark is known for its speed due to in memory computation model.
  • Spark has rich libraries for ML and Data Processsing.

Common Terminologies

image

image

Spark does not immediately process the data as mentioned above. When an action is called by the developer for storage or transformation purposes, it processes the data according to the DAG and return the output to the user.

image

To store data in on heap memory we need to serialize the data.

Stages and Tasks

image

Libraries Suppoprted by Spark

  • SQL
  • Streaming
  • MLLib
  • SparkX

Driver Node Architecture

image

Worker Node Architecture

image

On Heap Memory Architecture

image

  • Out of 32gb, around 300mb is used by spark for disaster recovery and cannot be used by the user or the processes.
  • Of the remaining (32gb - 300mb) we allocate 40% of the heap memory as the user memory and then 60% of the heap memory as the unified memory.
  • In unified memory 50% is used for scheduling and 50% of the memory is used for executing.

API Architecture in Spark

image

Dataset = Best of RDD + Best Of DataFrame image

  • RDDs were always stored on the on heap memory but dataframes can be stored on the off heap memory also.

  • All types of datasets here are immutable.

image

  • Strong Typed feature ensures certain things like mismatch in the datatypes is detected in compile time.

Transformations and Actions

image

image

Lazy Evaluation and DAG Logic Flow

image

Narrow Transformation

Each and every executor can work independently on the data and don't require any dependency from the others.

image

Wide Transformation

This type of transformation involves the shuffling of data.

image

df.collect is used to perform action after various transformations have been applied to the data.

On heap vs Off Heap Memory

On Heap Memory Architecture

Each and every executor has its own On Heap Memory image

Off Heap Memory Architecture

image

The Off Heap Memory is managed by the OS and shared by all the executors when the on heap memory runs out of space.

The performance can be hit when we use the On Heap Memory because in the middle of any process if the on heap memory is full, then the Garbage Colector has to scan theentire memory to check and remove the unwanted memory space and then resume the process again.

image

Clusters in PySpark

image

  • If multiple users are using All Purpose Clusters the resources are shared between them.

  • It is used in notebooks where we have to check the output of every command after executing it.

  • Job Clusters are used for schedulle jobs and the clusters are created duing runtime.

  • Pools are used to create and combine multiple clusters where we can command how many clusters must be active all the time. So there is no bootup time and is used for cost cutting.

Cluster Modes

image

Spark Architecture Diagram

image

Apache Spark Internals

How does Spark Execute Queries?

Spark Cluster Execution

  • Executor is a JVM virtual machine that runs on the workers.
  • The 625mb file is divided into memory partitions and then sent to the workers for execution.
  • Its an automatic parallelism process.

Hive Metastore

Parquet File Format

  • We prefer the parquet format because consider a dataset with 100 columns in it and we want to only fetch data of first three columns. We can use parquet format to do it faster compared to csv.

  • Search for files with java in it with Linux %sh ps grep 'java'

  • How to read markdown files use bash %fs head /databricks-dataset/README.md

  • Display all mount points %fs mounts

  • Declare a python variable in the spark context which SQL commands can access. spark.sql(f"SET c.events_path = {events_path}")

  • Creating a table form the files and load it as a table

    CREATE TABLE IF NOT EXISTS 
    events
    USING DELTA OPTIONS
    {path "${c.events_path}"}
    

  • Add notebook params as widgets

CREATE WIDGET TEXT state default "KA"
SELECT * FROM events WHERE state = getArgument("state")

What is there in Spark SQL?

Lazy Evaluation

Explicit vs Implicit vs Infer Schema

Fastest one is explicit since we don't need to read the data files.

Query Execution Process

![](query execution engine

DataFrame Action

  • anything we specify as options are actions.

Inferring JSON Schema

from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType, StringType, StructType, StructField



user_defined_schema = StructType([

StructField("device", StringType(), True),

StructField("ecommerce", StructType([

StructField("purchaseRevenue", DoubleType(), True),

StructField("total_item_quantity", LongType(), True),

StructField("unique_items", LongType(), True)

]), True),

StructField("event_name", StringType(), True),

StructField("event_previous_timestamp", LongType(), True),

StructField("event_timestamp", LongType(), True),

StructField("geo", StructType([

StructField("city", StringType(), True),

StructField("state", StringType(), True)

]), True),

StructField("items", ArrayType(

StructType([

StructField("coupon", StringType(), True),

StructField("item_id", StringType(), True),

StructField("item_name", StringType(), True),

StructField("item_revenue_in_usd", DoubleType(), True),

StructField("price_in_usd", DoubleType(), True),

StructField("quantity", LongType(), True)

])

), True),

StructField("traffic_source", StringType(), True),

StructField("user_first_touch_timestamp", LongType(), True),

StructField("user_id", StringType(), True)

])



events_df = (spark

.read

.schema(user_defined_schema)

.json(events_json_path)

)

There are no jobs that are spanned while running the above code since we give all the data to infer that spark needs.

Write Dataframes to tables

events_df.write.mode("overwrite").saveAsTable("events")

Reading Complex JSON and performing operations

rev_df = (events_df

.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
.withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
.withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))

.sort(col("avg_purchase_revenue").desc())

)
display(rev_df)

selectExpr()

apple_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")

display(apple_df)

Drop multiple columns

anonymous_df = events_df.drop("user_id", "geo", "device")
display(anonymous_df)

Create New Columns

mobile_df = events_df.withColumn("mobile", col("device").isin("iOS", "Android"))
display(mobile_df)

filter() to subset rows

purchases_df = events_df.filter("ecommerce.total_item_quantity > 0")
display(purchases_df)   
revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenue_df)

sort vs order_by

sort will run on individual memory partitions and order_by will sort all the memory partitions together.

revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenue_df)
decrease_sessions_df = events_df.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decrease_sessions_df)

Aggregations

Group By Operations

df.groupBy("geo.state", "geo.city")

How Group By Works?

Group Data Methods

Average Purchase Revenue for each state

avg_state_purchases_df = df.groupBy("geo.state").avg("ecommerce.purchase_revenue_in_usd")
display(avg_state_purchases_df)

Total Quantity and sum of purchase revenue and quantity for each combo of city and state

city_purchase_quantities_df = df.groupBy("geo.state", "geo.city").sum("ecommerce.total_item_quantity", "ecommerce.purchase_revenue_in_usd")
display(city_purchase_quantities_df)

List of Aggregation Functions

Multiple Aggregate Functions

from pyspark.sql.functions import avg, approx_count_distinct
state_aggregates_df = (df
.groupBy("geo.state")
.agg(avg("ecommerce.total_item_quantity").alias("avg_quantity"),
approx_count_distinct("user_id").alias("distinct_users"))

)
display(state_aggregates_df)

Unix Timestamps

Datetime Functions

Refer this docs

Add and Subtract Dates

plus_2_df = timestamp_df.withColumn("plus_two_days", date_add(col("timestamp"), 2))

String Built In Functions

Complex Data types

Review

Collection Functions

array_contains

exlpode()

element_at

collect_set

Split to extract email address

display(df.select(split(df.email, '@', 0).alias('email_handle')))

Collection Functions Review

Create a column for the size of mattress

mattress_df = (details_df
.filter(array_contains(col("details"), "Mattress"))
.withColumn("size", element_at(col("details"), 2)))
display(mattress_df)

For each email, check what mattress type they purchased.

size_df = mattress_df.groupBy("email").agg(collect_set("size").alias("size options"))
display(size_df)

Miscellaneous Functions

Joins Demo

Handling Null Values

How UDFs run in Scala and Python?

  • In case of Scala UDFs, it lies inside the executor so there is no inter process communication is required.
  • But in case of Python UDF, we will have a driver program and an executor but the Python UDFs run outside the Executor.
  • This means that the Spark DataFrame rows are deserialized, sent row by row to the python UDF that transforms it, serializes it row by row and sends it back to the executors.
  • The UDFs are registered on Python Interpreters.

Transform function execution

  • The custom UDF that was written took twice as long to execute due to the extra work involved.
  • The problem with UDFs is that if we write it in Python, there is extra overhead of converting it to Java bytecode and providing it to the executor.

How to register UDFs?

Python vs Pandas UDF

SQL UDF

Apache Spark Architecture

Cluster Architecture

  • Each worker will have only one executor

There is one task for each memory partition.

Driver's work

The rest of the memory partitions do not have any cores to execute on and wait in the queue.

A,E,H and J have finished working. They are idle so worker node assigns the memory partitions 13,14,15 and 16 to them.

Once all of them complete the work, the driver sends the answer set to the client.

  • The intermediate result sets mentioned as shuffle write and shuffle read are then sent to the worker node hard drives.

Deep Dive Into Shuffle

  • The memory used went from 20mb to 560 bytes because we are only storing the key value pairs and not the entire data. The key value pairs indicate the color and the number of rows that they are part of. The data is written to the disk under shuffle write.

  • In stage 2 we build shuffle partition with Green, Red and Yellow rows.

Summary

Query Optimization

  • RDD is resilient distributed dataset that is just an array of data.

Example

Example 2

Adaptive Query Optimization

Shuffle Partitions : With and Without AQE

Predicate Pushdown

  • In this case, less RAM is used so query is faster.

  • If we don't remove things from cache that is not needed, when there is only one core in the executor and a query is called in, then it will be given precedence and the cache will be evicted.

Memory Partitioning Guidelines

In the below example we have 8 memory partitions and 200 shuffle partitions.

Check the number of cores in the cluster

Check the number of memory partitions

Repartitioning Dataset

Repartitioning is always a wide transformation.

AQE and Shuffle Partitions

We can set the spark.sql.shuffle.partitions based on the largest dataset that our application can process.