Apache Flink — Internal Architecture

A developer's guide to how Flink actually works under the hood: the cluster runtime, how a program becomes parallel subtasks, how state lives and survives failures, and how checkpoints, watermarks, and backpressure keep results correct on an unbounded stream.

Apache Flink is a distributed engine for stateful computation over data streams. Where a batch engine processes a finite dataset and stops, Flink treats data as an unbounded stream of events that may never end, and it keeps long-lived state as it goes. Its design rests on three reinforcing ideas: every operator can hold state that survives failures; results are computed in event time (when things happened) rather than the order events happen to arrive; and the combination of checkpoints and replayable sources gives exactly-once semantics for that state. Understanding Flink means understanding how those three pieces fit together on top of a fairly conventional master/worker runtime.

Contents

  1. Design Goals and Core Ideas
  2. Cluster Architecture
  3. The Dataflow Graph
  4. State & State Backends
  5. Checkpointing
  6. Savepoints vs Checkpoints
  7. Time & Windows
  8. Network Shuffle & Backpressure
  9. Summary

1. Design Goals and Core Ideas

Every internal decision in Flink traces back to a small set of goals for processing unbounded streams correctly. Keeping them in mind makes the rest of the architecture predictable.

GoalHow Flink achieves it
Stateful stream processingOperators keep partitioned, fault-tolerant state in a state backend. State is a first-class citizen, not a side effect bolted on top.
Event-time correctnessRecords carry their own timestamps. Watermarks track progress in event time, so results do not depend on the wall-clock order in which events arrive.
Exactly-once semanticsPeriodic checkpoints snapshot a globally consistent cut of all state. On failure the job rewinds to the last checkpoint and replays from there, so each event affects state exactly once.
High throughput, low latencyPipelined, record-at-a-time execution (not micro-batching) with buffered network transfer. Backpressure is handled natively.
ScalabilityEvery operator runs as many parallel subtasks. Parallelism is set per operator, and state is repartitioned cleanly on rescale via savepoints.
Flink design goals
An unbounded stream flows through stateful operators driven by an event-time engine; checkpointing and backpressure keep the results correct and the pipeline stable.
A useful mental model: Flink is a stream processor first and a batch processor second. Batch is treated as the special case of a bounded stream — the same runtime, with the knowledge that the input eventually ends.

2. Cluster Architecture

A Flink cluster follows a master/worker pattern. The master is the JobManager and the workers are TaskManagers. Unlike Cassandra's peer-to-peer ring, there is a clear coordination role — but it is split into three distinct responsibilities so that a single failed job cannot take down the whole cluster.

Flink cluster architecture
The JobManager splits into Dispatcher, ResourceManager, and a per-job JobMaster. TaskManagers expose task slots into which subtasks are deployed.

The components are:

A task slot represents an isolated share of a TaskManager's resources (primarily managed memory). A TaskManager with three slots can run three parallel subtasks. By default Flink allows slot sharing: subtasks from different operators of the same job can share one slot, so a full pipeline slice fits in a single slot and the slot count effectively caps the job's parallelism.

Submitting a job proceeds roughly like this:

function submit(job_graph):
  dispatcher.receive(job_graph)            # REST entry point
  jm = dispatcher.start_job_master(job_graph)
  needed = jm.compute_required_slots()     # from operator parallelism
  slots = resource_manager.allocate(needed)
  exec_graph = jm.build_execution_graph(job_graph)
  for subtask in exec_graph.subtasks():
    slot = slots.pick_for(subtask)
    task_manager(slot).deploy(subtask)     # ship code + state handle
  jm.track_status()                        # reschedule on failure

3. The Dataflow Graph

A Flink program is a logical description of operators and the streams connecting them. Before it runs, that description passes through three representations, each lower-level than the last.

StreamGraph to JobGraph to ExecutionGraph
The program becomes a StreamGraph, then a chained JobGraph sent to the JobManager, then a parallel ExecutionGraph of subtasks deployed onto TaskManagers.

Edges between operators come in two flavors. A forward edge keeps a record on the same subtask (and enables chaining). A redistributing edge — produced by keyBy, a rebalance, or a change in parallelism — sends records across the network to a different subtask. A keyBy in particular partitions the stream by a hash of the key so that all records for a given key always reach the same downstream subtask, which is what makes keyed state possible.

function chain(stream_graph):              # client side
  for op in stream_graph.operators():
    if op.input_is_forward() and op.parallelism == op.input.parallelism \
       and chaining_enabled(op):
      fuse(op, op.input)                   # one task, no network
  return job_graph                         # chained operators

function parallelize(job_graph):           # JobMaster side
  for task in job_graph.tasks():
    for i in range(task.parallelism):
      execution_graph.add_subtask(task, index=i)
  return execution_graph

