1. Role of the PySpark βmain methodβ (driver)π
Your script (the driver) performs four key responsibilities when a UDF is present:
1.1 Define the Python functionπ
- This function exists only in the driverβs Python process initially.
1.2 Wrap the function as a Spark UDFπ
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
my_udf = udf(my_func, IntegerType())
What happens internally:
- Spark creates a UDF expression object
-
Associates:
-
the Python function
- return type
- evaluation type (regular UDF vs Pandas UDF)
- Marks this as a Python execution boundary
1.3 Build the logical and physical planπ
At this stage:
- No computation happens
- Spark constructs a logical plan
Example logical plan (simplified):
Then Sparkβs optimizer (Catalyst) creates a physical plan. With a UDF, it inserts special nodes such as:
This indicates:
- Data must leave JVM
- Be processed in Python
- Return back to JVM
1.4 Serialize the UDF and ship itπ
When an action is triggered:
The driver:
- Serializes
my_funcusing cloudpickle - Packages it with the task
- Sends it to executors via the Spark scheduler
2. What the driver does NOT doπ
Even with UDFs, the driver does not:
- Execute
my_func - Process rows or partitions
- Handle actual data transformations
It only:
- Defines logic
- Plans execution
- Distributes work
3. End-to-end execution flow with UDFπ
Consider this full example:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
data = [(1,), (2,), (3,)]
df = spark.createDataFrame(data, ["age"])
def my_func(x):
return x * 2
my_udf = udf(my_func, IntegerType())
df = df.withColumn("double_age", my_udf("age"))
df.show()
Step-by-step executionπ
Step 1: Driver builds planπ
- Creates DataFrame
- Adds UDF transformation
- Builds logical + physical plan
Step 2: Task creationπ
- Driver splits data into partitions
- Creates tasks
- Attaches serialized UDF
Step 3: Executor receives taskπ
- Executor JVM starts processing
- Encounters Python UDF node
Step 4: Python worker is launchedπ
- A separate Python process starts inside executor
- This is the Python worker
Step 5: Data transferπ
- JVM sends data (rows or batches) to Python worker
- Uses sockets (via Py4J or Arrow)
Step 6: UDF executionπ
Inside Python worker:
- Runs for each row (or batch in Pandas UDF)
Step 7: Return resultsπ
- Python worker sends results back to JVM
- JVM continues execution (projection, write, etc.)
4. Key architectural consequenceπ
When a UDF is used, the driver introduces a cross-language boundary in the plan.
Without UDF:
- Entire execution stays in JVM
- Optimized via Catalyst + Tungsten
- Whole-stage code generation applies
With UDF:
-
Execution includes:
-
JVM β Python β JVM transitions
-
Spark inserts:
-
BatchEvalPythonorArrowEvalPython - Optimization is limited across this boundary
5. Important implicationsπ
5.1 Serialization overheadπ
- Driver serializes function
- Executors deserialize it
- Data is serialized/deserialized between JVM and Python
5.2 Loss of optimizationπ
- Catalyst cannot optimize inside UDF
- No predicate pushdown inside UDF logic
- No code generation for UDF body
5.3 Performance impactπ
- Slower than built-in functions
- Additional process (Python worker)
- Network/socket communication overhead
6. Summaryπ
When a UDF is used, the PySpark main method (driver):
- Defines the Python function
- Wraps it as a Spark UDF object
- Embeds it into the execution plan as a Python operation
- Serializes the function using cloudpickle
- Ships it to executors along with tasks
It does not execute the function. The execution happens in Python workers inside executors, after the driver has completed planning and scheduling.