← Back to Blog

Spark Performance Tuning: From "It's Slow" to Actually Knowing Why

"The Spark job is slow" is one of the least actionable problem statements in data engineering. Slow compared to what? Which stage? Is it CPU-bound, I/O-bound, shuffle-bound, or skew-bound? Is it Spark's fault, or is it the cluster configuration, the storage layer, or the query plan? Without understanding Spark's execution model, performance tuning is just superstition — restarting the cluster and hoping.

This article builds the mental model you need to actually diagnose Spark performance problems: the execution model (stages, tasks, shuffles), partitioning (the root cause of most problems), the join strategies (when Spark picks the wrong one and why), Adaptive Query Execution (AQE) in Spark 3.x, memory configuration, and what Photon on Databricks actually accelerates.

The Execution Model: Jobs, Stages, and Tasks

When you call an action (count(), write(), collect()), Spark creates a job. The job is divided into stages by shuffle boundaries. Within each stage, Spark creates one task per input partition, running tasks in parallel across executors. Each task processes one partition of data entirely on a single executor core.

graph LR
    subgraph Stage1["Stage 1 (read + filter + map)"]
        T1["Task 1\nPartition 0"]
        T2["Task 2\nPartition 1"]
        T3["Task 3\nPartition 2"]
        T4["Task N\nPartition N"]
    end

    Shuffle["SHUFFLE\n(network exchange\nby key)"]

    subgraph Stage2["Stage 2 (aggregate + write)"]
        T5["Task 1\nReduced partition 0"]
        T6["Task 2\nReduced partition 1"]
        T7["Task M\nReduced partition M"]
    end

    Stage1 --> Shuffle --> Stage2

    style Shuffle fill:#3a1a1a,stroke:#e55,color:#faa
          

A shuffle boundary forces all tasks in Stage 1 to complete before Stage 2 can start — it's a global synchronization barrier. Data is written to disk by Stage 1 tasks, transferred over the network, and read by Stage 2 tasks. This is why shuffles are expensive: they involve disk I/O, network transfer, and a full pipeline stall.

The Spark UI's Stage tab is your primary diagnostic tool. Look for: stages with dramatically different task durations (data skew), stages with high shuffle read/write bytes (shuffle bottleneck), and stages where most tasks finish quickly but a few take 10x longer (straggler tasks, often caused by skew or GC pressure).

Partitioning: The Root Cause of Most Performance Problems

Spark parallelism is determined by partition count. Too few partitions → underutilized cluster, each task processes too much data, OOM errors. Too many partitions → overhead dominates (scheduling, metadata, small file problem on write), tasks finish in milliseconds but the scheduler can't keep up.

The right partition size: 128–256 MB of data per partition for most workloads. Calculate target partition count as: total_data_bytes / 134217728 (128 MB). For a 100 GB dataset, target 800 partitions.

Repartition vs Coalesce

repartition(N) triggers a full shuffle — all data moves across the network. Use when increasing partitions or when you need a specific key-based distribution. coalesce(N) reduces partitions without a full shuffle by combining adjacent partitions on the same executor. Use only for reducing partitions (never for increasing), and only when current partitions are already well-balanced. The common mistake: using coalesce(1) to write a single output file triggers a full sort/collect on the driver — use repartition(1) or write with controlled parallelism instead.

The Skew Problem

Data skew is when a small number of keys hold a disproportionate fraction of the data. In a grouped aggregation on a skewed column, one task processes 80% of the data while 99 other tasks finish quickly. The job takes as long as that one slow task — and no amount of adding executors helps, because the work is serialized on one core.

from pyspark.sql import functions as F

# Detect skew: check key distribution before a join
df.groupBy("customer_id") \
  .count() \
  .orderBy(F.desc("count")) \
  .show(20)

# Salting technique: distribute a hot key across multiple partitions
SALT_FACTOR = 20

# Add salt to the large table
large_df = large_df.withColumn(
    "salted_key",
    F.concat(F.col("customer_id"), F.lit("_"), (F.rand() * SALT_FACTOR).cast("int"))
)

# Explode the small table to match all salt values
small_df = small_df.withColumn("salt", F.array([F.lit(i) for i in range(SALT_FACTOR)])) \
                   .withColumn("salt", F.explode("salt")) \
                   .withColumn("salted_key", F.concat(F.col("customer_id"), F.lit("_"), F.col("salt")))

# Join on salted key — no single partition holds all of customer "AMAZON"
result = large_df.join(small_df, on="salted_key", how="inner") \
                 .drop("salted_key", "salt")

Join Strategies: When Spark Picks the Wrong One