4. State and State Backends

State is what separates Flink from a stateless stream filter. Anything an operator needs to remember between events — a running count, the contents of an open window, the last seen value per key — is held as Flink-managed state so the runtime can snapshot, restore, and repartition it. There are two kinds.

Keyed vs operator state and backends
Keyed state is partitioned by key and redistributed on rescale; operator state belongs to a subtask. Two backends trade speed for capacity.
KindScopeTypical use
Keyed stateBound to a single key on a keyed stream. Each key has its own independent value; a subtask only sees the keys hashed to it.Per-user counters, per-key aggregations, window contents. ValueState, ListState, MapState.
Operator stateBound to a parallel subtask, not a key. Redistributed by an explicit scheme when parallelism changes.Source connector bookkeeping — e.g. a Kafka source storing the read offset per partition.

Where that state physically lives is the job of the state backend. The choice is purely about capacity and access cost; it does not change semantics.

BackendWhere state livesBest for
HashMapStateBackendAs live Java objects on the JVM heap.State that fits in memory. Fastest access (no serialization), but bounded by heap size and subject to GC pressure.
EmbeddedRocksDBStateBackendSerialized in an embedded RocksDB instance on the local disk of each TaskManager.Very large state, larger than memory. Pays serialization cost per access but supports incremental checkpoints (only changed SSTables are uploaded).

Keyed state is the reason a keyBy matters so much: because every record for a key always lands on the same subtask, that subtask can keep the key's state locally with no coordination. When the job is rescaled, keys are reassigned to subtasks using key groups (fixed-size buckets of the key space), so state moves in whole buckets rather than key by key.

5. Checkpointing

Checkpointing is how Flink makes stateful streaming fault-tolerant. The challenge is to snapshot the state of every operator at a point that is mutually consistent, without stopping the stream. Flink solves this with a variant of the Chandy-Lamport distributed snapshot algorithm, using special records called checkpoint barriers.

Checkpoint barriers and snapshot
The CheckpointCoordinator injects barrier n at the sources; as the barrier flows downstream, each operator snapshots its state to durable storage. When all acknowledge, checkpoint n is complete.

The mechanism works like this:

  1. The CheckpointCoordinator (inside the JobMaster) periodically injects a barrier carrying checkpoint number n into every source subtask.
  2. A barrier flows through the dataflow alongside normal records, never overtaking them. When an operator has received barrier n on all of its input channels, everything before the barrier is "in" checkpoint n and everything after is "out".
  3. At that moment the operator snapshots its state to durable storage (S3, HDFS, etc.), then forwards barrier n to its outputs and resumes processing.
  4. When every operator — through to the sinks — has acknowledged barrier n, the coordinator marks checkpoint n complete. Sinks that participate in exactly-once then commit their pending output.

An operator with multiple inputs cannot snapshot until it has seen the barrier on all of them. The two strategies for handling that wait are the central tradeoff in checkpointing:

ModeWhat it doesTradeoff
AlignedOnce a barrier arrives on one input, that input is buffered ("aligned") until the barrier arrives on all others. Snapshot contains no in-flight data.Cleanest snapshot, but a slow channel stalls the operator while it waits — checkpoint latency grows under backpressure.
UnalignedThe operator snapshots immediately and includes the in-flight records (the buffered data between barriers) in the checkpoint.Checkpoints complete quickly even under heavy backpressure, at the cost of a larger snapshot.

Exactly-once falls directly out of this. Recovery rewinds all operators to the same checkpoint and replays sources from the offsets stored in that checkpoint:

on checkpoint trigger (every checkpoint_interval):
  coordinator.inject_barrier(n) into all sources

function on_barrier(op, n):
  wait_for_barrier_on_all_inputs(op, n)    # aligned mode
  handle = op.snapshot_state_to(durable_store)   # async upload
  op.forward_barrier(n)
  coordinator.ack(op, n, handle)

on failure:
  cp = latest_complete_checkpoint()
  for op in all_operators:
    op.restore_state_from(cp.handle[op])   # consistent cut
  sources.seek(cp.offsets)                 # replay from here
  # each event now affects state exactly once
Exactly-once means exactly-once state, not exactly-once side effects in general. End-to-end exactly-once also needs a replayable source (one that can rewind to a stored offset) and a sink that is either idempotent or supports a two-phase, transactional commit tied to checkpoint completion.

6. Savepoints vs Checkpoints

Savepoints and checkpoints use the same snapshotting machinery but serve opposite purposes. Checkpoints are an automatic, internal safety net the runtime manages for failure recovery. Savepoints are a manual, user-owned snapshot taken deliberately for operational changes.

