Skip to content

Operator Chaining and Slot Parallelism๐Ÿ”—

Here you go โ€” a clear explanation of how Flink chains operators, how slots are assigned, and a comparison with Spark executor cores.


1. How Flink Chains Operators๐Ÿ”—

Flink tries to optimize performance by operator chaining.

What is chaining?๐Ÿ”—

If multiple operators can run in the same thread, Flink merges them into one physical task.

Example:

source โ†’ map โ†’ filter โ†’ flatMap โ†’ sink

If they share the same parallelism and no shuffle is needed, Flink chains them into one task, executed inside a single slot.

When are operators chained?๐Ÿ”—

Operators are chained when:

  • They have same parallelism
  • They donโ€™t require data redistribution (no keyBy, window, join, shuffle)
  • They run in the same task slot
  • They are compatible for chaining

Example of chainable operators: map, flatMap, filter, process, async I/O

When are operators NOT chained?๐Ÿ”—

Operators are not chained when:

  • You use keyBy (introduces a network shuffle)
  • You use windowing (watermark alignment)
  • Join operations
  • Repartitioning operators (rebalance, broadcast, global)
  • Parallelism changes between operators

These break the chain and create a new task.


2. How Slots Are Assigned (Important Concept)๐Ÿ”—

Main rule:๐Ÿ”—

One slot holds one operator chain (one pipeline of tasks).

Example:

TM1 slot1: source โ†’ map โ†’ filter
TM1 slot2: keyBy โ†’ window โ†’ reduce
TM2 slot1: keyBy โ†’ window โ†’ reduce
TM2 slot2: sink

Slots DO NOT correspond to individual operators.๐Ÿ”—

If parallelism = 4:

  • Each operator chain runs 4 times
  • Flink places those 4 chains into available slots across all TMs

If you have a cluster with:

  • 3 Task Managers
  • Each with 2 slots
  • Total slots = 6

You cannot run a job with parallelism > 6.

Slots are the capacity limit for subtasks.


3. Comparison with Spark Executors๐Ÿ”—

Flink and Spark look similar at first, but they are fundamentally different.


Spark Executor (batch or micro-batch)๐Ÿ”—

Spark Executor = JVM process Inside it:

  • N CPU cores
  • M memory
  • Runs multiple task threads concurrently

Executors are long-running, but they process batches, not continuous events.


Task Manager = JVM process Inside it:

  • N slots (logical)
  • Slots run continuous streaming subtasks
  • Each subtask is like a small dedicated worker that never stops

Slots โ‰  cores Slots run operator chains continuously.


Key Differences Table๐Ÿ”—

Concept Spark Flink
Processing model Micro-batch (mostly) True continuous streaming
Execution units Executors and tasks Task Managers and slots
Task lifetime Short-lived tasks Long-running subtasks
Cores Dedicated per executor Shared across all slots
Operator chaining Rare Core optimization
Backpressure Coarse Very fine-grained
State Mostly external Native, large, fault-tolerant state
Checkpoints RDD lineage recompute Consistent snapshots

4. Putting it all together (Process Flow)๐Ÿ”—

Example job:๐Ÿ”—

source โ†’ map โ†’ filter โ†’ keyBy โ†’ window โ†’ reduce โ†’ sink

Step-by-step:๐Ÿ”—

  1. Flink chains map + filter (same parallelism)

  2. Parallelism triggers 4 subtasks:

  3. 4 chain subtasks for source โ†’ map โ†’ filter

  4. 4 subtasks for keyBy โ†’ window โ†’ reduce
  5. 4 subtasks for sink

  6. Total subtasks = 12

  7. These 12 subtasks are placed into available slots

  8. Each subtask runs continuously and maintains state


5. Short Summary๐Ÿ”—

  • Operators are chained when possible to reduce overhead.
  • One slot = one chain = one subtask, not one operator.
  • Spark executors run many short tasks; Flink slots run long-lived pipelines.
  • Slots determine parallelism; cores determine actual compute power.

You get 4 subtasks for each operator (or operator chain) because you set parallelism = 4.

So Flink creates:

  • 4 parallel instances of the sourceโ†’mapโ†’filter chain
  • 4 parallel instances of the keyByโ†’windowโ†’reduce chain
  • 4 parallel instances of the sink

This gives 12 total subtasks, not 3.


Why? Because parallelism applies per operator (or operator chain), not per whole flow๐Ÿ”—

In Flink:

  • The program is a DAG of operators
  • Each operator runs in parallel
  • Parallelism defines how many copies of each operator you run

Think of it like this:๐Ÿ”—

If parallelism = 4:

  • You are telling Flink: โ€œRun 4 copies of every operator independently.โ€

So even if you write:

stream
  .map(...)
  .filter(...)
  .keyBy(...)
  .window(...)
  .reduce(...)
  .sink(...)

Flink actually constructs parallel pipelines.


Let's visualize it๐Ÿ”—

Logical code (what you write):๐Ÿ”—

source โ†’ map โ†’ filter โ†’ keyBy โ†’ window โ†’ reduce โ†’ sink

Physical execution (parallelism = 4):๐Ÿ”—

Step 1: Chainable operators๐Ÿ”—

source, map, filter โ†’ get chained This becomes one operator chain, but still parallel.

Chain 1
Chain 2
Chain 3
Chain 4

These run in 4 different slots (distributed across Task Managers).


Why do keyBy โ†’ window โ†’ reduce also get 4 subtasks?๐Ÿ”—

Because they also have parallelism = 4.

They are unchained from the previous chain because keyBy requires a network shuffle.

So Flink generates:

  • 4 separate receiving tasks
  • Each gets a partition of keyed data

Then why does sink also have 4 subtasks?๐Ÿ”—

Same reason: parallelism = 4.

Unless you explicitly set:

sink.setParallelism(1)

the sink inherits parallelism from upstream.


Key Idea๐Ÿ”—

Parallelism applies per-operator, not per-entire-pipeline.

So if your DAG has 3 physical operators (chains), and each has parallelism = 4:

Operator or chain Parallelism Subtasks
Sourceโ†’Mapโ†’Filter 4 4
KeyByโ†’Windowโ†’Reduce 4 4
Sink 4 4

Total = 12 subtasks


Analogy๐Ÿ”—

Imagine a factory line:

  • You have 4 workers doing Step 1 (sourceโ†’mapโ†’filter)
  • 4 workers doing Step 2 (keyByโ†’windowโ†’reduce)
  • 4 workers doing Step 3 (sink)

Even though it's one โ€œflowโ€, each step needs 4 workers to keep up.

This is exactly how Flink distributes work.