Skip to content

1. Role of the PySpark β€œmain method” (driver)πŸ”—

Your script (the driver) performs four key responsibilities when a UDF is present:

1.1 Define the Python functionπŸ”—

def my_func(x):
    return x * 2
  • This function exists only in the driver’s Python process initially.

1.2 Wrap the function as a Spark UDFπŸ”—

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

my_udf = udf(my_func, IntegerType())

What happens internally:

  • Spark creates a UDF expression object
  • Associates:

  • the Python function

  • return type
  • evaluation type (regular UDF vs Pandas UDF)
  • Marks this as a Python execution boundary

1.3 Build the logical and physical planπŸ”—

df = df.withColumn("new_col", my_udf("age"))

At this stage:

  • No computation happens
  • Spark constructs a logical plan

Example logical plan (simplified):

Project [age, pythonUDF(age) AS new_col]

Then Spark’s optimizer (Catalyst) creates a physical plan. With a UDF, it inserts special nodes such as:

BatchEvalPython / ArrowEvalPython

This indicates:

  • Data must leave JVM
  • Be processed in Python
  • Return back to JVM

1.4 Serialize the UDF and ship itπŸ”—

When an action is triggered:

df.show()

The driver:

  • Serializes my_func using cloudpickle
  • Packages it with the task
  • Sends it to executors via the Spark scheduler

2. What the driver does NOT doπŸ”—

Even with UDFs, the driver does not:

  • Execute my_func
  • Process rows or partitions
  • Handle actual data transformations

It only:

  • Defines logic
  • Plans execution
  • Distributes work

3. End-to-end execution flow with UDFπŸ”—

Consider this full example:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

data = [(1,), (2,), (3,)]
df = spark.createDataFrame(data, ["age"])

def my_func(x):
    return x * 2

my_udf = udf(my_func, IntegerType())

df = df.withColumn("double_age", my_udf("age"))

df.show()

Step-by-step executionπŸ”—

Step 1: Driver builds planπŸ”—

  • Creates DataFrame
  • Adds UDF transformation
  • Builds logical + physical plan

Step 2: Task creationπŸ”—

  • Driver splits data into partitions
  • Creates tasks
  • Attaches serialized UDF

Step 3: Executor receives taskπŸ”—

  • Executor JVM starts processing
  • Encounters Python UDF node

Step 4: Python worker is launchedπŸ”—

  • A separate Python process starts inside executor
  • This is the Python worker

Step 5: Data transferπŸ”—

  • JVM sends data (rows or batches) to Python worker
  • Uses sockets (via Py4J or Arrow)

Step 6: UDF executionπŸ”—

Inside Python worker:

result = my_func(x)
  • Runs for each row (or batch in Pandas UDF)

Step 7: Return resultsπŸ”—

  • Python worker sends results back to JVM
  • JVM continues execution (projection, write, etc.)

4. Key architectural consequenceπŸ”—

When a UDF is used, the driver introduces a cross-language boundary in the plan.

Without UDF:

  • Entire execution stays in JVM
  • Optimized via Catalyst + Tungsten
  • Whole-stage code generation applies

With UDF:

  • Execution includes:

  • JVM β†’ Python β†’ JVM transitions

  • Spark inserts:

  • BatchEvalPython or ArrowEvalPython

  • Optimization is limited across this boundary

5. Important implicationsπŸ”—

5.1 Serialization overheadπŸ”—

  • Driver serializes function
  • Executors deserialize it
  • Data is serialized/deserialized between JVM and Python

5.2 Loss of optimizationπŸ”—

  • Catalyst cannot optimize inside UDF
  • No predicate pushdown inside UDF logic
  • No code generation for UDF body

5.3 Performance impactπŸ”—

  • Slower than built-in functions
  • Additional process (Python worker)
  • Network/socket communication overhead

6. SummaryπŸ”—

When a UDF is used, the PySpark main method (driver):

  1. Defines the Python function
  2. Wraps it as a Spark UDF object
  3. Embeds it into the execution plan as a Python operation
  4. Serializes the function using cloudpickle
  5. Ships it to executors along with tasks

It does not execute the function. The execution happens in Python workers inside executors, after the driver has completed planning and scheduling.