Savepoints versus checkpoints
Same snapshot machinery, different ownership and intent: checkpoints for automatic recovery, savepoints for planned upgrades and rescaling.
PropertyCheckpointSavepoint
Triggered byFlink, automatically and periodicallyAn operator, manually and on demand
LifecycleOwned by Flink; old ones are discarded automaticallyOwned by you; it persists until you delete it
FormatOptimized for fast write/restore; may be incrementalSelf-contained, portable, always a full snapshot
Primary purposeRecover from an unexpected failurePlanned stop/restart: upgrades, rescaling, migration

Because a savepoint is a portable, complete snapshot, it is the tool for the two most common operational tasks:

# graceful upgrade or rescale
savepoint_path = flink.stop_with_savepoint(job_id)   # drain + full snapshot
deploy(new_job_jar)                                  # or change -p parallelism
flink.run(new_job_jar, from_savepoint=savepoint_path)
# keyed state repartitioned by key group; operators matched by uid

7. Time and Windows

On an unbounded stream you cannot wait for "all the data" before producing a result, and events do not arrive in the order they happened — a mobile client may buffer events offline and deliver them late. Flink's answer is to compute in event time and track progress with watermarks.

Event time, watermarks, and windows
Events are bucketed into windows by their own timestamps. A watermark asserts how far event time has advanced; when it passes a window's end, the window fires. Events after the watermark are late.
on event e arriving at operator:
  window = assign_window(e.event_time)     # by timestamp, not arrival
  if e.event_time <= current_watermark:
    side_output.emit(e)                    # LATE: window already fired
  else:
    window.state.add(e)

on watermark W arriving:
  current_watermark = W
  for w in windows where w.end <= W:
    emit(trigger(w))                       # fire and produce result
    w.clear()

The watermark is fundamentally a latency-versus-completeness dial. Allowing more lateness before emitting a watermark gives later events time to arrive (more complete results) but delays when windows fire (higher latency). A tight watermark fires sooner but risks dropping stragglers.

8. Network Shuffle and Backpressure

When subtasks are connected by a redistributing edge, records cross the network from a producer subtask to a consumer subtask. Flink moves them in buffers (batches of serialized records) rather than one record per packet, for efficiency. The interesting part is what happens when the consumer cannot keep up.

Network shuffle and credit-based backpressure
Data flows forward in network buffers; credits flow backward. A consumer only ever receives as many buffers as it has announced it can hold.

Flink uses credit-based flow control to handle this natively, with no explicit throttling logic in user code:

This is what backpressure means in Flink: it is not an error condition but the normal, designed mechanism that keeps a fast producer from overwhelming a slow consumer. No data is dropped, and no unbounded queue grows; the system simply runs at the rate the bottleneck allows.

function producer_send(channel, buffer):
  if channel.credits == 0:
    block()                              # backpressure: cannot send
  else:
    channel.transfer(buffer)
    channel.credits -= 1

function consumer_loop():
  while True:
    buf = input_gate.next_buffer()       # blocks if none ready
    process(buf)
    recycle(buf)
    announce_credit(buf.channel, +1)     # I have a free buffer again
Because backpressure flows all the way back to the source, a sustained backlog is visible at the source's intake rate and in Flink's per-task backpressure metrics. That makes the bottleneck operator easy to spot — it is the last one that is busy while everything upstream of it is blocked.

9. Summary

Flink's design is a small set of ideas that compose into correct, fault-tolerant stream processing:

ConcernMechanism
Who coordinates the cluster?JobManager split into Dispatcher, ResourceManager, and a per-job JobMaster; TaskManagers provide task slots.
How does a program become work?StreamGraph → chained JobGraph → parallel ExecutionGraph of subtasks deployed into slots.
What does an operator remember?Keyed state (per key) and operator state (per subtask), held in a HashMap or RocksDB state backend.
How does state survive failure?Chandy-Lamport barriers snapshot a consistent cut to durable storage; recovery rewinds all operators to the last checkpoint.
How are results correct?Exactly-once over state, plus replayable sources and transactional/idempotent sinks for end-to-end guarantees.
How do upgrades and rescaling work?Manual, portable savepoints; keyed state repartitioned by key group.
How is out-of-order data handled?Event time + watermarks; windows fire when the watermark passes their end; late data dropped or side-output.
How does a fast stage not swamp a slow one?Credit-based backpressure: consumers grant credits, producers only send what fits, pressure flows to the source.
The recurring theme: Flink keeps long-lived state as a first-class thing, reasons in event time rather than arrival order, and makes the whole pipeline consistent through barriers that flow with the data. Failure recovery, upgrades, and flow control are all built on those same primitives rather than bolted on.