Spark's query planner chooses a join strategy based on table size estimates. Understanding the strategies helps you recognize when the planner makes the wrong choice (which it does when statistics are stale or unavailable).

StrategyWhen usedMechanismPerformance
Broadcast Hash Join Small table ≤ autoBroadcastJoinThreshold (10MB default) Small table broadcast to every executor; large table scanned once, no shuffle Fastest — no shuffle at all
Sort-Merge Join Both tables large Both sides shuffled by join key, sorted, then merged Expensive (2 shuffles) but handles any size
Shuffle Hash Join One table moderately small, build side fits in memory Shuffle by key, build hash table from smaller side Faster than SMJ if build side fits in executor memory
from pyspark.sql import functions as F

# Force broadcast join when Spark underestimates table size
result = large_orders_df.join(
    F.broadcast(small_country_lookup),   # force broadcast
    on="country_code",
    how="left"
)

# Increase threshold to allow larger broadcasts (use with care)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)  # 50 MB

# Collect statistics so Spark can make better decisions
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS")

Adaptive Query Execution (AQE) — Spark 3.x

AQE is Spark 3.x's answer to the static query plan problem. Traditional Spark plans queries before execution using estimated statistics, which are often wrong. AQE re-plans the query at shuffle boundaries using actual statistics gathered from completed stages.

AQE provides three automatic optimizations:

  • Coalescing shuffle partitions: If spark.sql.shuffle.partitions=200 but the actual shuffle produces mostly empty partitions (common for filtered datasets), AQE coalesces them down automatically — reducing task overhead without manual tuning.
  • Converting sort-merge join to broadcast join: If after the first stage Spark sees the build side is actually small enough to broadcast, it switches strategies. This is hugely valuable when filters dramatically reduce table size mid-query.
  • Skew join optimization: AQE detects skewed partitions and splits them into sub-partitions, distributing the work across multiple tasks. This is the automatic version of the salting technique — and it works without any code changes.
# Enable AQE (default in Spark 3.2+, but verify)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# AQE skew thresholds (tune based on your data)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")

Memory Configuration

Spark executor memory is divided into: execution memory (for shuffles, joins, aggregations), storage memory (for cached DataFrames), and user memory (for your Python/Scala objects). The unified memory manager (default since Spark 1.6) allows execution and storage to borrow from each other dynamically.

# Typical executor memory configuration
# Total executor memory = executor.memory + executor.memoryOverhead
# executor.memoryOverhead covers off-heap, Python worker, etc.

spark_config = {
    "spark.executor.memory":        "8g",    # JVM heap
    "spark.executor.memoryOverhead": "2g",   # off-heap overhead (min 10% of heap)
    "spark.executor.cores":         "4",     # tasks per executor
    "spark.memory.fraction":        "0.6",   # fraction of heap for execution+storage
    "spark.memory.storageFraction": "0.5",   # fraction of above for storage cache
    # For Python (PySpark) workloads:
    "spark.executor.pyspark.memory": "2g",   # Python worker memory (separate from JVM)
}

GC pressure is a common cause of straggler tasks. Signs: executor logs showing long GC pauses, tasks completing slowly with no obvious data reason, executor OOM errors. Fix: increase executor memory, reduce partition size (less data per task = smaller objects), use Kryo serialization instead of Java serialization, or switch to Databricks with Photon (C++ runtime has no JVM GC).

Photon on Databricks: What It Actually Accelerates

Photon is Databricks's native vectorized execution engine written in C++, operating at the Spark physical plan layer. It vectorizes columnar operations — scanning Parquet, filtering, hashing for joins and aggregations, and sorting. The result: 2–8x faster performance on SQL and DataFrame operations compared to vanilla Spark JVM code, with no GC overhead.

What Photon does accelerate: SQL queries on Parquet/Delta, aggregations (groupBy, sum, count), sort-merge joins, window functions, string operations. What it does not accelerate: Python UDFs (these run in a Python worker outside Photon), RDD operations, anything using Pandas UDFs that returns complex objects. The most common Photon anti-pattern: a DataFrame pipeline with three SQL operations (fast) followed by a Python UDF (falls back to JVM/Python, loses the Photon benefit for that stage).

The single most impactful Spark tuning action: Run df.explain(mode="cost") and look for BroadcastHashJoin vs SortMergeJoin choices. Then check whether ANALYZE TABLE ... COMPUTE STATISTICS has been run on your Delta/Hive tables. Stale statistics that cause the planner to pick a sort-merge join where a broadcast join would work is the single most common source of unnecessary shuffles — and fixing it takes 30 seconds.